class Fluent::Plugin::CopyOutput
Attributes
ignore_errors[R]
ignore_if_prev_successes[R]
Public Class Methods
new()
click to toggle source
Calls superclass method
Fluent::Plugin::MultiOutput::new
# File lib/fluent/plugin/out_copy.rb, line 32 def initialize super @ignore_errors = [] @ignore_if_prev_successes = [] end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
Fluent::Plugin::MultiOutput#configure
# File lib/fluent/plugin/out_copy.rb, line 38 def configure(conf) super @copy_proc = gen_copy_proc @stores.each_with_index { |store, i| if i == 0 && store.arg.include?('ignore_if_prev_success') raise Fluent::ConfigError, "ignore_if_prev_success must specify 2nd or later <store> directives" end @ignore_errors << (store.arg.include?('ignore_error')) @ignore_if_prev_successes << (store.arg.include?('ignore_if_prev_success')) } if @ignore_errors.uniq.size == 1 && @ignore_errors.include?(true) && !@ignore_if_prev_successes.include?(true) log.warn "ignore_errors are specified in all <store>, but ignore_if_prev_success is not specified. Is this intended?" end end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/out_copy.rb, line 54 def multi_workers_ready? true end
process(tag, es)
click to toggle source
# File lib/fluent/plugin/out_copy.rb, line 58 def process(tag, es) unless es.repeatable? m = Fluent::MultiEventStream.new es.each {|time,record| m.add(time, record) } es = m end success = Array.new(outputs.size) outputs.each_with_index do |output, i| begin if i > 0 && success[i - 1] && @ignore_if_prev_successes[i] log.debug "ignore copy because prev_success in #{output.plugin_id}", index: i else output.emit_events(tag, @copy_proc ? @copy_proc.call(es) : es) success[i] = true end rescue => e if @ignore_errors[i] log.error "ignore emit error in #{output.plugin_id}", error: e else raise e end end end end
Private Instance Methods
gen_copy_proc()
click to toggle source
# File lib/fluent/plugin/out_copy.rb, line 87 def gen_copy_proc @copy_mode = :shallow if @deep_copy case @copy_mode when :no_copy nil when :shallow Proc.new { |es| es.dup } when :deep Proc.new { |es| packer = Fluent::MessagePackFactory.msgpack_packer times = [] records = [] es.each { |time, record| times << time packer.pack(record) } Fluent::MessagePackFactory.msgpack_unpacker.feed_each(packer.full_pack) { |record| records << record } Fluent::MultiEventStream.new(times, records) } when :marshal Proc.new { |es| new_es = Fluent::MultiEventStream.new es.each { |time, record| new_es.add(time, Marshal.load(Marshal.dump(record))) } new_es } end end