class Fluent::Plugin::DummyInput
Constants
- BIN_NUM
- DEFAULT_STORAGE_TYPE
Public Class Methods
new()
click to toggle source
Calls superclass method
Fluent::Compat::Input.new
# File lib/fluent/plugin/in_dummy.rb, line 58 def initialize super @storage = nil end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
Fluent::PluginLoggerMixin#configure
# File lib/fluent/plugin/in_dummy.rb, line 63 def configure(conf) super @dummy_index = 0 config = conf.elements.select{|e| e.name == 'storage' }.first @storage = storage_create(usage: 'suspend', conf: config, default_type: DEFAULT_STORAGE_TYPE) end
emit(num)
click to toggle source
# File lib/fluent/plugin/in_dummy.rb, line 104 def emit(num) begin if @size > 1 num.times do router.emit_array(@tag, Array.new(@size) { [Fluent::Engine.now, generate] }) end else num.times { router.emit(@tag, Fluent::Engine.now, generate) } end rescue => _ # ignore all errors not to stop emits by emit errors end end
generate()
click to toggle source
# File lib/fluent/plugin/in_dummy.rb, line 118 def generate d = @dummy[@dummy_index] unless d @dummy_index = 0 d = @dummy[@dummy_index] end @dummy_index += 1 if @auto_increment_key d = d.dup d[@auto_increment_key] = @storage.update(:auto_increment_value){|v| v + 1 } end d end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/in_dummy.rb, line 70 def multi_workers_ready? true end
run()
click to toggle source
# File lib/fluent/plugin/in_dummy.rb, line 87 def run batch_num = (@rate / BIN_NUM).to_i residual_num = (@rate % BIN_NUM) while thread_current_running? current_time = Time.now.to_i BIN_NUM.times do break unless (thread_current_running? && Time.now.to_i <= current_time) wait(0.1) { emit(batch_num) } end emit(residual_num) if thread_current_running? # wait for next second while thread_current_running? && Time.now.to_i <= current_time sleep 0.01 end end end
start()
click to toggle source
Calls superclass method
Fluent::Compat::Input#start
# File lib/fluent/plugin/in_dummy.rb, line 74 def start super @storage.put(:increment_value, 0) unless @storage.get(:increment_value) @storage.put(:dummy_index, 0) unless @storage.get(:dummy_index) if @auto_increment_key && !@storage.get(:auto_increment_value) @storage.put(:auto_increment_value, -1) end thread_create(:dummy_input, &method(:run)) end
wait(time) { || ... }
click to toggle source
# File lib/fluent/plugin/in_dummy.rb, line 132 def wait(time) start_time = Time.now yield sleep_time = time - (Time.now - start_time) sleep sleep_time if sleep_time > 0 end