class Writer

Constants

RetryLimitError

Public Class Methods

new(tag, connector, time_as_integer: false, retry_limit: 5) click to toggle source
Calls superclass method
# File lib/fluent/command/cat.rb, line 140
def initialize(tag, connector, time_as_integer: false, retry_limit: 5)
  @tag = tag
  @connector = connector
  @socket = false

  @socket_time = Time.now.to_i
  @socket_ttl = 10  # TODO
  @error_history = []

  @pending = []
  @pending_limit = 1024  # TODO
  @retry_wait = 1
  @retry_limit = retry_limit
  @time_as_integer = time_as_integer

  super()
end

Public Instance Methods

close() click to toggle source
# File lib/fluent/command/cat.rb, line 199
def close
  @socket.close
  @socket = nil
end
on_timer() click to toggle source
# File lib/fluent/command/cat.rb, line 180
def on_timer
  now = Time.now.to_i

  synchronize {
    unless @pending.empty?
      # flush pending records
      if write_impl(@pending)
        # write succeeded
        @pending.clear
      end
    end

    if @socket && @socket_time + @socket_ttl < now
      # socket is not used @socket_ttl seconds
      close
    end
  }
end
shutdown() click to toggle source
# File lib/fluent/command/cat.rb, line 210
def shutdown
  @timer.shutdown
end
start() click to toggle source
# File lib/fluent/command/cat.rb, line 204
def start
  @timer = TimerThread.new(self)
  @timer.start
  self
end
write(record) click to toggle source
# File lib/fluent/command/cat.rb, line 158
def write(record)
  if record.class != Hash
    raise ArgumentError, "Input must be a map (got #{record.class})"
  end

  time = Fluent::EventTime.now
  time = time.to_i if @time_as_integer
  entry = [time, record]
  synchronize {
    unless write_impl([entry])
      # write failed
      @pending.push(entry)

      while @pending.size > @pending_limit
        # exceeds pending limit; trash oldest record
        time, record = @pending.shift
        abort_message(time, record)
      end
    end
  }
end

Private Instance Methods

abort_message(time, record) click to toggle source
# File lib/fluent/command/cat.rb, line 283
def abort_message(time, record)
  $stdout.puts "!#{time}:#{Yajl.dump(record)}"
end
get_socket() click to toggle source
# File lib/fluent/command/cat.rb, line 234
def get_socket
  unless @socket
    unless try_connect
      return nil
    end
  end

  @socket_time = Time.now.to_i
  return @socket
end
try_connect() click to toggle source
# File lib/fluent/command/cat.rb, line 245
def try_connect
  begin
    now = Time.now.to_i

    unless @error_history.empty?
      # wait before re-connecting
      wait = 1 #@retry_wait * (2 ** (@error_history.size-1))
      if now <= @socket_time + wait
        sleep(wait)
        try_connect
      end
    end

    @socket = @connector.call
    @error_history.clear
    return true

  rescue RetryLimitError => ex
    raise ex
  rescue
    $stderr.puts "connect failed: #{$!}"
    @error_history << $!
    @socket_time = now

    if @retry_limit < @error_history.size
      # abort all pending records
      @pending.each {|(time, record)|
        abort_message(time, record)
      }
      @pending.clear
      @error_history.clear
      raise RetryLimitError, "exceed retry limit"
    else
      retry
    end
  end
end
write_impl(array) click to toggle source
# File lib/fluent/command/cat.rb, line 215
def write_impl(array)
  socket = get_socket
  unless socket
    return false
  end

  begin
    packer = Fluent::MessagePackFactory.packer
    socket.write packer.pack([@tag, array])
    socket.flush
  rescue
    $stderr.puts "write failed: #{$!}"
    close
    return false
  end

  return true
end