class Fluent::Plugin::Filter

Attributes

has_filter_with_time[R]

Public Class Methods

new() click to toggle source
Calls superclass method Fluent::PluginLoggerMixin::new
# File lib/fluent/plugin/filter.rb, line 35
def initialize
  super
  @has_filter_with_time = has_filter_with_time?
  @emit_records_metrics = nil
  @emit_size_metrics = nil
  @counter_mutex = Mutex.new
  @enable_size_metrics = false
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method Fluent::PluginLoggerMixin#configure
# File lib/fluent/plugin/filter.rb, line 52
def configure(conf)
  super

  @emit_records_metrics = metrics_create(namespace: "fluentd", subsystem: "filter", name: "emit_records", help_text: "Number of count emit records")
  @emit_size_metrics = metrics_create(namespace: "fluentd", subsystem: "filter", name: "emit_size", help_text: "Total size of emit events")
  @enable_size_metrics = !!system_config.enable_size_metrics
end
emit_records() click to toggle source
# File lib/fluent/plugin/filter.rb, line 44
def emit_records
  @emit_records_metrics.get
end
emit_size() click to toggle source
# File lib/fluent/plugin/filter.rb, line 48
def emit_size
  @emit_size_metrics.get
end
filter(tag, time, record) click to toggle source
# File lib/fluent/plugin/filter.rb, line 74
def filter(tag, time, record)
  raise NotImplementedError, "BUG: filter plugins MUST implement this method"
end
filter_stream(tag, es) click to toggle source
# File lib/fluent/plugin/filter.rb, line 82
def filter_stream(tag, es)
  new_es = MultiEventStream.new
  if @has_filter_with_time
    es.each do |time, record|
      begin
        filtered_time, filtered_record = filter_with_time(tag, time, record)
        new_es.add(filtered_time, filtered_record) if filtered_time && filtered_record
      rescue => e
        router.emit_error_event(tag, time, record, e)
      end
    end
  else
    es.each do |time, record|
      begin
        filtered_record = filter(tag, time, record)
        new_es.add(time, filtered_record) if filtered_record
      rescue => e
        router.emit_error_event(tag, time, record, e)
      end
    end
  end
  new_es
end
filter_with_time(tag, time, record) click to toggle source
# File lib/fluent/plugin/filter.rb, line 78
def filter_with_time(tag, time, record)
  raise NotImplementedError, "BUG: filter plugins MUST implement this method"
end
measure_metrics(es) click to toggle source
# File lib/fluent/plugin/filter.rb, line 69
def measure_metrics(es)
  @emit_records_metrics.add(es.size)
  @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics
end
statistics() click to toggle source
# File lib/fluent/plugin/filter.rb, line 60
def statistics
  stats = {
    'emit_records' => @emit_records_metrics.get,
    'emit_size' => @emit_size_metrics.get,
  }

  { 'filter' => stats }
end

Private Instance Methods

has_filter_with_time?() click to toggle source
# File lib/fluent/plugin/filter.rb, line 108
def has_filter_with_time?
  implmented_methods = self.class.instance_methods(false)
  # Plugins that override `filter_stream` don't need check,
  # because they may not call `filter` or `filter_with_time`
  # for example fluentd/lib/fluent/plugin/filter_record_transformer.rb
  return nil if implmented_methods.include?(:filter_stream)
  case
  when [:filter, :filter_with_time].all? { |e| implmented_methods.include?(e) }
    raise "BUG: Filter plugins MUST implement either `filter` or `filter_with_time`"
  when implmented_methods.include?(:filter)
    false
  when implmented_methods.include?(:filter_with_time)
    true
  else
    raise NotImplementedError, "BUG: Filter plugins MUST implement either `filter` or `filter_with_time`"
  end
end