class Bunny::ConsumerWorkPool

Thread pool that dispatches consumer deliveries. Not supposed to be shared between channels or threads.

Every channel its own consumer pool.

@private

Attributes

abort_on_exception[R]
size[R]
threads[R]

API

Public Class Methods

new(size = 1, abort_on_exception = false, shutdown_timeout = 60) click to toggle source
# File lib/bunny/consumer_work_pool.rb, line 20
def initialize(size = 1, abort_on_exception = false, shutdown_timeout = 60)
  @size  = size
  @abort_on_exception = abort_on_exception
  @shutdown_timeout = shutdown_timeout
  @shutdown_mutex = ::Mutex.new
  @shutdown_conditional = ::ConditionVariable.new
  @queue = ::Queue.new
  @paused = false
  @running = false
end

Public Instance Methods

backlog() click to toggle source
# File lib/bunny/consumer_work_pool.rb, line 52
def backlog
  @queue.length
end
busy?() click to toggle source
# File lib/bunny/consumer_work_pool.rb, line 56
def busy?
  !@queue.empty?
end
join(timeout = nil) click to toggle source
# File lib/bunny/consumer_work_pool.rb, line 77
def join(timeout = nil)
  (@threads || []).each { |t| t.join(timeout) }
end
kill() click to toggle source
# File lib/bunny/consumer_work_pool.rb, line 93
def kill
  @running = false

  (@threads || []).each { |t| t.kill }
end
pause() click to toggle source
# File lib/bunny/consumer_work_pool.rb, line 81
def pause
  @running = false
  @paused = true
end
resume() click to toggle source
# File lib/bunny/consumer_work_pool.rb, line 86
def resume
  @running = true
  @paused = false

  @threads.each { |t| t.run }
end
running?() click to toggle source
# File lib/bunny/consumer_work_pool.rb, line 48
def running?
  @running
end
shutdown(wait_for_workers = false) click to toggle source
# File lib/bunny/consumer_work_pool.rb, line 60
def shutdown(wait_for_workers = false)
  was_running = running?
  @running = false

  @size.times do
    submit do |*args|
      throw :terminate
    end
  end

  return if !(wait_for_workers && @shutdown_timeout && was_running)

  @shutdown_mutex.synchronize do
    @shutdown_conditional.wait(@shutdown_mutex, @shutdown_timeout)
  end
end
start() click to toggle source
# File lib/bunny/consumer_work_pool.rb, line 36
def start
  @threads = []

  @size.times do
    t = Thread.new(&method(:run_loop))
    t.abort_on_exception = true if abort_on_exception
    @threads << t
  end

  @running = true
end
submit(callable = nil, &block) click to toggle source
# File lib/bunny/consumer_work_pool.rb, line 32
def submit(callable = nil, &block)
  @queue.push(callable || block)
end

Protected Instance Methods

run_loop() click to toggle source
# File lib/bunny/consumer_work_pool.rb, line 101
def run_loop
  catch(:terminate) do
    loop do
      Thread.stop if @paused
      callable = @queue.pop

      begin
        callable.call
      rescue ::StandardError => e
        # TODO: use connection logger
        $stderr.puts e.class.name
        $stderr.puts e.message
      end
    end
  end

  @shutdown_mutex.synchronize do
    @shutdown_conditional.signal unless busy?
  end
end