class Fluent::Counter::MutexHash

Public Class Methods

new(data_store) click to toggle source
# File lib/fluent/counter/mutex_hash.rb, line 22
def initialize(data_store)
  @mutex = Mutex.new
  @data_store = data_store
  @mutex_hash = {}
  @thread = nil
  @cleanup_thread = CleanupThread.new(@data_store, @mutex_hash, @mutex)
end

Public Instance Methods

start() click to toggle source
# File lib/fluent/counter/mutex_hash.rb, line 30
def start
  @data_store.start
  @cleanup_thread.start
end
stop() click to toggle source
# File lib/fluent/counter/mutex_hash.rb, line 35
def stop
  @data_store.stop
  @cleanup_thread.stop
end
synchronize(*keys) { |data_store, k| ... } click to toggle source
# File lib/fluent/counter/mutex_hash.rb, line 40
def synchronize(*keys)
  return if keys.empty?

  locks = {}
  loop do
    @mutex.synchronize do
      keys.each do |key|
        mutex = @mutex_hash[key]
        unless mutex
          v = Mutex.new
          @mutex_hash[key] = v
          mutex = v
        end

        if mutex.try_lock
          locks[key] = mutex
        else
          locks.values.each(&:unlock)
          locks = {}          # flush locked keys
          break
        end
      end
    end

    next if locks.empty?      # failed to lock all keys

    locks.each do |(k, v)|
      yield @data_store, k
      v.unlock
    end
    break
  end
end
synchronize_keys(*keys) { |data_store, key| ... } click to toggle source
# File lib/fluent/counter/mutex_hash.rb, line 74
def synchronize_keys(*keys)
  return if keys.empty?
  keys = keys.dup

  while key = keys.shift
    @mutex.lock

    mutex = @mutex_hash[key]
    unless mutex
      v = Mutex.new
      @mutex_hash[key] = v
      mutex = v
    end

    if mutex.try_lock
      @mutex.unlock
      yield @data_store, key
      mutex.unlock
    else
      # release global lock
      @mutex.unlock
      keys.push(key)          # failed lock, retry this key
    end
  end
end