class Fluent::Plugin::TailInput
Constants
- FILE_PERMISSION
Attributes
Public Class Methods
# File lib/fluent/plugin/in_tail.rb, line 49 def initialize super @paths = [] @tails = {} @pf_file = nil @pf = nil @ignore_list = [] end
Public Instance Methods
# File lib/fluent/plugin/in_tail.rb, line 207 def close super # close file handles after all threads stopped (in #close of thread plugin helper) close_watcher_handles end
# File lib/fluent/plugin/in_tail.rb, line 324 def close_watcher_handles @tails.keys.each do |path| tw = @tails.delete(path) if tw tw.close end end end
# File lib/fluent/plugin/in_tail.rb, line 104 def configure(conf) compat_parameters_convert(conf, :parser) parser_config = conf.elements('parse').first unless parser_config raise Fluent::ConfigError, "<parse> section is required." end unless parser_config["@type"] raise Fluent::ConfigError, "parse/@type is required." end (1..Fluent::Plugin::MultilineParser::FORMAT_MAX_NUM).each do |n| parser_config["format#{n}"] = conf["format#{n}"] if conf["format#{n}"] end super if !@enable_watch_timer && !@enable_stat_watcher raise Fluent::ConfigError, "either of enable_watch_timer or enable_stat_watcher must be true" end @paths = @path.split(',').map {|path| path.strip } if @paths.empty? raise Fluent::ConfigError, "tail: 'path' parameter is required on tail input" end # TODO: Use plugin_root_dir and storage plugin to store positions if available if @pos_file if @@pos_file_paths.has_key?(@pos_file) && !called_in_test? plugin_id_using_this_path = @@pos_file_paths[@pos_file] raise Fluent::ConfigError, "Other 'in_tail' plugin already use same pos_file path: plugin_id = #{plugin_id_using_this_path}, pos_file path = #{@pos_file}" end @@pos_file_paths[@pos_file] = self.plugin_id else $log.warn "'pos_file PATH' parameter is not set to a 'tail' source." $log.warn "this parameter is highly recommended to save the position to resume tailing." end configure_tag configure_encoding @multiline_mode = parser_config["@type"] =~ /multiline/ @receive_handler = if @multiline_mode method(:parse_multilines) else method(:parse_singleline) end @file_perm = system_config.file_permission || FILE_PERMISSION @parser = parser_create(conf: parser_config) end
# File lib/fluent/plugin/in_tail.rb, line 165 def configure_encoding unless @encoding if @from_encoding raise Fluent::ConfigError, "tail: 'from_encoding' parameter must be specified with 'encoding' parameter." end end @encoding = parse_encoding_param(@encoding) if @encoding @from_encoding = parse_encoding_param(@from_encoding) if @from_encoding end
# File lib/fluent/plugin/in_tail.rb, line 154 def configure_tag if @tag.index('*') @tag_prefix, @tag_suffix = @tag.split('*') @tag_prefix ||= '' @tag_suffix ||= '' else @tag_prefix = nil @tag_suffix = nil end end
# File lib/fluent/plugin/in_tail.rb, line 411 def convert_line_to_event(line, es, tail_watcher) begin line.chomp! # remove \n @parser.parse(line) { |time, record| if time && record record[@path_key] ||= tail_watcher.path unless @path_key.nil? es.add(time, record) else if @emit_unmatched_lines record = {'unmatched_line' => line} record[@path_key] ||= tail_watcher.path unless @path_key.nil? es.add(Fluent::EventTime.now, record) end log.warn "pattern not match: #{line.inspect}" end } rescue => e log.warn line.dump, error: e.to_s log.debug_backtrace(e.backtrace) end end
Fluent::Plugin::TailInput::TailWatcher#close is called by another thread at shutdown phase. It causes 'can't modify string; temporarily locked' error in IOHandler so adding close_io argument to avoid this problem. At shutdown, IOHandler's io will be released automatically after detached the event loop
# File lib/fluent/plugin/in_tail.rb, line 350 def detach_watcher(tw, close_io = true) tw.detach { |watcher| event_loop_detach(watcher.timer_trigger) if watcher.timer_trigger event_loop_detach(watcher.stat_trigger) if watcher.stat_trigger } tw.close if close_io flush_buffer(tw) if tw.unwatched && @pf @pf[tw.path].update_pos(PositionFile::UNWATCHED_POSITION) end end
# File lib/fluent/plugin/in_tail.rb, line 362 def detach_watcher_after_rotate_wait(tw) # Call event_loop_attach/event_loop_detach is high-cost for short-live object. # If this has a problem with large number of files, use @_event_loop directly instead of timer_execute. timer_execute(:in_tail_close_watcher, @rotate_wait, repeat: false) do detach_watcher(tw) end end
# File lib/fluent/plugin/in_tail.rb, line 213 def expand_paths date = Time.now paths = [] @paths.each { |path| path = date.strftime(path) if path.include?('*') paths += Dir.glob(path).select { |p| begin is_file = !File.directory?(p) if File.readable?(p) && is_file if @limit_recently_modified && File.mtime(p) < (date - @limit_recently_modified) false else true end else if is_file unless @ignore_list.include?(path) log.warn "#{p} unreadable. It is excluded and would be examined next time." @ignore_list << path if @ignore_repeated_permission_error end end false end rescue Errno::ENOENT false end } else # When file is not created yet, Dir.glob returns an empty array. So just add when path is static. paths << path end } excluded = @exclude_path.map { |path| path = date.strftime(path); path.include?('*') ? Dir.glob(path) : path }.flatten.uniq paths - excluded end
# File lib/fluent/plugin/in_tail.rb, line 370 def flush_buffer(tw) if lb = tw.line_buffer lb.chomp! @parser.parse(lb) { |time, record| if time && record tag = if @tag_prefix || @tag_suffix @tag_prefix + tw.tag + @tag_suffix else @tag end record[@path_key] ||= tw.path unless @path_key.nil? router.emit(tag, time, record) else log.warn "got incomplete line at shutdown from #{tw.path}: #{lb.inspect}" end } end end
# File lib/fluent/plugin/in_tail.rb, line 176 def parse_encoding_param(encoding_name) begin Encoding.find(encoding_name) if encoding_name rescue ArgumentError => e raise Fluent::ConfigError, e.message end end
# File lib/fluent/plugin/in_tail.rb, line 441 def parse_multilines(lines, tail_watcher) lb = tail_watcher.line_buffer es = Fluent::MultiEventStream.new if @parser.has_firstline? tail_watcher.line_buffer_timer_flusher.reset_timer if tail_watcher.line_buffer_timer_flusher lines.each { |line| if @parser.firstline?(line) if lb convert_line_to_event(lb, es, tail_watcher) end lb = line else if lb.nil? if @emit_unmatched_lines convert_line_to_event(line, es, tail_watcher) end log.warn "got incomplete line before first line from #{tail_watcher.path}: #{line.inspect}" else lb << line end end } else lb ||= '' lines.each do |line| lb << line @parser.parse(lb) { |time, record| if time && record convert_line_to_event(lb, es, tail_watcher) lb = '' end } end end tail_watcher.line_buffer = lb es end
# File lib/fluent/plugin/in_tail.rb, line 433 def parse_singleline(lines, tail_watcher) es = Fluent::MultiEventStream.new lines.each { |line| convert_line_to_event(line, es, tail_watcher) } es end
@return true if no error or unrecoverable error happens in emit action. false if got BufferOverflowError
# File lib/fluent/plugin/in_tail.rb, line 390 def receive_lines(lines, tail_watcher) es = @receive_handler.call(lines, tail_watcher) unless es.empty? tag = if @tag_prefix || @tag_suffix @tag_prefix + tail_watcher.tag + @tag_suffix else @tag end begin router.emit_stream(tag, es) rescue Fluent::Plugin::Buffer::BufferOverflowError return false rescue # ignore non BufferQueueLimitError errors because in_tail can't recover. Engine shows logs and backtraces. return true end end return true end
in_tail with '*' path doesn't check rotation file equality at refresh phase. So you should not use '*' path when your logs will be rotated by another tool. It will cause log duplication after updated watch files. In such case, you should separate log directory and specify two paths in path parameter. e.g. path /path/to/dir/*,/path/to/rotated_logs/target_file
# File lib/fluent/plugin/in_tail.rb, line 256 def refresh_watchers target_paths = expand_paths existence_paths = @tails.keys unwatched = existence_paths - target_paths added = target_paths - existence_paths stop_watchers(unwatched, immediate: false, unwatched: true) unless unwatched.empty? start_watchers(added) unless added.empty? end
# File lib/fluent/plugin/in_tail.rb, line 267 def setup_watcher(path, pe) line_buffer_timer_flusher = (@multiline_mode && @multiline_flush_interval) ? TailWatcher::LineBufferTimerFlusher.new(log, @multiline_flush_interval, &method(:flush_buffer)) : nil tw = TailWatcher.new(path, @rotate_wait, pe, log, @read_from_head, @enable_watch_timer, @enable_stat_watcher, @read_lines_limit, method(:update_watcher), line_buffer_timer_flusher, @from_encoding, @encoding, open_on_every_update, &method(:receive_lines)) tw.attach do |watcher| event_loop_attach(watcher.timer_trigger) if watcher.timer_trigger event_loop_attach(watcher.stat_trigger) if watcher.stat_trigger end tw rescue => e if tw tw.detach { |watcher| event_loop_detach(watcher.timer_trigger) if watcher.timer_trigger event_loop_detach(watcher.stat_trigger) if watcher.stat_trigger } tw.close end raise e end
# File lib/fluent/plugin/in_tail.rb, line 199 def shutdown # during shutdown phase, don't close io. It should be done in close after all threads are stopped. See close. stop_watchers(@tails.keys, immediate: true, remove_watcher: false) @pf_file.close if @pf_file super end
# File lib/fluent/plugin/in_tail.rb, line 184 def start super if @pos_file pos_file_dir = File.dirname(@pos_file) FileUtils.mkdir_p(pos_file_dir) unless Dir.exist?(pos_file_dir) @pf_file = File.open(@pos_file, File::RDWR|File::CREAT|File::BINARY, @file_perm) @pf_file.sync = true @pf = PositionFile.parse(@pf_file) end refresh_watchers unless @skip_refresh_on_startup timer_execute(:in_tail_refresh_watchers, @refresh_interval, &method(:refresh_watchers)) end
# File lib/fluent/plugin/in_tail.rb, line 286 def start_watchers(paths) paths.each { |path| pe = nil if @pf pe = @pf[path] if @read_from_head && pe.read_inode.zero? begin pe.update(Fluent::FileWrapper.stat(path).ino, 0) rescue Errno::ENOENT $log.warn "#{path} not found. Continuing without tailing it." end end end begin tw = setup_watcher(path, pe) rescue WatcherSetupError => e log.warn "Skip #{path} because unexpected setup error happens: #{e}" next end @tails[path] = tw } end
# File lib/fluent/plugin/in_tail.rb, line 310 def stop_watchers(paths, immediate: false, unwatched: false, remove_watcher: true) paths.each { |path| tw = remove_watcher ? @tails.delete(path) : @tails[path] if tw tw.unwatched = unwatched if immediate detach_watcher(tw, false) else detach_watcher_after_rotate_wait(tw) end end } end
#refresh_watchers calls @tails.keys so we don't use stop_watcher -> start_watcher sequence for safety.
# File lib/fluent/plugin/in_tail.rb, line 334 def update_watcher(path, pe) if @pf unless pe.read_inode == @pf[path].read_inode log.trace "Skip update_watcher because watcher has been already updated by other inotify event" return end end rotated_tw = @tails[path] @tails[path] = setup_watcher(path, pe) detach_watcher_after_rotate_wait(rotated_tw) if rotated_tw end