R
R
Roman Mirilaczvili2021-11-13 16:11:55
Node.js
Roman Mirilaczvili, 2021-11-13 16:11:55

Why are messages from the queue being processed without waiting for them to complete, despite await?

Built a TypeScript microservice application to learn Node.js. This is an AMQP queue message handler.
I'm trying to figure out why messages continue to be fetched and processed despite await. I expect the current one to be processed first before the next message is retrieved. Instead, I see that messages continue to be retrieved. Is there something wrong in the Consume function?

The code
export type OnMessageCallback = (msg: unknown) => Promise<void>;

///...

async Consume(queue: string, callback: OnMessageCallback): Promise<unknown> {
    logger.log('info', 'DeclareQueue: ', queue);
    const queueOk = await this.channel.assertQueue(queue, { durable: false });
    if (!queueOk) {
        return Promise.reject();
    }

    return this.channel
        .consume(
            queue,
            async (msg: lib.ConsumeMessage) => {
                if (msg === null) {
                    return;
                }

                const body = msg.content.toString();
                logger.log('info', ` [x] Received ${body}`);

                try {
                    await callback(body);
                } catch (error) {
                    logger.log('error', error);
                    return this.channel.nack(msg);
                }
                return this.channel.ack(msg);
            },
            { noAck: false },
        )
        .then(() => {
            logger.log('info', ' [*] Waiting for messages. To exit press CTRL+C');
        });
}

///...

// в главной функции
await mq.Consume('foo', async (body: string) => {
    // const obj = JSON.parse(body);
    // const event = obj as FooMessage;

    logger.log('info', 'Processing body:', body);
    await sleep(1000);
    logger.log('info', 'End processing body:', body);

    return Promise.resolve();
});
Код проекта


Added later:
Updated the Consume code, although the problem is still the same.
async Consume(queue: string, callback: OnMessageCallback): Promise<unknown> {
    logger.log('info', 'DeclareQueue: ', queue);
    const queueOk = await this.channel.assertQueue(queue, { durable: false });
    if (!queueOk) {
      return Promise.reject();
    }

    logger.log('info', ' [*] Waiting for messages. To exit press CTRL+C');
    return this.channel.consume(
      queue,
      async (msg: lib.ConsumeMessage) => {
        if (msg === null) {
          return;
        }

        const body = msg.content.toString();
        logger.log('info', ` [x] Received ${body}`);

        try {
          await callback(body);
        } catch (error) {
          logger.log('error', error);
          return this.channel.nack(msg);
        }
        return this.channel.ack(msg);
      },
      { noAck: false },
    );
  }

Answer the question

In order to leave comments, you need to log in

1 answer(s)
R
Roman Mirilaczvili, 2021-11-14
@2ord

After creating a channel, you must specify

await this.channel.prefetch(1); // или другое кол-во, которое хотим обрабатывать одновременно.

Didn't find what you were looking for?

Ask your question

Ask a Question

731 491 924 answers to any question