class Fluent::Plugin::ForwardOutput::SocketCache

Constants

TimedSocket

Public Class Methods

new(timeout, log) click to toggle source
# File lib/fluent/plugin/out_forward/socket_cache.rb, line 24
def initialize(timeout, log)
  @log = log
  @timeout = timeout
  @available_sockets = Hash.new { |obj, k| obj[k] = [] }
  @inflight_sockets = {}
  @inactive_sockets = []
  @mutex = Mutex.new
end

Public Instance Methods

checkin(sock) click to toggle source
# File lib/fluent/plugin/out_forward/socket_cache.rb, line 50
def checkin(sock)
  @mutex.synchronize do
    if (s = @inflight_sockets.delete(sock))
      s.timeout = timeout
      @available_sockets[s.key] << s
    else
      @log.debug("there is no socket #{sock}")
    end
  end
end
checkout_or(key) { || ... } click to toggle source
# File lib/fluent/plugin/out_forward/socket_cache.rb, line 33
def checkout_or(key)
  @mutex.synchronize do
    tsock = pick_socket(key)

    if tsock
      tsock.sock
    else
      sock = yield
      new_tsock = TimedSocket.new(timeout, key, sock)
      @log.debug("connect new socket #{new_tsock}")

      @inflight_sockets[sock] = new_tsock
      new_tsock.sock
    end
  end
end
clear() click to toggle source
# File lib/fluent/plugin/out_forward/socket_cache.rb, line 98
def clear
  sockets = []
  @mutex.synchronize do
    sockets += @available_sockets.values.flat_map { |v| v }
    sockets += @inflight_sockets.values
    sockets += @inactive_sockets

    @available_sockets.clear
    @inflight_sockets.clear
    @inactive_sockets.clear
  end

  sockets.each do |s|
    s.sock.close rescue nil
  end
end
purge_obsolete_socks() click to toggle source
# File lib/fluent/plugin/out_forward/socket_cache.rb, line 71
def purge_obsolete_socks
  sockets = []

  @mutex.synchronize do
    # don't touch @inflight_sockets

    @available_sockets.each do |_, socks|
      socks.each do |sock|
        if expired_socket?(sock)
          sockets << sock
          socks.delete(sock)
        end
      end
    end

    # reuse same object (@available_sockets)
    @available_sockets.reject! { |_, v| v.empty? }

    sockets += @inactive_sockets
    @inactive_sockets.clear
  end

  sockets.each do |s|
    s.sock.close rescue nil
  end
end
revoke(sock) click to toggle source
# File lib/fluent/plugin/out_forward/socket_cache.rb, line 61
def revoke(sock)
  @mutex.synchronize do
    if (s = @inflight_sockets.delete(sock))
      @inactive_sockets << s
    else
      @log.debug("there is no socket #{sock}")
    end
  end
end

Private Instance Methods

expired_socket?(sock, time: Time.now) click to toggle source
# File lib/fluent/plugin/out_forward/socket_cache.rb, line 137
def expired_socket?(sock, time: Time.now)
  sock.timeout ? sock.timeout < time : false
end
pick_socket(key) click to toggle source

this method is not thread safe

# File lib/fluent/plugin/out_forward/socket_cache.rb, line 118
def pick_socket(key)
  if @available_sockets[key].empty?
    return nil
  end

  t = Time.now
  if (s = @available_sockets[key].find { |sock| !expired_socket?(sock, time: t) })
    @inflight_sockets[s.sock] = @available_sockets[key].delete(s)
    s.timeout = timeout
    s
  else
    nil
  end
end
timeout() click to toggle source
# File lib/fluent/plugin/out_forward/socket_cache.rb, line 133
def timeout
  @timeout && Time.now + @timeout
end