class Fluent::Test::InputTestDriver
Attributes
emit_streams[R]
event_streams[R]
expected_emits_length[RW]
run_timeout[RW]
Public Class Methods
new(klass, &block)
click to toggle source
Calls superclass method
Fluent::Test::TestDriver.new
# File lib/fluent/test/input_test.rb, line 24 def initialize(klass, &block) super(klass, &block) @emit_streams = [] @event_streams = [] @expects = nil # for checking only the number of emitted records during run @expected_emits_length = nil @run_timeout = 5 @run_post_conditions = [] end
Public Instance Methods
emits()
click to toggle source
# File lib/fluent/test/input_test.rb, line 48 def emits all = [] @emit_streams.each {|tag,events| events.each {|time,record| all << [tag, time, record] } } all end
events()
click to toggle source
# File lib/fluent/test/input_test.rb, line 58 def events all = [] @emit_streams.each {|tag,events| all.concat events } all end
expect_emit(tag, time, record)
click to toggle source
# File lib/fluent/test/input_test.rb, line 35 def expect_emit(tag, time, record) (@expects ||= []) << [tag, time, record] self end
expected_emits()
click to toggle source
# File lib/fluent/test/input_test.rb, line 40 def expected_emits @expects ||= [] end
records()
click to toggle source
# File lib/fluent/test/input_test.rb, line 66 def records all = [] @emit_streams.each {|tag,events| events.each {|time,record| all << record } } all end
register_run_breaking_condition(&block)
click to toggle source
# File lib/fluent/test/input_test.rb, line 82 def register_run_breaking_condition(&block) if block @run_breaking_conditions ||= [] @run_breaking_conditions << block end end
register_run_post_condition(&block)
click to toggle source
# File lib/fluent/test/input_test.rb, line 76 def register_run_post_condition(&block) if block @run_post_conditions << block end end
run(num_waits = 10, &block)
click to toggle source
Calls superclass method
Fluent::Test::TestDriver#run
# File lib/fluent/test/input_test.rb, line 112 def run(num_waits = 10, &block) m = method(:emit_stream) unless Engine.singleton_class.ancestors.include?(EmitStreamWrapper) Engine.singleton_class.prepend EmitStreamWrapper end Engine.emit_stream_callee = m unless instance.router.singleton_class.ancestors.include?(EmitStreamWrapper) instance.router.singleton_class.prepend EmitStreamWrapper end instance.router.emit_stream_callee = m super(num_waits) { block.call if block if @expected_emits_length || @expects || @run_post_conditions # counters for emits and emit_streams i, j = 0, 0 # Events of expected length will be emitted at the end. max_length = @expected_emits_length max_length ||= @expects.length if @expects if max_length register_run_post_condition do i == max_length end end # Set running timeout to avoid infinite loop caused by some errors. started_at = Time.now register_run_breaking_condition do Time.now >= started_at + @run_timeout end until run_should_stop? if j >= @emit_streams.length sleep 0.01 next end tag, events = @emit_streams[j] events.each do |time, record| if @expects assert_equal(@expects[i], [tag, time, record]) assert_equal_event_time(@expects[i][1], time) if @expects[i][1].is_a?(Fluent::EventTime) end i += 1 end j += 1 end assert_equal(@expects.length, i) if @expects end } self end
run_should_stop?()
click to toggle source
# File lib/fluent/test/input_test.rb, line 89 def run_should_stop? # Should stop running if post conditions are not registered. return true unless @run_post_conditions # Should stop running if all of the post conditions are true. return true if @run_post_conditions.all? {|proc| proc.call } # Should stop running if any of the breaking conditions is true. # In this case, some post conditions may be not true. return true if @run_breaking_conditions && @run_breaking_conditions.any? {|proc| proc.call } false end
Private Instance Methods
emit_stream(tag, es)
click to toggle source
# File lib/fluent/test/input_test.rb, line 168 def emit_stream(tag, es) @event_streams << es @emit_streams << [tag, es.to_a] end