class Fluent::Plugin::HttpInput::Handler
Constants
- RES_200_STATUS
- RES_403_STATUS
Attributes
content_type[R]
Public Class Methods
new(io, km, callback, body_size_limit, format_name, log, cors_allow_origins, cors_allow_credentials, add_query_params)
click to toggle source
# File lib/fluent/plugin/in_http.rb, line 378 def initialize(io, km, callback, body_size_limit, format_name, log, cors_allow_origins, cors_allow_credentials, add_query_params) @io = io @km = km @callback = callback @body_size_limit = body_size_limit @next_close = false @format_name = format_name @log = log @cors_allow_origins = cors_allow_origins @cors_allow_credentials = cors_allow_credentials @idle = 0 @add_query_params = add_query_params @km.add(self) @remote_port, @remote_addr = io.remote_port, io.remote_addr @parser = Http::Parser.new(self) end
Public Instance Methods
close()
click to toggle source
# File lib/fluent/plugin/in_http.rb, line 619 def close @io.close end
closing?()
click to toggle source
# File lib/fluent/plugin/in_http.rb, line 632 def closing? @next_close end
handle_get_request()
click to toggle source
Azure App Service sends GET requests for health checking purpose. Respond with ‘200 OK` to accommodate it.
# File lib/fluent/plugin/in_http.rb, line 487 def handle_get_request return send_response_and_close(RES_200_STATUS, {}, "") end
handle_options_request()
click to toggle source
Web browsers can send an OPTIONS request before performing POST to check if cross-origin requests are supported.
# File lib/fluent/plugin/in_http.rb, line 493 def handle_options_request # Is CORS enabled in the first place? if @cors_allow_origins.nil? return send_response_and_close(RES_403_STATUS, {}, "") end # in_http does not support HTTP methods except POST if @access_control_request_method != 'POST' return send_response_and_close(RES_403_STATUS, {}, "") end header = { "Access-Control-Allow-Methods" => "POST", "Access-Control-Allow-Headers" => @access_control_request_headers || "", } # Check the origin and send back a CORS response if @cors_allow_origins.include?('*') header["Access-Control-Allow-Origin"] = "*" send_response_and_close(RES_200_STATUS, header, "") elsif include_cors_allow_origin header["Access-Control-Allow-Origin"] = @origin if @cors_allow_credentials header["Access-Control-Allow-Credentials"] = true end send_response_and_close(RES_200_STATUS, header, "") else send_response_and_close(RES_403_STATUS, {}, "") end end
include_cors_allow_origin()
click to toggle source
# File lib/fluent/plugin/in_http.rb, line 659 def include_cors_allow_origin if @origin.nil? return false end if @cors_allow_origins.include?(@origin) return true end filtered_cors_allow_origins = @cors_allow_origins.select {|origin| origin != ""} r = filtered_cors_allow_origins.find do |origin| (start_str, end_str) = origin.split("*", 2) @origin.start_with?(start_str) && @origin.end_with?(end_str) end !r.nil? end
on_body(chunk)
click to toggle source
# File lib/fluent/plugin/in_http.rb, line 472 def on_body(chunk) if @body.bytesize + chunk.bytesize > @body_size_limit unless closing? send_response_and_close("413 Request Entity Too Large", {}, "Too large") end return end @body << chunk end
on_close()
click to toggle source
# File lib/fluent/plugin/in_http.rb, line 401 def on_close @km.delete(self) end
on_headers_complete(headers)
click to toggle source
# File lib/fluent/plugin/in_http.rb, line 418 def on_headers_complete(headers) expect = nil size = nil if @parser.http_version == [1, 1] @keep_alive = true else @keep_alive = false end @env = {} @content_type = "" @content_encoding = "" headers.each_pair {|k,v| @env["HTTP_#{k.tr('-','_').upcase}"] = v case k when /\AExpect\z/i expect = v when /\AContent-Length\Z/i size = v.to_i when /\AContent-Type\Z/i @content_type = v when /\AContent-Encoding\Z/i @content_encoding = v when /\AConnection\Z/i if /close/i.match?(v) @keep_alive = false elsif /Keep-alive/i.match?(v) @keep_alive = true end when /\AOrigin\Z/i @origin = v when /\AX-Forwarded-For\Z/i # For multiple X-Forwarded-For headers. Use first header value. v = v.first if v.is_a?(Array) @remote_addr = v.split(",").first when /\AAccess-Control-Request-Method\Z/i @access_control_request_method = v when /\AAccess-Control-Request-Headers\Z/i @access_control_request_headers = v end } if expect if expect == '100-continue'.freeze if !size || size < @body_size_limit send_response_nobody("100 Continue", {}) else send_response_and_close("413 Request Entity Too Large", {}, "Too large") end else send_response_and_close("417 Expectation Failed", {}, "") end end end
on_message_begin()
click to toggle source
# File lib/fluent/plugin/in_http.rb, line 414 def on_message_begin @body = '' end
on_message_complete()
click to toggle source
# File lib/fluent/plugin/in_http.rb, line 524 def on_message_complete return if closing? if @parser.http_method == 'GET'.freeze return handle_get_request() end if @parser.http_method == 'OPTIONS'.freeze return handle_options_request() end # CORS check # ========== # For every incoming request, we check if we have some CORS # restrictions and allow listed origins through @cors_allow_origins. unless @cors_allow_origins.nil? unless @cors_allow_origins.include?('*') || include_cors_allow_origin send_response_and_close(RES_403_STATUS, {'Connection' => 'close'}, "") return end end # Content Encoding # ================= # Decode payload according to the "Content-Encoding" header. # For now, we only support 'gzip' and 'deflate'. begin if @content_encoding == 'gzip'.freeze @body = Zlib::GzipReader.new(StringIO.new(@body)).read elsif @content_encoding == 'deflate'.freeze @body = Zlib::Inflate.inflate(@body) end rescue @log.warn 'fails to decode payload', error: $!.to_s send_response_and_close(RES_400_STATUS, {}, "") return end @env['REMOTE_ADDR'] = @remote_addr if @remote_addr uri = URI.parse(@parser.request_url) params = WEBrick::HTTPUtils.parse_query(uri.query) if @format_name != 'default' params[EVENT_RECORD_PARAMETER] = @body elsif /^application\/x-www-form-urlencoded/.match?(@content_type) params.update WEBrick::HTTPUtils.parse_query(@body) elsif @content_type =~ /^multipart\/form-data; boundary=(.+)/ boundary = WEBrick::HTTPUtils.dequote($1) params.update WEBrick::HTTPUtils.parse_form_data(@body, boundary) elsif /^application\/json/.match?(@content_type) params['json'] = @body elsif /^application\/msgpack/.match?(@content_type) params['msgpack'] = @body elsif /^application\/x-ndjson/.match?(@content_type) params['ndjson'] = @body end path_info = uri.path if (@add_query_params) query_params = WEBrick::HTTPUtils.parse_query(uri.query) query_params.each_pair {|k,v| params["QUERY_#{k.tr('-','_').upcase}"] = v } end params.merge!(@env) @env.clear code, header, body = @callback.call(path_info, params) body = body.to_s header = header.dup if header.frozen? unless @cors_allow_origins.nil? if @cors_allow_origins.include?('*') header['Access-Control-Allow-Origin'] = '*' elsif include_cors_allow_origin header['Access-Control-Allow-Origin'] = @origin if @cors_allow_credentials header["Access-Control-Allow-Credentials"] = true end end end if @keep_alive header['Connection'] = 'Keep-Alive'.freeze send_response(code, header, body) else send_response_and_close(code, header, body) end end
on_read(data)
click to toggle source
# File lib/fluent/plugin/in_http.rb, line 405 def on_read(data) @idle = 0 @parser << data rescue @log.warn "unexpected error", error: $!.to_s @log.warn_backtrace @io.close end
on_write_complete()
click to toggle source
# File lib/fluent/plugin/in_http.rb, line 623 def on_write_complete @io.close if @next_close end
send_response(code, header, body)
click to toggle source
# File lib/fluent/plugin/in_http.rb, line 636 def send_response(code, header, body) header['Content-Length'] ||= body.bytesize header['Content-Type'] ||= 'text/plain'.freeze data = %[HTTP/1.1 #{code}\r\n] header.each_pair {|k,v| data << "#{k}: #{v}\r\n" } data << "\r\n".freeze @io.write(data) @io.write(body) end
send_response_and_close(code, header, body)
click to toggle source
# File lib/fluent/plugin/in_http.rb, line 627 def send_response_and_close(code, header, body) send_response(code, header, body) @next_close = true end
send_response_nobody(code, header)
click to toggle source
# File lib/fluent/plugin/in_http.rb, line 650 def send_response_nobody(code, header) data = %[HTTP/1.1 #{code}\r\n] header.each_pair {|k,v| data << "#{k}: #{v}\r\n" } data << "\r\n".freeze @io.write(data) end
step_idle()
click to toggle source
# File lib/fluent/plugin/in_http.rb, line 397 def step_idle @idle += 1 end