module Fluent::ServerModule
Public Instance Methods
after_run()
click to toggle source
# File lib/fluent/supervisor.rb, line 77 def after_run stop_windows_event_thread if Fluent.windows? stop_rpc_server if @rpc_endpoint stop_counter_server if @counter Fluent::Supervisor.cleanup_resources end
before_run()
click to toggle source
# File lib/fluent/supervisor.rb, line 46 def before_run @fluentd_conf = config[:fluentd_conf] @rpc_endpoint = nil @rpc_server = nil @counter = nil if config[:rpc_endpoint] @rpc_endpoint = config[:rpc_endpoint] @enable_get_dump = config[:enable_get_dump] run_rpc_server end if Fluent.windows? install_windows_event_handler else install_supervisor_signal_handlers end if counter = config[:counter_server] run_counter_server(counter) end if config[:disable_shared_socket] $log.info "shared socket for multiple workers is disabled" else socket_manager_path = ServerEngine::SocketManager::Server.generate_path ServerEngine::SocketManager::Server.open(socket_manager_path) ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = socket_manager_path.to_s end end
install_supervisor_signal_handlers()
click to toggle source
# File lib/fluent/supervisor.rb, line 172 def install_supervisor_signal_handlers return if Fluent.windows? trap :HUP do $log.debug "fluentd supervisor process get SIGHUP" supervisor_sighup_handler end trap :USR1 do $log.debug "fluentd supervisor process get SIGUSR1" supervisor_sigusr1_handler end trap :USR2 do $log.debug 'fluentd supervisor process got SIGUSR2' supervisor_sigusr2_handler end end
install_windows_event_handler()
click to toggle source
# File lib/fluent/supervisor.rb, line 209 def install_windows_event_handler return unless Fluent.windows? @pid_signame = "fluentd_#{$$}" @signame = config[:signame] Thread.new do ipc = Win32::Ipc.new(nil) events = [ Win32::Event.new("#{@pid_signame}_STOP_EVENT_THREAD"), Win32::Event.new("#{@pid_signame}"), Win32::Event.new("#{@pid_signame}_HUP"), Win32::Event.new("#{@pid_signame}_USR1"), Win32::Event.new("#{@pid_signame}_USR2"), ] if @signame signame_events = [ Win32::Event.new("#{@signame}"), Win32::Event.new("#{@signame}_HUP"), Win32::Event.new("#{@signame}_USR1"), Win32::Event.new("#{@signame}_USR2"), ] events.concat(signame_events) end begin loop do idx = ipc.wait_any(events, Windows::Synchronize::INFINITE) if idx > 0 && idx <= events.length $log.debug("Got Win32 event \"#{events[idx - 1].name}\"") else $log.warn("Unexpected reutrn value of Win32::Ipc#wait_any: #{idx}") end case idx when 2, 6 stop(true) when 3, 7 supervisor_sighup_handler when 4, 8 supervisor_sigusr1_handler when 5, 9 supervisor_sigusr2_handler when 1 break end end ensure events.each { |event| event.close } end end end
kill_worker()
click to toggle source
# File lib/fluent/supervisor.rb, line 305 def kill_worker if config[:worker_pid] pids = config[:worker_pid].clone config[:worker_pid].clear pids.each_value do |pid| if Fluent.windows? Process.kill :KILL, pid else Process.kill :TERM, pid end end end end
reload()
click to toggle source
# File lib/fluent/supervisor.rb, line 202 def reload @monitors.each do |m| m.send_command("RELOAD\n") end end
restart(graceful)
click to toggle source
Override some methods of ServerEngine::MultiSpawnWorker Since Fluentd's Supervisor
doesn't use ServerEngine's HUP, USR1 and USR2 handlers (see install_supervisor_signal_handlers
), they should be disabled also on Windows, just send commands to workers instead.
# File lib/fluent/supervisor.rb, line 196 def restart(graceful) @monitors.each do |m| m.send_command(graceful ? "GRACEFUL_RESTART\n" : "IMMEDIATE_RESTART\n") end end
run_counter_server(counter_conf)
click to toggle source
# File lib/fluent/supervisor.rb, line 160 def run_counter_server(counter_conf) @counter = Fluent::Counter::Server.new( counter_conf.scope, {host: counter_conf.bind, port: counter_conf.port, log: $log, path: counter_conf.backup_path} ) @counter.start end
run_rpc_server()
click to toggle source
# File lib/fluent/supervisor.rb, line 84 def run_rpc_server @rpc_server = RPC::Server.new(@rpc_endpoint, $log) # built-in RPC for signals @rpc_server.mount_proc('/api/processes.interruptWorkers') { |req, res| $log.debug "fluentd RPC got /api/processes.interruptWorkers request" Process.kill :INT, $$ nil } @rpc_server.mount_proc('/api/processes.killWorkers') { |req, res| $log.debug "fluentd RPC got /api/processes.killWorkers request" Process.kill :TERM, $$ nil } @rpc_server.mount_proc('/api/processes.flushBuffersAndKillWorkers') { |req, res| $log.debug "fluentd RPC got /api/processes.flushBuffersAndKillWorkers request" if Fluent.windows? supervisor_sigusr1_handler stop(true) else Process.kill :USR1, $$ Process.kill :TERM, $$ end nil } @rpc_server.mount_proc('/api/plugins.flushBuffers') { |req, res| $log.debug "fluentd RPC got /api/plugins.flushBuffers request" if Fluent.windows? supervisor_sigusr1_handler else Process.kill :USR1, $$ end nil } @rpc_server.mount_proc('/api/config.reload') { |req, res| $log.debug "fluentd RPC got /api/config.reload request" if Fluent.windows? # restart worker with auto restarting by killing kill_worker else Process.kill :HUP, $$ end nil } @rpc_server.mount_proc('/api/config.dump') { |req, res| $log.debug "fluentd RPC got /api/config.dump request" $log.info "dump in-memory config" supervisor_dump_config_handler nil } @rpc_server.mount_proc('/api/config.gracefulReload') { |req, res| $log.debug "fluentd RPC got /api/config.gracefulReload request" if Fluent.windows? supervisor_sigusr2_handler else Process.kill :USR2, $$ end nil } @rpc_server.mount_proc('/api/config.getDump') { |req, res| $log.debug "fluentd RPC got /api/config.getDump request" $log.info "get dump in-memory config via HTTP" res.body = supervisor_get_dump_config_handler [nil, nil, res] } if @enable_get_dump @rpc_server.start end
stop_counter_server()
click to toggle source
# File lib/fluent/supervisor.rb, line 168 def stop_counter_server @counter.stop end
stop_rpc_server()
click to toggle source
# File lib/fluent/supervisor.rb, line 156 def stop_rpc_server @rpc_server.shutdown end
stop_windows_event_thread()
click to toggle source
# File lib/fluent/supervisor.rb, line 260 def stop_windows_event_thread if Fluent.windows? ev = Win32::Event.open("#{@pid_signame}_STOP_EVENT_THREAD") ev.set ev.close end end
supervisor_dump_config_handler()
click to toggle source
# File lib/fluent/supervisor.rb, line 319 def supervisor_dump_config_handler $log.info @fluentd_conf end
supervisor_get_dump_config_handler()
click to toggle source
# File lib/fluent/supervisor.rb, line 323 def supervisor_get_dump_config_handler { conf: @fluentd_conf } end
supervisor_sighup_handler()
click to toggle source
# File lib/fluent/supervisor.rb, line 268 def supervisor_sighup_handler kill_worker end
supervisor_sigusr1_handler()
click to toggle source
# File lib/fluent/supervisor.rb, line 272 def supervisor_sigusr1_handler reopen_log send_signal_to_workers(:USR1) end
supervisor_sigusr2_handler()
click to toggle source
# File lib/fluent/supervisor.rb, line 277 def supervisor_sigusr2_handler conf = nil t = Thread.new do $log.info 'Reloading new config' # Validate that loading config is valid at first conf = Fluent::Config.build( config_path: config[:config_path], encoding: config[:conf_encoding], additional_config: config[:inline_config], use_v1_config: config[:use_v1_config], ) Fluent::VariableStore.try_to_reset do Fluent::Engine.reload_config(conf, supervisor: true) end end t.report_on_exception = false # Error is handled by myself t.join reopen_log send_signal_to_workers(:USR2) @fluentd_conf = conf.to_s rescue => e $log.error "Failed to reload config file: #{e}" end
Private Instance Methods
reopen_log()
click to toggle source
# File lib/fluent/supervisor.rb, line 329 def reopen_log if (log = config[:logger_initializer]) # Creating new thread due to mutex can't lock # in main thread during trap context Thread.new do log.reopen! end end end
send_command_to_workers(signal)
click to toggle source
# File lib/fluent/supervisor.rb, line 352 def send_command_to_workers(signal) # Use SeverEngine's CommandSender on Windows case signal when :HUP restart(false) when :USR1 restart(true) when :USR2 reload end end
send_signal_to_workers(signal)
click to toggle source
# File lib/fluent/supervisor.rb, line 339 def send_signal_to_workers(signal) return unless config[:worker_pid] if Fluent.windows? send_command_to_workers(signal) else config[:worker_pid].each_value do |pid| # don't rescue Errno::ESRCH here (invalid status) Process.kill(signal, pid) end end end