Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for takeLock & optionally forcing #1033

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

rysi3k
Copy link

@rysi3k rysi3k commented Aug 27, 2018

  1. Fix for call job.lockKey in scripts.js (it is function, but called as property)
  2. Possibility to take lock without checking key existance

… as property)

2. Possibility to take lock without checking key existance
@manast
Copy link
Member

manast commented Aug 27, 2018

Thanks for the PR. Could you please motivate why you would like to force taking a lock? it could potentially have a lot of side effects.

@rysi3k
Copy link
Author

rysi3k commented Aug 27, 2018

Hello,
My scenario:

  1. Job is processing on remote server, my code only supervising it (requests for status every 15s)
  2. Supervisor process is restarted/crashed
  3. On process start I need to check if on remote server job is still processing (request for status)
  4. If processing is in progress, I take lock of job and run standard requests every 15s

In normal implementation, job will be stalled on bull, but on remote server will be continued. I dont want to restart it.

Regards

@manast
Copy link
Member

manast commented Aug 27, 2018

In the scenario above, I do not get why the job is stalled, you wrote that the supervisor process crashes, but not the remote server processing the job, right?

@rysi3k
Copy link
Author

rysi3k commented Aug 27, 2018

Yes, supervisor where bull works crashes. So, there is different queue.token, this causes job stalled.

@manast
Copy link
Member

manast commented Aug 27, 2018

If the process running bull crashes, the jobs will picked up later by other bull workers, I still do not fully understand your use case :/

@rysi3k
Copy link
Author

rysi3k commented Aug 28, 2018 via email

@manast
Copy link
Member

manast commented Aug 28, 2018

Can you explain the point 4 a bit more? (If processing is in progress, I take lock of job and run standard requests every 15s)

@rysi3k
Copy link
Author

rysi3k commented Aug 29, 2018

Here is my simplified code, it could resolve any confusions.


queue.on('failed', () => {
    worker.setCurrentJobId(0);
}).on('completed', () => {
    worker.setCurrentJobId(0);
});
start();
   
async start() {  

    // check if some job is not done
    if (worker.currentJobId) {
        const job = await queue.getJob(jobId);
        if (job) {
            // if some job is found, process and wait until end
            await resumeProcessing(job);
        }
        worker.setCurrentJobId(0);
    }

    // normal processing method
    queue.process(async(job) => {
        await remoteServer.start(job.data);
        worker.setCurrentJobId(job.id);

        return await checkStatus(job);
    });
}

async resumeProcessing(job) {
    job.takeLock(true);
    // process method needs uniq name if we dont want to use this handler anymore, lockKey looks good for it
    queue.process(job.name = job.lockKey(), async(job) => {
        return await checkStatus(job);
    });
    await queue.processJob(job);
}

async checkStatus(job) {
    const res = await remoteServer.getStatus(job.data);
    
    if (!isJobDone(res)) {
        // wait next 15s
        return new Promise((resolve, reject) => {
            setTimeout(() => this.checkStatus(job).then(resolve).catch(reject), 15000);
        });
    } else {
        return 'done';
    }
}

@manast
Copy link
Member

manast commented Sep 2, 2018

I have studied the code above and I do not understand why it is needed, it looks as if you have implemented a similar mechanism of what already exists in bull to handle stalled jobs. A stalled job will be picked up by a idle worker, just remember to have a large enough value in the maxStalledCount setting: https://github.com/OptimalBits/bull/blob/master/REFERENCE.md#queue

@rysi3k
Copy link
Author

rysi3k commented Sep 3, 2018

I have different "process" method in normal processing (start + checkStatus) and durign resuming (only checkStatus).
In built in stalled method it is not possible to achive this or I'm miss something?

@manast
Copy link
Member

manast commented Sep 3, 2018

ok, so your use case is to run a different process function when re-processing a stalled job?

@rysi3k
Copy link
Author

rysi3k commented Sep 3, 2018

Hmmm I think yes, it could be called like this

@manast
Copy link
Member

manast commented Sep 3, 2018

In that case I think I have a better solution. I think we can add a "status" flag to the job object that is passed to the processor, so depending on the flag you can run one code or another, it should simplify your code a lot, what do you think?

@rysi3k
Copy link
Author

rysi3k commented Sep 5, 2018

Your Idea sounds good and should resolve my problems.
BTW dont forget to fix takeLock method, ther is bug in call lockKey method ;)

@stale
Copy link

stale bot commented Jul 12, 2021

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@stale stale bot added the wontfix label Jul 12, 2021
@stale stale bot removed the wontfix label Jul 16, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants