class Fluent::Compat::SocketUtil::BaseInput

Public Class Methods

new() click to toggle source
Calls superclass method Fluent::Compat::Input.new
# File lib/fluent/compat/socket_util.rb, line 97
def initialize
  super
  require 'fluent/parser'
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method Fluent::PluginLoggerMixin#configure
# File lib/fluent/compat/socket_util.rb, line 114
def configure(conf)
  super

  @parser = Plugin.new_parser(@format)
  @parser.configure(conf)
end
run() click to toggle source
# File lib/fluent/compat/socket_util.rb, line 139
def run
  @loop.run(@blocking_timeout)
rescue => e
  log.error "unexpected error", error: e
  log.error_backtrace
end
shutdown() click to toggle source
Calls superclass method Fluent::Compat::Input#shutdown
# File lib/fluent/compat/socket_util.rb, line 130
def shutdown
  @loop.watchers.each { |w| w.detach }
  @loop.stop if @loop.instance_variable_get("@running")
  @handler.close
  @thread.join

  super
end
start() click to toggle source
Calls superclass method Fluent::Compat::Input#start
# File lib/fluent/compat/socket_util.rb, line 121
def start
  super

  @loop = Coolio::Loop.new
  @handler = listen(method(:on_message))
  @loop.attach(@handler)
  @thread = Thread.new(&method(:run))
end

Private Instance Methods

on_message(msg, addr) click to toggle source
# File lib/fluent/compat/socket_util.rb, line 148
def on_message(msg, addr)
  @parser.parse(msg) { |time, record|
    unless time && record
      log.warn "pattern not match: #{msg.inspect}"
      return
    end

    record[@source_host_key] = addr[3] if @source_host_key
    router.emit(@tag, time, record)
  }
rescue => e
  log.error msg.dump, error: e, host: addr[3]
  log.error_backtrace
end