class Fluent::Plugin::Buffer::FileChunk

Constants

BUFFER_HEADER

Attributes

path[R]

state: b/q - ‘b’(on stage, compatible with v0.12), ‘q’(enqueued) path_prefix: path prefix string, ended with ‘.’ path_suffix: path suffix string, like ‘.log’ (or any other user specified)

permission[R]

state: b/q - ‘b’(on stage, compatible with v0.12), ‘q’(enqueued) path_prefix: path prefix string, ended with ‘.’ path_suffix: path suffix string, like ‘.log’ (or any other user specified)

Public Class Methods

assume_chunk_state(path) click to toggle source
# File lib/fluent/plugin/buffer/file_chunk.rb, line 176
def self.assume_chunk_state(path)
  if /\.(b|q)([0-9a-f]+)\.[^\/]*\Z/n =~ path # //n switch means explicit 'ASCII-8BIT' pattern
    $1 == 'b' ? :staged : :queued
  else
    # files which matches to glob of buffer file pattern
    # it includes files which are created by out_file
    :unknown
  end
end
generate_queued_chunk_path(path, unique_id) click to toggle source
# File lib/fluent/plugin/buffer/file_chunk.rb, line 198
def self.generate_queued_chunk_path(path, unique_id)
  chunk_id = Fluent::UniqueId.hex(unique_id)
  if path.index(".b#{chunk_id}.")
    path.sub(".b#{chunk_id}.", ".q#{chunk_id}.")
  else # for unexpected cases (ex: users rename files while opened by fluentd)
    path + ".q#{chunk_id}.chunk"
  end
end
generate_stage_chunk_path(path, unique_id) click to toggle source
# File lib/fluent/plugin/buffer/file_chunk.rb, line 186
def self.generate_stage_chunk_path(path, unique_id)
  pos = path.index('.*.')
  raise "BUG: buffer chunk path on stage MUST have '.*.'" unless pos

  prefix = path[0...pos]
  suffix = path[(pos+3)..-1]

  chunk_id = Fluent::UniqueId.hex(unique_id)
  state = 'b'
  "#{prefix}.#{state}#{chunk_id}.#{suffix}"
end
new(metadata, path, mode, perm: nil, compress: :text) click to toggle source
Calls superclass method Fluent::Plugin::Buffer::Chunk::new
# File lib/fluent/plugin/buffer/file_chunk.rb, line 42
def initialize(metadata, path, mode, perm: nil, compress: :text)
  super(metadata, compress: compress)
  perm ||= Fluent::DEFAULT_FILE_PERMISSION
  @permission = perm.is_a?(String) ? perm.to_i(8) : perm
  @bytesize = @size = @adding_bytes = @adding_size = 0
  @meta = nil

  case mode
  when :create then create_new_chunk(path, @permission)
  when :staged then load_existing_staged_chunk(path)
  when :queued then load_existing_enqueued_chunk(path)
  else
    raise ArgumentError, "Invalid file chunk mode: #{mode}"
  end
end
unique_id_from_path(path) click to toggle source

used only for queued v0.12 buffer path or broken files

# File lib/fluent/plugin/buffer/file_chunk.rb, line 208
def self.unique_id_from_path(path)
  if /\.(b|q)([0-9a-f]+)\.[^\/]*\Z/n =~ path # //n switch means explicit 'ASCII-8BIT' pattern
    return $2.scan(/../).map{|x| x.to_i(16) }.pack('C*')
  end
  nil
end

Public Instance Methods

bytesize() click to toggle source
# File lib/fluent/plugin/buffer/file_chunk.rb, line 90
def bytesize
  @bytesize + @adding_bytes
end
close() click to toggle source
Calls superclass method Fluent::Plugin::Buffer::Chunk#close
# File lib/fluent/plugin/buffer/file_chunk.rb, line 146
def close
  super
  size = @chunk.size
  @chunk.close
  @meta.close if @meta # meta may be missing if chunk is queued at first
  if size == 0
    File.unlink(@path, @meta_path)
  end
end
commit() click to toggle source
# File lib/fluent/plugin/buffer/file_chunk.rb, line 68
def commit
  write_metadata # this should be at first: of course, this operation may fail

  @commit_position = @chunk.pos
  @size += @adding_size
  @bytesize += @adding_bytes
  @adding_bytes = @adding_size = 0
  @modified_at = Fluent::Clock.real_now
  @modified_at_object = nil

  true
end
concat(bulk, bulk_size) click to toggle source
# File lib/fluent/plugin/buffer/file_chunk.rb, line 58
def concat(bulk, bulk_size)
  raise "BUG: concatenating to unwritable chunk, now '#{self.state}'" unless self.writable?

  bulk.force_encoding(Encoding::ASCII_8BIT)
  @chunk.write bulk
  @adding_bytes += bulk.bytesize
  @adding_size += bulk_size
  true
end
create_new_chunk(path, perm) click to toggle source
# File lib/fluent/plugin/buffer/file_chunk.rb, line 278
def create_new_chunk(path, perm)
  @path = self.class.generate_stage_chunk_path(path, @unique_id)
  @meta_path = @path + '.meta'
  begin
    @chunk = File.open(@path, 'wb+', perm)
    @chunk.set_encoding(Encoding::ASCII_8BIT)
    @chunk.sync = true
    @chunk.binmode
  rescue => e
    # Here assumes "Too many open files" like recoverable error so raising BufferOverflowError.
    # If other cases are possible, we will change erorr handling with proper classes.
    raise BufferOverflowError, "can't create buffer file for #{path}. Stop creating buffer files: error = #{e}"
  end
  begin
    @meta = File.open(@meta_path, 'wb', perm)
    @meta.set_encoding(Encoding::ASCII_8BIT)
    @meta.sync = true
    @meta.binmode
    write_metadata(update: false)
  rescue => e
    # This case is easier than enqueued!. Just removing pre-create buffer file
    @chunk.close rescue nil
    File.unlink(@path) rescue nil

    if @meta
      # ensure to unlink when #write_metadata fails
      @meta.close rescue nil
      File.unlink(@meta_path) rescue nil
    end

    # Same as @chunk case. See above
    raise BufferOverflowError, "can't create buffer metadata for #{path}. Stop creating buffer files: error = #{e}"
  end

  @state = :unstaged
  @bytesize = 0
  @commit_position = @chunk.pos # must be 0
  @adding_bytes = 0
  @adding_size = 0
end
empty?() click to toggle source
# File lib/fluent/plugin/buffer/file_chunk.rb, line 98
def empty?
  @bytesize == 0
end
enqueued!() click to toggle source
Calls superclass method Fluent::Plugin::Buffer::Chunk#enqueued!
# File lib/fluent/plugin/buffer/file_chunk.rb, line 102
def enqueued!
  return unless self.staged?

  new_chunk_path = self.class.generate_queued_chunk_path(@path, @unique_id)
  new_meta_path = new_chunk_path + '.meta'

  write_metadata(update: false) # re-write metadata w/ finalized records

  begin
    file_rename(@chunk, @path, new_chunk_path, ->(new_io) { @chunk = new_io })
  rescue => e
    begin
      file_rename(@chunk, new_chunk_path, @path, ->(new_io) { @chunk = new_io }) if File.exist?(new_chunk_path)
    rescue => re
      # In this point, restore buffer state is hard because previous `file_rename` failed by resource problem.
      # Retry is one possible approach but it may cause livelock under limited resources or high load environment.
      # So we ignore such errors for now and log better message instead.
      # "Too many open files" should be fixed by proper buffer configuration and system setting.
      raise "can't enqueue buffer file and failed to restore. This may causes inconsistent state: path = #{@path}, error = '#{e}', retry error = '#{re}'"
    else
      raise "can't enqueue buffer file: path = #{@path}, error = '#{e}'"
    end
  end

  begin
    file_rename(@meta, @meta_path, new_meta_path, ->(new_io) { @meta = new_io })
  rescue => e
    begin
      file_rename(@chunk, new_chunk_path, @path, ->(new_io) { @chunk = new_io }) if File.exist?(new_chunk_path)
      file_rename(@meta, new_meta_path, @meta_path, ->(new_io) { @meta = new_io }) if File.exist?(new_meta_path)
    rescue => re
      # See above
      raise "can't enqueue buffer metadata and failed to restore. This may causes inconsistent state: path = #{@meta_path}, error = '#{e}', retry error = '#{re}'"
    else
      raise "can't enqueue buffer metadata: path = #{@meta_path}, error = '#{e}'"
    end
  end

  @path = new_chunk_path
  @meta_path = new_meta_path

  super
end
file_rename(file, old_path, new_path, callback=nil) click to toggle source
# File lib/fluent/plugin/buffer/file_chunk.rb, line 261
def file_rename(file, old_path, new_path, callback=nil)
  pos = file.pos
  if Fluent.windows?
    file.close
    File.rename(old_path, new_path)
    file = File.open(new_path, 'rb', @permission)
  else
    File.rename(old_path, new_path)
    file.reopen(new_path, 'rb')
  end
  file.set_encoding(Encoding::ASCII_8BIT)
  file.sync = true
  file.binmode
  file.pos = pos
  callback.call(file) if callback
end
load_existing_enqueued_chunk(path) click to toggle source
# File lib/fluent/plugin/buffer/file_chunk.rb, line 369
def load_existing_enqueued_chunk(path)
  @path = path
  raise FileChunkError, "enqueued file chunk is empty" if File.size(@path).zero?

  @chunk = File.open(@path, 'rb')
  @chunk.set_encoding(Encoding::ASCII_8BIT)
  @chunk.binmode
  @chunk.seek(0, IO::SEEK_SET)
  @bytesize = @chunk.size
  @commit_position = @chunk.size

  @meta_path = @path + '.meta'
  if File.readable?(@meta_path)
    begin
      restore_metadata(File.open(@meta_path){|f| f.set_encoding(Encoding::ASCII_8BIT); f.binmode; f.read })
    rescue => e
      @chunk.close
      raise FileChunkError, "enqueued meta file is broken. #{e.message}"
    end
  else
    restore_metadata_partially(@chunk)
  end
  @state = :queued
end
load_existing_staged_chunk(path) click to toggle source
# File lib/fluent/plugin/buffer/file_chunk.rb, line 319
def load_existing_staged_chunk(path)
  @path = path
  @meta_path = @path + '.meta'

  @meta = nil
  # staging buffer chunk without metadata is classic buffer chunk file
  # and it should be enqueued immediately
  if File.exist?(@meta_path)
    raise FileChunkError, "staged file chunk is empty" if File.size(@path).zero?

    @chunk = File.open(@path, 'rb+')
    @chunk.set_encoding(Encoding::ASCII_8BIT)
    @chunk.sync = true
    @chunk.seek(0, IO::SEEK_END)
    @chunk.binmode

    @meta = File.open(@meta_path, 'rb+')
    @meta.set_encoding(Encoding::ASCII_8BIT)
    @meta.sync = true
    @meta.binmode
    begin
      restore_metadata(@meta.read)
    rescue => e
      @chunk.close
      @meta.close
      raise FileChunkError, "staged meta file is broken. #{e.message}"
    end
    @meta.seek(0, IO::SEEK_SET)

    @state = :staged
    @bytesize = @chunk.size
    @commit_position = @chunk.pos
    @adding_bytes = 0
    @adding_size = 0
  else
    # classic buffer chunk - read only chunk
    @chunk = File.open(@path, 'rb')
    @chunk.set_encoding(Encoding::ASCII_8BIT)
    @chunk.binmode
    @chunk.seek(0, IO::SEEK_SET)
    @state = :queued
    @bytesize = @chunk.size

    restore_metadata_partially(@chunk)

    @commit_position = @chunk.size
    @unique_id = self.class.unique_id_from_path(@path) || @unique_id
  end
end
open(**kwargs) { |chunk| ... } click to toggle source
# File lib/fluent/plugin/buffer/file_chunk.rb, line 169
def open(**kwargs, &block)
  @chunk.seek(0, IO::SEEK_SET)
  val = yield @chunk
  @chunk.seek(0, IO::SEEK_END) if self.staged?
  val
end
purge() click to toggle source
Calls superclass method Fluent::Plugin::Buffer::Chunk#purge
# File lib/fluent/plugin/buffer/file_chunk.rb, line 156
def purge
  super
  @chunk.close
  @meta.close if @meta
  @bytesize = @size = @adding_bytes = @adding_size = 0
  File.unlink(@path, @meta_path)
end
read(**kwargs) click to toggle source
# File lib/fluent/plugin/buffer/file_chunk.rb, line 164
def read(**kwargs)
  @chunk.seek(0, IO::SEEK_SET)
  @chunk.read
end
restore_metadata(bindata) click to toggle source
# File lib/fluent/plugin/buffer/file_chunk.rb, line 215
def restore_metadata(bindata)
  data = restore_metadata_with_new_format(bindata)

  unless data
    # old type of restore
    data = Fluent::MessagePackFactory.msgpack_unpacker(symbolize_keys: true).feed(bindata).read rescue {}
  end

  now = Fluent::Clock.real_now

  @unique_id = data[:id] || self.class.unique_id_from_path(@path) || @unique_id
  @size = data[:s] || 0
  @created_at = data.fetch(:c, now.to_i)
  @modified_at = data.fetch(:m, now.to_i)

  @metadata.timekey = data[:timekey]
  @metadata.tag = data[:tag]
  @metadata.variables = data[:variables]
  @metadata.seq = data[:seq] || 0
end
restore_metadata_partially(chunk) click to toggle source
# File lib/fluent/plugin/buffer/file_chunk.rb, line 236
def restore_metadata_partially(chunk)
  @unique_id = self.class.unique_id_from_path(chunk.path) || @unique_id
  @size = 0
  @created_at = chunk.ctime.to_i # birthtime isn't supported on Windows (and Travis?)
  @modified_at = chunk.mtime.to_i

  @metadata.timekey = nil
  @metadata.tag = nil
  @metadata.variables = nil
  @metadata.seq = 0
end
rollback() click to toggle source
# File lib/fluent/plugin/buffer/file_chunk.rb, line 81
def rollback
  if @chunk.pos != @commit_position
    @chunk.seek(@commit_position, IO::SEEK_SET)
    @chunk.truncate(@commit_position)
  end
  @adding_bytes = @adding_size = 0
  true
end
size() click to toggle source
# File lib/fluent/plugin/buffer/file_chunk.rb, line 94
def size
  @size + @adding_size
end
write_metadata(update: true) click to toggle source
# File lib/fluent/plugin/buffer/file_chunk.rb, line 248
def write_metadata(update: true)
  data = @metadata.to_h.merge({
      id: @unique_id,
      s: (update ? @size + @adding_size : @size),
      c: @created_at,
      m: (update ? Fluent::Clock.real_now : @modified_at),
  })
  bin = Fluent::MessagePackFactory.thread_local_msgpack_packer.pack(data).full_pack
  size = [bin.bytesize].pack('N')
  @meta.seek(0, IO::SEEK_SET)
  @meta.write(BUFFER_HEADER + size + bin)
end

Private Instance Methods

restore_metadata_with_new_format(chunk) click to toggle source
# File lib/fluent/plugin/buffer/file_chunk.rb, line 396
def restore_metadata_with_new_format(chunk)
  if chunk.size <= 6 # size of BUFFER_HEADER (2) + size of data size(4)
    return nil
  end

  if chunk.slice(0, 2) == BUFFER_HEADER
    size = chunk.slice(2, 4).unpack('N').first
    if size
      return Fluent::MessagePackFactory.msgpack_unpacker(symbolize_keys: true).feed(chunk.slice(6, size)).read rescue nil
    end
  end

  nil
end