class ServerEngine::ProcessManager

Constants

CONFIG_PARAMS
HEARTBEAT_MESSAGE

Attributes

auto_heartbeat[R]
auto_tick[R]
auto_tick_interval[R]
cloexec_mode[RW]
command_sender[RW]
enable_heartbeat[R]
graceful_kill_signal[R]
immediate_kill_signal[R]
logger[RW]

Public Class Methods

new(config={}) click to toggle source
# File lib/serverengine/process_manager.rb, line 24
def initialize(config={})
  @monitors = []
  @rpipes = {}
  @heartbeat_time = {}

  @cloexec_mode = config[:cloexec_mode]

  @graceful_kill_signal = config[:graceful_kill_signal] || :TERM
  @immediate_kill_signal = config[:immediate_kill_signal] || :QUIT

  @auto_tick = !!config.fetch(:auto_tick, true)
  @auto_tick_interval = config[:auto_tick_interval] || 1

  @enable_heartbeat = !!config[:enable_heartbeat]
  if ServerEngine.windows?
    # heartbeat is not supported on Windows platform. See also spawn method.
    @enable_heartbeat = false
  end
  @auto_heartbeat = !!config.fetch(:auto_heartbeat, true)

  case op = config[:on_heartbeat_error]
  when nil
    @heartbeat_error_proc = lambda {|t| }
  when Proc
    @heartbeat_error_proc = op
  when :abort
    @heartbeat_error_proc = lambda {|t| exit 1 }
  else
    raise ArgumentError, "unexpected :on_heartbeat_error option (expected Proc, true or false but got #{op.class})"
  end

  configure(config)

  @closed = false
  @read_buffer = ''

  if @auto_tick
    TickThread.new(@auto_tick_interval, &method(:tick))
  end
end

Public Instance Methods

close() click to toggle source
# File lib/serverengine/process_manager.rb, line 234
def close
  @closed = true
  @rpipes.keys.each {|m| m.close }
  nil
end
configure(config, opts={}) click to toggle source
# File lib/serverengine/process_manager.rb, line 95
def configure(config, opts={})
  prefix = opts[:prefix] || ""
  CONFIG_PARAMS.keys.each {|key|
    send("#{key}=", config[:"#{prefix}#{key}"])
  }
end
fork(&block) click to toggle source
# File lib/serverengine/process_manager.rb, line 117
def fork(&block)
  if ServerEngine.windows?
    raise NotImplementedError, "fork is not available on this platform. Please use spawn (worker_type: 'spawn')."
  end

  rpipe, wpipe = new_pipe_pair

  begin
    pid = Process.fork do
      self.close
      begin
        t = Target.new(wpipe)
        if @enable_heartbeat && @auto_heartbeat
          HeartbeatThread.new(@heartbeat_interval, t, @heartbeat_error_proc)
        end

        block.call(t)
        exit! 0

      rescue
        ServerEngine.dump_uncaught_error($!)
      ensure
        exit! 1
      end
    end

    m = Monitor.new(pid, monitor_options)

    @monitors << m
    @rpipes[rpipe] = m
    rpipe = nil

    return m

  ensure
    wpipe.close
    rpipe.close if rpipe
  end
end
monitor_options() click to toggle source
# File lib/serverengine/process_manager.rb, line 102
def monitor_options
  {
    enable_heartbeat: @enable_heartbeat,
    heartbeat_timeout: @heartbeat_timeout,
    graceful_kill_signal: @graceful_kill_signal,
    graceful_kill_timeout: @graceful_kill_timeout,
    graceful_kill_interval: @graceful_kill_interval,
    graceful_kill_interval_increment: @graceful_kill_interval_increment,
    immediate_kill_signal: @immediate_kill_signal,
    immediate_kill_timeout: @immediate_kill_timeout,
    immediate_kill_interval: @immediate_kill_interval,
    immediate_kill_interval_increment: @immediate_kill_interval_increment,
  }
end
new_pipe_pair() click to toggle source
# File lib/serverengine/process_manager.rb, line 213
def new_pipe_pair
  rpipe, wpipe = IO.pipe

  if Fcntl.const_defined?(:F_SETFD) && Fcntl.const_defined?(:FD_CLOEXEC)
    case @cloexec_mode
    when :target_only
      wpipe.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC)
    when :monitor_only
      rpipe.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC)
    else
      rpipe.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC)
      wpipe.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC)
    end
  end

  rpipe.sync = true
  wpipe.sync = true

  return rpipe, wpipe
end
spawn(*args) click to toggle source
# File lib/serverengine/process_manager.rb, line 157
def spawn(*args)
  if args.first.is_a?(Hash)
    env = args.shift.dup
  else
    env = {}
  end

  if args.last.is_a?(Hash)
    options = args.pop.dup
  else
    options = {}
  end

  # pipe is necessary even if @enable_heartbeat == false because
  # parent process detects shutdown of a child process using it
  begin
    unless ServerEngine.windows?
      # heartbeat is not supported on Windows platform
      rpipe, wpipe = new_pipe_pair
      options[[wpipe.fileno]] = wpipe
      if @enable_heartbeat
        env['SERVERENGINE_HEARTBEAT_PIPE'] = wpipe.fileno.to_s
      end
    end

    command_sender_pipe = nil
    if @command_sender == "pipe"
      inpipe, command_sender_pipe = IO.pipe
      command_sender_pipe.sync = true
      command_sender_pipe.binmode
      options[:in] = inpipe
    end
    env['SERVERENGINE_SOCKETMANAGER_INTERNAL_TOKEN'] = SocketManager::INTERNAL_TOKEN
    pid = Process.spawn(env, *args, options)
    if @command_sender == "pipe"
      inpipe.close
    end

    m = Monitor.new(pid, monitor_options)
    m.command_sender_pipe = command_sender_pipe

    @monitors << m

    unless ServerEngine.windows?
      @rpipes[rpipe] = m
      rpipe = nil
    end

    return m

  ensure
    wpipe.close if wpipe
    rpipe.close if rpipe
  end
end
tick(blocking_timeout=0) click to toggle source
# File lib/serverengine/process_manager.rb, line 240
def tick(blocking_timeout=0)
  if @closed
    raise AlreadyClosedError.new
  end

  if @rpipes.empty?
    sleep blocking_timeout if blocking_timeout > 0
    return nil
  end

  if ServerEngine.windows?
    raise "heartbeat is not supported on Windows platform. @rpipes must be empty."
  end

  time = Time.now
  ready_pipes, _, _ = IO.select(@rpipes.keys, nil, nil, blocking_timeout)

  if ready_pipes
    ready_pipes.each do |r|
      begin
        r.read_nonblock(1024, @read_buffer)
      rescue Errno::EAGAIN, Errno::EINTR
        next
      rescue #EOFError
        m = @rpipes.delete(r)
        m.start_immediate_stop!
        r.close rescue nil
        next
      end

      if m = @rpipes[r]
        m.last_heartbeat_time = time
      end
    end
  end

  @monitors.delete_if {|m|
    !m.tick(time)
  }

  nil
end