class Fluent::StaticConfigAnalysis

Static Analysis means analysing all plugins and Fluent::Element without invokeing Plugin#configure

Constants

Result

Public Class Methods

call(conf, workers: 1) click to toggle source

@param workers [Integer] Number of workers @return [Fluent::StaticConfigAnalysis::Result]

# File lib/fluent/static_config_analysis.rb, line 39
def self.call(conf, workers: 1)
  new(workers).call(conf)
end
new(workers) click to toggle source
# File lib/fluent/static_config_analysis.rb, line 43
def initialize(workers)
  @workers = workers

  reset
end

Public Instance Methods

call(config) click to toggle source
# File lib/fluent/static_config_analysis.rb, line 49
def call(config)
  reset

  tree = [
    static_worker_analyse(config),
    static_label_analyse(config),
    static_filter_and_output_analyse(config),
    static_input_analyse(config),
  ].flatten

  Result.new(tree, @outputs, @inputs, @filters, @labels.values)
end

Private Instance Methods

parse_worker_id(conf) click to toggle source
# File lib/fluent/static_config_analysis.rb, line 102
def parse_worker_id(conf)
  worker_id_str = conf.arg

  if worker_id_str.empty?
    raise Fluent::ConfigError, 'Missing worker id on <worker> directive'
  end

  l, r =
     begin
       worker_id_str.split('-', 2).map { |v| Integer(v) }
     rescue TypeError, ArgumentError
       raise Fluent::ConfigError, "worker id should be integer: #{worker_id_str}"
     end

  if l < 0 || l >= @workers
    raise Fluent::ConfigError, "worker id #{l} specified by <worker> directive is not allowed. Available worker id is between 0 and #{@workers-1}"
  end

  # e.g. specified one worker id like `<worker 0>`
  if r.nil?
    return [l]
  end

  if r < 0 || r >= @workers
    raise Fluent::ConfigError, "worker id #{r} specified by <worker> directive is not allowed. Available worker id is between 0 and #{@workers-1}"
  end

  if l > r
    raise Fluent::ConfigError, "greater first_worker_id<#{l}> than last_worker_id<#{r}> specified by <worker> directive is not allowed. Available multi worker assign syntax is <smaller_worker_id>-<greater_worker_id>"
  end

  [l, r]
end
reset() click to toggle source
# File lib/fluent/static_config_analysis.rb, line 64
def reset
  @outputs = []
  @inputs = []
  @filters = []
  @labels = {}
end
static_filter_and_output_analyse(conf) click to toggle source
# File lib/fluent/static_config_analysis.rb, line 156
def static_filter_and_output_analyse(conf)
  ret = []
  conf.elements('filter', 'match').each do |e|
    type = e['@type']
    if type.nil? || type.empty?
      raise Fluent::ConfigError, "Missing '@type' parameter on <#{e.name}> directive"
    end

    if e.name == 'filter'
      f = Elem::Filter.new(Fluent::Plugin.new_filter(type), e)
      ret << f
      @filters << f
    else
      o = Elem::Output.new(Fluent::Plugin.new_output(type), e)
      ret << o
      @outputs << o
    end
  end

  ret
end
static_input_analyse(conf) click to toggle source
# File lib/fluent/static_config_analysis.rb, line 178
def static_input_analyse(conf)
  ret = []
  conf.elements(name: 'source').each do |e|
    type = e['@type']
    if type.nil? || type.empty?
      raise Fluent::ConfigError, "Missing '@type' parameter on <#{e.name}> directive"
    end

    i = Elem::Input.new(Fluent::Plugin.new_input(type), e)
    @inputs << i
    ret << i
  end

  ret
end
static_label_analyse(conf) click to toggle source
# File lib/fluent/static_config_analysis.rb, line 136
def static_label_analyse(conf)
  ret = []
  conf.elements(name: 'label').each do |e|
    name = e.arg
    if name.empty?
      raise ConfigError, 'Missing symbol argument on <label> directive'
    end

    if @labels[name]
      raise ConfigError, "Section <label #{name}> appears twice"
    end

    l = Elem::Label.new(name, e, static_filter_and_output_analyse(e))
    ret << l
    @labels[name] = l
  end

  ret
end
static_worker_analyse(conf) click to toggle source
# File lib/fluent/static_config_analysis.rb, line 71
def static_worker_analyse(conf)
  available_worker_ids = [*0...@workers]

  ret = []
  conf.elements(name: 'worker').each do |config|
    ids = parse_worker_id(config)
    ids.each do |id|
      if available_worker_ids.include?(id)
        available_worker_ids.delete(id)
      else
        raise Fluent::ConfigError, "specified worker_id<#{id}> collisions is detected on <worker> directive. Available worker id(s): #{available_worker_ids}"
      end
    end

    config.elements.each do |elem|
      unless %w[source match filter label].include?(elem.name)
        raise Fluent::ConfigError, "<worker> section cannot have <#{elem.name}> directive"
      end
    end

    nodes = [
      static_label_analyse(config),
      static_filter_and_output_analyse(config),
      static_input_analyse(config),
    ].flatten
    ret << Elem::Worker.new(ids, config, nodes)
  end

  ret
end