class Fluent::Plugin::SecondaryFileOutput

Constants

PLACEHOLDER_REGEX

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method Fluent::Compat::Output#configure
# File lib/fluent/plugin/out_secondary_file.rb, line 36
def configure(conf)
  super

  unless @as_secondary
    raise Fluent::ConfigError, "This plugin can only be used in the <secondary> section"
  end

  if @basename.include?("/")
    raise Fluent::ConfigError, "basename should not include `/`"
  end

  @path_without_suffix = File.join(@directory, @basename)
  validate_compatible_with_primary_buffer!(@path_without_suffix)

  @suffix = case @compress
            when :text
              ""
            when :gzip
              ".gz"
            end

  test_path = @path_without_suffix
  unless Fluent::FileUtil.writable_p?(test_path)
    raise Fluent::ConfigError, "out_secondary_file: `#{@directory}` should be writable"
  end

  @dir_perm = system_config.dir_permission || Fluent::DEFAULT_DIR_PERMISSION
  @file_perm = system_config.file_permission || Fluent::DEFAULT_FILE_PERMISSION
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_secondary_file.rb, line 66
def multi_workers_ready?
  true
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_secondary_file.rb, line 70
def write(chunk)
  path_without_suffix = extract_placeholders(@path_without_suffix, chunk)
  generate_path(path_without_suffix) do |path|
    FileUtils.mkdir_p File.dirname(path), mode: @dir_perm

    case @compress
    when :text
      File.open(path, "ab", @file_perm) {|f|
        f.flock(File::LOCK_EX)
        chunk.write_to(f)
      }
    when :gzip
      File.open(path, "ab", @file_perm) {|f|
        f.flock(File::LOCK_EX)
        gz = Zlib::GzipWriter.new(f)
        chunk.write_to(gz)
        gz.close
      }
    end
  end
end

Private Instance Methods

generate_path(path_without_suffix) { |path| ... } click to toggle source
# File lib/fluent/plugin/out_secondary_file.rb, line 116
def generate_path(path_without_suffix)
  if @append
    path = "#{path_without_suffix}#{@suffix}"
    synchronize_path(path) do
      yield path
    end
    return path
  end

  begin
    i = 0
    loop do
      path = "#{path_without_suffix}.#{i}#{@suffix}"
      break unless File.exist?(path)
      i += 1
    end
    synchronize_path(path) do
      # If multiple processes or threads select the same path and another
      # one entered this locking block first, the file should already
      # exist and this one should retry to find new path.
      raise FileAlreadyExist if File.exist?(path)
      yield path
    end
  rescue FileAlreadyExist
    retry
  end
  path
end
has_time_format?(str) click to toggle source
# File lib/fluent/plugin/out_secondary_file.rb, line 112
def has_time_format?(str)
  str != Time.now.strftime(str)
end
validate_compatible_with_primary_buffer!(path_without_suffix) click to toggle source
# File lib/fluent/plugin/out_secondary_file.rb, line 94
def validate_compatible_with_primary_buffer!(path_without_suffix)
  placeholders = path_without_suffix.scan(PLACEHOLDER_REGEX).flat_map(&:first) # to trim suffix [\d+]

  if !@chunk_key_time && has_time_format?(path_without_suffix)
    raise Fluent::ConfigError, "out_secondary_file: basename or directory has an incompatible placeholder, remove time formats, like `%Y%m%d`, from basename or directory"
  end

  if !@chunk_key_tag && (ph = placeholders.find { |placeholder| placeholder.match(/tag(\[\d+\])?/) })
    raise Fluent::ConfigError, "out_secondary_file: basename or directory has an incompatible placeholder #{ph}, remove tag placeholder, like `${tag}`, from basename or directory"
  end

  vars = placeholders.reject { |placeholder| placeholder.match(/tag(\[\d+\])?/) || (placeholder == 'chunk_id') }

  if ph = vars.find { |v| !@chunk_keys.include?(v) }
    raise Fluent::ConfigError, "out_secondary_file: basename or directory has an incompatible placeholder #{ph}, remove variable placeholder, like `${varname}`, from basename or directory"
  end
end