class Fluent::EventRouter
EventRouter
is responsible to route events to a collector.
It has a list of MatchPattern
and Collector pairs:
+----------------+ +-----------------+ | MatchPattern | | Collector | +----------------+ +-----------------+ | access.** ---------> type forward | | logs.** ---------> type copy | | archive.** ---------> type s3 | +----------------+ +-----------------+
EventRouter
does:
1) receive an event at `#emit` methods 2) match the event's tag with the MatchPatterns 3) forward the event to the corresponding Collector
Collector is either of Output
, Filter
or other EventRouter
.
Attributes
default_collector[RW]
emit_error_handler[RW]
Public Class Methods
new(default_collector, emit_error_handler)
click to toggle source
# File lib/fluent/event_router.rb, line 45 def initialize(default_collector, emit_error_handler) @match_rules = [] @match_cache = MatchCache.new @default_collector = default_collector @emit_error_handler = emit_error_handler @metric_callbacks = {} @caller_plugin_id = nil end
Public Instance Methods
add_metric_callbacks(caller_plugin_id, callback)
click to toggle source
# File lib/fluent/event_router.rb, line 88 def add_metric_callbacks(caller_plugin_id, callback) @metric_callbacks[caller_plugin_id] = callback end
add_rule(pattern, collector)
click to toggle source
called by Agent
to add new match pattern and collector
# File lib/fluent/event_router.rb, line 84 def add_rule(pattern, collector) @match_rules << Rule.new(pattern, collector) end
caller_plugin_id=(caller_plugin_id)
click to toggle source
# File lib/fluent/event_router.rb, line 92 def caller_plugin_id=(caller_plugin_id) @caller_plugin_id = caller_plugin_id end
emit(tag, time, record)
click to toggle source
# File lib/fluent/event_router.rb, line 104 def emit(tag, time, record) unless record.nil? emit_stream(tag, OneEventStream.new(time, record)) end end
emit_array(tag, array)
click to toggle source
# File lib/fluent/event_router.rb, line 110 def emit_array(tag, array) emit_stream(tag, ArrayEventStream.new(array)) end
emit_error_event(tag, time, record, error)
click to toggle source
# File lib/fluent/event_router.rb, line 123 def emit_error_event(tag, time, record, error) @emit_error_handler.emit_error_event(tag, time, record, error) end
emit_stream(tag, es)
click to toggle source
# File lib/fluent/event_router.rb, line 114 def emit_stream(tag, es) match(tag).emit_events(tag, es) if callback = find_callback callback.call(es) end rescue => e @emit_error_handler.handle_emits_error(tag, es, e) end
find_callback()
click to toggle source
# File lib/fluent/event_router.rb, line 96 def find_callback if @caller_plugin_id @metric_callbacks[@caller_plugin_id] else nil end end
match(tag)
click to toggle source
# File lib/fluent/event_router.rb, line 131 def match(tag) collector = @match_cache.get(tag) { find(tag) || @default_collector } collector end
match?(tag)
click to toggle source
# File lib/fluent/event_router.rb, line 127 def match?(tag) !!find(tag) end
suppress_missing_match!()
click to toggle source
# File lib/fluent/event_router.rb, line 77 def suppress_missing_match! if @default_collector.respond_to?(:suppress_missing_match!) @default_collector.suppress_missing_match! end end
Private Instance Methods
find(tag)
click to toggle source
# File lib/fluent/event_router.rb, line 269 def find(tag) pipeline = nil @match_rules.each_with_index { |rule, i| if rule.match?(tag) if rule.collector.is_a?(Plugin::Filter) pipeline ||= Pipeline.new pipeline.add_filter(rule.collector) else if pipeline pipeline.set_output(rule.collector) else # Use Output directly when filter is not matched pipeline = rule.collector end return pipeline end end } if pipeline # filter is matched but no match pipeline.set_output(@default_collector) pipeline else nil end end