class Fluent::Supervisor

Constants

RUBY_ENCODING_OPTIONS_REGEX

Public Class Methods

cleanup_resources() click to toggle source
# File lib/fluent/supervisor.rb, line 585
def self.cleanup_resources
  unless Fluent.windows?
    if ENV.has_key?('SERVERENGINE_SOCKETMANAGER_PATH')
      FileUtils.rm_f(ENV['SERVERENGINE_SOCKETMANAGER_PATH'])
    end
  end
end
default_options() click to toggle source
# File lib/fluent/supervisor.rb, line 557
def self.default_options
  {
    config_path: Fluent::DEFAULT_CONFIG_PATH,
    plugin_dirs: [Fluent::DEFAULT_PLUGIN_DIR],
    log_level: Fluent::Log::LEVEL_INFO,
    log_path: nil,
    daemonize: nil,
    libs: [],
    setup_path: nil,
    chuser: nil,
    chgroup: nil,
    root_dir: nil,
    suppress_interval: 0,
    suppress_repeated_stacktrace: true,
    ignore_repeated_log_interval: nil,
    without_source: nil,
    enable_input_metrics: nil,
    enable_size_metrics: nil,
    use_v1_config: true,
    strict_config_value: nil,
    supervise: true,
    standalone_worker: false,
    signame: nil,
    conf_encoding: 'utf-8',
    disable_shared_socket: nil
  }
end
load_config(path, params = {}) click to toggle source
# File lib/fluent/supervisor.rb, line 380
def self.load_config(path, params = {})
  pre_loadtime = 0
  pre_loadtime = params['pre_loadtime'].to_i if params['pre_loadtime']
  pre_config_mtime = nil
  pre_config_mtime = params['pre_config_mtime'] if params['pre_config_mtime']
  config_mtime = File.mtime(path)

  # reuse previous config if last load time is within 5 seconds and mtime of the config file is not changed
  if (Time.now - Time.at(pre_loadtime) < 5) && (config_mtime == pre_config_mtime)
    return params['pre_conf']
  end

  log_level = params['log_level']
  suppress_repeated_stacktrace = params['suppress_repeated_stacktrace']
  ignore_repeated_log_interval = params['ignore_repeated_log_interval']
  ignore_same_log_interval = params['ignore_same_log_interval']

  log_path = params['log_path']
  chuser = params['chuser']
  chgroup = params['chgroup']
  log_rotate_age = params['log_rotate_age']
  log_rotate_size = params['log_rotate_size']

  log_opts = {suppress_repeated_stacktrace: suppress_repeated_stacktrace, ignore_repeated_log_interval: ignore_repeated_log_interval,
              ignore_same_log_interval: ignore_same_log_interval}
  logger_initializer = Supervisor::LoggerInitializer.new(
    log_path, log_level, chuser, chgroup, log_opts,
    log_rotate_age: log_rotate_age,
    log_rotate_size: log_rotate_size
  )
  # this #init sets initialized logger to $log
  logger_initializer.init(:supervisor, 0)
  logger_initializer.apply_options(format: params['log_format'], time_format: params['log_time_format'])
  logger = $log

  command_sender = Fluent.windows? ? "pipe" : "signal"

  # ServerEngine's "daemonize" option is boolean, and path of pid file is brought by "pid_path"
  pid_path = params['daemonize']
  daemonize = !!params['daemonize']

  se_config = {
    worker_type: 'spawn',
    workers: params['workers'],
    log_stdin: false,
    log_stdout: false,
    log_stderr: false,
    enable_heartbeat: true,
    auto_heartbeat: false,
    unrecoverable_exit_codes: [2],
    stop_immediately_at_unrecoverable_exit: true,
    root_dir: params['root_dir'],
    logger: logger,
    log: logger.out,
    log_path: log_path,
    log_level: log_level,
    logger_initializer: logger_initializer,
    chuser: chuser,
    chgroup: chgroup,
    chumask: 0,
    suppress_repeated_stacktrace: suppress_repeated_stacktrace,
    ignore_repeated_log_interval: ignore_repeated_log_interval,
    ignore_same_log_interval: ignore_same_log_interval,
    daemonize: daemonize,
    rpc_endpoint: params['rpc_endpoint'],
    counter_server: params['counter_server'],
    enable_get_dump: params['enable_get_dump'],
    windows_daemon_cmdline: [ServerEngine.ruby_bin_path,
                             File.join(File.dirname(__FILE__), 'daemon.rb'),
                             ServerModule.name,
                             WorkerModule.name,
                             path,
                             JSON.dump(params)],
    command_sender: command_sender,
    fluentd_conf: params['fluentd_conf'],
    conf_encoding: params['conf_encoding'],
    inline_config: params['inline_config'],
    config_path: path,
    main_cmd: params['main_cmd'],
    signame: params['signame'],
    disable_shared_socket: params['disable_shared_socket']
  }
  if daemonize
    se_config[:pid_path] = pid_path
  end
  pre_params = params.dup
  params['pre_loadtime'] = Time.now.to_i
  params['pre_config_mtime'] = config_mtime
  params['pre_conf'] = se_config
  # prevent pre_conf from being too big by reloading many times.
  pre_params['pre_conf'] = nil
  params['pre_conf'][:windows_daemon_cmdline][5] = JSON.dump(pre_params)

  se_config
end
new(opt) click to toggle source
# File lib/fluent/supervisor.rb, line 593
def initialize(opt)
  @daemonize = opt[:daemonize]
  @standalone_worker= opt[:standalone_worker]
  @config_path = opt[:config_path]
  @inline_config = opt[:inline_config]
  @use_v1_config = opt[:use_v1_config]
  @conf_encoding = opt[:conf_encoding]
  @log_path = opt[:log_path]
  @show_plugin_config = opt[:show_plugin_config]
  @libs = opt[:libs]
  @plugin_dirs = opt[:plugin_dirs]
  @chgroup = opt[:chgroup]
  @chuser = opt[:chuser]

  @log_rotate_age = opt[:log_rotate_age]
  @log_rotate_size = opt[:log_rotate_size]
  @signame = opt[:signame]

  @cl_opt = opt
  @conf = nil
  # parse configuration immediately to initialize logger in early stage
  if @config_path and File.exist?(@config_path)
    @conf = Fluent::Config.build(config_path: @config_path,
                                 encoding: @conf_encoding ? @conf_encoding : 'utf-8',
                                 additional_config: @inline_config ? @inline_config : nil,
                                 use_v1_config: !!@use_v1_config)
    @system_config = build_system_config(@conf)
    if @system_config.log
      @log_rotate_age ||= @system_config.log.rotate_age
      @log_rotate_size ||= @system_config.log.rotate_size
    end
    @conf = nil
  end

  log_opts = {suppress_repeated_stacktrace: opt[:suppress_repeated_stacktrace], ignore_repeated_log_interval: opt[:ignore_repeated_log_interval],
              ignore_same_log_interval: opt[:ignore_same_log_interval]}
  @log = LoggerInitializer.new(
    @log_path, opt[:log_level], @chuser, @chgroup, log_opts,
    log_rotate_age: @log_rotate_age,
    log_rotate_size: @log_rotate_size
  )
  @finished = false
end

Public Instance Methods

configure(supervisor: false) click to toggle source
# File lib/fluent/supervisor.rb, line 723
def configure(supervisor: false)
  if supervisor
    @log.init(:supervisor, 0)
  else
    worker_id = ENV['SERVERENGINE_WORKER_ID'].to_i
    process_type = case
                   when @standalone_worker then :standalone
                   when worker_id == 0 then :worker0
                   else :workers
                   end
    @log.init(process_type, worker_id)
  end

  if @show_plugin_config
    show_plugin_config
  end

  if @inline_config == '-'
    $log.warn('the value "-" for `inline_config` is deprecated. See https://github.com/fluent/fluentd/issues/2711')
    @inline_config = STDIN.read
  end
  @conf = Fluent::Config.build(config_path: @config_path, encoding: @conf_encoding, additional_config: @inline_config, use_v1_config: @use_v1_config)
  @system_config = build_system_config(@conf)

  @log.level = @system_config.log_level
  @log.apply_options(
    format: @system_config.log.format,
    time_format: @system_config.log.time_format,
    log_dir_perm: @system_config.dir_permission,
    ignore_repeated_log_interval: @system_config.ignore_repeated_log_interval,
    ignore_same_log_interval: @system_config.ignore_same_log_interval
  )

  $log.info :supervisor, 'parsing config file is succeeded', path: @config_path

  @libs.each do |lib|
    require lib
  end

  @plugin_dirs.each do |dir|
    if Dir.exist?(dir)
      dir = File.expand_path(dir)
      Fluent::Plugin.add_plugin_dir(dir)
    end
  end

  if supervisor
    # plugins / configuration dumps
    Gem::Specification.find_all.select { |x| x.name =~ /^fluent(d|-(plugin|mixin)-.*)$/ }.each do |spec|
      $log.info("gem '#{spec.name}' version '#{spec.version}'")
    end
  end
end
options() click to toggle source
# File lib/fluent/supervisor.rb, line 680
def options
  {
    'config_path' => @config_path,
    'pid_file' => @daemonize,
    'plugin_dirs' => @plugin_dirs,
    'log_path' => @log_path,
    'root_dir' => @system_config.root_dir,
  }
end
run_supervisor(dry_run: false) click to toggle source
# File lib/fluent/supervisor.rb, line 637
def run_supervisor(dry_run: false)
  if dry_run
    $log.info "starting fluentd-#{Fluent::VERSION} as dry run mode", ruby: RUBY_VERSION
  end

  if @system_config.workers < 1
    raise Fluent::ConfigError, "invalid number of workers (must be > 0):#{@system_config.workers}"
  end

  root_dir = @system_config.root_dir
  if root_dir
    if File.exist?(root_dir)
      unless Dir.exist?(root_dir)
        raise Fluent::InvalidRootDirectory, "non directory entry exists:#{root_dir}"
      end
    else
      begin
        FileUtils.mkdir_p(root_dir, mode: @system_config.dir_permission || Fluent::DEFAULT_DIR_PERMISSION)
      rescue => e
        raise Fluent::InvalidRootDirectory, "failed to create root directory:#{root_dir}, #{e.inspect}"
      end
    end
  end

  begin
    ServerEngine::Privilege.change(@chuser, @chgroup)
    MessagePackFactory.init(enable_time_support: @system_config.enable_msgpack_time_support)
    Fluent::Engine.init(@system_config, supervisor_mode: true)
    Fluent::Engine.run_configure(@conf, dry_run: dry_run)
  rescue Fluent::ConfigError => e
    $log.error 'config error', file: @config_path, error: e
    $log.debug_backtrace
    exit!(1)
  end

  if dry_run
    $log.info 'finished dry run mode'
    exit 0
  else
    supervise
  end
end
run_worker() click to toggle source
# File lib/fluent/supervisor.rb, line 690
def run_worker
  begin
    require 'sigdump/setup'
  rescue Exception
    # ignore LoadError and others (related with signals): it may raise these errors in Windows
  end

  Process.setproctitle("worker:#{@system_config.process_name}") if @process_name

  if @standalone_worker && @system_config.workers != 1
    raise Fluent::ConfigError, "invalid number of workers (must be 1 or unspecified) with --no-supervisor: #{@system_config.workers}"
  end

  install_main_process_signal_handlers

  # This is the only log messsage for @standalone_worker
  $log.info "starting fluentd-#{Fluent::VERSION} without supervision", pid: Process.pid, ruby: RUBY_VERSION if @standalone_worker

  main_process do
    create_socket_manager if @standalone_worker
    if @standalone_worker
      ServerEngine::Privilege.change(@chuser, @chgroup)
      File.umask(0)
    end
    MessagePackFactory.init(enable_time_support: @system_config.enable_msgpack_time_support)
    Fluent::Engine.init(@system_config)
    Fluent::Engine.run_configure(@conf)
    Fluent::Engine.run
    self.class.cleanup_resources if @standalone_worker
    exit 0
  end
end

Private Instance Methods

build_spawn_command() click to toggle source
# File lib/fluent/supervisor.rb, line 1021
def build_spawn_command
  if ENV['TEST_RUBY_PATH']
    fluentd_spawn_cmd = [ENV['TEST_RUBY_PATH']]
  else
    fluentd_spawn_cmd = [ServerEngine.ruby_bin_path]
  end

  rubyopt = ENV['RUBYOPT']
  if rubyopt
    encodes, others = rubyopt.split(' ').partition { |e| e.match?(RUBY_ENCODING_OPTIONS_REGEX) }
    fluentd_spawn_cmd.concat(others)

    adopted_encodes = encodes.empty? ? ['-Eascii-8bit:ascii-8bit'] : encodes
    fluentd_spawn_cmd.concat(adopted_encodes)
  else
    fluentd_spawn_cmd << '-Eascii-8bit:ascii-8bit'
  end

  # Adding `-h` so that it can avoid ruby's command blocking
  # e.g. `ruby -Eascii-8bit:ascii-8bit` will block. but `ruby -Eascii-8bit:ascii-8bit -h` won't.
  _, e, s = Open3.capture3(*fluentd_spawn_cmd, "-h")
  if s.exitstatus != 0
    $log.error('Invalid option is passed to RUBYOPT', command: fluentd_spawn_cmd, error: e)
    exit s.exitstatus
  end

  fluentd_spawn_cmd << $0
  fluentd_spawn_cmd += $fluentdargv
  fluentd_spawn_cmd << '--under-supervisor'

  fluentd_spawn_cmd
end
build_system_config(conf) click to toggle source
# File lib/fluent/supervisor.rb, line 1001
def build_system_config(conf)
  system_config = SystemConfig.create(conf, @cl_opt[:strict_config_value])
  opt = {}
  Fluent::SystemConfig::SYSTEM_CONFIG_PARAMETERS.each do |param|
    if @cl_opt.key?(param) && !@cl_opt[param].nil?
      if param == :log_level && @cl_opt[:log_level] == Fluent::Log::LEVEL_INFO
        # info level can't be specified via command line option.
        # log_level is info here, it is default value and <system>'s log_level should be applied if exists.
        next
      end

      opt[param] = @cl_opt[param]
    end
  end
  system_config.overwrite_variables(**opt)
  system_config
end
create_socket_manager() click to toggle source
# File lib/fluent/supervisor.rb, line 779
def create_socket_manager
  socket_manager_path = ServerEngine::SocketManager::Server.generate_path
  ServerEngine::SocketManager::Server.open(socket_manager_path)
  ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = socket_manager_path.to_s
end
flush_buffer() click to toggle source
# File lib/fluent/supervisor.rb, line 903
def flush_buffer
  # Creating new thread due to mutex can't lock
  # in main thread during trap context
  Thread.new do
    begin
      $log.debug "fluentd main process get SIGUSR1"
      $log.info "force flushing buffered events"
      @log.reopen!
      Fluent::Engine.flush!
      $log.debug "flushing thread: flushed"
    rescue Exception => e
      $log.warn "flushing thread error: #{e}"
    end
  end
end
install_main_process_command_handlers() click to toggle source
# File lib/fluent/supervisor.rb, line 872
def install_main_process_command_handlers
  command_pipe = $stdin.dup
  $stdin.reopen(File::NULL, "rb")
  command_pipe.binmode
  command_pipe.sync = true

  Thread.new do
    loop do
      cmd = command_pipe.gets
      break unless cmd

      case cmd.chomp!
      when "GRACEFUL_STOP", "IMMEDIATE_STOP"
        $log.debug "fluentd main process get #{cmd} command"
        @finished = true
        $log.debug "getting start to shutdown main process"
        Fluent::Engine.stop
        break
      when "GRACEFUL_RESTART"
        $log.debug "fluentd main process get #{cmd} command"
        flush_buffer
      when "RELOAD"
        $log.debug "fluentd main process get #{cmd} command"
        reload_config
      else
        $log.warn "fluentd main process get unknown command [#{cmd}]"
      end
    end
  end
end
install_main_process_signal_handlers() click to toggle source
# File lib/fluent/supervisor.rb, line 831
def install_main_process_signal_handlers
  # Fluentd worker process (worker of ServerEngine) don't use code in serverengine to set signal handlers,
  # because it does almost nothing.
  # This method is the only method to set signal handlers in Fluentd worker process.

  # When user use Ctrl + C not SIGINT, SIGINT is sent to all process in same process group.
  # ServerEngine server process will send SIGTERM to child(spawned) processes by that SIGINT, so
  # worker process SHOULD NOT do anything with SIGINT, SHOULD just ignore.
  trap :INT do
    $log.debug "fluentd main process get SIGINT"

    # When Fluentd is launched without supervisor, worker should handle ctrl-c by itself
    if @standalone_worker
      @finished = true
      $log.debug "getting start to shutdown main process"
      Fluent::Engine.stop
    end
  end

  trap :TERM do
    $log.debug "fluentd main process get SIGTERM"
    unless @finished
      @finished = true
      $log.debug "getting start to shutdown main process"
      Fluent::Engine.stop
    end
  end

  if Fluent.windows?
    install_main_process_command_handlers
  else
    trap :USR1 do
      flush_buffer
    end

    trap :USR2 do
      reload_config
    end
  end
end
logging_with_console_output() { |$log| ... } click to toggle source
# File lib/fluent/supervisor.rb, line 945
def logging_with_console_output
  yield $log
  unless @log.stdout?
    logger = ServerEngine::DaemonLogger.new(STDOUT)
    log = Fluent::Log.new(logger)
    log.level = @system_config.log_level
    console = log.enable_debug
    yield console
  end
end
main_process(&block) click to toggle source
# File lib/fluent/supervisor.rb, line 956
def main_process(&block)
  if @system_config.process_name
    if @system_config.workers > 1
      Process.setproctitle("worker:#{@system_config.process_name}#{ENV['SERVERENGINE_WORKER_ID']}")
    else
      Process.setproctitle("worker:#{@system_config.process_name}")
    end
  end

  unrecoverable_error = false

  begin
    block.call
  rescue Fluent::ConfigError => e
    logging_with_console_output do |log|
      log.error "config error", file: @config_path, error: e
      log.debug_backtrace
    end
    unrecoverable_error = true
  rescue Fluent::UnrecoverableError => e
    logging_with_console_output do |log|
      log.error e.message, error: e
      log.error_backtrace
    end
    unrecoverable_error = true
  rescue ScriptError => e # LoadError, NotImplementedError, SyntaxError
    logging_with_console_output do |log|
      if e.respond_to?(:path)
        log.error e.message, path: e.path, error: e
      else
        log.error e.message, error: e
      end
      log.error_backtrace
    end
    unrecoverable_error = true
  rescue => e
    logging_with_console_output do |log|
      log.error "unexpected error", error: e
      log.error_backtrace
    end
  end

  exit!(unrecoverable_error ? 2 : 1)
end
reload_config() click to toggle source
# File lib/fluent/supervisor.rb, line 919
def reload_config
  Thread.new do
    $log.debug('worker got SIGUSR2')

    begin
      conf = Fluent::Config.build(
        config_path: @config_path,
        encoding: @conf_encoding,
        additional_config: @inline_config,
        use_v1_config: @use_v1_config,
      )

      Fluent::VariableStore.try_to_reset do
        Fluent::Engine.reload_config(conf)
      end
    rescue => e
      # it is guaranteed that config file is valid by supervisor side. but it's not atomic because of using signals to commnicate between worker and super
      # So need this rescue code
      $log.error("failed to reload config: #{e}")
      next
    end

    @conf = conf
  end
end
show_plugin_config() click to toggle source
# File lib/fluent/supervisor.rb, line 785
def show_plugin_config
  name, type = @show_plugin_config.split(":") # input:tail
  $log.info "show_plugin_config option is deprecated. Use fluent-plugin-config-format --format=txt #{name} #{type}"
  exit 0
end
supervise() click to toggle source
# File lib/fluent/supervisor.rb, line 791
def supervise
  Process.setproctitle("supervisor:#{@system_config.process_name}") if @system_config.process_name
  $log.info "starting fluentd-#{Fluent::VERSION}", pid: Process.pid, ruby: RUBY_VERSION

  fluentd_spawn_cmd = build_spawn_command
  $log.info "spawn command to main: ", cmdline: fluentd_spawn_cmd

  params = {
    'main_cmd' => fluentd_spawn_cmd,
    'daemonize' => @daemonize,
    'inline_config' => @inline_config,
    'log_path' => @log_path,
    'log_rotate_age' => @log_rotate_age,
    'log_rotate_size' => @log_rotate_size,
    'chuser' => @chuser,
    'chgroup' => @chgroup,
    'use_v1_config' => @use_v1_config,
    'conf_encoding' => @conf_encoding,
    'signame' => @signame,
    'fluentd_conf' => @conf.to_s,

    'workers' => @system_config.workers,
    'root_dir' => @system_config.root_dir,
    'log_level' => @system_config.log_level,
    'suppress_repeated_stacktrace' => @system_config.suppress_repeated_stacktrace,
    'ignore_repeated_log_interval' => @system_config.ignore_repeated_log_interval,
    'rpc_endpoint' => @system_config.rpc_endpoint,
    'enable_get_dump' => @system_config.enable_get_dump,
    'counter_server' => @system_config.counter_server,
    'log_format' => @system_config.log.format,
    'log_time_format' => @system_config.log.time_format,
    'disable_shared_socket' => @system_config.disable_shared_socket
  }

  se = ServerEngine.create(ServerModule, WorkerModule){
    Fluent::Supervisor.load_config(@config_path, params)
  }
  se.run
end