class Fluent::PluginHelper::ServiceDiscovery::Manager

Public Class Methods

new(log:, load_balancer: nil, custom_build_method: nil) click to toggle source
# File lib/fluent/plugin_helper/service_discovery/manager.rb, line 24
def initialize(log:, load_balancer: nil, custom_build_method: nil)
  @log = log
  @load_balancer = load_balancer || RoundRobinBalancer.new
  @custom_build_method = custom_build_method

  @discoveries = []
  @services = {}
  @queue = Queue.new
  @static_config = true
end

Public Instance Methods

configure(configs, parent: nil) click to toggle source
# File lib/fluent/plugin_helper/service_discovery/manager.rb, line 35
def configure(configs, parent: nil)
  configs.each do |config|
    type, conf = if config.has_key?(:conf) # for compatibility with initial API
                   [config[:type], config[:conf]]
                 else
                   [config['@type'], config]
                 end

    sd = Fluent::Plugin.new_sd(type, parent: parent)
    sd.configure(conf)

    sd.services.each do |s|
      @services[s.discovery_id] = build_service(s)
    end
    @discoveries << sd

    if @static_config && type.to_sym != :static
      @static_config = false
    end
  end

  rebalance
end
rebalance() click to toggle source
# File lib/fluent/plugin_helper/service_discovery/manager.rb, line 103
def rebalance
  @load_balancer.rebalance(services)
end
run_once() click to toggle source
# File lib/fluent/plugin_helper/service_discovery/manager.rb, line 77
def run_once
  # Don't care race in this loop intentionally
  s = @queue.size

  if s == 0
    return
  end

  s.times do
    msg = @queue.pop

    unless msg.is_a?(Fluent::Plugin::ServiceDiscovery::DiscoveryMessage)
      @log.warn("BUG: #{msg}")
      next
    end

    begin
      handle_message(msg)
    rescue => e
      @log.error(e)
    end
  end

  rebalance
end
select_service(&block) click to toggle source
# File lib/fluent/plugin_helper/service_discovery/manager.rb, line 107
def select_service(&block)
  @load_balancer.select_service(&block)
end
services() click to toggle source
# File lib/fluent/plugin_helper/service_discovery/manager.rb, line 111
def services
  @services.values
end
start() click to toggle source
# File lib/fluent/plugin_helper/service_discovery/manager.rb, line 63
def start
  @discoveries.each do |d|
    d.start(@queue)
  end
end
static_config?() click to toggle source
# File lib/fluent/plugin_helper/service_discovery/manager.rb, line 59
def static_config?
  @static_config
end

Private Instance Methods

build_service(n) click to toggle source
# File lib/fluent/plugin_helper/service_discovery/manager.rb, line 140
def build_service(n)
  @custom_build_method ? @custom_build_method.call(n) : n
end
handle_message(msg) click to toggle source
# File lib/fluent/plugin_helper/service_discovery/manager.rb, line 117
def handle_message(msg)
  service = msg.service

  case msg.type
  when Fluent::Plugin::ServiceDiscovery::SERVICE_IN
    if (n = build_service(service))
      @log.info("Service in: name=#{service.name} #{service.host}:#{service.port}")
      @services[service.discovery_id] = n
    else
      raise "failed to build service in name=#{service.name} #{service.host}:#{service.port}"
    end
  when Fluent::Plugin::ServiceDiscovery::SERVICE_OUT
    s = @services.delete(service.discovery_id)
    if s
      @log.info("Service out: name=#{service.name} #{service.host}:#{service.port}")
    else
      @log.warn("Not found service: name=#{service.name} #{service.host}:#{service.port}")
    end
  else
    @log.error("BUG: unknow message type: #{msg.type}")
  end
end