Answer the question
In order to leave comments, you need to log in
What is the best way to organize work with queues on Bull, how to track the end of processing the current queue?
Hello. I'm trying to master queues in node.js. Got the Bull.
I run a separate worker. 3 processes can work per unit of time.
const Queue = require('bull');
const tasksQueue = new Queue('my_queue', {redis, defaultJobOptions, settings, limiter});
tasksQueue.process('my_process', 3, `${__dirname}/processor.js`);
tasksQueue.on('active', function (job, result) {
console.log(`Start ${job.data.interval}`);
}
);
tasksQueue.on('completed', function (job, result) {
console.log(`Job ${job.data.interval} completed! Result: ${result}`);
job.remove();
tasksQueue.getJobCounts().then((count) => {
if (!count.waiting && !count.active)
console.log('ЭНТО КОНЕЦ!'');
});
}
);
module.exports = function (job, done) {
return new Promise((resolve, reject) => {
setTimeout(function () {
resolve(job.data.interval);
}, job.data.interval);
}).then(response => done(null, response));
}
const tasksQueue = new Queue('my_queue', {redis, defaultJobOptions, settings, limiter});
return Promise.all(
[tasksQueue.add('my_process', {interval: 7001}),
tasksQueue.add('my_process', {interval: 7001}),
tasksQueue.add('my_process', {interval: 7001}),
tasksQueue.add('my_process', {interval: 7001}),
tasksQueue.add('my_process', {interval: 7001})]
).then(() => {
process.exit();
});
Start 7001
Start 7001
Start 7001
Job 7001 completed! Result: 7001
Job 7001 completed! Result: 7001
Job 7001 completed! Result: 7001
Start 7001
Start 7001
Job 7001 completed! Result: 7001
Job 7001 completed! Result: 7001
'ЭНТО КОНЕЦ!'
'ЭНТО КОНЕЦ!'
Answer the question
In order to leave comments, you need to log in
1) here it works:
let resultsReturned = false;
let finishJobs = function (tasksQueue) {
tasksQueue.getJobCounts().then((count) => {
if (!count.waiting && !count.active) {
if (!resultsReturned) {
resultsReturned = true;
results.forEach(i => console.table(i));
setTimeout(() => {
resultsReturned = false;
results = [];
}, 500);
}
}
});
};
tasksQueue.on('completed', function (job, result) {
results.push(result);
job.remove();
finishJobs(tasksQueue);
}
);
Didn't find what you were looking for?
Ask your questionAsk a Question
731 491 924 answers to any question