class Fluent::Plugin::Filter
Attributes
has_filter_with_time[R]
Public Class Methods
new()
click to toggle source
Calls superclass method
Fluent::PluginLoggerMixin::new
# File lib/fluent/plugin/filter.rb, line 35 def initialize super @has_filter_with_time = has_filter_with_time? @emit_records_metrics = nil @emit_size_metrics = nil @counter_mutex = Mutex.new @enable_size_metrics = false end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
Fluent::PluginLoggerMixin#configure
# File lib/fluent/plugin/filter.rb, line 52 def configure(conf) super @emit_records_metrics = metrics_create(namespace: "fluentd", subsystem: "filter", name: "emit_records", help_text: "Number of count emit records") @emit_size_metrics = metrics_create(namespace: "fluentd", subsystem: "filter", name: "emit_size", help_text: "Total size of emit events") @enable_size_metrics = !!system_config.enable_size_metrics end
emit_records()
click to toggle source
# File lib/fluent/plugin/filter.rb, line 44 def emit_records @emit_records_metrics.get end
emit_size()
click to toggle source
# File lib/fluent/plugin/filter.rb, line 48 def emit_size @emit_size_metrics.get end
filter(tag, time, record)
click to toggle source
# File lib/fluent/plugin/filter.rb, line 74 def filter(tag, time, record) raise NotImplementedError, "BUG: filter plugins MUST implement this method" end
filter_stream(tag, es)
click to toggle source
# File lib/fluent/plugin/filter.rb, line 82 def filter_stream(tag, es) new_es = MultiEventStream.new if @has_filter_with_time es.each do |time, record| begin filtered_time, filtered_record = filter_with_time(tag, time, record) new_es.add(filtered_time, filtered_record) if filtered_time && filtered_record rescue => e router.emit_error_event(tag, time, record, e) end end else es.each do |time, record| begin filtered_record = filter(tag, time, record) new_es.add(time, filtered_record) if filtered_record rescue => e router.emit_error_event(tag, time, record, e) end end end new_es end
filter_with_time(tag, time, record)
click to toggle source
# File lib/fluent/plugin/filter.rb, line 78 def filter_with_time(tag, time, record) raise NotImplementedError, "BUG: filter plugins MUST implement this method" end
measure_metrics(es)
click to toggle source
# File lib/fluent/plugin/filter.rb, line 69 def measure_metrics(es) @emit_records_metrics.add(es.size) @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics end
statistics()
click to toggle source
# File lib/fluent/plugin/filter.rb, line 60 def statistics stats = { 'emit_records' => @emit_records_metrics.get, 'emit_size' => @emit_size_metrics.get, } { 'filter' => stats } end
Private Instance Methods
has_filter_with_time?()
click to toggle source
# File lib/fluent/plugin/filter.rb, line 108 def has_filter_with_time? implmented_methods = self.class.instance_methods(false) # Plugins that override `filter_stream` don't need check, # because they may not call `filter` or `filter_with_time` # for example fluentd/lib/fluent/plugin/filter_record_transformer.rb return nil if implmented_methods.include?(:filter_stream) case when [:filter, :filter_with_time].all? { |e| implmented_methods.include?(e) } raise "BUG: Filter plugins MUST implement either `filter` or `filter_with_time`" when implmented_methods.include?(:filter) false when implmented_methods.include?(:filter_with_time) true else raise NotImplementedError, "BUG: Filter plugins MUST implement either `filter` or `filter_with_time`" end end