class Fluent::Plugin::ForwardOutput::Node
Constants
- RequestInfo
Attributes
available[R]
failure[R]
host[R]
name[R]
port[R]
sockaddr[R]
standby[R]
state[R]
usock[RW]
weight[R]
Public Class Methods
new(sender, server, failure:)
click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 541 def initialize(sender, server, failure:) @sender = sender @log = sender.log @compress = sender.compress @name = server.name @host = server.host @port = server.port @weight = server.weight @standby = server.standby @failure = failure @available = true # @hostname is used for certificate verification & TLS SNI host_is_hostname = !(IPAddr.new(@host) rescue false) @hostname = case when host_is_hostname then @host when @name then @name else nil end @usock = nil @username = server.username @password = server.password @shared_key = server.shared_key || (sender.security && sender.security.shared_key) || "" @shared_key_salt = generate_salt @unpacker = Fluent::Engine.msgpack_unpacker @resolved_host = nil @resolved_time = 0 @resolved_once = false end
Public Instance Methods
available?()
click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 588 def available? @available end
check_helo(ri, message)
click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 801 def check_helo(ri, message) @log.debug "checking helo" # ['HELO', options(hash)] unless message.size == 2 && message[0] == 'HELO' return false end opts = message[1] || {} # make shared_key_check failed (instead of error) if protocol version mismatch exist ri.shared_key_nonce = opts['nonce'] || '' ri.auth = opts['auth'] || '' true end
check_pong(ri, message)
click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 833 def check_pong(ri, message) @log.debug "checking pong" # ['PONG', bool(authentication result), 'reason if authentication failed', # self_hostname, sha512\_hex(salt + self_hostname + nonce + sharedkey)] unless message.size == 5 && message[0] == 'PONG' return false, 'invalid format for PONG message' end _pong, auth_result, reason, hostname, shared_key_hexdigest = message unless auth_result return false, 'authentication failed: ' + reason end if hostname == @sender.security.self_hostname return false, 'same hostname between input and output: invalid configuration' end clientside = Digest::SHA512.new.update(@shared_key_salt).update(hostname).update(ri.shared_key_nonce).update(@shared_key).hexdigest unless shared_key_hexdigest == clientside return false, 'shared key mismatch' end return true, nil end
disable!()
click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 592 def disable! @available = false end
establish_connection(sock, ri)
click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 613 def establish_connection(sock, ri) while available? && ri.state != :established begin # TODO: On Ruby 2.2 or earlier, read_nonblock doesn't work expectedly. # We need rewrite around here using new socket/server plugin helper. buf = sock.read_nonblock(@sender.read_length) if buf.empty? sleep @sender.read_interval next end @unpacker.feed_each(buf) do |data| on_read(sock, ri, data) end rescue IO::WaitReadable # If the exception is Errno::EWOULDBLOCK or Errno::EAGAIN, it is extended by IO::WaitReadable. # So IO::WaitReadable can be used to rescue the exceptions for retrying read_nonblock. # https//docs.ruby-lang.org/en/2.3.0/IO.html#method-i-read_nonblock sleep @sender.read_interval unless ri.state == :established rescue SystemCallError => e @log.warn "disconnected by error", host: @host, port: @port, error: e disable! break rescue EOFError @log.warn "disconnected", host: @host, port: @port disable! break end end end
generate_ping(ri)
click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 814 def generate_ping(ri) @log.debug "generating ping" # ['PING', self_hostname, sharedkey\_salt, sha512\_hex(sharedkey\_salt + self_hostname + nonce + shared_key), # username || '', sha512\_hex(auth\_salt + username + password) || ''] shared_key_hexdigest = Digest::SHA512.new.update(@shared_key_salt) .update(@sender.security.self_hostname) .update(ri.shared_key_nonce) .update(@shared_key) .hexdigest ping = ['PING', @sender.security.self_hostname, @shared_key_salt, shared_key_hexdigest] if !ri.auth.empty? password_hexdigest = Digest::SHA512.new.update(ri.auth).update(@username).update(@password).hexdigest ping.push(@username, password_hexdigest) else ping.push('','') end ping end
generate_salt()
click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 797 def generate_salt SecureRandom.hex(16) end
heartbeat(detect=true)
click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 785 def heartbeat(detect=true) now = Time.now.to_f @failure.add(now) if detect && !@available && @failure.sample_size > @sender.recover_sample_size @available = true @log.warn "recovered forwarding server '#{@name}'", host: @host, port: @port true else nil end end
on_read(sock, ri, data)
click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 858 def on_read(sock, ri, data) @log.trace __callee__ case ri.state when :helo unless check_helo(ri, data) @log.warn "received invalid helo message from #{@name}" disable! # shutdown return end sock.write(generate_ping(ri).to_msgpack) ri.state = :pingpong when :pingpong succeeded, reason = check_pong(ri, data) unless succeeded @log.warn "connection refused to #{@name || @host}: #{reason}" disable! # shutdown return end ri.state = :established @log.debug "connection established", host: @host, port: @port else raise "BUG: unknown session state: #{ri.state}" end end
resolved_host()
click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 726 def resolved_host case @sender.expire_dns_cache when 0 # cache is disabled resolve_dns! when nil # persistent cache @resolved_host ||= resolve_dns! else now = Fluent::Engine.now rh = @resolved_host if !rh || now - @resolved_time >= @sender.expire_dns_cache rh = @resolved_host = resolve_dns! @resolved_time = now end rh end end
send_data(tag, chunk)
click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 674 def send_data(tag, chunk) sock = @sender.create_transfer_socket(resolved_host, port, @hostname) begin send_data_actual(sock, tag, chunk) rescue sock.close rescue nil raise end if @sender.require_ack_response return sock # to read ACK from socket end sock.close_write rescue nil sock.close rescue nil heartbeat(false) nil end
send_data_actual(sock, tag, chunk)
click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 643 def send_data_actual(sock, tag, chunk) ri = RequestInfo.new(@sender.security ? :helo : :established) if ri.state != :established establish_connection(sock, ri) end unless available? raise ConnectionClosedError, "failed to establish connection with node #{@name}" end option = { 'size' => chunk.size, 'compressed' => @compress } option['chunk'] = Base64.encode64(chunk.unique_id) if @sender.require_ack_response # https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#packedforward-mode # out_forward always uses str32 type for entries. # str16 can store only 64kbytes, and it should be much smaller than buffer chunk size. tag = tag.dup.force_encoding(Encoding::UTF_8) sock.write @sender.forward_header # array, size=3 sock.write tag.to_msgpack # 1. tag: String (str) chunk.open(compressed: @compress) do |chunk_io| entries = [0xdb, chunk_io.size].pack('CN') sock.write entries.force_encoding(Encoding::UTF_8) # 2. entries: String (str32) IO.copy_stream(chunk_io, sock) # writeRawBody(packed_es) end sock.write option.to_msgpack # 3. option: Hash(map) # TODO: use bin32 for non-utf8 content(entries) when old msgpack-ruby (0.5.x or earlier) not supported end
send_heartbeat()
click to toggle source
FORWARD_TCP_HEARTBEAT_DATA = FORWARD_HEADER + ''.to_msgpack + [].to_msgpack
# File lib/fluent/plugin/out_forward.rb, line 694 def send_heartbeat begin dest_addr = resolved_host @resolved_once = true rescue ::SocketError => e if !@resolved_once && @sender.ignore_network_errors_at_startup @log.warn "failed to resolve node name in heartbeating", server: @name || @host, error: e return end raise end case @sender.heartbeat_type when :transport @sender.create_transfer_socket(dest_addr, port, @hostname) do |sock| ## don't send any data to not cause a compatibility problem # sock.write FORWARD_TCP_HEARTBEAT_DATA # successful tcp connection establishment is considered as valid heartbeat. # When heartbeat is succeeded after detached, return true. It rebuilds weight array. heartbeat(true) end when :udp @usock.send "\0", 0, Socket.pack_sockaddr_in(@port, resolved_host) nil when :none # :none doesn't use this class raise "BUG: heartbeat_type none must not use Node" else raise "BUG: unknown heartbeat_type '#{@sender.heartbeat_type}'" end end
standby?()
click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 596 def standby? @standby end
tick()
click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 755 def tick now = Time.now.to_f if !@available if @failure.hard_timeout?(now) @failure.clear end return nil end if @failure.hard_timeout?(now) @log.warn "detached forwarding server '#{@name}'", host: @host, port: @port, hard_timeout: true @available = false @resolved_host = nil # expire cached host @failure.clear return true end if @sender.phi_failure_detector phi = @failure.phi(now) if phi > @sender.phi_threshold @log.warn "detached forwarding server '#{@name}'", host: @host, port: @port, phi: phi, phi_threshold: @sender.phi_threshold @available = false @resolved_host = nil # expire cached host @failure.clear return true end end false end
validate_host_resolution!()
click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 584 def validate_host_resolution! resolved_host end
verify_connection()
click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 600 def verify_connection sock = @sender.create_transfer_socket(resolved_host, port, @hostname) begin ri = RequestInfo.new(@sender.security ? :helo : :established) if ri.state != :established establish_connection(sock, ri) raise if ri.state != :established end ensure sock.close end end
Private Instance Methods
resolve_dns!()
click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 747 def resolve_dns! addrinfo_list = Socket.getaddrinfo(@host, @port, nil, Socket::SOCK_STREAM) addrinfo = @sender.dns_round_robin ? addrinfo_list.sample : addrinfo_list.first @sockaddr = Socket.pack_sockaddr_in(addrinfo[1], addrinfo[3]) # used by on_heartbeat addrinfo[3] end