module Fluent::PluginHelper::Server
Constants
- CONNECTION_PROTOCOLS
- PEERADDR_FAILED
Use string “?” for port, not integer or nil. “?” is clear than -1 or nil in the log.
- PROTOCOLS
- SERVER_TRANSPORT_PARAMS
- ServerInfo
Attributes
_servers[R]
stop : [-] shutdown : detach server event handler from event loop (event_loop) close : close listening sockets terminate: remote all server instances
Public Class Methods
included(mod)
click to toggle source
# File lib/fluent/plugin_helper/server.rb, line 300 def self.included(mod) mod.include ServerTransportParams end
new()
click to toggle source
Calls superclass method
Fluent::PluginHelper::EventLoop.new
# File lib/fluent/plugin_helper/server.rb, line 304 def initialize super @_servers = [] @_server_connections = [] @_server_mutex = Mutex.new end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin_helper/server.rb, line 311 def configure(conf) super if @transport_config if @transport_config.protocol == :tls cert_option_server_validate!(@transport_config) end end end
server_attach(title, proto, port, bind, shared, server)
click to toggle source
# File lib/fluent/plugin_helper/server.rb, line 206 def server_attach(title, proto, port, bind, shared, server) @_servers << ServerInfo.new(title, proto, port, bind, shared, server) event_loop_attach(server) end
server_create(title, port, proto: nil, bind: '0.0.0.0', shared: true, socket: nil, backlog: nil, tls_options: nil, max_bytes: nil, flags: 0, **socket_options, &callback)
click to toggle source
#server_create(:title, @port) do |data|
# ...
end #server_create(:title, @port) do |data, conn|
# ...
end #server_create(:title, @port, proto: :udp, max_bytes: 2048) do |data, sock|
sock.remote_host sock.remote_port # ...
end
# File lib/fluent/plugin_helper/server.rb, line 122 def server_create(title, port, proto: nil, bind: '0.0.0.0', shared: true, socket: nil, backlog: nil, tls_options: nil, max_bytes: nil, flags: 0, **socket_options, &callback) proto ||= (@transport_config && @transport_config.protocol == :tls) ? :tls : :tcp raise ArgumentError, "BUG: title must be a symbol" unless title && title.is_a?(Symbol) raise ArgumentError, "BUG: port must be an integer" unless port && port.is_a?(Integer) raise ArgumentError, "BUG: invalid protocol name" unless PROTOCOLS.include?(proto) raise ArgumentError, "BUG: socket option is available only for udp" if socket && proto != :udp raise ArgumentError, "BUG: tls_options is available only for tls" if tls_options && proto != :tls raise ArgumentError, "BUG: block not specified which handles received data" unless block_given? raise ArgumentError, "BUG: block must have 1 or 2 arguments" unless callback.arity == 1 || callback.arity == 2 if proto == :tcp || proto == :tls # default linger_timeout only for server socket_options[:linger_timeout] ||= 0 end unless socket socket_option_validate!(proto, **socket_options) socket_option_setter = ->(sock){ socket_option_set(sock, **socket_options) } end if proto != :tcp && proto != :tls && proto != :unix # options to listen/accept connections raise ArgumentError, "BUG: backlog is available for tcp/tls" if backlog end if proto != :udp # UDP options raise ArgumentError, "BUG: max_bytes is available only for udp" if max_bytes raise ArgumentError, "BUG: flags is available only for udp" if flags != 0 end case proto when :tcp server = server_create_for_tcp_connection(shared, bind, port, backlog, socket_option_setter) do |conn| conn.data(&callback) end when :tls transport_config = if tls_options server_create_transport_section_object(tls_options) elsif @transport_config && @transport_config.protocol == :tls @transport_config else raise ArgumentError, "BUG: TLS transport specified, but certification options are not specified" end server = server_create_for_tls_connection(shared, bind, port, transport_config, backlog, socket_option_setter) do |conn| conn.data(&callback) end when :udp raise ArgumentError, "BUG: max_bytes must be specified for UDP" unless max_bytes if socket sock = socket close_socket = false else sock = server_create_udp_socket(shared, bind, port) socket_option_setter.call(sock) close_socket = true end server = EventHandler::UDPServer.new(sock, max_bytes, flags, close_socket, @log, @under_plugin_development, &callback) when :unix raise "not implemented yet" else raise "BUG: unknown protocol #{proto}" end server_attach(title, proto, port, bind, shared, server) end
server_create_connection(title, port, proto: nil, bind: '0.0.0.0', shared: true, backlog: nil, tls_options: nil, **socket_options, &block)
click to toggle source
#server_create_connection(:title, @port) do |conn|
# on connection source_addr = conn.remote_host source_port = conn.remote_port conn.data do |data| # on data conn.write resp # ... conn.close end
end
# File lib/fluent/plugin_helper/server.rb, line 70 def server_create_connection(title, port, proto: nil, bind: '0.0.0.0', shared: true, backlog: nil, tls_options: nil, **socket_options, &block) proto ||= (@transport_config && @transport_config.protocol == :tls) ? :tls : :tcp raise ArgumentError, "BUG: title must be a symbol" unless title && title.is_a?(Symbol) raise ArgumentError, "BUG: port must be an integer" unless port && port.is_a?(Integer) raise ArgumentError, "BUG: invalid protocol name" unless PROTOCOLS.include?(proto) raise ArgumentError, "BUG: cannot create connection for UDP" unless CONNECTION_PROTOCOLS.include?(proto) raise ArgumentError, "BUG: tls_options is available only for tls" if tls_options && proto != :tls raise ArgumentError, "BUG: block not specified which handles connection" unless block_given? raise ArgumentError, "BUG: block must have just one argument" unless block.arity == 1 if proto == :tcp || proto == :tls # default linger_timeout only for server socket_options[:linger_timeout] ||= 0 end socket_option_validate!(proto, **socket_options) socket_option_setter = ->(sock){ socket_option_set(sock, **socket_options) } case proto when :tcp server = server_create_for_tcp_connection(shared, bind, port, backlog, socket_option_setter, &block) when :tls transport_config = if tls_options server_create_transport_section_object(tls_options) elsif @transport_config && @transport_config.protocol == :tls @transport_config else raise ArgumentError, "BUG: TLS transport specified, but certification options are not specified" end server = server_create_for_tls_connection(shared, bind, port, transport_config, backlog, socket_option_setter, &block) when :unix raise "not implemented yet" else raise "unknown protocol #{proto}" end server_attach(title, proto, port, bind, shared, server) end
server_create_for_tcp_connection(shared, bind, port, backlog, socket_option_setter, &block)
click to toggle source
# File lib/fluent/plugin_helper/server.rb, line 211 def server_create_for_tcp_connection(shared, bind, port, backlog, socket_option_setter, &block) sock = server_create_tcp_socket(shared, bind, port) socket_option_setter.call(sock) close_callback = ->(conn){ @_server_mutex.synchronize{ @_server_connections.delete(conn) } } server = Coolio::TCPServer.new(sock, nil, EventHandler::TCPServer, socket_option_setter, close_callback, @log, @under_plugin_development, block) do |conn| unless conn.closing @_server_mutex.synchronize do @_server_connections << conn end end end server.listen(backlog) if backlog server end
server_create_for_tls_connection(shared, bind, port, conf, backlog, socket_option_setter, &block)
click to toggle source
# File lib/fluent/plugin_helper/server.rb, line 226 def server_create_for_tls_connection(shared, bind, port, conf, backlog, socket_option_setter, &block) context = cert_option_create_context(conf.version, conf.insecure, conf.ciphers, conf) sock = server_create_tcp_socket(shared, bind, port) socket_option_setter.call(sock) close_callback = ->(conn){ @_server_mutex.synchronize{ @_server_connections.delete(conn) } } server = Coolio::TCPServer.new(sock, nil, EventHandler::TLSServer, context, socket_option_setter, close_callback, @log, @under_plugin_development, block) do |conn| unless conn.closing @_server_mutex.synchronize do @_server_connections << conn end end end server.listen(backlog) if backlog server end
server_create_tcp(title, port, **kwargs, &callback)
click to toggle source
# File lib/fluent/plugin_helper/server.rb, line 188 def server_create_tcp(title, port, **kwargs, &callback) server_create(title, port, proto: :tcp, **kwargs, &callback) end
server_create_tcp_socket(shared, bind, port)
click to toggle source
# File lib/fluent/plugin_helper/server.rb, line 354 def server_create_tcp_socket(shared, bind, port) sock = if shared server_socket_manager_client.listen_tcp(bind, port) else TCPServer.new(bind, port) # this method call can create sockets for AF_INET6 end # close-on-exec is set by default in Ruby 2.0 or later (, and it's unavailable on Windows) sock.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) # nonblock sock end
server_create_tls(title, port, **kwargs, &callback)
click to toggle source
# File lib/fluent/plugin_helper/server.rb, line 196 def server_create_tls(title, port, **kwargs, &callback) server_create(title, port, proto: :tls, **kwargs, &callback) end
server_create_transport_section_object(opts)
click to toggle source
# File lib/fluent/plugin_helper/server.rb, line 252 def server_create_transport_section_object(opts) transport_section = configured_section_create(:transport) SERVER_TRANSPORT_PARAMS.each do |param| if opts.has_key?(param) transport_section[param] = opts[param] end end transport_section end
server_create_udp(title, port, **kwargs, &callback)
click to toggle source
# File lib/fluent/plugin_helper/server.rb, line 192 def server_create_udp(title, port, **kwargs, &callback) server_create(title, port, proto: :udp, **kwargs, &callback) end
server_create_udp_socket(shared, bind, port)
click to toggle source
# File lib/fluent/plugin_helper/server.rb, line 365 def server_create_udp_socket(shared, bind, port) sock = if shared server_socket_manager_client.listen_udp(bind, port) else family = IPAddr.new(IPSocket.getaddress(bind)).ipv4? ? ::Socket::AF_INET : ::Socket::AF_INET6 usock = UDPSocket.new(family) usock.bind(bind, port) usock end # close-on-exec is set by default in Ruby 2.0 or later (, and it's unavailable on Windows) sock.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) # nonblock sock end
server_create_unix(title, port, **kwargs, &callback)
click to toggle source
# File lib/fluent/plugin_helper/server.rb, line 200 def server_create_unix(title, port, **kwargs, &callback) server_create(title, port, proto: :unix, **kwargs, &callback) end
server_socket_manager_client()
click to toggle source
# File lib/fluent/plugin_helper/server.rb, line 346 def server_socket_manager_client socket_manager_path = ENV['SERVERENGINE_SOCKETMANAGER_PATH'] if Fluent.windows? socket_manager_path = socket_manager_path.to_i end ServerEngine::SocketManager::Client.new(socket_manager_path) end
server_wait_until_start()
click to toggle source
# File lib/fluent/plugin_helper/server.rb, line 48 def server_wait_until_start # event_loop_wait_until_start works well for this end
server_wait_until_stop()
click to toggle source
# File lib/fluent/plugin_helper/server.rb, line 52 def server_wait_until_stop sleep 0.1 while @_servers.any?{|si| si.server.attached? } @_servers.each{|si| si.server.close rescue nil } end
shutdown()
click to toggle source
Calls superclass method
Fluent::PluginHelper::EventLoop#shutdown
# File lib/fluent/plugin_helper/server.rb, line 333 def shutdown @_server_connections.each do |conn| conn.close rescue nil end super end
stop()
click to toggle source
Calls superclass method
# File lib/fluent/plugin_helper/server.rb, line 321 def stop @_server_mutex.synchronize do @_servers.each do |si| si.server.detach if si.server.attached? # to refuse more connections: (connected sockets are still alive here) si.server.close rescue nil end end super end
terminate()
click to toggle source
Calls superclass method
Fluent::PluginHelper::EventLoop#terminate
# File lib/fluent/plugin_helper/server.rb, line 341 def terminate @_servers = [] super end