C
C
coderisimo2020-06-10 14:12:28
Redis
coderisimo, 2020-06-10 14:12:28

What is the best way to organize work with queues on Bull, how to track the end of processing the current queue?

Hello. I'm trying to master queues in node.js. Got the Bull.
I run a separate worker. 3 processes can work per unit of time.

const Queue = require('bull');
const tasksQueue = new Queue('my_queue', {redis, defaultJobOptions, settings, limiter});
tasksQueue.process('my_process', 3, `${__dirname}/processor.js`);
tasksQueue.on('active', function (job, result) {
        console.log(`Start ${job.data.interval}`);
    }
);

tasksQueue.on('completed', function (job, result) {
        console.log(`Job ${job.data.interval} completed! Result: ${result}`);
        job.remove();
        tasksQueue.getJobCounts().then((count) => {
            if (!count.waiting && !count.active)
                console.log('ЭНТО КОНЕЦ!'');
        });
    }
);

processor.js looks like this. there will be a lot of these functions (100 pieces). They will work on assignment in parallel.
module.exports =  function (job, done) {
    return new Promise((resolve, reject) => {
        setTimeout(function () {
            resolve(job.data.interval);
        }, job.data.interval);
    }).then(response => done(null, response));
}


Well, I add new tasks -
const tasksQueue = new Queue('my_queue', {redis, defaultJobOptions, settings, limiter});
return Promise.all(
    [tasksQueue.add('my_process', {interval: 7001}),
        tasksQueue.add('my_process', {interval: 7001}),
        tasksQueue.add('my_process', {interval: 7001}),
        tasksQueue.add('my_process', {interval: 7001}),
        tasksQueue.add('my_process', {interval: 7001})]
).then(() => {
    process.exit();
});

the log looks like this:
Start 7001
Start 7001
Start 7001
Job 7001 completed! Result: 7001
Job 7001 completed! Result: 7001
Job 7001 completed! Result: 7001
Start 7001
Start 7001
Job 7001 completed! Result: 7001
Job 7001 completed! Result: 7001
'ЭНТО КОНЕЦ!'
'ЭНТО КОНЕЦ!'


i.e. I cannot unequivocally catch the moment when all tasks are completed. On the idea of ​​'ANTO THE END!' must be issued once. Hence the questions:
1) Tell me, is there a way to solve this problem - maybe there are built-in tools for understanding when everything worked out (and you can start drinking beer), and I invent a bicycle PED? (.on('drained' didn't help either.)
2) When looking at the stats while tasks are running, I see that waiting and active reflect the real state of things, but completed is always 0 . Should I manually call moveToCompleted on the completed task?

So far I have drowned in the documentation (((.
Thank you.

Answer the question

In order to leave comments, you need to log in

1 answer(s)
C
coderisimo, 2020-07-08
@coderisimo

1) here it works:

let resultsReturned = false;

    let finishJobs = function (tasksQueue) {
        tasksQueue.getJobCounts().then((count) => {
            if (!count.waiting && !count.active) {
                if (!resultsReturned) {
                    resultsReturned = true;
                    results.forEach(i => console.table(i));
                    setTimeout(() => {
                        resultsReturned = false;
                        results = [];
                    }, 500);
                }
            }
        });
    };




  tasksQueue.on('completed', function (job, result) {
            results.push(result);
            job.remove();
            finishJobs(tasksQueue);
        }
    );

Didn't find what you were looking for?

Ask your question

Ask a Question

731 491 924 answers to any question