class Fluent::Plugin::Buffer::Chunk

Attributes

metadata[R]
state[R]
unique_id[R]

Public Class Methods

new(metadata, compress: :text) click to toggle source

TODO: CompressedPackedMessage of forward protocol?

Calls superclass method
# File lib/fluent/plugin/buffer/chunk.rb, line 51
def initialize(metadata, compress: :text)
  super()
  @unique_id = generate_unique_id
  @metadata = metadata

  # state: unstaged/staged/queued/closed
  @state = :unstaged

  @size = 0
  @created_at = Fluent::Clock.real_now
  @modified_at = Fluent::Clock.real_now

  extend Decompressable if compress == :gzip
end

Public Instance Methods

append(data, **kwargs) click to toggle source

data is array of formatted record string

# File lib/fluent/plugin/buffer/chunk.rb, line 87
def append(data, **kwargs)
  raise ArgumentError, '`compress: gzip` can be used for Compressable module' if kwargs[:compress] == :gzip
  adding = ''.b
  data.each do |d|
    adding << d.b
  end
  concat(adding, data.size)
end
bytesize() click to toggle source
# File lib/fluent/plugin/buffer/chunk.rb, line 109
def bytesize
  raise NotImplementedError, "Implement this method in child class"
end
close() click to toggle source
# File lib/fluent/plugin/buffer/chunk.rb, line 157
def close
  @state = :closed
  self
end
closed?() click to toggle source
# File lib/fluent/plugin/buffer/chunk.rb, line 138
def closed?
  @state == :closed
end
commit() click to toggle source
# File lib/fluent/plugin/buffer/chunk.rb, line 101
def commit
  raise NotImplementedError, "Implement this method in child class"
end
concat(bulk, records) click to toggle source

for event streams which is packed or zipped (and we want not to unpack/uncompress)

# File lib/fluent/plugin/buffer/chunk.rb, line 97
def concat(bulk, records)
  raise NotImplementedError, "Implement this method in child class"
end
created_at() click to toggle source

for compatibility

# File lib/fluent/plugin/buffer/chunk.rb, line 77
def created_at
  @created_at_object ||= Time.at(@created_at)
end
empty?() click to toggle source
# File lib/fluent/plugin/buffer/chunk.rb, line 118
def empty?
  size == 0
end
enqueued!() click to toggle source
# File lib/fluent/plugin/buffer/chunk.rb, line 152
def enqueued!
  @state = :queued
  self
end
length()
Alias for: size
modified_at() click to toggle source

for compatibility

# File lib/fluent/plugin/buffer/chunk.rb, line 82
def modified_at
  @modified_at_object ||= Time.at(@modified_at)
end
open(**kwargs, &block) click to toggle source
# File lib/fluent/plugin/buffer/chunk.rb, line 172
def open(**kwargs, &block)
  raise ArgumentError, '`compressed: gzip` can be used for Compressable module' if kwargs[:compressed] == :gzip
  raise NotImplementedError, "Implement this method in child class"
end
purge() click to toggle source
# File lib/fluent/plugin/buffer/chunk.rb, line 162
def purge
  @state = :closed
  self
end
queued?() click to toggle source
# File lib/fluent/plugin/buffer/chunk.rb, line 134
def queued?
  @state == :queued
end
raw_create_at() click to toggle source
# File lib/fluent/plugin/buffer/chunk.rb, line 68
def raw_create_at
  @created_at
end
raw_modified_at() click to toggle source
# File lib/fluent/plugin/buffer/chunk.rb, line 72
def raw_modified_at
  @modified_at
end
read(**kwargs) click to toggle source
# File lib/fluent/plugin/buffer/chunk.rb, line 167
def read(**kwargs)
  raise ArgumentError, '`compressed: gzip` can be used for Compressable module' if kwargs[:compressed] == :gzip
  raise NotImplementedError, "Implement this method in child class"
end
rollback() click to toggle source
# File lib/fluent/plugin/buffer/chunk.rb, line 105
def rollback
  raise NotImplementedError, "Implement this method in child class"
end
size() click to toggle source
# File lib/fluent/plugin/buffer/chunk.rb, line 113
def size
  raise NotImplementedError, "Implement this method in child class"
end
Also aliased as: length
staged!() click to toggle source
# File lib/fluent/plugin/buffer/chunk.rb, line 142
def staged!
  @state = :staged
  self
end
staged?() click to toggle source
# File lib/fluent/plugin/buffer/chunk.rb, line 130
def staged?
  @state == :staged
end
unstaged!() click to toggle source
# File lib/fluent/plugin/buffer/chunk.rb, line 147
def unstaged!
  @state = :unstaged
  self
end
unstaged?() click to toggle source
# File lib/fluent/plugin/buffer/chunk.rb, line 126
def unstaged?
  @state == :unstaged
end
writable?() click to toggle source
# File lib/fluent/plugin/buffer/chunk.rb, line 122
def writable?
  @state == :staged || @state == :unstaged
end
write_to(io, **kwargs) click to toggle source
# File lib/fluent/plugin/buffer/chunk.rb, line 177
def write_to(io, **kwargs)
  raise ArgumentError, '`compressed: gzip` can be used for Compressable module' if kwargs[:compressed] == :gzip
  open do |i|
    IO.copy_stream(i, io)
  end
end