class Fluent::Compat::BufferedOutput

Constants

BUFFER_PARAMS

Private Class Methods

new() click to toggle source
Calls superclass method Fluent::Plugin::Output::new
# File lib/fluent/compat/output.rb, line 387
def initialize
  super
  unless self.class.ancestors.include?(Fluent::Compat::CallSuperMixin)
    self.class.prepend Fluent::Compat::CallSuperMixin
  end
end
propagate_default_params() click to toggle source
# File lib/fluent/compat/output.rb, line 248
def self.propagate_default_params
  BUFFER_PARAMS
end

Private Instance Methods

configure(conf) click to toggle source
Calls superclass method Fluent::Plugin::Output#configure
# File lib/fluent/compat/output.rb, line 253
def configure(conf)
  bufconf = CompatOutputUtils.buffer_section(conf)
  config_style = (bufconf ? :v1 : :v0)
  if config_style == :v0
    buf_params = {
      "flush_mode" => "interval",
      "retry_type" => "exponential_backoff",
    }
    BUFFER_PARAMS.each do |older, newer|
      next unless newer
      if conf.has_key?(older)
        if older == 'buffer_queue_full_action' && conf[older] == 'exception'
          buf_params[newer] = 'throw_exception'
        else
          buf_params[newer] = conf[older]
        end
      end
    end

    conf.elements << Fluent::Config::Element.new('buffer', '', buf_params, [])
  end

  @includes_record_filter = self.class.ancestors.include?(Fluent::Compat::RecordFilterMixin)

  methods_of_plugin = self.class.instance_methods(false)
  @overrides_emit = methods_of_plugin.include?(:emit)
  # RecordFilter mixin uses its own #format_stream method implementation
  @overrides_format_stream = methods_of_plugin.include?(:format_stream) || @includes_record_filter

  ParserUtils.convert_parser_conf(conf)
  FormatterUtils.convert_formatter_conf(conf)

  super

  if config_style == :v1
    unless @buffer_config.chunk_keys.empty?
      raise Fluent::ConfigError, "this plugin '#{self.class}' cannot handle arguments for <buffer ...> section"
    end
  end

  self.extend BufferedChunkMixin

  if @overrides_emit
    self.singleton_class.module_eval do
      attr_accessor :last_emit_via_buffer
    end
    output_plugin = self
    m = Module.new do
      define_method(:emit) do |key, data, chain|
        # receivers of this method are buffer instances
        output_plugin.last_emit_via_buffer = [key, data]
      end
    end
    @buffer.extend m
  end
end
detach_multi_process(&block) click to toggle source
# File lib/fluent/compat/output.rb, line 412
def detach_multi_process(&block)
  log.warn "detach_process is not supported in this version. ignored."
  block.call
end
detach_process(&block) click to toggle source
# File lib/fluent/compat/output.rb, line 407
def detach_process(&block)
  log.warn "detach_process is not supported in this version. ignored."
  block.call
end
emit(tag, es, chain, key="") click to toggle source

original implementation of v0.12 BufferedOutput

# File lib/fluent/compat/output.rb, line 311
def emit(tag, es, chain, key="")
  # this method will not be used except for the case that plugin calls super
  @emit_count_metrics.inc
  data = format_stream(tag, es)
  if @buffer.emit(key, data, chain)
    submit_flush
  end
end
extract_placeholders(str, metadata) click to toggle source
# File lib/fluent/compat/output.rb, line 383
def extract_placeholders(str, metadata)
  raise "BUG: compat plugin does not support extract_placeholders: use newer plugin API"
end
format_stream(tag, es) click to toggle source
# File lib/fluent/compat/output.rb, line 324
def format_stream(tag, es)
  # this method will not be used except for the case that plugin calls super
  out = ''
  es.each do |time, record|
    out << format(tag, time, record)
  end
  out
end
handle_stream_simple(tag, es, enqueue: false) click to toggle source

This method overrides Fluent::Plugin::Output#handle_stream_simple because v0.12 BufferedOutput may overrides format_stream, but original handle_stream_simple method doesn’t consider about it

# File lib/fluent/compat/output.rb, line 338
def handle_stream_simple(tag, es, enqueue: false)
  if @overrides_emit
    current_emit_count = @emit_count_metrics.get
    size = es.size
    key = data = nil
    begin
      emit(tag, es, NULL_OUTPUT_CHAIN)
      key, data = self.last_emit_via_buffer
    ensure
      @emit_count_metrics.set(current_emit_count)
      self.last_emit_via_buffer = nil
    end
    # on-the-fly key assignment can be done, and it's not configurable if Plugin#emit does it dynamically
    meta = @buffer.metadata(variables: (key && !key.empty? ? {key: key} : nil))
    write_guard do
      @buffer.write({meta => data}, format: ->(_data){ _data }, size: ->(){ size }, enqueue: enqueue)
    end
    @emit_records_metrics.add(es.size)
    @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics
    return [meta]
  end

  if @overrides_format_stream
    meta = metadata(nil, nil, nil)
    size = es.size
    bulk = format_stream(tag, es)
    write_guard do
      @buffer.write({meta => bulk}, format: ->(_data){ _data }, size: ->(){ size }, enqueue: enqueue)
    end
    @emit_records_metrics.add(es.size)
    @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics
    return [meta]
  end

  meta = metadata(nil, nil, nil)
  size = es.size
  data = es.map{|time,record| format(tag, time, record) }
  write_guard do
    @buffer.write({meta => data}, enqueue: enqueue)
  end
  @emit_records_metrics.add(es.size)
  @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics
  [meta]
end
start() click to toggle source
Calls superclass method Fluent::Plugin::Output#start
# File lib/fluent/compat/output.rb, line 394
def start
  super

  if instance_variable_defined?(:@formatter) && @inject_config
    unless @formatter.class.ancestors.include?(Fluent::Compat::HandleTagAndTimeMixin)
      if @formatter.respond_to?(:owner) && !@formatter.owner
        @formatter.owner = self
        @formatter.singleton_class.prepend FormatterUtils::InjectMixin
      end
    end
  end
end
submit_flush() click to toggle source
# File lib/fluent/compat/output.rb, line 320
def submit_flush
  # nothing todo: blank method to be called from #emit of 3rd party plugins
end
support_in_v12_style?(feature) click to toggle source
# File lib/fluent/compat/output.rb, line 210
def support_in_v12_style?(feature)
  case feature
  when :synchronous    then false
  when :buffered       then true
  when :delayed_commit then false
  when :custom_format  then true
  end
end