class Writer

Constants

RetryLimitError

Public Class Methods

new(tag, connector, time_as_integer: false, retry_limit: 5, event_time: nil) click to toggle source
Calls superclass method
# File lib/fluent/command/cat.rb, line 142
def initialize(tag, connector, time_as_integer: false, retry_limit: 5, event_time: nil)
  @tag = tag
  @connector = connector
  @socket = false

  @socket_time = Time.now.to_i
  @socket_ttl = 10  # TODO
  @error_history = []

  @pending = []
  @pending_limit = 1024  # TODO
  @retry_wait = 1
  @retry_limit = retry_limit
  @time_as_integer = time_as_integer
  @event_time = event_time

  super()
end

Public Instance Methods

close() click to toggle source
# File lib/fluent/command/cat.rb, line 222
def close
  @socket.close
  @socket = nil
end
on_timer() click to toggle source
# File lib/fluent/command/cat.rb, line 203
def on_timer
  now = Time.now.to_i

  synchronize {
    unless @pending.empty?
      # flush pending records
      if write_impl(@pending)
        # write succeeded
        @pending.clear
      end
    end

    if @socket && @socket_time + @socket_ttl < now
      # socket is not used @socket_ttl seconds
      close
    end
  }
end
secondary_record?(record) click to toggle source
# File lib/fluent/command/cat.rb, line 161
def secondary_record?(record)
  record.class != Hash &&
    record.size == 2 &&
    record.first.class == Fluent::EventTime &&
    record.last.class == Hash
end
shutdown() click to toggle source
# File lib/fluent/command/cat.rb, line 233
def shutdown
  @timer.shutdown
end
start() click to toggle source
# File lib/fluent/command/cat.rb, line 227
def start
  @timer = TimerThread.new(self)
  @timer.start
  self
end
write(record) click to toggle source
# File lib/fluent/command/cat.rb, line 168
def write(record)
  unless secondary_record?(record)
    if record.class != Hash
      raise ArgumentError, "Input must be a map (got #{record.class})"
    end
  end

  time = if @event_time
           Fluent::EventTime.parse(@event_time)
         else
           Fluent::EventTime.now
         end
  time = time.to_i if @time_as_integer
  entry = if secondary_record?(record)
            # Even though secondary contains Fluent::EventTime in record,
            # fluent-cat just ignore it and set Fluent::EventTime.now instead.
            # This specification is adopted to keep consistency.
            [time, record.last]
          else
            [time, record]
          end
  synchronize {
    unless write_impl([entry])
      # write failed
      @pending.push(entry)

      while @pending.size > @pending_limit
        # exceeds pending limit; trash oldest record
        time, record = @pending.shift
        abort_message(time, record)
      end
    end
  }
end

Private Instance Methods

abort_message(time, record) click to toggle source
# File lib/fluent/command/cat.rb, line 306
def abort_message(time, record)
  $stdout.puts "!#{time}:#{Yajl.dump(record)}"
end
get_socket() click to toggle source
# File lib/fluent/command/cat.rb, line 257
def get_socket
  unless @socket
    unless try_connect
      return nil
    end
  end

  @socket_time = Time.now.to_i
  return @socket
end
try_connect() click to toggle source
# File lib/fluent/command/cat.rb, line 268
def try_connect
  begin
    now = Time.now.to_i

    unless @error_history.empty?
      # wait before re-connecting
      wait = 1 #@retry_wait * (2 ** (@error_history.size-1))
      if now <= @socket_time + wait
        sleep(wait)
        try_connect
      end
    end

    @socket = @connector.call
    @error_history.clear
    return true

  rescue RetryLimitError => ex
    raise ex
  rescue
    $stderr.puts "connect failed: #{$!}"
    @error_history << $!
    @socket_time = now

    if @retry_limit < @error_history.size
      # abort all pending records
      @pending.each {|(time, record)|
        abort_message(time, record)
      }
      @pending.clear
      @error_history.clear
      raise RetryLimitError, "exceed retry limit"
    else
      retry
    end
  end
end
write_impl(array) click to toggle source
# File lib/fluent/command/cat.rb, line 238
def write_impl(array)
  socket = get_socket
  unless socket
    return false
  end

  begin
    packer = Fluent::MessagePackFactory.packer
    socket.write packer.pack([@tag, array])
    socket.flush
  rescue
    $stderr.puts "write failed: #{$!}"
    close
    return false
  end

  return true
end