Answer the question
In order to leave comments, you need to log in
How to prevent deadlock using Async?
I wrote a simple producer-consumer demo using Thread(s) and Queue and it works.
Then I rewrote it with Async , however it doesn't work due to deadlock. Tried using Async::Reactor instead of while loop and that didn't help either.
I, [2021-03-27T14:36:31.695540 #30532] INFO -- : Consumer
Traceback (most recent call last):
2: from /home/user/Apps/rbenv/versions/2.5.5/lib/ruby/gems/2.5.0/gems/async-1.28.9/lib/async/task.rb:265:in `block in make_fiber'
1: from /home/user/Devel/Ruby/async_queue_test.rb:25:in `block (4 levels) in run'
/home/user/Devel/Ruby/async_queue_test.rb:25:in `pop': No live threads left. Deadlock? (fatal)
1 threads, 1 sleeps current:0x0000555f03693840 main thread:0x0000555f03693840
* #<Thread:0x0000555f036f4b60 sleep_forever>
rb_thread_t:0x0000555f03693840 native:0x00007f9440267740 int:0
/home/user/Devel/Ruby/async_queue_test.rb:25:in `pop'
/home/user/Devel/Ruby/async_queue_test.rb:25:in `block (4 levels) in run'
/home/user/Apps/rbenv/versions/2.5.5/lib/ruby/gems/2.5.0/gems/async-1.28.9/lib/async/task.rb:265:in `block in make_fiber'
require 'logger'
def has_jobs(item)
!item.nil?
end
def run(log)
delay = 0.2
times = 3
q = Queue.new
# consumer
t1 = Thread.new do
log.info('Consumer')
while (has_jobs(job = q.deq))
log.info("consume #{job}")
end
log.info('Consumer exited')
end
# producer
t2 = Thread.new do
log.info('Producer')
(1..times).each do |i|
log.info("produce #{i}")
q.enq(i)
sleep(delay)
end
q.enq(nil)
q.close
log.info('Producer exited')
end
# Ensure we wait for all tasks to complete before continuing:
[t1, t2].each(&:join)
end
log = Logger.new(STDOUT)
t = Time.now
run(log)
puts(Time.now-t)
log.info('Done')
require 'async'
require 'async/barrier'
require 'async/semaphore'
require 'logger'
def has_jobs(item)
!item.nil?
end
def run(log)
delay = 0.2
times = 3
q = Queue.new
Async do
barrier = Async::Barrier.new
semaphore = Async::Semaphore.new(2, parent: barrier)
# consumer
semaphore.async do
Async do |task|
log.info('Consumer')
while (has_jobs(job = q.deq))
log.info("consume #{job}")
end
# Async::Reactor.run do
# job = q.deq
# if has_jobs(job)
# log.info("consume #{job}")
# end
# end
log.info('Consumer exited')
end
end
# producer
semaphore.async do
Async do |task|
log.info('Producer')
(1..times).each do |i|
log.info("produce #{i}")
q.enq(i)
task.sleep(delay)
end
q.enq(nil)
q.close
log.info('Producer exited')
end
end
# Ensure we wait for all tasks to complete before continuing:
barrier.wait
end
end
log = Logger.new(STDOUT)
t = Time.now
run(log)
puts(Time.now-t)
log.info('Done')
Answer the question
In order to leave comments, you need to log in
To quote the author of gem async:
Ruby 3.0 exposes hooks for blocking operations and Async can handle them.
If you want to write something which is based on async, use Async::Queue. Otherwise, if you want to use thread primtives, use them. But if you mix it, and expect concurrency, you better use Ruby 3.
In other words, if you use thread primitives with async on Ruby 2, you will block the reactor.
Didn't find what you were looking for?
Ask your questionAsk a Question
731 491 924 answers to any question