class Fluent::Plugin::Buffer::Chunk

Attributes

created_at[R]
metadata[R]
modified_at[R]
state[R]
unique_id[R]

Public Class Methods

new(metadata, compress: :text) click to toggle source

TODO: CompressedPackedMessage of forward protocol?

Calls superclass method
# File lib/fluent/plugin/buffer/chunk.rb, line 51
def initialize(metadata, compress: :text)
  super()
  @unique_id = generate_unique_id
  @metadata = metadata

  # state: unstaged/staged/queued/closed
  @state = :unstaged

  @size = 0
  @created_at = Time.now
  @modified_at = Time.now

  extend Decompressable if compress == :gzip
end

Public Instance Methods

append(data, **kwargs) click to toggle source

data is array of formatted record string

# File lib/fluent/plugin/buffer/chunk.rb, line 69
def append(data, **kwargs)
  raise ArgumentError, '`compress: gzip` can be used for Compressable module' if kwargs[:compress] == :gzip
  adding = ''.b
  data.each do |d|
    adding << d.b
  end
  concat(adding, data.size)
end
bytesize() click to toggle source
# File lib/fluent/plugin/buffer/chunk.rb, line 91
def bytesize
  raise NotImplementedError, "Implement this method in child class"
end
close() click to toggle source
# File lib/fluent/plugin/buffer/chunk.rb, line 139
def close
  @state = :closed
  self
end
closed?() click to toggle source
# File lib/fluent/plugin/buffer/chunk.rb, line 120
def closed?
  @state == :closed
end
commit() click to toggle source
# File lib/fluent/plugin/buffer/chunk.rb, line 83
def commit
  raise NotImplementedError, "Implement this method in child class"
end
concat(bulk, records) click to toggle source

for event streams which is packed or zipped (and we want not to unpack/uncompress)

# File lib/fluent/plugin/buffer/chunk.rb, line 79
def concat(bulk, records)
  raise NotImplementedError, "Implement this method in child class"
end
empty?() click to toggle source
# File lib/fluent/plugin/buffer/chunk.rb, line 100
def empty?
  size == 0
end
enqueued!() click to toggle source
# File lib/fluent/plugin/buffer/chunk.rb, line 134
def enqueued!
  @state = :queued
  self
end
length()
Alias for: size
open(**kwargs, &block) click to toggle source
# File lib/fluent/plugin/buffer/chunk.rb, line 154
def open(**kwargs, &block)
  raise ArgumentError, '`compressed: gzip` can be used for Compressable module' if kwargs[:compressed] == :gzip
  raise NotImplementedError, "Implement this method in child class"
end
purge() click to toggle source
# File lib/fluent/plugin/buffer/chunk.rb, line 144
def purge
  @state = :closed
  self
end
queued?() click to toggle source
# File lib/fluent/plugin/buffer/chunk.rb, line 116
def queued?
  @state == :queued
end
read(**kwargs) click to toggle source
# File lib/fluent/plugin/buffer/chunk.rb, line 149
def read(**kwargs)
  raise ArgumentError, '`compressed: gzip` can be used for Compressable module' if kwargs[:compressed] == :gzip
  raise NotImplementedError, "Implement this method in child class"
end
rollback() click to toggle source
# File lib/fluent/plugin/buffer/chunk.rb, line 87
def rollback
  raise NotImplementedError, "Implement this method in child class"
end
size() click to toggle source
# File lib/fluent/plugin/buffer/chunk.rb, line 95
def size
  raise NotImplementedError, "Implement this method in child class"
end
Also aliased as: length
staged!() click to toggle source
# File lib/fluent/plugin/buffer/chunk.rb, line 124
def staged!
  @state = :staged
  self
end
staged?() click to toggle source
# File lib/fluent/plugin/buffer/chunk.rb, line 112
def staged?
  @state == :staged
end
unstaged!() click to toggle source
# File lib/fluent/plugin/buffer/chunk.rb, line 129
def unstaged!
  @state = :unstaged
  self
end
unstaged?() click to toggle source
# File lib/fluent/plugin/buffer/chunk.rb, line 108
def unstaged?
  @state == :unstaged
end
writable?() click to toggle source
# File lib/fluent/plugin/buffer/chunk.rb, line 104
def writable?
  @state == :staged || @state == :unstaged
end
write_to(io, **kwargs) click to toggle source
# File lib/fluent/plugin/buffer/chunk.rb, line 159
def write_to(io, **kwargs)
  raise ArgumentError, '`compressed: gzip` can be used for Compressable module' if kwargs[:compressed] == :gzip
  open do |i|
    IO.copy_stream(i, io)
  end
end