I
I
ineveraskdfrths2021-08-03 15:26:29
Python
ineveraskdfrths, 2021-08-03 15:26:29

How to make aio-pika not wait for process execution?

I took an RPC implementation example from the aio-pika documentation, everything would be fine, but it is not applicable to long tasks, the function that transfers the task for execution is blocking and waits until the transferred task is completed, so the program cannot send the task from the queue to the next worker, but only sends upon completion of the previous one.

How can I make the submit task function not block its loop?

Worker code:

async def receive_task_from_queue():
    connection = await connect_robust(
        "amqp://guest:[email protected]/",
        client_properties={"connection_name": "callee"},
    )
    channel = await connection.channel()
    await channel.set_qos(prefetch_count=1)
    rpc = await RPC.create(channel)

    await rpc.register("main", main, auto_delete=True)
    return connection

Producer code:
async def run_rabbitmq_queue():
    connection = await connect_robust(
        "amqp://guest:[email protected]/",
        client_properties={"connection_name": "caller"},
    )
    async with connection:
        channel = await connection.channel()
        rpc = await RPC.create(channel)
        await rpc.proxy.main()

Answer the question

In order to leave comments, you need to log in

1 answer(s)
I
ineveraskdfrths, 2021-08-03
@ineveraskdfrths

Boys, don't do bullshit in short, use workers in this case Worker
code

async def worker(*, user_id):
    print('start')
    await asyncio.sleep(10)
    print(user_id)


async def main():
    connection = await connect_robust("amqp://guest:[email protected]/")

    # Creating channel
    channel = await connection.channel()
    await channel.set_qos(prefetch_count=1)

    master = Master(channel)
    await master.create_worker("my_task_name", worker, auto_delete=True)

    return connection

Producer Code
async def run_rabbitmq_queue(user_id):
    connection = await connect_robust("amqp://guest:[email protected]/")

    async with connection:
        # Creating channel
        channel = await connection.channel()

        master = Master(channel)

        # Creates tasks by proxy object

        await master.proxy.my_task_name(user_id=user_id)

        # Or using create_task method
        await master.create_task(
            "my_task_name", kwargs=dict(user_id=user_id)
        )
        print('!')

Didn't find what you were looking for?

Ask your question

Ask a Question

731 491 924 answers to any question