class Fluent::EventRouter::Pipeline::FilterOptimizer

Public Class Methods

new(filters = []) click to toggle source
# File lib/fluent/event_router.rb, line 163
def initialize(filters = [])
  @filters = filters
  @optimizable = nil
end

Public Instance Methods

filter_stream(tag, es) click to toggle source
# File lib/fluent/event_router.rb, line 173
def filter_stream(tag, es)
  if optimizable?
    optimized_filter_stream(tag, es)
  else
    @filters.reduce(es) { |acc, filter| filter.filter_stream(tag, acc) }
  end
end
filters=(filters) click to toggle source
# File lib/fluent/event_router.rb, line 168
def filters=(filters)
  @filters = filters
  reset_optimization
end

Private Instance Methods

filters_having_filter_stream() click to toggle source
# File lib/fluent/event_router.rb, line 228
def filters_having_filter_stream
  @filters_having_filter_stream ||= @filters.select do |filter|
    filter.class.instance_methods(false).include?(:filter_stream)
  end
end
optimizable?() click to toggle source
# File lib/fluent/event_router.rb, line 214
def optimizable?
  return @optimizable unless @optimizable.nil?
  fs_filters = filters_having_filter_stream
  @optimizable = if fs_filters.empty?
                   true
                 else
                   # skip log message when filter is only 1, because its performance is same as non optimized chain.
                   if @filters.size > 1 && fs_filters.size >= 1
                     $log.info "disable filter chain optimization because #{fs_filters.map(&:class)} uses `#filter_stream` method."
                   end
                   false
                 end
end
optimized_filter_stream(tag, es) click to toggle source
# File lib/fluent/event_router.rb, line 183
def optimized_filter_stream(tag, es)
  new_es = MultiEventStream.new
  es.each do |time, record|
    filtered_record = record
    filtered_time = time

    catch :break_loop do
      @filters.each do |filter|
        if filter.has_filter_with_time
          begin
            filtered_time, filtered_record = filter.filter_with_time(tag, filtered_time, filtered_record)
            throw :break_loop unless filtered_record && filtered_time
          rescue => e
            filter.router.emit_error_event(tag, filtered_time, filtered_record, e)
          end
        else
          begin
            filtered_record = filter.filter(tag, filtered_time, filtered_record)
            throw :break_loop unless filtered_record
          rescue => e
            filter.router.emit_error_event(tag, filtered_time, filtered_record, e)
          end
        end
      end

      new_es.add(filtered_time, filtered_record)
    end
  end
  new_es
end
reset_optimization() click to toggle source
# File lib/fluent/event_router.rb, line 234
def reset_optimization
  @optimizable = nil
  @filters_having_filter_stream = nil
end