class Fluent::Plugin::ForwardOutput::ConnectionManager

Constants

RequestInfo

Public Class Methods

new(log:, secure:, connection_factory:, socket_cache:) click to toggle source

@param log [Logger] @param secure [Boolean] @param connection_factory [Proc] @param SocketCache [Fluent::ForwardOutput::SocketCache]

# File lib/fluent/plugin/out_forward/connection_manager.rb, line 28
def initialize(log:, secure:, connection_factory:, socket_cache:)
  @log = log
  @secure = secure
  @connection_factory = connection_factory
  @socket_cache = socket_cache
end

Public Instance Methods

close(sock) click to toggle source
# File lib/fluent/plugin/out_forward/connection_manager.rb, line 72
def close(sock)
  if @socket_cache
    @socket_cache.checkin(sock)
  else
    sock.close_write rescue nil
    sock.close rescue nil
  end
end
connect(host:, port:, hostname:, ack: nil) { |socket, request_info| ... } click to toggle source

@param ack [Fluent::Plugin::ForwardOutput::AckHander::Ack|nil]

# File lib/fluent/plugin/out_forward/connection_manager.rb, line 40
def connect(host:, port:, hostname:, ack: nil, &block)
  if @socket_cache
    return connect_keepalive(host: host, port: port, hostname: hostname, ack: ack, &block)
  end

  @log.debug('connect new socket')
  socket = @connection_factory.call(host, port, hostname)
  request_info = RequestInfo.new(@secure ? :helo : :established)

  unless block_given?
    return [socket, request_info]
  end

  begin
    yield(socket, request_info)
  ensure
    if ack
      ack.enqueue(socket)
    else
      socket.close_write rescue nil
      socket.close rescue nil
    end
  end
end
purge_obsolete_socks() click to toggle source
# File lib/fluent/plugin/out_forward/connection_manager.rb, line 65
def purge_obsolete_socks
  unless @socket_cache
    raise "Do not call this method without keepalive option"
  end
  @socket_cache.purge_obsolete_socks
end
stop() click to toggle source
# File lib/fluent/plugin/out_forward/connection_manager.rb, line 35
def stop
  @socket_cache && @socket_cache.clear
end

Private Instance Methods

connect_keepalive(host:, port:, hostname:, ack: nil) { |socket, request_info| ... } click to toggle source
# File lib/fluent/plugin/out_forward/connection_manager.rb, line 83
def connect_keepalive(host:, port:, hostname:, ack: nil)
  request_info = RequestInfo.new(:established)
  socket = @socket_cache.checkout_or([host, port, hostname]) do
    s = @connection_factory.call(host, port, hostname)
    request_info = RequestInfo.new(@secure ? :helo : :established) # overwrite if new connection
    s
  end

  unless block_given?
    return [socket, request_info]
  end

  ret = nil
  begin
    ret = yield(socket, request_info)
  rescue
    @socket_cache.revoke(socket)
    raise
  else
    if ack
      ack.enqueue(socket)
    else
      @socket_cache.checkin(socket)
    end
  end

  ret
end