class Fluent::Plugin::ForwardInput

Constants

HEARTBEAT_UDP_PAYLOAD
LISTEN_PORT

Public Instance Methods

add_source_info(es, conn) click to toggle source
# File lib/fluent/plugin/in_forward.rb, line 361
def add_source_info(es, conn)
  new_es = Fluent::MultiEventStream.new
  if @source_address_key && @source_hostname_key
    address = conn.remote_addr
    hostname = conn.remote_host
    es.each { |time, record|
      record[@source_address_key] = address
      record[@source_hostname_key] = hostname
      new_es.add(time, record)
    }
  elsif @source_address_key
    address = conn.remote_addr
    es.each { |time, record|
      record[@source_address_key] = address
      new_es.add(time, record)
    }
  elsif @source_hostname_key
    hostname = conn.remote_host
    es.each { |time, record|
      record[@source_hostname_key] = hostname
      new_es.add(time, record)
    }
  else
    raise "BUG: don't call this method in this case"
  end
  new_es
end
check_and_skip_invalid_event(tag, es, remote_host) click to toggle source
# File lib/fluent/plugin/in_forward.rb, line 349
def check_and_skip_invalid_event(tag, es, remote_host)
  new_es = Fluent::MultiEventStream.new
  es.each { |time, record|
    if invalid_event?(tag, time, record)
      log.warn "skip invalid event:", host: remote_host, tag: tag, time: time, record: record
      next
    end
    new_es.add(time, record)
  }
  new_es
end
check_ping(message, remote_addr, user_auth_salt, nonce) click to toggle source
# File lib/fluent/plugin/in_forward.rb, line 407
def check_ping(message, remote_addr, user_auth_salt, nonce)
  log.debug "checking ping"
  # ['PING', self_hostname, shared_key_salt, sha512_hex(shared_key_salt + self_hostname + nonce + shared_key), username || '', sha512_hex(auth_salt + username + password) || '']
  unless message.size == 6 && message[0] == 'PING'
    return false, 'invalid ping message'
  end
  _ping, hostname, shared_key_salt, shared_key_hexdigest, username, password_digest = message

  node = @nodes.select{|n| n[:address].include?(remote_addr) rescue false }.first
  if !node && !@security.allow_anonymous_source
    log.warn "Anonymous client disallowed", address: remote_addr, hostname: hostname
    return false, "anonymous source host '#{remote_addr}' denied", nil
  end

  shared_key = node ? node[:shared_key] : @security.shared_key
  serverside = Digest::SHA512.new.update(shared_key_salt).update(hostname).update(nonce).update(shared_key).hexdigest
  if shared_key_hexdigest != serverside
    log.warn "Shared key mismatch", address: remote_addr, hostname: hostname
    return false, 'shared_key mismatch', nil
  end

  if @security.user_auth
    users = select_authenticate_users(node, username)
    success = false
    users.each do |user|
      passhash = Digest::SHA512.new.update(user_auth_salt).update(username).update(user[:password]).hexdigest
      success ||= (passhash == password_digest)
    end
    unless success
      log.warn "Authentication failed", address: remote_addr, hostname: hostname, username: username
      return false, 'username/password mismatch', nil
    end
  end

  return true, shared_key_salt, shared_key
end
configure(conf) click to toggle source
Calls superclass method Fluent::PluginLoggerMixin#configure
# File lib/fluent/plugin/in_forward.rb, line 94
def configure(conf)
  super

  if @source_hostname_key
    # TODO: add test
    if @resolve_hostname.nil?
      @resolve_hostname = true
    elsif !@resolve_hostname # user specifies "false" in config
      raise Fluent::ConfigError, "resolve_hostname must be true with source_hostname_key"
    end
  end
  @enable_field_injection = @source_address_key || @source_hostname_key

  if @security
    if @security.user_auth && @security.users.empty?
      raise Fluent::ConfigError, "<user> sections required if user_auth enabled"
    end
    if !@security.allow_anonymous_source && @security.clients.empty?
      raise Fluent::ConfigError, "<client> sections required if allow_anonymous_source disabled"
    end

    @nodes = []

    @security.clients.each do |client|
      if client.host && client.network
        raise Fluent::ConfigError, "both of 'host' and 'network' are specified for client"
      end
      if !client.host && !client.network
        raise Fluent::ConfigError, "Either of 'host' and 'network' must be specified for client"
      end
      source = nil
      if client.host
        begin
          source = IPSocket.getaddress(client.host)
        rescue SocketError => e
          raise Fluent::ConfigError, "host '#{client.host}' cannot be resolved"
        end
      end
      source_addr = begin
                      IPAddr.new(source || client.network)
                    rescue ArgumentError => e
                      raise Fluent::ConfigError, "network '#{client.network}' address format is invalid"
                    end
      @nodes.push({
          address: source_addr,
          shared_key: (client.shared_key || @security.shared_key),
          users: client.users
        })
    end
  end
end
generate_helo(nonce, user_auth_salt) click to toggle source
# File lib/fluent/plugin/in_forward.rb, line 401
def generate_helo(nonce, user_auth_salt)
  log.debug "generating helo"
  # ['HELO', options(hash)]
  ['HELO', {'nonce' => nonce, 'auth' => (@security ? user_auth_salt : ''), 'keepalive' => !@deny_keepalive}]
end
generate_pong(auth_result, reason_or_salt, nonce, shared_key) click to toggle source
# File lib/fluent/plugin/in_forward.rb, line 444
def generate_pong(auth_result, reason_or_salt, nonce, shared_key)
  log.debug "generating pong"
  # ['PONG', bool(authentication result), 'reason if authentication failed', self_hostname, sha512_hex(salt + self_hostname + nonce + sharedkey)]
  unless auth_result
    return ['PONG', false, reason_or_salt, '', '']
  end

  shared_key_digest_hex = Digest::SHA512.new.update(reason_or_salt).update(@security.self_hostname).update(nonce).update(shared_key).hexdigest
  ['PONG', true, '', @security.self_hostname, shared_key_digest_hex]
end
generate_salt() click to toggle source
# File lib/fluent/plugin/in_forward.rb, line 397
def generate_salt
  ::SecureRandom.random_bytes(16)
end
handle_connection(conn) click to toggle source
# File lib/fluent/plugin/in_forward.rb, line 178
def handle_connection(conn)
  send_data = ->(serializer, data){ conn.write serializer.call(data) }

  log.trace "connected fluent socket", addr: conn.remote_addr, port: conn.remote_port
  state = :established
  nonce = nil
  user_auth_salt = nil

  if @security
    # security enabled session MUST use MessagePack as serialization format
    state = :helo
    nonce = generate_salt
    user_auth_salt = generate_salt
    send_data.call(:to_msgpack.to_proc, generate_helo(nonce, user_auth_salt))
    state = :pingpong
  end

  log.trace "accepted fluent socket", addr: conn.remote_addr, port: conn.remote_port

  read_messages(conn) do |msg, chunk_size, serializer|
    case state
    when :pingpong
      success, reason_or_salt, shared_key = check_ping(msg, conn.remote_addr, user_auth_salt, nonce)
      unless success
        conn.on(:write_complete) { |c| c.close_after_write_complete }
        send_data.call(serializer, generate_pong(false, reason_or_salt, nonce, shared_key))
        next
      end
      send_data.call(serializer, generate_pong(true, reason_or_salt, nonce, shared_key))

      log.debug "connection established", address: conn.remote_addr, port: conn.remote_port
      state = :established
    when :established
      options = on_message(msg, chunk_size, conn)
      if options && r = response(options)
        log.trace "sent response to fluent socket", address: conn.remote_addr, response: r
        conn.on(:write_complete) { |c| c.close } if @deny_keepalive
        send_data.call(serializer, r)
      else
        if @deny_keepalive
          conn.close
        end
      end
    else
      raise "BUG: unknown session state: #{state}"
    end
  end
end
invalid_event?(tag, time, record) click to toggle source
# File lib/fluent/plugin/in_forward.rb, line 345
def invalid_event?(tag, time, record)
  !((time.is_a?(Integer) || time.is_a?(::Fluent::EventTime)) && record.is_a?(Hash) && tag.is_a?(String))
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/in_forward.rb, line 146
def multi_workers_ready?
  true
end
on_message(msg, chunk_size, conn) click to toggle source
# File lib/fluent/plugin/in_forward.rb, line 267
def on_message(msg, chunk_size, conn)
  if msg.nil?
    # for future TCP heartbeat_request
    return
  end

  # TODO: raise an exception if broken chunk is generated by recoverable situation
  unless msg.is_a?(Array)
    log.warn "incoming chunk is broken:", host: conn.remote_host, msg: msg
    return
  end

  tag = msg[0]
  entries = msg[1]

  if @chunk_size_limit && (chunk_size > @chunk_size_limit)
    log.warn "Input chunk size is larger than 'chunk_size_limit', dropped:", tag: tag, host: conn.remote_host, limit: @chunk_size_limit, size: chunk_size
    return
  elsif @chunk_size_warn_limit && (chunk_size > @chunk_size_warn_limit)
    log.warn "Input chunk size is larger than 'chunk_size_warn_limit':", tag: tag, host: conn.remote_host, limit: @chunk_size_warn_limit, size: chunk_size
  end

  case entries
  when String
    # PackedForward
    option = msg[2]
    size = (option && option['size']) || 0
    es_class = (option && option['compressed'] == 'gzip') ? Fluent::CompressedMessagePackEventStream : Fluent::MessagePackEventStream
    es = es_class.new(entries, nil, size.to_i)
    es = check_and_skip_invalid_event(tag, es, conn.remote_host) if @skip_invalid_event
    if @enable_field_injection
      es = add_source_info(es, conn)
    end
    router.emit_stream(tag, es)

  when Array
    # Forward
    es = if @skip_invalid_event
           check_and_skip_invalid_event(tag, entries, conn.remote_host)
         else
           es = Fluent::MultiEventStream.new
           entries.each { |e|
             record = e[1]
             next if record.nil?
             time = e[0]
             time = Fluent::Engine.now if time.nil? || time.to_i == 0 # `to_i == 0` for empty EventTime
             es.add(time, record)
           }
           es
         end
    if @enable_field_injection
      es = add_source_info(es, conn)
    end
    router.emit_stream(tag, es)
    option = msg[2]

  else
    # Message
    time = msg[1]
    record = msg[2]
    if @skip_invalid_event && invalid_event?(tag, time, record)
      log.warn "got invalid event and drop it:", host: conn.remote_host, tag: tag, time: time, record: record
      return msg[3] # retry never succeeded so return ack and drop incoming event.
    end
    return if record.nil?
    time = Fluent::Engine.now if time.to_i == 0
    if @enable_field_injection
      record[@source_address_key] = conn.remote_addr if @source_address_key
      record[@source_hostname_key] = conn.remote_host if @source_hostname_key
    end
    router.emit(tag, time, record)
    option = msg[3]
  end

  # return option for response
  option
end
read_messages(conn, &block) click to toggle source
# File lib/fluent/plugin/in_forward.rb, line 227
def read_messages(conn, &block)
  feeder = nil
  serializer = nil
  bytes = 0
  conn.data do |data|
    # only for first call of callback
    unless feeder
      first = data[0]
      if first == '{' || first == '[' # json
        parser = Yajl::Parser.new
        parser.on_parse_complete = ->(obj){
          block.call(obj, bytes, serializer)
          bytes = 0
        }
        serializer = :to_json.to_proc
        feeder = ->(d){ parser << d }
      else # msgpack
        parser = Fluent::Engine.msgpack_factory.unpacker
        serializer = :to_msgpack.to_proc
        feeder = ->(d){
          parser.feed_each(d){|obj|
            block.call(obj, bytes, serializer)
            bytes = 0
          }
        }
      end
    end

    bytes += data.bytesize
    feeder.call(data)
  end
end
response(option) click to toggle source
# File lib/fluent/plugin/in_forward.rb, line 260
def response(option)
  if option && option['chunk']
    return { 'ack' => option['chunk'] }
  end
  nil
end
select_authenticate_users(node, username) click to toggle source
# File lib/fluent/plugin/in_forward.rb, line 389
def select_authenticate_users(node, username)
  if node.nil? || node[:users].empty?
    @security.users.select{|u| u.username == username}
  else
    @security.users.select{|u| node[:users].include?(u.username) && u.username == username}
  end
end
start() click to toggle source
Calls superclass method Fluent::Compat::Input#start
# File lib/fluent/plugin/in_forward.rb, line 152
def start
  super

  shared_socket = system_config.workers > 1

  log.info "listening port", port: @port, bind: @bind
  server_create_connection(
    :in_forward_server, @port,
    bind: @bind,
    shared: shared_socket,
    resolve_name: @resolve_hostname,
    linger_timeout: @linger_timeout,
    backlog: @backlog,
    &method(:handle_connection)
  )

  server_create(:in_forward_server_udp_heartbeat, @port, shared: shared_socket, proto: :udp, bind: @bind, resolve_name: @resolve_hostname, max_bytes: 128) do |data, sock|
    log.trace "heartbeat udp data arrived", host: sock.remote_host, port: sock.remote_port, data: data
    begin
      sock.write HEARTBEAT_UDP_PAYLOAD
    rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR
      log.trace "error while heartbeat response", host: sock.remote_host, error: e
    end
  end
end