class Fluent::Plugin::SampleInput

Constants

BIN_NUM
DEFAULT_STORAGE_TYPE

Public Class Methods

new() click to toggle source
Calls superclass method Fluent::Compat::Input::new
# File lib/fluent/plugin/in_sample.rb, line 59
def initialize
  super
  @storage = nil
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method Fluent::Plugin::Input#configure
# File lib/fluent/plugin/in_sample.rb, line 64
def configure(conf)
  super
  @sample_index = 0
  config = conf.elements.select{|e| e.name == 'storage' }.first
  @storage = storage_create(usage: 'suspend', conf: config, default_type: DEFAULT_STORAGE_TYPE)
end
emit(num) click to toggle source
# File lib/fluent/plugin/in_sample.rb, line 106
def emit(num)
  begin
    if @size > 1
      num.times do
        router.emit_array(@tag, Array.new(@size) { [Fluent::EventTime.now, generate] })
      end
    else
      num.times { router.emit(@tag, Fluent::EventTime.now, generate) }
    end
  rescue => _
    # ignore all errors not to stop emits by emit errors
  end
end
generate() click to toggle source
# File lib/fluent/plugin/in_sample.rb, line 120
def generate
  d = @sample[@sample_index]
  unless d
    @sample_index = 0
    d = @sample[@sample_index]
  end
  @sample_index += 1
  if @auto_increment_key
    d = d.dup
    d[@auto_increment_key] = @storage.update(:auto_increment_value){|v| v + 1 }
  end
  d
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/in_sample.rb, line 71
def multi_workers_ready?
  true
end
run() click to toggle source
# File lib/fluent/plugin/in_sample.rb, line 89
def run
  batch_num    = (@rate / BIN_NUM).to_i
  residual_num = (@rate % BIN_NUM)
  while thread_current_running?
    current_time = Time.now.to_i
    BIN_NUM.times do
      break unless (thread_current_running? && Time.now.to_i <= current_time)
      wait(0.1) { emit(batch_num) }
    end
    emit(residual_num) if thread_current_running?
    # wait for next second
    while thread_current_running? && Time.now.to_i <= current_time
      sleep 0.01
    end
  end
end
start() click to toggle source
Calls superclass method Fluent::Compat::Input#start
# File lib/fluent/plugin/in_sample.rb, line 75
def start
  super

  @storage.put(:increment_value, 0) unless @storage.get(:increment_value)
  # keep 'dummy' to avoid breaking changes for existing environment. Change it in fluentd v2
  @storage.put(:dummy_index, 0) unless @storage.get(:dummy_index)

  if @auto_increment_key && !@storage.get(:auto_increment_value)
    @storage.put(:auto_increment_value, -1)
  end

  thread_create(:sample_input, &method(:run))
end
wait(time) { || ... } click to toggle source
# File lib/fluent/plugin/in_sample.rb, line 134
def wait(time)
  start_time = Time.now
  yield
  sleep_time = time - (Time.now - start_time)
  sleep sleep_time if sleep_time > 0
end