R
R
Roman Mirilaczvili2021-03-27 15:14:55
ruby
Roman Mirilaczvili, 2021-03-27 15:14:55

How to prevent deadlock using Async?

I wrote a simple producer-consumer demo using Thread(s) and Queue and it works.
Then I rewrote it with Async , however it doesn't work due to deadlock. Tried using Async::Reactor instead of while loop and that didn't help either.

I, [2021-03-27T14:36:31.695540 #30532]  INFO -- : Consumer
Traceback (most recent call last):
  2: from /home/user/Apps/rbenv/versions/2.5.5/lib/ruby/gems/2.5.0/gems/async-1.28.9/lib/async/task.rb:265:in `block in make_fiber'
  1: from /home/user/Devel/Ruby/async_queue_test.rb:25:in `block (4 levels) in run'
/home/user/Devel/Ruby/async_queue_test.rb:25:in `pop': No live threads left. Deadlock? (fatal)
1 threads, 1 sleeps current:0x0000555f03693840 main thread:0x0000555f03693840
* #<Thread:0x0000555f036f4b60 sleep_forever>
   rb_thread_t:0x0000555f03693840 native:0x00007f9440267740 int:0
   /home/user/Devel/Ruby/async_queue_test.rb:25:in `pop'
   /home/user/Devel/Ruby/async_queue_test.rb:25:in `block (4 levels) in run'
   /home/user/Apps/rbenv/versions/2.5.5/lib/ruby/gems/2.5.0/gems/async-1.28.9/lib/async/task.rb:265:in `block in make_fiber'


code with Thread - working
require 'logger'

def has_jobs(item)
    !item.nil?
end

def run(log)
    delay = 0.2
    times = 3

    q = Queue.new

    # consumer
    t1 = Thread.new do
        log.info('Consumer')
        while (has_jobs(job = q.deq))
            log.info("consume #{job}")
        end
        log.info('Consumer exited')
    end

    # producer
    t2 = Thread.new do
        log.info('Producer')
        (1..times).each do |i|
            log.info("produce #{i}")
            q.enq(i)
            sleep(delay)
        end
        q.enq(nil)
        q.close
        log.info('Producer exited')
    end

    # Ensure we wait for all tasks to complete before continuing:
    [t1, t2].each(&:join)
end

log = Logger.new(STDOUT)

t = Time.now
run(log)
puts(Time.now-t)

log.info('Done')

code with async is not working
require 'async'
require 'async/barrier'
require 'async/semaphore'
require 'logger'

def has_jobs(item)
    !item.nil?
end

def run(log)
    delay = 0.2
    times = 3

    q = Queue.new

    Async do
        barrier = Async::Barrier.new
        semaphore = Async::Semaphore.new(2, parent: barrier)

        # consumer
        semaphore.async do
            Async do |task|
                log.info('Consumer')
                while (has_jobs(job = q.deq))
                    log.info("consume #{job}")
                end
                # Async::Reactor.run do
                #     job = q.deq
                #     if has_jobs(job)
                #         log.info("consume #{job}")
                #     end
                # end
                log.info('Consumer exited')
            end
        end

        # producer
        semaphore.async do
            Async do |task|
                log.info('Producer')
                (1..times).each do |i|
                    log.info("produce #{i}")
                    q.enq(i)
                    task.sleep(delay)
                end
                q.enq(nil)
                q.close
                log.info('Producer exited')
            end
        end
    
        # Ensure we wait for all tasks to complete before continuing:
        barrier.wait
    end
end

log = Logger.new(STDOUT)

t = Time.now
run(log)
puts(Time.now-t)

log.info('Done')

How to fix, do not tell me?

Added
The issue does not appear to be reproducible for Ruby 3.0

Answer the question

In order to leave comments, you need to log in

1 answer(s)
R
Roman Mirilaczvili, 2021-03-30
@2ord

To quote the author of gem async:

Ruby 3.0 exposes hooks for blocking operations and Async can handle them.
If you want to write something which is based on async, use Async::Queue. Otherwise, if you want to use thread primtives, use them. But if you mix it, and expect concurrency, you better use Ruby 3.
In other words, if you use thread primitives with async on Ruby 2, you will block the reactor.

Didn't find what you were looking for?

Ask your question

Ask a Question

731 491 924 answers to any question