class Fluent::Plugin::TailInput::TailWatcher

Attributes

enable_stat_watcher[R]
enable_watch_timer[R]
encoding[R]
from_encoding[R]
line_buffer[RW]
line_buffer_timer_flusher[RW]
log[R]
open_on_every_update[R]
path[R]
pe[R]
read_lines_limit[R]
stat_trigger[R]
timer_trigger[RW]
unwatched[RW]

Public Class Methods

new(path, rotate_wait, pe, log, read_from_head, enable_watch_timer, enable_stat_watcher, read_lines_limit, update_watcher, line_buffer_timer_flusher, from_encoding, encoding, open_on_every_update, &receive_lines) click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 480
def initialize(path, rotate_wait, pe, log, read_from_head, enable_watch_timer, enable_stat_watcher, read_lines_limit, update_watcher, line_buffer_timer_flusher, from_encoding, encoding, open_on_every_update, &receive_lines)
  @path = path
  @rotate_wait = rotate_wait
  @pe = pe || MemoryPositionEntry.new
  @read_from_head = read_from_head
  @enable_watch_timer = enable_watch_timer
  @enable_stat_watcher = enable_stat_watcher
  @read_lines_limit = read_lines_limit
  @receive_lines = receive_lines
  @update_watcher = update_watcher

  @stat_trigger = @enable_stat_watcher ? StatWatcher.new(self, &method(:on_notify)) : nil
  @timer_trigger = @enable_watch_timer ? TimerTrigger.new(1, log, &method(:on_notify)) : nil

  @rotate_handler = RotateHandler.new(self, &method(:on_rotate))
  @io_handler = nil
  @log = log

  @line_buffer_timer_flusher = line_buffer_timer_flusher
  @from_encoding = from_encoding
  @encoding = encoding
  @open_on_every_update = open_on_every_update
end

Public Instance Methods

attach() { |self| ... } click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 520
def attach
  on_notify
  yield self
end
close() click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 530
def close
  if @io_handler
    @io_handler.close
    @io_handler = nil
  end
end
detach() { |self| ... } click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 525
def detach
  yield self
  @io_handler.on_notify if @io_handler
end
on_notify() click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 537
def on_notify
  begin
    stat = Fluent::FileWrapper.stat(@path)
  rescue Errno::ENOENT
    # moved or deleted
    stat = nil
  end

  @rotate_handler.on_notify(stat) if @rotate_handler
  @line_buffer_timer_flusher.on_notify(self) if @line_buffer_timer_flusher
  @io_handler.on_notify if @io_handler
end
on_rotate(stat) click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 550
def on_rotate(stat)
  if @io_handler.nil?
    if stat
      # first time
      fsize = stat.size
      inode = stat.ino

      last_inode = @pe.read_inode
      if inode == last_inode
        # rotated file has the same inode number with the last file.
        # assuming following situation:
        #   a) file was once renamed and backed, or
        #   b) symlink or hardlink to the same file is recreated
        # in either case of a and b, seek to the saved position
        #   c) file was once renamed, truncated and then backed
        # in this case, consider it truncated
        @pe.update(inode, 0) if fsize < @pe.read_pos
      elsif last_inode != 0
        # this is FilePositionEntry and fluentd once started.
        # read data from the head of the rotated file.
        # logs never duplicate because this file is a rotated new file.
        @pe.update(inode, 0)
      else
        # this is MemoryPositionEntry or this is the first time fluentd started.
        # seek to the end of the any files.
        # logs may duplicate without this seek because it's not sure the file is
        # existent file or rotated new file.
        pos = @read_from_head ? 0 : fsize
        @pe.update(inode, pos)
      end
      @io_handler = IOHandler.new(self, &method(:wrap_receive_lines))
    else
      @io_handler = NullIOHandler.new
    end
  else
    watcher_needs_update = false

    if stat
      inode = stat.ino
      if inode == @pe.read_inode # truncated
        @pe.update_pos(0)
        @io_handler.close
      elsif !@io_handler.opened? # There is no previous file. Reuse TailWatcher
        @pe.update(inode, 0)
      else # file is rotated and new file found
        watcher_needs_update = true
        # Handle the old log file before renewing TailWatcher [fluentd#1055]
        @io_handler.on_notify
      end
    else # file is rotated and new file not found
      # Clear RotateHandler to avoid duplicated file watch in same path.
      @rotate_handler = nil
      watcher_needs_update = true
    end

    log_msg = "detected rotation of #{@path}"
    log_msg << "; waiting #{@rotate_wait} seconds" if watcher_needs_update # wait rotate_time if previous file exists
    @log.info log_msg

    if watcher_needs_update
      @update_watcher.call(@path, swap_state(@pe))
    else
      @io_handler = IOHandler.new(self, &method(:wrap_receive_lines))
    end
  end
end
swap_state(pe) click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 617
def swap_state(pe)
  # Use MemoryPositionEntry for rotated file temporary
  mpe = MemoryPositionEntry.new
  mpe.update(pe.read_inode, pe.read_pos)
  @pe = mpe
  pe # This pe will be updated in on_rotate after TailWatcher is initialized
end
tag() click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 512
def tag
  @parsed_tag ||= @path.tr('/', '.').gsub(/\.+/, '.').gsub(/^\./, '')
end
wrap_receive_lines(lines) click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 516
def wrap_receive_lines(lines)
  @receive_lines.call(lines, self)
end