class Fluent::Plugin::TailInput::TailWatcher
Attributes
ino[R]
line_buffer_timer_flusher[R]
path[R]
pe[R]
unwatched[RW]
watchers[R]
Public Class Methods
new(target_info, pe, log, read_from_head, follow_inodes, update_watcher, line_buffer_timer_flusher, io_handler_build, metrics)
click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 757 def initialize(target_info, pe, log, read_from_head, follow_inodes, update_watcher, line_buffer_timer_flusher, io_handler_build, metrics) @path = target_info.path @ino = target_info.ino @pe = pe || MemoryPositionEntry.new @read_from_head = read_from_head @follow_inodes = follow_inodes @update_watcher = update_watcher @log = log @rotate_handler = RotateHandler.new(log, &method(:on_rotate)) @line_buffer_timer_flusher = line_buffer_timer_flusher @io_handler = nil @io_handler_build = io_handler_build @metrics = metrics @watchers = [] end
Public Instance Methods
close()
click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 795 def close if @io_handler @io_handler.close @io_handler = nil end end
detach(shutdown_start_time = nil)
click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 787 def detach(shutdown_start_time = nil) if @io_handler @io_handler.ready_to_shutdown(shutdown_start_time) @io_handler.on_notify end @line_buffer_timer_flusher&.close(self) end
eof?()
click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 802 def eof? @io_handler.nil? || @io_handler.eof? end
io_handler()
click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 900 def io_handler @io_handler_build.call(self, @path) end
on_notify()
click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 806 def on_notify begin stat = Fluent::FileWrapper.stat(@path) rescue Errno::ENOENT, Errno::EACCES # moved or deleted stat = nil end @rotate_handler.on_notify(stat) if @rotate_handler @line_buffer_timer_flusher.on_notify(self) if @line_buffer_timer_flusher @io_handler.on_notify if @io_handler end
on_rotate(stat)
click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 819 def on_rotate(stat) if @io_handler.nil? if stat # first time fsize = stat.size inode = stat.ino last_inode = @pe.read_inode if inode == last_inode # rotated file has the same inode number with the last file. # assuming following situation: # a) file was once renamed and backed, or # b) symlink or hardlink to the same file is recreated # in either case of a and b, seek to the saved position # c) file was once renamed, truncated and then backed # in this case, consider it truncated @pe.update(inode, 0) if fsize < @pe.read_pos elsif last_inode != 0 # this is FilePositionEntry and fluentd once started. # read data from the head of the rotated file. # logs never duplicate because this file is a rotated new file. @pe.update(inode, 0) else # this is MemoryPositionEntry or this is the first time fluentd started. # seek to the end of the any files. # logs may duplicate without this seek because it's not sure the file is # existent file or rotated new file. pos = @read_from_head ? 0 : fsize @pe.update(inode, pos) end @io_handler = io_handler else @io_handler = NullIOHandler.new end else watcher_needs_update = false if stat inode = stat.ino if inode == @pe.read_inode # truncated @pe.update_pos(0) @io_handler.close elsif !@io_handler.opened? # There is no previous file. Reuse TailWatcher @pe.update(inode, 0) else # file is rotated and new file found watcher_needs_update = true # Handle the old log file before renewing TailWatcher [fluentd#1055] @io_handler.on_notify end else # file is rotated and new file not found # Clear RotateHandler to avoid duplicated file watch in same path. @rotate_handler = nil watcher_needs_update = true end if watcher_needs_update if @follow_inodes # No need to update a watcher if stat is nil (file not present), because moving to inodes will create # new watcher, and old watcher will be closed by stop_watcher in refresh_watchers method # don't want to swap state because we need latest read offset in pos file even after rotate_wait if stat target_info = TargetInfo.new(@path, stat.ino) @update_watcher.call(target_info, @pe) end else # Permit to handle if stat is nil (file not present). # If a file is mv-ed and a new file is created during # calling `#refresh_watchers`s, and `#refresh_watchers` won't run `#start_watchers` # and `#stop_watchers()` for the path because `target_paths_hash` # always contains the path. target_info = TargetInfo.new(@path, stat ? stat.ino : nil) @update_watcher.call(target_info, swap_state(@pe)) end else @log.info "detected rotation of #{@path}" @io_handler = io_handler end @metrics.rotated.inc end end
register_watcher(watcher)
click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 783 def register_watcher(watcher) @watchers << watcher end
swap_state(pe)
click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 904 def swap_state(pe) # Use MemoryPositionEntry for rotated file temporary mpe = MemoryPositionEntry.new mpe.update(pe.read_inode, pe.read_pos) @pe = mpe pe # This pe will be updated in on_rotate after TailWatcher is initialized end
tag()
click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 779 def tag @parsed_tag ||= @path.tr('/', '.').gsub(/\.+/, '.').gsub(/^\./, '') end