class Fluent::Plugin::LocalStorage

Attributes

store[R]

Public Class Methods

new() click to toggle source
Calls superclass method Fluent::Plugin::Base::new
# File lib/fluent/plugin/storage_local.rb, line 40
def initialize
  super
  @store = {}
  @multi_workers_available = nil
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method Fluent::Plugin::Base#configure
# File lib/fluent/plugin/storage_local.rb, line 46
def configure(conf)
  super

  @on_memory = false
  if @path
    if File.exist?(@path) && File.file?(@path)
      @multi_workers_available = false
    elsif File.exist?(@path) && File.directory?(@path)
      @path = File.join(@path, "worker#{fluentd_worker_id}", "storage.json")
      @multi_workers_available = true
    else # path file/directory doesn't exist
      if @path.end_with?('.json') # file
        @multi_workers_available = false
      else # directory
        @path = File.join(@path, "worker#{fluentd_worker_id}", "storage.json")
        @multi_workers_available = true
      end
    end
  elsif root_dir = owner.plugin_root_dir
    basename = (conf.arg && !conf.arg.empty?) ? "storage.#{conf.arg}.json" : "storage.json"
    @path = File.join(root_dir, basename)
    @multi_workers_available = true
  else
    if @persistent
      raise Fluent::ConfigError, "Plugin @id or path for <storage> required when 'persistent' is true"
    else
      if @autosave
        log.warn "both of Plugin @id and path for <storage> are not specified. Using on-memory store."
      else
        log.info "both of Plugin @id and path for <storage> are not specified. Using on-memory store."
      end
      @on_memory = true
      @multi_workers_available = true
    end
  end

  if !@on_memory
    dir = File.dirname(@path)
    FileUtils.mkdir_p(dir, mode: @dir_mode) unless Dir.exist?(dir)
    if File.exist?(@path)
      raise Fluent::ConfigError, "Plugin storage path '#{@path}' is not readable/writable" unless File.readable?(@path) && File.writable?(@path)
      begin
        data = File.open(@path, 'r:utf-8') { |io| io.read }
        if data.empty?
          log.warn "detect empty plugin storage file during startup. Ignored: #{@path}"
          return
        end
        data = Yajl::Parser.parse(data)
        raise Fluent::ConfigError, "Invalid contents (not object) in plugin storage file: '#{@path}'" unless data.is_a?(Hash)
      rescue => e
        log.error "failed to read data from plugin storage file", path: @path, error: e
        raise Fluent::ConfigError, "Unexpected error: failed to read data from plugin storage file: '#{@path}'"
      end
    else
      raise Fluent::ConfigError, "Directory is not writable for plugin storage file '#{@path}'" unless File.stat(dir).writable?
    end
  end
end
delete(key) click to toggle source
# File lib/fluent/plugin/storage_local.rb, line 153
def delete(key)
  @store.delete(key.to_s)
end
fetch(key, defval) click to toggle source
# File lib/fluent/plugin/storage_local.rb, line 145
def fetch(key, defval)
  @store.fetch(key.to_s, defval)
end
get(key) click to toggle source
# File lib/fluent/plugin/storage_local.rb, line 141
def get(key)
  @store[key.to_s]
end
load() click to toggle source
# File lib/fluent/plugin/storage_local.rb, line 112
def load
  return if @on_memory
  return unless File.exist?(@path)
  begin
    json_string = File.open(@path, 'r:utf-8'){ |io| io.read }
    json = Yajl::Parser.parse(json_string)
    unless json.is_a?(Hash)
      log.error "broken content for plugin storage (Hash required: ignored)", type: json.class
      log.debug "broken content", content: json_string
      return
    end
    @store = json
  rescue => e
    log.error "failed to load data for plugin storage from file", path: @path, error: e
  end
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/storage_local.rb, line 105
def multi_workers_ready?
  unless @multi_workers_available
    log.error "local plugin storage with multi workers should be configured to use directory 'path', or system root_dir and plugin id"
  end
  @multi_workers_available
end
put(key, value) click to toggle source
# File lib/fluent/plugin/storage_local.rb, line 149
def put(key, value)
  @store[key.to_s] = value
end
save() click to toggle source
# File lib/fluent/plugin/storage_local.rb, line 129
def save
  return if @on_memory
  tmp_path = @path + '.tmp.' + Fluent::UniqueId.hex(Fluent::UniqueId.generate)
  begin
    json_string = Yajl::Encoder.encode(@store, pretty: @pretty_print)
    File.open(tmp_path, 'w:utf-8', @mode) { |io| io.write json_string; io.fsync }
    File.rename(tmp_path, @path)
  rescue => e
    log.error "failed to save data for plugin storage to file", path: @path, tmp: tmp_path, error: e
  end
end
update(key, &block) click to toggle source
# File lib/fluent/plugin/storage_local.rb, line 157
def update(key, &block)
  @store[key.to_s] = block.call(@store[key.to_s])
end