Answer the question
In order to leave comments, you need to log in
Why are messages from the queue being processed without waiting for them to complete, despite await?
Built a TypeScript microservice application to learn Node.js. This is an AMQP queue message handler.
I'm trying to figure out why messages continue to be fetched and processed despite await. I expect the current one to be processed first before the next message is retrieved. Instead, I see that messages continue to be retrieved. Is there something wrong in the Consume function?
export type OnMessageCallback = (msg: unknown) => Promise<void>;
///...
async Consume(queue: string, callback: OnMessageCallback): Promise<unknown> {
logger.log('info', 'DeclareQueue: ', queue);
const queueOk = await this.channel.assertQueue(queue, { durable: false });
if (!queueOk) {
return Promise.reject();
}
return this.channel
.consume(
queue,
async (msg: lib.ConsumeMessage) => {
if (msg === null) {
return;
}
const body = msg.content.toString();
logger.log('info', ` [x] Received ${body}`);
try {
await callback(body);
} catch (error) {
logger.log('error', error);
return this.channel.nack(msg);
}
return this.channel.ack(msg);
},
{ noAck: false },
)
.then(() => {
logger.log('info', ' [*] Waiting for messages. To exit press CTRL+C');
});
}
///...
// в главной функции
await mq.Consume('foo', async (body: string) => {
// const obj = JSON.parse(body);
// const event = obj as FooMessage;
logger.log('info', 'Processing body:', body);
await sleep(1000);
logger.log('info', 'End processing body:', body);
return Promise.resolve();
});
Код проектаasync Consume(queue: string, callback: OnMessageCallback): Promise<unknown> {
logger.log('info', 'DeclareQueue: ', queue);
const queueOk = await this.channel.assertQueue(queue, { durable: false });
if (!queueOk) {
return Promise.reject();
}
logger.log('info', ' [*] Waiting for messages. To exit press CTRL+C');
return this.channel.consume(
queue,
async (msg: lib.ConsumeMessage) => {
if (msg === null) {
return;
}
const body = msg.content.toString();
logger.log('info', ` [x] Received ${body}`);
try {
await callback(body);
} catch (error) {
logger.log('error', error);
return this.channel.nack(msg);
}
return this.channel.ack(msg);
},
{ noAck: false },
);
}
Answer the question
In order to leave comments, you need to log in
After creating a channel, you must specify
await this.channel.prefetch(1); // или другое кол-во, которое хотим обрабатывать одновременно.
Didn't find what you were looking for?
Ask your questionAsk a Question
731 491 924 answers to any question