class Fluent::EngineClass
Constants
- MAINLOOP_SLEEP_INTERVAL
Attributes
root_agent[R]
supervisor_mode[R]
system_config[R]
Public Class Methods
new()
click to toggle source
# File lib/fluent/engine.rb, line 33 def initialize @root_agent = nil @engine_stopped = false @_worker_id = nil @log_event_verbose = false @suppress_config_dump = false @without_source = false @fluent_log_event_router = nil @system_config = SystemConfig.new @supervisor_mode = false end
Public Instance Methods
add_plugin_dir(dir)
click to toggle source
# File lib/fluent/engine.rb, line 118 def add_plugin_dir(dir) $log.warn('Deprecated method: this method is going to be deleted. Use Fluent::Plugin.add_plugin_dir') Plugin.add_plugin_dir(dir) end
configure(conf)
click to toggle source
# File lib/fluent/engine.rb, line 104 def configure(conf) @root_agent.configure(conf) @fluent_log_event_router = FluentLogEventRouter.build(@root_agent) if @fluent_log_event_router.emittable? $log.enable_event(true) end unless @suppress_config_dump $log.info :supervisor, "using configuration file: #{conf.to_s.rstrip}" end end
emit(tag, time, record)
click to toggle source
# File lib/fluent/engine.rb, line 123 def emit(tag, time, record) raise "BUG: use router.emit instead of Engine.emit" end
emit_array(tag, array)
click to toggle source
# File lib/fluent/engine.rb, line 127 def emit_array(tag, array) raise "BUG: use router.emit_array instead of Engine.emit_array" end
emit_stream(tag, es)
click to toggle source
# File lib/fluent/engine.rb, line 131 def emit_stream(tag, es) raise "BUG: use router.emit_stream instead of Engine.emit_stream" end
flush!()
click to toggle source
# File lib/fluent/engine.rb, line 135 def flush! @root_agent.flush! end
init(system_config, supervisor_mode: false)
click to toggle source
# File lib/fluent/engine.rb, line 52 def init(system_config, supervisor_mode: false) @system_config = system_config @supervisor_mode = supervisor_mode @suppress_config_dump = system_config.suppress_config_dump unless system_config.suppress_config_dump.nil? @without_source = system_config.without_source unless system_config.without_source.nil? @log_event_verbose = system_config.log_event_verbose unless system_config.log_event_verbose.nil? @root_agent = RootAgent.new(log: log, system_config: @system_config) self end
log()
click to toggle source
# File lib/fluent/engine.rb, line 66 def log $log end
now()
click to toggle source
# File lib/fluent/engine.rb, line 139 def now # TODO thread update Fluent::EventTime.now end
parse_config(io, fname, basepath = Dir.pwd, v1_config = false)
click to toggle source
# File lib/fluent/engine.rb, line 70 def parse_config(io, fname, basepath = Dir.pwd, v1_config = false) if fname =~ /\.rb$/ require 'fluent/config/dsl' Config::DSL::Parser.parse(io, File.join(basepath, fname)) else Config.parse(io, fname, basepath, v1_config) end end
push_log_event(tag, time, record)
click to toggle source
# File lib/fluent/engine.rb, line 209 def push_log_event(tag, time, record) @fluent_log_event_router.emit_event([tag, time, record]) end
reload_config(conf, supervisor: false)
click to toggle source
@param conf [Fluent::Config] @param supervisor [Bool] @reutrn nil
# File lib/fluent/engine.rb, line 167 def reload_config(conf, supervisor: false) # configure first to reduce down time while restarting new_agent = RootAgent.new(log: log, system_config: @system_config) ret = Fluent::StaticConfigAnalysis.call(conf, workers: system_config.workers) ret.all_plugins.each do |plugin| if plugin.respond_to?(:reloadable_plugin?) && !plugin.reloadable_plugin? raise Fluent::ConfigError, "Unreloadable plugin plugin: #{Fluent::Plugin.lookup_type_from_class(plugin.class)}, plugin_id: #{plugin.plugin_id}, class_name: #{plugin.class})" end end # Assign @root_agent to new root_agent # for https://github.com/fluent/fluentd/blob/fcef949ce40472547fde295ddd2cfe297e1eddd6/lib/fluent/plugin_helper/event_emitter.rb#L50 old_agent, @root_agent = @root_agent, new_agent begin @root_agent.configure(conf) rescue @root_agent = old_agent raise end unless @suppress_config_dump $log.info :supervisor, "using configuration file: #{conf.to_s.rstrip}" end # supervisor doesn't handle actual data. so the following code is unnecessary. if supervisor old_agent.shutdown # to close thread created in #configure return end stop_phase(old_agent) $log.info 'restart fluentd worker', worker: worker_id start_phase(new_agent) end
run()
click to toggle source
# File lib/fluent/engine.rb, line 144 def run begin $log.info "starting fluentd worker", pid: Process.pid, ppid: Process.ppid, worker: worker_id start @fluent_log_event_router.start $log.info "fluentd worker is now running", worker: worker_id sleep MAINLOOP_SLEEP_INTERVAL until @engine_stopped $log.info "fluentd worker is now stopping", worker: worker_id rescue Exception => e $log.error "unexpected error", error: e $log.error_backtrace raise end stop_phase(@root_agent) end
run_configure(conf, dry_run: false)
click to toggle source
# File lib/fluent/engine.rb, line 79 def run_configure(conf, dry_run: false) configure(conf) conf.check_not_fetched do |key, e| parent_name, plugin_name = e.unused_in message = if parent_name && plugin_name "section <#{e.name}> is not used in <#{parent_name}> of #{plugin_name} plugin" elsif parent_name "section <#{e.name}> is not used in <#{parent_name}>" elsif e.name != 'system' && !(@without_source && e.name == 'source') "parameter '#{key}' in #{e.to_s.strip} is not used." else nil end next if message.nil? if dry_run && @supervisor_mode $log.warn :supervisor, message elsif e.for_every_workers? $log.warn :worker0, message elsif e.for_this_worker? $log.warn message end end end
stop()
click to toggle source
# File lib/fluent/engine.rb, line 204 def stop @engine_stopped = true nil end
worker_id()
click to toggle source
# File lib/fluent/engine.rb, line 213 def worker_id if @supervisor_mode return -1 end return @_worker_id if @_worker_id # if ENV doesn't have SERVERENGINE_WORKER_ID, it is a worker under --no-supervisor or in tests # so it's (almost) a single worker, worker_id=0 @_worker_id = (ENV['SERVERENGINE_WORKER_ID'] || 0).to_i @_worker_id end
Private Instance Methods
start()
click to toggle source
# File lib/fluent/engine.rb, line 247 def start @root_agent.start end
start_phase(root_agent)
click to toggle source
# File lib/fluent/engine.rb, line 238 def start_phase(root_agent) @fluent_log_event_router = FluentLogEventRouter.build(root_agent) if @fluent_log_event_router.emittable? $log.enable_event(true) end @root_agent.start end
stop_phase(root_agent)
click to toggle source
# File lib/fluent/engine.rb, line 227 def stop_phase(root_agent) unless @log_event_verbose $log.enable_event(false) @fluent_log_event_router.graceful_stop end $log.info 'shutting down fluentd worker', worker: worker_id root_agent.shutdown @fluent_log_event_router.stop end