class Fluent::Plugin::SyslogInput

Constants

DEFAULT_PARSER
FACILITY_MAP
PRIORITY_MAP
SYSLOG_REGEXP

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method Fluent::PluginLoggerMixin#configure
# File lib/fluent/plugin/in_syslog.rb, line 107
def configure(conf)
  compat_parameters_convert(conf, :parser)

  super

  @use_default = false

  @parser = parser_create
  @parser_parse_priority = @parser.respond_to?(:with_priority) && @parser.with_priority

  if @include_source_host
    if @source_address_key
      raise Fluent::ConfigError, "specify either source_address_key or include_source_host"
    end
    @source_address_key = @source_host_key
  end
  if @source_hostname_key
    if @resolve_hostname.nil?
      @resolve_hostname = true
    elsif !@resolve_hostname # user specifies "false" in config
      raise Fluent::ConfigError, "resolve_hostname must be true with source_hostname_key"
    end
  end

  @_event_loop_run_timeout = @blocking_timeout
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/in_syslog.rb, line 134
def multi_workers_ready?
  true
end
start() click to toggle source
Calls superclass method Fluent::Compat::Input#start
# File lib/fluent/plugin/in_syslog.rb, line 138
def start
  super

  log.info "listening syslog socket on #{@bind}:#{@port} with #{@protocol_type}"
  case @protocol_type
  when :udp then start_udp_server
  when :tcp then start_tcp_server
  else
    raise "BUG: invalid protocol_type value:#{@protocol_type}"
  end
end
start_tcp_server() click to toggle source
# File lib/fluent/plugin/in_syslog.rb, line 156
def start_tcp_server
  octet_count_frame = @frame_type == :octet_count

  # syslog family adds "\n" to each message when transport is TCP and traditional frame
  delimiter = octet_count_frame ? " " : "\n"
  delimiter_size = delimiter.size
  server_create_connection(:in_syslog_tcp_server, @port, bind: @bind, resolve_name: @resolve_hostname) do |conn|
    conn.data do |data|
      buffer = conn.buffer
      buffer << data
      pos = 0
      if octet_count_frame
        while idx = buffer.index(delimiter, pos)
          num = Integer(buffer[pos..idx])
          pos = idx + num
          msg = buffer[idx + 1...pos]
          message_handler(msg, conn)
        end
      else
        while idx = buffer.index(delimiter, pos)
          msg = buffer[pos...idx]
          pos = idx + delimiter_size
          message_handler(msg, conn)
        end
      end
      buffer.slice!(0, pos) if pos > 0
    end
  end
end
start_udp_server() click to toggle source
# File lib/fluent/plugin/in_syslog.rb, line 150
def start_udp_server
  server_create_udp(:in_syslog_udp_server, @port, bind: @bind, max_bytes: @message_length_limit, resolve_name: @resolve_hostname) do |data, sock|
    message_handler(data.chomp, sock)
  end
end

Private Instance Methods

emit(tag, time, record) click to toggle source
# File lib/fluent/plugin/in_syslog.rb, line 224
def emit(tag, time, record)
  router.emit(tag, time, record)
rescue => e
  log.error "syslog failed to emit", error: e, tag: tag, record: Yajl.dump(record)
end
message_handler(data, sock) click to toggle source
# File lib/fluent/plugin/in_syslog.rb, line 188
def message_handler(data, sock)
  pri = nil
  text = data
  unless @parser_parse_priority
    m = SYSLOG_REGEXP.match(data)
    unless m
      log.warn "invalid syslog message: #{data.dump}"
      return
    end
    pri = m[1].to_i
    text = m[2]
  end

  @parser.parse(text) do |time, record|
    unless time && record
      log.warn "failed to parse message", data: data
      return
    end

    pri ||= record.delete('pri')
    facility = FACILITY_MAP[pri >> 3]
    priority = PRIORITY_MAP[pri & 0b111]

    record[@priority_key] = priority if @priority_key
    record[@facility_key] = facility if @facility_key
    record[@source_address_key] = sock.remote_addr if @source_address_key
    record[@source_hostname_key] = sock.remote_host if @source_hostname_key

    tag = "#{@tag}.#{facility}.#{priority}"
    emit(tag, time, record)
  end
rescue => e
  log.error "invalid input", data: data, error: e
  log.error_backtrace
end