class ServerEngine::ProcessManager
Constants
- CONFIG_PARAMS
- HEARTBEAT_MESSAGE
Attributes
auto_heartbeat[R]
auto_tick[R]
auto_tick_interval[R]
cloexec_mode[RW]
command_sender[RW]
enable_heartbeat[R]
graceful_kill_signal[R]
immediate_kill_signal[R]
logger[RW]
Public Class Methods
new(config={})
click to toggle source
# File lib/serverengine/process_manager.rb, line 24 def initialize(config={}) @monitors = [] @rpipes = {} @heartbeat_time = {} @cloexec_mode = config[:cloexec_mode] @graceful_kill_signal = config[:graceful_kill_signal] || :TERM @immediate_kill_signal = config[:immediate_kill_signal] || :QUIT @auto_tick = !!config.fetch(:auto_tick, true) @auto_tick_interval = config[:auto_tick_interval] || 1 @enable_heartbeat = !!config[:enable_heartbeat] if ServerEngine.windows? # heartbeat is not supported on Windows platform. See also spawn method. @enable_heartbeat = false end @auto_heartbeat = !!config.fetch(:auto_heartbeat, true) case op = config[:on_heartbeat_error] when nil @heartbeat_error_proc = lambda {|t| } when Proc @heartbeat_error_proc = op when :abort @heartbeat_error_proc = lambda {|t| exit 1 } else raise ArgumentError, "unexpected :on_heartbeat_error option (expected Proc, true or false but got #{op.class})" end configure(config) @closed = false @read_buffer = '' if @auto_tick TickThread.new(@auto_tick_interval, &method(:tick)) end end
Public Instance Methods
close()
click to toggle source
# File lib/serverengine/process_manager.rb, line 234 def close @closed = true @rpipes.keys.each {|m| m.close } nil end
configure(config, opts={})
click to toggle source
# File lib/serverengine/process_manager.rb, line 95 def configure(config, opts={}) prefix = opts[:prefix] || "" CONFIG_PARAMS.keys.each {|key| send("#{key}=", config[:"#{prefix}#{key}"]) } end
fork(&block)
click to toggle source
# File lib/serverengine/process_manager.rb, line 117 def fork(&block) if ServerEngine.windows? raise NotImplementedError, "fork is not available on this platform. Please use spawn (worker_type: 'spawn')." end rpipe, wpipe = new_pipe_pair begin pid = Process.fork do self.close begin t = Target.new(wpipe) if @enable_heartbeat && @auto_heartbeat HeartbeatThread.new(@heartbeat_interval, t, @heartbeat_error_proc) end block.call(t) exit! 0 rescue ServerEngine.dump_uncaught_error($!) ensure exit! 1 end end m = Monitor.new(pid, monitor_options) @monitors << m @rpipes[rpipe] = m rpipe = nil return m ensure wpipe.close rpipe.close if rpipe end end
monitor_options()
click to toggle source
# File lib/serverengine/process_manager.rb, line 102 def monitor_options { enable_heartbeat: @enable_heartbeat, heartbeat_timeout: @heartbeat_timeout, graceful_kill_signal: @graceful_kill_signal, graceful_kill_timeout: @graceful_kill_timeout, graceful_kill_interval: @graceful_kill_interval, graceful_kill_interval_increment: @graceful_kill_interval_increment, immediate_kill_signal: @immediate_kill_signal, immediate_kill_timeout: @immediate_kill_timeout, immediate_kill_interval: @immediate_kill_interval, immediate_kill_interval_increment: @immediate_kill_interval_increment, } end
new_pipe_pair()
click to toggle source
# File lib/serverengine/process_manager.rb, line 213 def new_pipe_pair rpipe, wpipe = IO.pipe if Fcntl.const_defined?(:F_SETFD) && Fcntl.const_defined?(:FD_CLOEXEC) case @cloexec_mode when :target_only wpipe.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) when :monitor_only rpipe.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) else rpipe.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) wpipe.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) end end rpipe.sync = true wpipe.sync = true return rpipe, wpipe end
spawn(*args)
click to toggle source
# File lib/serverengine/process_manager.rb, line 157 def spawn(*args) if args.first.is_a?(Hash) env = args.shift.dup else env = {} end if args.last.is_a?(Hash) options = args.pop.dup else options = {} end # pipe is necessary even if @enable_heartbeat == false because # parent process detects shutdown of a child process using it begin unless ServerEngine.windows? # heartbeat is not supported on Windows platform rpipe, wpipe = new_pipe_pair options[[wpipe.fileno]] = wpipe if @enable_heartbeat env['SERVERENGINE_HEARTBEAT_PIPE'] = wpipe.fileno.to_s end end command_sender_pipe = nil if @command_sender == "pipe" inpipe, command_sender_pipe = IO.pipe command_sender_pipe.sync = true command_sender_pipe.binmode options[:in] = inpipe end env['SERVERENGINE_SOCKETMANAGER_INTERNAL_TOKEN'] = SocketManager::INTERNAL_TOKEN pid = Process.spawn(env, *args, options) if @command_sender == "pipe" inpipe.close end m = Monitor.new(pid, monitor_options) m.command_sender_pipe = command_sender_pipe @monitors << m unless ServerEngine.windows? @rpipes[rpipe] = m rpipe = nil end return m ensure wpipe.close if wpipe rpipe.close if rpipe end end
tick(blocking_timeout=0)
click to toggle source
# File lib/serverengine/process_manager.rb, line 240 def tick(blocking_timeout=0) if @closed raise AlreadyClosedError.new end if @rpipes.empty? sleep blocking_timeout if blocking_timeout > 0 return nil end if ServerEngine.windows? raise "heartbeat is not supported on Windows platform. @rpipes must be empty." end time = Time.now ready_pipes, _, _ = IO.select(@rpipes.keys, nil, nil, blocking_timeout) if ready_pipes ready_pipes.each do |r| begin r.read_nonblock(1024, @read_buffer) rescue Errno::EAGAIN, Errno::EINTR next rescue #EOFError m = @rpipes.delete(r) m.start_immediate_stop! r.close rescue nil next end if m = @rpipes[r] m.last_heartbeat_time = time end end end @monitors.delete_if {|m| !m.tick(time) } nil end