Skip to content

Commit

Permalink
added interruption handling suport
Browse files Browse the repository at this point in the history
  • Loading branch information
inlife committed Oct 12, 2024
1 parent 4de43c7 commit ea11fd3
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 37 deletions.
2 changes: 1 addition & 1 deletion packages/nexrender-worker/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,4 @@ Available settings (almost same as for `nexrender-core`):
* `actions` - an object with keys corresponding to the `module` field when defining an action, value should be a function matching expected signature of an action. Used for defining actions programmatically without needing to package the action as a separate package
* `cache` - boolean or string. Set the cache folder used by HTTP assets. If `true` will use the default path of `${workpath}/http-cache`, if set to a string it will be interpreted as a filesystem path to the cache folder.
* `name` - string. An unique name (or not) to the `nexrender-worker`, and it will be identified in the `nexrender-server`. It can be used as an executor name on picked job(s) as well.
* `handleInterruption` - boolean, if set to true, enables handling of interruption signals (SIGINT, SIGTERM). When an interruption signal is received, the worker will attempt to update the current job's state to 'queued' before shutting down. (false by default)
4 changes: 4 additions & 0 deletions packages/nexrender-worker/src/bin.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ const args = arg({
'--aerender-parameter': [String],
'--language': String,

'--handle-interruption': Boolean,

// Aliases
'-v': '--version',
'-t': '--tag-selector',
Expand Down Expand Up @@ -193,6 +195,7 @@ if (args['--help']) {
{bold $} NEXRENDER_API_POLLING=1000 {cyan nexrender-worker}
`);

process.exit(2);
}

Expand Down Expand Up @@ -245,6 +248,7 @@ opt('wslMap', '--wsl-map');
opt('aeParams', '--aerender-parameter');
opt('tagSelector', '--tag-selector');
opt('language', '--language');
opt('handleInterruption', '--handle-interruption');

if(args['--cache-path']){
opt('cache', '--cache-path');
Expand Down
104 changes: 68 additions & 36 deletions packages/nexrender-worker/src/instance.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,32 @@ const createWorker = () => {
let active = false;
let settingsRef = null;
let stop_datetime = null;
let currentJob = null;
let client = null;

// New function to handle interruption
const handleInterruption = async () => {
if (currentJob) {
settingsRef.logger.log(`[${currentJob.uid}] Interruption signal received. Updating job state to 'queued'...`);
currentJob.onRenderProgress = null;
currentJob.state = 'queued';
try {
await client.updateJob(currentJob.uid, getRenderingStatus(currentJob));
settingsRef.logger.log(`[${currentJob.uid}] Job state updated to 'queued' successfully.`);
} catch (err) {
settingsRef.logger.error(`[${currentJob.uid}] Failed to update job state: ${err.message}`);
}
}
active = false;
process.exit(0);
};

const nextJob = async (client, settings) => {
do {
try {
if (stop_datetime !== null && new Date() > stop_datetime) {
active = false;
return
return null
}

let job = await (settings.tagSelector ?
Expand Down Expand Up @@ -63,6 +82,7 @@ const createWorker = () => {
process: 'nexrender-worker',
stopOnError: false,
logger: console,
handleInterruption: false,
}, settings))

settingsRef = settings;
Expand All @@ -85,7 +105,7 @@ const createWorker = () => {
headers = headers || {};
headers['user-agent'] = ('nexrender-worker/' + pkg.version + ' ' + (headers['user-agent'] || '')).trim();

const client = createClient({ host, secret, headers, name: settings.name });
client = createClient({ host, secret, headers, name: settings.name });

settings.track('Worker Started', {
worker_tags_set: !!settings.tagSelector,
Expand Down Expand Up @@ -114,99 +134,111 @@ const createWorker = () => {
}
}

do {
// Set up interruption handlers if enabled
if (settings.handleInterruption) {
process.on('SIGINT', handleInterruption);
process.on('SIGTERM', handleInterruption);
settingsRef.logger.log('Interruption handling enabled.');
}

let job = await nextJob(client, settings);
do {
currentJob = await nextJob(client, settings);

// if the worker has been deactivated, exit this loop
if (!active) break;

settings.track('Worker Job Started', {
job_id: job.uid, // anonymized internally
job_id: currentJob.uid, // anonymized internally
})

job.state = 'started';
job.startedAt = new Date()
currentJob.state = 'started';
currentJob.startedAt = new Date()

try {
await client.updateJob(job.uid, job)
await client.updateJob(currentJob.uid, currentJob)
} catch (err) {
console.log(`[${job.uid}] error while updating job state to ${job.state}. Job abandoned.`)
console.log(`[${job.uid}] error stack: ${err.stack}`)
console.log(`[${currentJob.uid}] error while updating job state to ${currentJob.state}. Job abandoned.`)
console.log(`[${currentJob.uid}] error stack: ${err.stack}`)
continue;
}

try {
job.onRenderProgress = (job) => {
currentJob.onRenderProgress = (currentJob) => {

Check failure on line 166 in packages/nexrender-worker/src/instance.js

View workflow job for this annotation

GitHub Actions / build (18.x)

Function declared in a loop contains unsafe references to variable(s) 'client'
try {
/* send render progress to our server */
client.updateJob(job.uid, getRenderingStatus(job));
client.updateJob(currentJob.uid, getRenderingStatus(currentJob));

if (settings.onRenderProgress) {
settings.onRenderProgress(job);
settings.onRenderProgress(currentJob);
}
} catch (err) {
if (settings.stopOnError) {
throw err;
} else {
console.log(`[${job.uid}] error occurred: ${err.stack}`)
console.log(`[${job.uid}] render proccess stopped with error...`)
console.log(`[${job.uid}] continue listening next job...`)
console.log(`[${currentJob.uid}] error occurred: ${err.stack}`)
console.log(`[${currentJob.uid}] render proccess stopped with error...`)
console.log(`[${currentJob.uid}] continue listening next job...`)
}
}
}

job.onRenderError = (job, err /* on render error */) => {
job.error = [].concat(job.error || [], [err.toString()]);
currentJob.onRenderError = (currentJob, err /* on render error */) => {
currentJob.error = [].concat(currentJob.error || [], [err.toString()]);

if (settings.onRenderError) {
settings.onRenderError(job, err);
settings.onRenderError(currentJob, err);
}
}

job = await render(job, settings); {
job.state = 'finished';
job.finishedAt = new Date();
currentJob = await render(currentJob, settings); {
currentJob.state = 'finished';
currentJob.finishedAt = new Date();
if (settings.onFinished) {
settings.onFinished(job);
settings.onFinished(currentJob);
}
}

settings.track('Worker Job Finished', { job_id: job.uid })
settings.track('Worker Job Finished', { job_id: currentJob.uid })

await client.updateJob(job.uid, getRenderingStatus(job))
await client.updateJob(currentJob.uid, getRenderingStatus(currentJob))
} catch (err) {
job.error = [].concat(job.error || [], [err.toString()]);
job.errorAt = new Date();
job.state = 'error';
currentJob.error = [].concat(currentJob.error || [], [err.toString()]);
currentJob.errorAt = new Date();
currentJob.state = 'error';

settings.track('Worker Job Error', { job_id: job.uid });
settings.track('Worker Job Error', { job_id: currentJob.uid });

if (settings.onError) {
settings.onError(job, err);
settings.onError(currentJob, err);
}

try {
await client.updateJob(job.uid, getRenderingStatus(job))
await client.updateJob(currentJob.uid, getRenderingStatus(currentJob))
}
catch (e) {
console.log(`[${job.uid}] error while updating job state to ${job.state}. Job abandoned.`)
console.log(`[${job.uid}] error stack: ${e.stack}`)
console.log(`[${currentJob.uid}] error while updating job state to ${currentJob.state}. Job abandoned.`)
console.log(`[${currentJob.uid}] error stack: ${e.stack}`)
}

if (settings.stopOnError) {
throw err;
} else {
console.log(`[${job.uid}] error occurred: ${err.stack}`)
console.log(`[${job.uid}] render proccess stopped with error...`)
console.log(`[${job.uid}] continue listening next job...`)
console.log(`[${currentJob.uid}] error occurred: ${err.stack}`)
console.log(`[${currentJob.uid}] render proccess stopped with error...`)
console.log(`[${currentJob.uid}] continue listening next job...`)
}
}

if (settings.waitBetweenJobs) {
await delay(settings.waitBetweenJobs);
}
} while (active)

// Clean up interruption handlers
if (settings.handleInterruption) {
process.removeListener('SIGINT', handleInterruption);
process.removeListener('SIGTERM', handleInterruption);
}
}

/**
Expand Down

0 comments on commit ea11fd3

Please sign in to comment.