class Bunny::ConsumerWorkPool
Thread pool that dispatches consumer deliveries. Not supposed to be shared between channels or threads.
Every channel its own consumer pool.
@private
Attributes
abort_on_exception[R]
size[R]
threads[R]
API
Public Class Methods
new(size = 1, abort_on_exception = false, shutdown_timeout = 60)
click to toggle source
# File lib/bunny/consumer_work_pool.rb, line 20 def initialize(size = 1, abort_on_exception = false, shutdown_timeout = 60) @size = size @abort_on_exception = abort_on_exception @shutdown_timeout = shutdown_timeout @shutdown_mutex = ::Mutex.new @shutdown_conditional = ::ConditionVariable.new @queue = ::Queue.new @paused = false @running = false end
Public Instance Methods
backlog()
click to toggle source
# File lib/bunny/consumer_work_pool.rb, line 52 def backlog @queue.length end
busy?()
click to toggle source
# File lib/bunny/consumer_work_pool.rb, line 56 def busy? !@queue.empty? end
join(timeout = nil)
click to toggle source
# File lib/bunny/consumer_work_pool.rb, line 77 def join(timeout = nil) (@threads || []).each { |t| t.join(timeout) } end
kill()
click to toggle source
# File lib/bunny/consumer_work_pool.rb, line 93 def kill @running = false (@threads || []).each { |t| t.kill } end
pause()
click to toggle source
# File lib/bunny/consumer_work_pool.rb, line 81 def pause @running = false @paused = true end
resume()
click to toggle source
# File lib/bunny/consumer_work_pool.rb, line 86 def resume @running = true @paused = false @threads.each { |t| t.run } end
running?()
click to toggle source
# File lib/bunny/consumer_work_pool.rb, line 48 def running? @running end
shutdown(wait_for_workers = false)
click to toggle source
# File lib/bunny/consumer_work_pool.rb, line 60 def shutdown(wait_for_workers = false) was_running = running? @running = false @size.times do submit do |*args| throw :terminate end end return if !(wait_for_workers && @shutdown_timeout && was_running) @shutdown_mutex.synchronize do @shutdown_conditional.wait(@shutdown_mutex, @shutdown_timeout) end end
start()
click to toggle source
# File lib/bunny/consumer_work_pool.rb, line 36 def start @threads = [] @size.times do t = Thread.new(&method(:run_loop)) t.abort_on_exception = true if abort_on_exception @threads << t end @running = true end
submit(callable = nil, &block)
click to toggle source
# File lib/bunny/consumer_work_pool.rb, line 32 def submit(callable = nil, &block) @queue.push(callable || block) end
Protected Instance Methods
run_loop()
click to toggle source
# File lib/bunny/consumer_work_pool.rb, line 101 def run_loop catch(:terminate) do loop do Thread.stop if @paused callable = @queue.pop begin callable.call rescue ::StandardError => e # TODO: use connection logger $stderr.puts e.class.name $stderr.puts e.message end end end @shutdown_mutex.synchronize do @shutdown_conditional.signal unless busy? end end