class Fluent::Supervisor

Public Class Methods

cleanup_resources() click to toggle source
# File lib/fluent/supervisor.rb, line 428
def self.cleanup_resources
  unless Fluent.windows?
    if ENV.has_key?('SERVERENGINE_SOCKETMANAGER_PATH')
      FileUtils.rm_f(ENV['SERVERENGINE_SOCKETMANAGER_PATH'])
    end
  end
end
default_options() click to toggle source
# File lib/fluent/supervisor.rb, line 406
def self.default_options
  {
    config_path: Fluent::DEFAULT_CONFIG_PATH,
    plugin_dirs: [Fluent::DEFAULT_PLUGIN_DIR],
    log_level: Fluent::Log::LEVEL_INFO,
    log_path: nil,
    daemonize: nil,
    libs: [],
    setup_path: nil,
    chuser: nil,
    chgroup: nil,
    root_dir: nil,
    suppress_interval: 0,
    suppress_repeated_stacktrace: true,
    without_source: nil,
    use_v1_config: true,
    supervise: true,
    standalone_worker: false,
    signame: nil,
  }
end
load_config(path, params = {}) click to toggle source
# File lib/fluent/supervisor.rb, line 236
def self.load_config(path, params = {})

  pre_loadtime = 0
  pre_loadtime = params['pre_loadtime'].to_i if params['pre_loadtime']
  pre_config_mtime = nil
  pre_config_mtime = params['pre_config_mtime'] if params['pre_config_mtime']
  config_mtime = File.mtime(path)

  # reuse previous config if last load time is within 5 seconds and mtime of the config file is not changed
  if Time.now - Time.at(pre_loadtime) < 5 and config_mtime == pre_config_mtime
    return params['pre_conf']
  end

  config_fname = File.basename(path)
  config_basedir = File.dirname(path)
  # Assume fluent.conf encoding is UTF-8
  config_data = File.open(path, "r:utf-8:utf-8") {|f| f.read }
  inline_config = params['inline_config']
  if inline_config == '-'
    config_data << "\n" << STDIN.read
  elsif inline_config
    config_data << "\n" << inline_config.gsub("\\n","\n")
  end
  fluentd_conf = Fluent::Config.parse(config_data, config_fname, config_basedir, params['use_v1_config'])
  system_config = SystemConfig.create(fluentd_conf)

  # these params must NOT be configured via system config here.
  # these may be overridden by command line params.
  workers = params['workers']
  root_dir = params['root_dir']
  log_level = params['log_level']
  suppress_repeated_stacktrace = params['suppress_repeated_stacktrace']

  log_path = params['log_path']
  chuser = params['chuser']
  chgroup = params['chgroup']
  log_rotate_age = params['log_rotate_age']
  log_rotate_size = params['log_rotate_size']
  rpc_endpoint = system_config.rpc_endpoint
  enable_get_dump = system_config.enable_get_dump
  counter_server = system_config.counter_server

  log_opts = {suppress_repeated_stacktrace: suppress_repeated_stacktrace}
  logger_initializer = Supervisor::LoggerInitializer.new(
    log_path, log_level, chuser, chgroup, log_opts,
    log_rotate_age: log_rotate_age,
    log_rotate_size: log_rotate_size
  )
  # this #init sets initialized logger to $log
  logger_initializer.init(:supervisor, 0)
  logger = $log

  command_sender = Fluent.windows? ? "pipe" : "signal"

  # ServerEngine's "daemonize" option is boolean, and path of pid file is brought by "pid_path"
  pid_path = params['daemonize']
  daemonize = !!params['daemonize']
  main_cmd = params['main_cmd']
  signame = params['signame']

  se_config = {
      worker_type: 'spawn',
      workers: workers,
      log_stdin: false,
      log_stdout: false,
      log_stderr: false,
      enable_heartbeat: true,
      auto_heartbeat: false,
      unrecoverable_exit_codes: [2],
      stop_immediately_at_unrecoverable_exit: true,
      root_dir: root_dir,
      logger: logger,
      log: logger.out,
      log_path: log_path,
      log_level: log_level,
      logger_initializer: logger_initializer,
      chuser: chuser,
      chgroup: chgroup,
      chumask: 0,
      suppress_repeated_stacktrace: suppress_repeated_stacktrace,
      daemonize: daemonize,
      rpc_endpoint: rpc_endpoint,
      counter_server: counter_server,
      enable_get_dump: enable_get_dump,
      windows_daemon_cmdline: [ServerEngine.ruby_bin_path,
                               File.join(File.dirname(__FILE__), 'daemon.rb'),
                               ServerModule.name,
                               WorkerModule.name,
                               path,
                               JSON.dump(params)],
      command_sender: command_sender,
      fluentd_conf: fluentd_conf,
      main_cmd: main_cmd,
      signame: signame,
  }
  if daemonize
    se_config[:pid_path] = pid_path
  end
  pre_params = params.dup
  params['pre_loadtime'] = Time.now.to_i
  params['pre_config_mtime'] = config_mtime
  params['pre_conf'] = se_config
  # prevent pre_conf from being too big by reloading many times.
  pre_params['pre_conf'] = nil
  params['pre_conf'][:windows_daemon_cmdline][5] = JSON.dump(pre_params)

  return se_config
end
new(opt) click to toggle source
# File lib/fluent/supervisor.rb, line 436
def initialize(opt)
  @daemonize = opt[:daemonize]
  @supervise = opt[:supervise]
  @standalone_worker= opt[:standalone_worker]
  @config_path = opt[:config_path]
  @inline_config = opt[:inline_config]
  @use_v1_config = opt[:use_v1_config]
  @log_path = opt[:log_path]
  @dry_run = opt[:dry_run]
  @show_plugin_config = opt[:show_plugin_config]
  @libs = opt[:libs]
  @plugin_dirs = opt[:plugin_dirs]
  @chgroup = opt[:chgroup]
  @chuser = opt[:chuser]
  @rpc_server = nil
  @process_name = nil

  @workers = opt[:workers]
  @root_dir = opt[:root_dir]
  @log_level = opt[:log_level]
  @log_rotate_age = opt[:log_rotate_age]
  @log_rotate_size = opt[:log_rotate_size]
  @suppress_interval = opt[:suppress_interval]
  @suppress_config_dump = opt[:suppress_config_dump]
  @log_event_verbose = opt[:log_event_verbose]
  @without_source = opt[:without_source]
  @signame = opt[:signame]

  @suppress_repeated_stacktrace = opt[:suppress_repeated_stacktrace]
  log_opts = {suppress_repeated_stacktrace: @suppress_repeated_stacktrace}
  @log = LoggerInitializer.new(
    @log_path, @log_level, @chuser, @chgroup, log_opts,
    log_rotate_age: @log_rotate_age,
    log_rotate_size: @log_rotate_size
  )
  @finished = false
end

Public Instance Methods

options() click to toggle source
# File lib/fluent/supervisor.rb, line 505
def options
  {
    'config_path' => @config_path,
    'pid_file' => @daemonize,
    'plugin_dirs' => @plugin_dirs,
    'log_path' => @log_path,
    'root_dir' => @root_dir,
  }
end
run_supervisor() click to toggle source
# File lib/fluent/supervisor.rb, line 474
def run_supervisor
  @log.init(:supervisor, 0)
  show_plugin_config if @show_plugin_config
  read_config
  set_system_config
  @log.apply_options(format: @system_config.log.format, time_format: @system_config.log.time_format)

  $log.info :supervisor, "parsing config file is succeeded", path: @config_path

  if @workers < 1
    raise Fluent::ConfigError, "invalid number of workers (must be > 0):#{@workers}"
  end

  if @root_dir
    if File.exist?(@root_dir)
      unless Dir.exist?(@root_dir)
        raise Fluent::InvalidRootDirectory, "non directory entry exists:#{@root_dir}"
      end
    else
      begin
        FileUtils.mkdir_p(@root_dir)
      rescue => e
        raise Fluent::InvalidRootDirectory, "failed to create root directory:#{@root_dir}, #{e.inspect}"
      end
    end
  end

  dry_run_cmd if @dry_run
  supervise
end
run_worker() click to toggle source
# File lib/fluent/supervisor.rb, line 515
def run_worker
  begin
    require 'sigdump/setup'
  rescue Exception
    # ignore LoadError and others (related with signals): it may raise these errors in Windows
  end
  worker_id = ENV['SERVERENGINE_WORKER_ID'].to_i
  process_type = case
                 when @standalone_worker then :standalone
                 when worker_id == 0 then :worker0
                 else :workers
                 end
  @log.init(process_type, worker_id)
  show_plugin_config if @show_plugin_config
  read_config
  set_system_config
  @log.apply_options(format: @system_config.log.format, time_format: @system_config.log.time_format)

  Process.setproctitle("worker:#{@process_name}") if @process_name

  if @standalone_worker && @workers != 1
    raise Fluent::ConfigError, "invalid number of workers (must be 1 or unspecified) with --no-supervisor: #{@workers}"
  end

  install_main_process_signal_handlers

  # This is the only log messsage for @standalone_worker
  $log.info "starting fluentd-#{Fluent::VERSION} without supervision", pid: Process.pid, ruby: RUBY_VERSION if @standalone_worker

  main_process do
    create_socket_manager if @standalone_worker
    change_privilege if @standalone_worker
    init_engine
    run_configure
    run_engine
    self.class.cleanup_resources if @standalone_worker
    exit 0
  end
end

Private Instance Methods

change_privilege() click to toggle source
# File lib/fluent/supervisor.rb, line 775
def change_privilege
  ServerEngine::Privilege.change(@chuser, @chgroup)
end
create_socket_manager() click to toggle source
# File lib/fluent/supervisor.rb, line 557
def create_socket_manager
  socket_manager_path = ServerEngine::SocketManager::Server.generate_path
  ServerEngine::SocketManager::Server.open(socket_manager_path)
  ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = socket_manager_path.to_s
end
dry_run() click to toggle source

Set Engine's dry_run_mode true to override all target_id of worker sections

# File lib/fluent/supervisor.rb, line 574
def dry_run
  begin
    Fluent::Engine.dry_run_mode = true
    change_privilege
    init_engine
    run_configure
  rescue Fluent::ConfigError => e
    $log.error "config error", file: @config_path, error: e
    $log.debug_backtrace
    exit!(1)
  ensure
    Fluent::Engine.dry_run_mode = false
  end
end
dry_run_cmd() click to toggle source
# File lib/fluent/supervisor.rb, line 563
def dry_run_cmd
  $log.info "starting fluentd-#{Fluent::VERSION} as dry run mode", ruby: RUBY_VERSION
  @system_config.suppress_config_dump = true
  dry_run
  exit 0
rescue => e
  $log.error "dry run failed: #{e}"
  exit 1
end
flush_buffer() click to toggle source
# File lib/fluent/supervisor.rb, line 691
def flush_buffer
  # Creating new thread due to mutex can't lock
  # in main thread during trap context
  Thread.new {
    begin
      $log.debug "fluentd main process get SIGUSR1"
      $log.info "force flushing buffered events"
      @log.reopen!
      Fluent::Engine.flush!
      $log.debug "flushing thread: flushed"
    rescue Exception => e
      $log.warn "flushing thread error: #{e}"
    end
  }.run
end
init_engine() click to toggle source
# File lib/fluent/supervisor.rb, line 779
def init_engine
  Fluent::Engine.init(@system_config)

  @libs.each {|lib|
    require lib
  }

  @plugin_dirs.each {|dir|
    if Dir.exist?(dir)
      dir = File.expand_path(dir)
      Fluent::Engine.add_plugin_dir(dir)
    end
  }
end
install_main_process_signal_handlers() click to toggle source
# File lib/fluent/supervisor.rb, line 635
def install_main_process_signal_handlers
  # Fluentd worker process (worker of ServerEngine) don't use code in serverengine to set signal handlers,
  # because it does almost nothing.
  # This method is the only method to set signal handlers in Fluentd worker process.

  # When user use Ctrl + C not SIGINT, SIGINT is sent to all process in same process group.
  # ServerEngine server process will send SIGTERM to child(spawned) processes by that SIGINT, so
  # worker process SHOULD NOT do anything with SIGINT, SHOULD just ignore.
  trap :INT do
    $log.debug "fluentd main process get SIGINT"

    # When Fluentd is launched without supervisor, worker should handle ctrl-c by itself
    if @standalone_worker
      @finished = true
      $log.debug "getting start to shutdown main process"
      Fluent::Engine.stop
    end
  end

  trap :TERM do
    $log.debug "fluentd main process get SIGTERM"
    unless @finished
      @finished = true
      $log.debug "getting start to shutdown main process"
      Fluent::Engine.stop
    end
  end

  trap :USR1 do
    flush_buffer
  end unless Fluent.windows?

  if Fluent.windows?
    command_pipe = STDIN.dup
    STDIN.reopen(File::NULL, "rb")
    command_pipe.binmode
    command_pipe.sync = true

    Thread.new do
      loop do
        cmd = command_pipe.gets.chomp
        case cmd
        when "GRACEFUL_STOP", "IMMEDIATE_STOP"
          $log.debug "fluentd main process get #{cmd} command"
          @finished = true
          $log.debug "getting start to shutdown main process"
          Fluent::Engine.stop
          break
        else
          $log.warn "fluentd main process get unknown command [#{cmd}]"
        end
      end
    end
  end
end
logging_with_console_output() { |$log| ... } click to toggle source
# File lib/fluent/supervisor.rb, line 707
def logging_with_console_output
  yield $log
  unless @log.stdout?
    logger = ServerEngine::DaemonLogger.new(STDOUT)
    log = Fluent::Log.new(logger)
    log.level = @log_level
    console = log.enable_debug
    yield console
  end
end
main_process(&block) click to toggle source
# File lib/fluent/supervisor.rb, line 718
def main_process(&block)
  Process.setproctitle("worker:#{@process_name}") if @process_name

  unrecoverable_error = false

  begin
    block.call
  rescue Fluent::ConfigError => e
    logging_with_console_output do |log|
      log.error "config error", file: @config_path, error: e
      log.debug_backtrace
    end
    unrecoverable_error = true
  rescue Fluent::UnrecoverableError => e
    logging_with_console_output do |log|
      log.error e.message, error: e
      log.error_backtrace
    end
    unrecoverable_error = true
  rescue ScriptError => e # LoadError, NotImplementedError, SyntaxError
    logging_with_console_output do |log|
      if e.respond_to?(:path)
        log.error e.message, path: e.path, error: e
      else
        log.error e.message, error: e
      end
      log.error_backtrace
    end
    unrecoverable_error = true
  rescue => e
    logging_with_console_output do |log|
      log.error "unexpected error", error: e
      log.error_backtrace
    end
  end

  exit!(unrecoverable_error ? 2 : 1)
end
read_config() click to toggle source
# File lib/fluent/supervisor.rb, line 757
def read_config
  @config_fname = File.basename(@config_path)
  @config_basedir = File.dirname(@config_path)
  @config_data = File.open(@config_path, "r:utf-8:utf-8") {|f| f.read }
  if @inline_config == '-'
    @config_data << "\n" << STDIN.read
  elsif @inline_config
    @config_data << "\n" << @inline_config.gsub("\\n","\n")
  end
  @conf = Fluent::Config.parse(@config_data, @config_fname, @config_basedir, @use_v1_config)
end
run_configure() click to toggle source
# File lib/fluent/supervisor.rb, line 794
def run_configure
  Fluent::Engine.run_configure(@conf)
end
run_engine() click to toggle source
# File lib/fluent/supervisor.rb, line 798
def run_engine
  Fluent::Engine.run
end
set_system_config() click to toggle source
# File lib/fluent/supervisor.rb, line 769
def set_system_config
  @system_config = SystemConfig.create(@conf) # @conf is set in read_config
  @system_config.attach(self)
  @system_config.apply(self)
end
show_plugin_config() click to toggle source
# File lib/fluent/supervisor.rb, line 589
def show_plugin_config
  name, type = @show_plugin_config.split(":") # input:tail
  $log.info "Use fluent-plugin-config-format --format=txt #{name} #{type}"
  exit 0
end
supervise() click to toggle source
# File lib/fluent/supervisor.rb, line 595
def supervise
  # Make dumpable conf, which is set corresponding_proxies for all elements in all worker sections
  dry_run

  Process.setproctitle("supervisor:#{@process_name}") if @process_name
  $log.info "starting fluentd-#{Fluent::VERSION}", pid: Process.pid, ruby: RUBY_VERSION

  rubyopt = ENV["RUBYOPT"]
  fluentd_spawn_cmd = [ServerEngine.ruby_bin_path, "-Eascii-8bit:ascii-8bit"]
  fluentd_spawn_cmd << rubyopt if rubyopt
  fluentd_spawn_cmd << $0
  fluentd_spawn_cmd += $fluentdargv
  fluentd_spawn_cmd << "--under-supervisor"

  $log.info "spawn command to main: ", cmdline: fluentd_spawn_cmd

  params = {}
  params['main_cmd'] = fluentd_spawn_cmd
  params['daemonize'] = @daemonize
  params['inline_config'] = @inline_config
  params['log_path'] = @log_path
  params['log_rotate_age'] = @log_rotate_age
  params['log_rotate_size'] = @log_rotate_size
  params['chuser'] = @chuser
  params['chgroup'] = @chgroup
  params['use_v1_config'] = @use_v1_config

  # system config parameters
  params['workers'] = @workers
  params['root_dir'] = @root_dir
  params['log_level'] = @log_level
  params['suppress_repeated_stacktrace'] = @suppress_repeated_stacktrace
  params['signame'] = @signame

  se = ServerEngine.create(ServerModule, WorkerModule){
    Fluent::Supervisor.load_config(@config_path, params)
  }
  se.run
end