class Fluent::Plugin::ElasticsearchOutputDataStream

Constants

INVALID_CHARACTERS
INVALID_START_CHRACTERS

Public Instance Methods

append_record_to_messages(op, meta, header, record, msgs) click to toggle source
# File lib/fluent/plugin/out_elasticsearch_data_stream.rb, line 268
def append_record_to_messages(op, meta, header, record, msgs)
  header[CREATE_OP] = meta
  msgs << @dump_proc.call(header) << BODY_DELIMITER
  msgs << @dump_proc.call(record) << BODY_DELIMITER
  msgs
end
client_library_version() click to toggle source
# File lib/fluent/plugin/out_elasticsearch_data_stream.rb, line 208
def client_library_version
  Elasticsearch::VERSION
end
configure(conf) click to toggle source
# File lib/fluent/plugin/out_elasticsearch_data_stream.rb, line 23
def configure(conf)
  super

  if Gem::Version.new(TRANSPORT_CLASS::VERSION) < Gem::Version.new("8.0.0")
    begin
      require 'elasticsearch/api'
      require 'elasticsearch/xpack'
    rescue LoadError
      raise Fluent::ConfigError, "'elasticsearch/api', 'elasticsearch/xpack' are required for <@elasticsearch_data_stream>."
    end
  else
    begin
      require 'elasticsearch/api'
    rescue LoadError
      raise Fluent::ConfigError, "'elasticsearch/api is required for <@elasticsearch_data_stream>."
    end
  end

  @data_stream_ilm_name = "#{@data_stream_name}_policy" if @data_stream_ilm_name.nil?
  @data_stream_template_name = "#{@data_stream_name}_template" if @data_stream_template_name.nil?
  @data_stream_ilm_policy = File.read(File.join(File.dirname(__FILE__), "default-ilm-policy.json")) if @data_stream_ilm_policy.nil?

  # ref. https://www.elastic.co/guide/en/elasticsearch/reference/master/indices-create-data-stream.html
  unless placeholder?(:data_stream_name_placeholder, @data_stream_name)
    validate_data_stream_parameters
  else
    @use_placeholder = true
    @data_stream_names = []
  end

  unless @use_placeholder
    begin
      @data_stream_names = [@data_stream_name]
      retry_operate(@max_retry_putting_template,
                    @fail_on_putting_template_retry_exceed,
                    @catch_transport_exception_on_retry) do
        create_ilm_policy(@data_stream_name, @data_stream_template_name, @data_stream_ilm_name)
        create_index_template(@data_stream_name, @data_stream_template_name, @data_stream_ilm_name)
        create_data_stream(@data_stream_name)
      end
    rescue => e
      raise Fluent::ConfigError, "Failed to create data stream: <#{@data_stream_name}> #{e.message}"
    end
  end
end
create_data_stream(datastream_name, host = nil) click to toggle source
# File lib/fluent/plugin/out_elasticsearch_data_stream.rb, line 148
def create_data_stream(datastream_name, host = nil)
  return if data_stream_exist?(datastream_name, host)
  params = {
    name: datastream_name
  }
  retry_operate(@max_retry_putting_template,
                @fail_on_putting_template_retry_exceed,
                @catch_transport_exception_on_retry) do
    client(host).indices.create_data_stream(params)
  end
end
create_ilm_policy(datastream_name, template_name, ilm_name, host = nil) click to toggle source
# File lib/fluent/plugin/out_elasticsearch_data_stream.rb, line 94
def create_ilm_policy(datastream_name, template_name, ilm_name, host = nil)
  unless @data_stream_ilm_policy_overwrite
    return if data_stream_exist?(datastream_name, host) or template_exists?(template_name, host) or ilm_policy_exists?(ilm_name, host)
  end

  params = {
    body: @data_stream_ilm_policy
  }
  retry_operate(@max_retry_putting_template,
                @fail_on_putting_template_retry_exceed,
                @catch_transport_exception_on_retry) do
    if Gem::Version.new(TRANSPORT_CLASS::VERSION) >= Gem::Version.new("8.0.0")
      client(host).enrich.put_policy(params.merge(name: ilm_name))
    else
      client(host).xpack.ilm.put_policy(params.merge(policy_id: ilm_name))
    end
  end
end
create_index_template(datastream_name, template_name, ilm_name, host = nil) click to toggle source
# File lib/fluent/plugin/out_elasticsearch_data_stream.rb, line 113
def create_index_template(datastream_name, template_name, ilm_name, host = nil)
  return if data_stream_exist?(datastream_name, host) or template_exists?(template_name, host)
  body = {
    "index_patterns" => ["#{datastream_name}*"],
    "data_stream" => {},
    "template" => {
      "settings" => {
        "index.lifecycle.name" => "#{ilm_name}"
      }
    }
  }
  params = {
    name: template_name,
    body: body
  }
  retry_operate(@max_retry_putting_template,
                @fail_on_putting_template_retry_exceed,
                @catch_transport_exception_on_retry) do
    client(host).indices.put_index_template(params)
  end
end
data_stream_exist?(datastream_name, host = nil) click to toggle source
# File lib/fluent/plugin/out_elasticsearch_data_stream.rb, line 135
def data_stream_exist?(datastream_name, host = nil)
  params = {
    name: datastream_name
  }
  begin
    response = client(host).indices.get_data_stream(params)
    return (not response.is_a?(TRANSPORT_CLASS::Transport::Errors::NotFound))
  rescue TRANSPORT_CLASS::Transport::Errors::NotFound => e
    log.info "Specified data stream does not exist. Will be created: <#{e}>"
    return false
  end
end
ilm_policy_exists?(policy_id, host = nil) click to toggle source
# File lib/fluent/plugin/out_elasticsearch_data_stream.rb, line 160
def ilm_policy_exists?(policy_id, host = nil)
  begin
    if Gem::Version.new(TRANSPORT_CLASS::VERSION) >= Gem::Version.new("8.0.0")
      client(host).enrich.get_policy(name: policy_id)
    else
      client(host).ilm.get_policy(policy_id: policy_id)
    end
    true
  rescue
    false
  end
end
lowercase_only?(data_stream_parameter) click to toggle source
# File lib/fluent/plugin/out_elasticsearch_data_stream.rb, line 192
def lowercase_only?(data_stream_parameter)
  data_stream_parameter.downcase == data_stream_parameter
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_elasticsearch_data_stream.rb, line 212
def multi_workers_ready?
  true
end
not_dots?(data_stream_parameter) click to toggle source
# File lib/fluent/plugin/out_elasticsearch_data_stream.rb, line 204
def not_dots?(data_stream_parameter)
  not (data_stream_parameter == "." or data_stream_parameter == "..")
end
retry_stream_retryable?() click to toggle source
# File lib/fluent/plugin/out_elasticsearch_data_stream.rb, line 275
def retry_stream_retryable?
  @buffer.storable?
end
start_with_valid_characters?(data_stream_parameter) click to toggle source
# File lib/fluent/plugin/out_elasticsearch_data_stream.rb, line 200
def start_with_valid_characters?(data_stream_parameter)
  not (INVALID_START_CHRACTERS.each.any? do |v| data_stream_parameter.start_with?(v) end)
end
template_exists?(name, host = nil) click to toggle source
# File lib/fluent/plugin/out_elasticsearch_data_stream.rb, line 173
def template_exists?(name, host = nil)
  if @use_legacy_template
    client(host).indices.get_template(:name => name)
  else
    client(host).indices.get_index_template(:name => name)
  end
  return true
rescue TRANSPORT_CLASS::Transport::Errors::NotFound
  return false
end
valid_characters?(data_stream_parameter) click to toggle source
# File lib/fluent/plugin/out_elasticsearch_data_stream.rb, line 196
def valid_characters?(data_stream_parameter)
  not (INVALID_CHARACTERS.each.any? do |v| data_stream_parameter.include?(v) end)
end
valid_data_stream_parameters?(data_stream_parameter) click to toggle source
# File lib/fluent/plugin/out_elasticsearch_data_stream.rb, line 184
def valid_data_stream_parameters?(data_stream_parameter)
  lowercase_only?(data_stream_parameter) and
    valid_characters?(data_stream_parameter) and
    start_with_valid_characters?(data_stream_parameter) and
    not_dots?(data_stream_parameter) and
    data_stream_parameter.bytes.size <= 255
end
validate_data_stream_parameters() click to toggle source
# File lib/fluent/plugin/out_elasticsearch_data_stream.rb, line 69
def validate_data_stream_parameters
  {"data_stream_name" => @data_stream_name,
   "data_stream_template_name"=> @data_stream_template_name,
   "data_stream_ilm_name" => @data_stream_ilm_name}.each do |parameter, value|
    unless valid_data_stream_parameters?(value)
      unless start_with_valid_characters?(value)
        if not_dots?(value)
          raise Fluent::ConfigError, "'#{parameter}' must not start with #{INVALID_START_CHRACTERS.join(",")}: <#{value}>"
        else
          raise Fluent::ConfigError, "'#{parameter}' must not be . or ..: <#{value}>"
        end
      end
      unless valid_characters?(value)
        raise Fluent::ConfigError, "'#{parameter}' must not contain invalid characters #{INVALID_CHARACTERS.join(",")}: <#{value}>"
      end
      unless lowercase_only?(value)
        raise Fluent::ConfigError, "'#{parameter}' must be lowercase only: <#{value}>"
      end
      if value.bytes.size > 255
        raise Fluent::ConfigError, "'#{parameter}' must not be longer than 255 bytes: <#{value}>"
      end
    end
  end
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_elasticsearch_data_stream.rb, line 216
def write(chunk)
  data_stream_name = @data_stream_name
  data_stream_template_name = @data_stream_template_name
  data_stream_ilm_name = @data_stream_ilm_name
  host = nil
  if @use_placeholder
    host = extract_placeholders(@host, chunk)
    data_stream_name = extract_placeholders(@data_stream_name, chunk)
    data_stream_template_name = extract_placeholders(@data_stream_template_name, chunk)
    data_stream_ilm_name = extract_placeholders(@data_stream_ilm_name, chunk)
    unless @data_stream_names.include?(data_stream_name)
      begin
        create_ilm_policy(data_stream_name, data_stream_template_name, data_stream_ilm_name, host)
        create_index_template(data_stream_name, data_stream_template_name, data_stream_ilm_name, host)
        create_data_stream(data_stream_name)
        @data_stream_names << data_stream_name
      rescue => e
        raise Fluent::ConfigError, "Failed to create data stream: <#{data_stream_name}> #{e.message}"
      end
    end
  end

  bulk_message = ""
  headers = {
    CREATE_OP => {}
  }
  tag = chunk.metadata.tag
  chunk.msgpack_each do |time, record|
    next unless record.is_a? Hash

    begin
      record.merge!({"@timestamp" => Time.at(time).iso8601(@time_precision)})
      bulk_message = append_record_to_messages(CREATE_OP, {}, headers, record, bulk_message)
    rescue => e
      router.emit_error_event(tag, time, record, e)
    end
  end

  params = {
    index: data_stream_name,
    body: bulk_message
  }
  begin
    response = client(host).bulk(params)
    if response['errors']
      log.error "Could not bulk insert to Data Stream: #{data_stream_name} #{response}"
    end
  rescue => e
    raise RecoverableRequestFailure, "could not push logs to Elasticsearch cluster (#{data_stream_name}): #{e.message}"
  end
end