diff --git a/packages/nexrender-worker/readme.md b/packages/nexrender-worker/readme.md index 6ca91b87..25cda380 100644 --- a/packages/nexrender-worker/readme.md +++ b/packages/nexrender-worker/readme.md @@ -96,4 +96,4 @@ Available settings (almost same as for `nexrender-core`): * `actions` - an object with keys corresponding to the `module` field when defining an action, value should be a function matching expected signature of an action. Used for defining actions programmatically without needing to package the action as a separate package * `cache` - boolean or string. Set the cache folder used by HTTP assets. If `true` will use the default path of `${workpath}/http-cache`, if set to a string it will be interpreted as a filesystem path to the cache folder. * `name` - string. An unique name (or not) to the `nexrender-worker`, and it will be identified in the `nexrender-server`. It can be used as an executor name on picked job(s) as well. - +* `handleInterruption` - boolean, if set to true, enables handling of interruption signals (SIGINT, SIGTERM). When an interruption signal is received, the worker will attempt to update the current job's state to 'queued' before shutting down. (false by default) diff --git a/packages/nexrender-worker/src/bin.js b/packages/nexrender-worker/src/bin.js index 61a010d7..3e893aab 100644 --- a/packages/nexrender-worker/src/bin.js +++ b/packages/nexrender-worker/src/bin.js @@ -49,6 +49,8 @@ const args = arg({ '--aerender-parameter': [String], '--language': String, + '--handle-interruption': Boolean, + // Aliases '-v': '--version', '-t': '--tag-selector', @@ -193,6 +195,7 @@ if (args['--help']) { {bold $} NEXRENDER_API_POLLING=1000 {cyan nexrender-worker} `); + process.exit(2); } @@ -245,6 +248,7 @@ opt('wslMap', '--wsl-map'); opt('aeParams', '--aerender-parameter'); opt('tagSelector', '--tag-selector'); opt('language', '--language'); +opt('handleInterruption', '--handle-interruption'); if(args['--cache-path']){ opt('cache', '--cache-path'); diff --git a/packages/nexrender-worker/src/instance.js b/packages/nexrender-worker/src/instance.js index 47e3cc83..cec9b8b3 100644 --- a/packages/nexrender-worker/src/instance.js +++ b/packages/nexrender-worker/src/instance.js @@ -13,13 +13,32 @@ const createWorker = () => { let active = false; let settingsRef = null; let stop_datetime = null; + let currentJob = null; + let client = null; + + // New function to handle interruption + const handleInterruption = async () => { + if (currentJob) { + settingsRef.logger.log(`[${currentJob.uid}] Interruption signal received. Updating job state to 'queued'...`); + currentJob.onRenderProgress = null; + currentJob.state = 'queued'; + try { + await client.updateJob(currentJob.uid, getRenderingStatus(currentJob)); + settingsRef.logger.log(`[${currentJob.uid}] Job state updated to 'queued' successfully.`); + } catch (err) { + settingsRef.logger.error(`[${currentJob.uid}] Failed to update job state: ${err.message}`); + } + } + active = false; + process.exit(0); + }; const nextJob = async (client, settings) => { do { try { if (stop_datetime !== null && new Date() > stop_datetime) { active = false; - return + return null } let job = await (settings.tagSelector ? @@ -63,6 +82,7 @@ const createWorker = () => { process: 'nexrender-worker', stopOnError: false, logger: console, + handleInterruption: false, }, settings)) settingsRef = settings; @@ -85,7 +105,7 @@ const createWorker = () => { headers = headers || {}; headers['user-agent'] = ('nexrender-worker/' + pkg.version + ' ' + (headers['user-agent'] || '')).trim(); - const client = createClient({ host, secret, headers, name: settings.name }); + client = createClient({ host, secret, headers, name: settings.name }); settings.track('Worker Started', { worker_tags_set: !!settings.tagSelector, @@ -114,92 +134,98 @@ const createWorker = () => { } } - do { + // Set up interruption handlers if enabled + if (settings.handleInterruption) { + process.on('SIGINT', handleInterruption); + process.on('SIGTERM', handleInterruption); + settingsRef.logger.log('Interruption handling enabled.'); + } - let job = await nextJob(client, settings); + do { + currentJob = await nextJob(client, settings); // if the worker has been deactivated, exit this loop if (!active) break; settings.track('Worker Job Started', { - job_id: job.uid, // anonymized internally + job_id: currentJob.uid, // anonymized internally }) - job.state = 'started'; - job.startedAt = new Date() + currentJob.state = 'started'; + currentJob.startedAt = new Date() try { - await client.updateJob(job.uid, job) + await client.updateJob(currentJob.uid, currentJob) } catch (err) { - console.log(`[${job.uid}] error while updating job state to ${job.state}. Job abandoned.`) - console.log(`[${job.uid}] error stack: ${err.stack}`) + console.log(`[${currentJob.uid}] error while updating job state to ${currentJob.state}. Job abandoned.`) + console.log(`[${currentJob.uid}] error stack: ${err.stack}`) continue; } try { - job.onRenderProgress = (job) => { + currentJob.onRenderProgress = (currentJob) => { try { /* send render progress to our server */ - client.updateJob(job.uid, getRenderingStatus(job)); + client.updateJob(currentJob.uid, getRenderingStatus(currentJob)); if (settings.onRenderProgress) { - settings.onRenderProgress(job); + settings.onRenderProgress(currentJob); } } catch (err) { if (settings.stopOnError) { throw err; } else { - console.log(`[${job.uid}] error occurred: ${err.stack}`) - console.log(`[${job.uid}] render proccess stopped with error...`) - console.log(`[${job.uid}] continue listening next job...`) + console.log(`[${currentJob.uid}] error occurred: ${err.stack}`) + console.log(`[${currentJob.uid}] render proccess stopped with error...`) + console.log(`[${currentJob.uid}] continue listening next job...`) } } } - job.onRenderError = (job, err /* on render error */) => { - job.error = [].concat(job.error || [], [err.toString()]); + currentJob.onRenderError = (currentJob, err /* on render error */) => { + currentJob.error = [].concat(currentJob.error || [], [err.toString()]); if (settings.onRenderError) { - settings.onRenderError(job, err); + settings.onRenderError(currentJob, err); } } - job = await render(job, settings); { - job.state = 'finished'; - job.finishedAt = new Date(); + currentJob = await render(currentJob, settings); { + currentJob.state = 'finished'; + currentJob.finishedAt = new Date(); if (settings.onFinished) { - settings.onFinished(job); + settings.onFinished(currentJob); } } - settings.track('Worker Job Finished', { job_id: job.uid }) + settings.track('Worker Job Finished', { job_id: currentJob.uid }) - await client.updateJob(job.uid, getRenderingStatus(job)) + await client.updateJob(currentJob.uid, getRenderingStatus(currentJob)) } catch (err) { - job.error = [].concat(job.error || [], [err.toString()]); - job.errorAt = new Date(); - job.state = 'error'; + currentJob.error = [].concat(currentJob.error || [], [err.toString()]); + currentJob.errorAt = new Date(); + currentJob.state = 'error'; - settings.track('Worker Job Error', { job_id: job.uid }); + settings.track('Worker Job Error', { job_id: currentJob.uid }); if (settings.onError) { - settings.onError(job, err); + settings.onError(currentJob, err); } try { - await client.updateJob(job.uid, getRenderingStatus(job)) + await client.updateJob(currentJob.uid, getRenderingStatus(currentJob)) } catch (e) { - console.log(`[${job.uid}] error while updating job state to ${job.state}. Job abandoned.`) - console.log(`[${job.uid}] error stack: ${e.stack}`) + console.log(`[${currentJob.uid}] error while updating job state to ${currentJob.state}. Job abandoned.`) + console.log(`[${currentJob.uid}] error stack: ${e.stack}`) } if (settings.stopOnError) { throw err; } else { - console.log(`[${job.uid}] error occurred: ${err.stack}`) - console.log(`[${job.uid}] render proccess stopped with error...`) - console.log(`[${job.uid}] continue listening next job...`) + console.log(`[${currentJob.uid}] error occurred: ${err.stack}`) + console.log(`[${currentJob.uid}] render proccess stopped with error...`) + console.log(`[${currentJob.uid}] continue listening next job...`) } } @@ -207,6 +233,12 @@ const createWorker = () => { await delay(settings.waitBetweenJobs); } } while (active) + + // Clean up interruption handlers + if (settings.handleInterruption) { + process.removeListener('SIGINT', handleInterruption); + process.removeListener('SIGTERM', handleInterruption); + } } /**