diff --git a/README.md b/README.md index 826ff4db..44c9a535 100644 --- a/README.md +++ b/README.md @@ -15,8 +15,7 @@ I learn best by examples: ```javascript const path = require('path') -const NodeResque = require(path.join(__dirname, '..', 'index.js')) -// In your projects: const NR = require('node-resque'); +const NodeResque = require('node-resque') async function boot () { // //////////////////////// @@ -66,7 +65,6 @@ async function boot () { const worker = new NodeResque.Worker({connection: connectionDetails, queues: ['math', 'otherQueue']}, jobs) await worker.connect() - await worker.workerCleanup() // optional: cleanup any previous improperly shutdown workers on this host worker.start() // //////////////////// @@ -85,6 +83,7 @@ async function boot () { worker.on('end', () => { console.log('worker ended') }) worker.on('cleaning_worker', (worker, pid) => { console.log(`cleaning old worker ${worker}`) }) worker.on('poll', (queue) => { console.log(`worker polling ${queue}`) }) + worker.on('ping', (time) => { console.log(`worker check in @ ${time}`) }) worker.on('job', (queue, job) => { console.log(`working job ${queue} ${JSON.stringify(job)}`) }) worker.on('reEnqueue', (queue, job, plugin) => { console.log(`reEnqueue job (${plugin}) ${queue} ${JSON.stringify(job)}`) }) worker.on('success', (queue, job, result) => { console.log(`job success ${queue} ${JSON.stringify(job)} >> ${result}`) }) @@ -96,6 +95,7 @@ async function boot () { scheduler.on('end', () => { console.log('scheduler ended') }) scheduler.on('poll', () => { console.log('scheduler polling') }) scheduler.on('master', (state) => { console.log('scheduler became master') }) + scheduler.on('cleanStuckWorker', (workerName, errorPayload, delta) => { console.log(`failing ${workerName} (stuck for ${delta}s) and failing job ${errorPayload}`) }) scheduler.on('error', (error) => { console.log(`scheduler error >> ${error}`) }) scheduler.on('workingTimestamp', (timestamp) => { console.log(`scheduler working timestamp ${timestamp}`) }) scheduler.on('transferredJob', (timestamp, job) => { console.log(`scheduler enquing job ${timestamp} >> ${JSON.stringify(job)}`) }) @@ -302,6 +302,30 @@ We use a try/catch pattern to catch errors in your jobs. If any job throws an un ## Failed Worker Management +### Automatically + +By default, the scheduler will check for workers which haven't pinged redis in 60 minutes. If this happens, we will assume the process crashed, and remove it from redis. If this worker was working on a job, we will place it in the failed queue for later inspection. Every worker has a timer running in which it then updates a key in redis every `timeout` (default: 5 seconds). If your job is slow, but async, there should be no problem. However, if your job consumes 100% of the CPU of the process, this timer might not fire. + +To modify the 60 minute check, change `stuckWorkerTimeout` when configuring your scheudler, ie: + +```js +const scheduler = new NodeResque.Scheduler({ + stuckWorkerTimeout: (1000 * 60 * 60) // 1 hour, in ms + connection: connectionDetails +}) +``` + +Set your scheduler's `stuckWorkerTimeout = false` to disable this behavior. + +```js +const scheduler = new NodeResque.Scheduler({ + stuckWorkerTimeout: false // will not fail jobs which haven't pinged redis + connection: connectionDetails +}) +``` + +### Manually + Sometimes a worker crashes is a *severe* way, and it doesn't get the time/chance to notify redis that it is leaving the pool (this happens all the time on PAAS providers like Heroku). When this happens, you will not only need to extract the job from the now-zombie worker's "working on" status, but also remove the stuck worker. To aid you in these edge cases, `await queue.cleanOldWorkers(age)` is available. Because there are no 'heartbeats' in resque, it is imposable for the application to know if a worker has been working on a long job or it is dead. You are required to provide an "age" for how long a worker has been "working", and all those older than that age will be removed, and the job they are working on moved to the error queue (where you can then use `queue.retryAndRemoveFailed`) to re-enqueue the job. @@ -437,6 +461,7 @@ multiWorker.on('start', (workerId) => { console multiWorker.on('end', (workerId) => { console.log("worker["+workerId+"] ended"); }) multiWorker.on('cleaning_worker', (workerId, worker, pid) => { console.log("cleaning old worker " + worker); }) multiWorker.on('poll', (workerId, queue) => { console.log("worker["+workerId+"] polling " + queue); }) +multiWorker.on('ping', (workerId, time) => { console.log("worker["+workerId+"] check in @ " + time); }) multiWorker.on('job', (workerId, queue, job) => { console.log("worker["+workerId+"] working job " + queue + " " + JSON.stringify(job)); }) multiWorker.on('reEnqueue', (workerId, queue, job, plugin) => { console.log("worker["+workerId+"] reEnqueue job (" + plugin + ") " + queue + " " + JSON.stringify(job)); }) multiWorker.on('success', (workerId, queue, job, result) => { console.log("worker["+workerId+"] job success " + queue + " " + JSON.stringify(job) + " >> " + result); }) diff --git a/__tests__/core/scheduler.js b/__tests__/core/scheduler.js index c63ca1db..e6a04509 100644 --- a/__tests__/core/scheduler.js +++ b/__tests__/core/scheduler.js @@ -26,7 +26,7 @@ describe('scheduler', () => { namespace: specHelper.connectionDetails.namespace } - scheduler = new NodeResque.Scheduler({connection: connectionDetails, timeout: specHelper.timeout}) + scheduler = new NodeResque.Scheduler({ connection: connectionDetails, timeout: specHelper.timeout }) scheduler.on('poll', () => { throw new Error('Should not emit poll') }) scheduler.on('master', () => { throw new Error('Should not emit master') }) @@ -70,7 +70,11 @@ describe('scheduler', () => { describe('[with connection]', () => { beforeEach(async () => { await specHelper.cleanup() - scheduler = new NodeResque.Scheduler({connection: specHelper.connectionDetails, timeout: specHelper.timeout}) + scheduler = new NodeResque.Scheduler({ + connection: specHelper.connectionDetails, + timeout: specHelper.timeout, + stuckWorkerTimeout: 1000 + }) queue = new NodeResque.Queue({connection: specHelper.connectionDetails, queue: specHelper.queue}) await scheduler.connect() await queue.connect() @@ -100,6 +104,70 @@ describe('scheduler', () => { expect(obj).toBeFalsy() await scheduler.end() }) + + describe('stuck workers', () => { + let worker + const jobs = { + 'stuck': { + perform: async function () { + await new Promise((resolve) => { + // stop the worker from checkin in, like the process crashed + // don't resolve + clearTimeout(this.pingTimer) + }) + } + } + } + + beforeAll(async () => { + worker = new NodeResque.Worker({ + connection: specHelper.connectionDetails, + timeout: specHelper.timeout, + queues: ['stuckJobs'] + }, jobs) + await worker.connect() + }) + + afterAll(async () => { + await scheduler.end() + await worker.end() + }) + + test('will remove stuck workers and fail thier jobs', async () => { + await scheduler.connect() + await scheduler.start() + await worker.start() + + const workers = await queue.allWorkingOn() + let h = {} + h[worker.name] = 'started' + expect(workers).toEqual(h) + + await queue.enqueue('stuckJobs', 'stuck', ['oh no!']) + + await new Promise(async (resolve) => { + scheduler.on('cleanStuckWorker', async (workerName, errorPayload, delta) => { + // response data should contain failure + expect(workerName).toEqual(worker.name) + expect(errorPayload.worker).toEqual(worker.name) + expect(errorPayload.error).toEqual('Worker Timeout (killed manually)') + + // check the workers list, should be empty now + expect(await queue.allWorkingOn()).toEqual({}) + + // check the failed list + let failed = await specHelper.redis.rpop(specHelper.namespace + ':' + 'failed') + failed = JSON.parse(failed) + expect(failed.queue).toBe('stuckJobs') + expect(failed.exception).toBe('Worker Timeout (killed manually)') + expect(failed.error).toBe('Worker Timeout (killed manually)') + + scheduler.removeAllListeners('cleanStuckWorker') + resolve() + }) + }) + }) + }) }) }) }) diff --git a/__tests__/core/worker.js b/__tests__/core/worker.js index 93c50acc..3d199a2a 100644 --- a/__tests__/core/worker.js +++ b/__tests__/core/worker.js @@ -1,6 +1,5 @@ const path = require('path') const specHelper = require(path.join(__dirname, '..', 'utils', 'specHelper.js')) -const os = require('os') const NodeResque = require(path.join(__dirname, '..', '..', 'index.js')) let jobs = { @@ -27,6 +26,12 @@ let jobs = { return 'yay' } }, + 'twoSeconds': { + perform: async () => { + await new Promise((resolve) => { setTimeout(resolve, 1000 * 2) }) + return 'slow' + } + }, 'quickDefine': async () => { return 'ok' } } @@ -108,33 +113,6 @@ describe('worker', () => { await worker.end() }) - describe('crashing workers', () => { - test('can clear previously crashed workers from the same host', async () => { - let name1 = os.hostname() + ':' + '0' // fake pid - let name2 = os.hostname() + ':' + process.pid // real pid - let worker1 = new NodeResque.Worker({connection: specHelper.connectionDetails, timeout: specHelper.timeout, name: name1}, jobs) - - await worker1.connect() - await worker1.init() - worker1.running = false - - await new Promise((resolve) => { setTimeout(resolve, 500) }) - - let worker2 = new NodeResque.Worker({connection: specHelper.connectionDetails, timeout: specHelper.timeout, name: name2}, jobs) - await worker2.connect() - - await new Promise((resolve) => { - worker2.workerCleanup() - - worker2.on('cleaning_worker', (worker, pid) => { - expect(worker).toMatch(new RegExp(name1)) - expect(pid).toBe(0) - return resolve() - }) - }) - }) - }) - test('will determine the proper queue names', async () => { let worker = new NodeResque.Worker({connection: specHelper.connectionDetails, timeout: specHelper.timeout}, jobs) await worker.connect() @@ -178,7 +156,7 @@ describe('worker', () => { await new Promise(async (resolve) => { worker.start() - worker.on('success', function (q, job, result) { + worker.on('success', (q, job, result) => { expect(q).toBe(specHelper.queue) expect(job['class']).toBe('add') expect(result).toBe(3) @@ -196,7 +174,7 @@ describe('worker', () => { await new Promise(async (resolve) => { worker.start() - worker.on('success', function (q, job, result) { + worker.on('success', (q, job, result) => { expect(result.a).toBe('starting value') expect(worker.result).toBe(result) @@ -212,7 +190,7 @@ describe('worker', () => { await new Promise(async (resolve) => { worker.start() - worker.on('success', function (q, job, result) { + worker.on('success', (q, job, result) => { expect(result).toBe('ok') worker.removeAllListeners('success') return resolve() @@ -226,7 +204,7 @@ describe('worker', () => { await new Promise(async (resolve) => { worker.start() - worker.on('failure', function (q, job, failure) { + worker.on('failure', (q, job, failure) => { expect(q).toBe(specHelper.queue) expect(String(failure)).toBe('Error: No job defined for class "somethingFake"') @@ -243,6 +221,30 @@ describe('worker', () => { expect(data.exception).toBe('Error') expect(data.error).toBe('No job defined for class "somethingFake"') }) + + test('will ping with status even when working a slow job', async () => { + const nowInSeconds = Math.round(new Date().getTime() / 1000) + await worker.start() + await new Promise((resolve) => setTimeout(resolve, (worker.options.timeout * 2))) + const pingKey = worker.connection.key('worker', 'ping', worker.name) + let firstPayload = JSON.parse(await specHelper.redis.get(pingKey)) + expect(firstPayload.name).toEqual(worker.name) + expect(firstPayload.time).toBeGreaterThanOrEqual(nowInSeconds) + + await queue.enqueue(specHelper.queue, 'twoSeconds') + + await new Promise(async (resolve) => { + worker.on('success', (q, job, result) => { + expect(result).toBe('slow') + worker.removeAllListeners('success') + return resolve() + }) + }) + + let secondPayload = JSON.parse(await specHelper.redis.get(pingKey)) + expect(secondPayload.name).toEqual(worker.name) + expect(secondPayload.time).toBeGreaterThanOrEqual(firstPayload.time) + }) }) }) }) diff --git a/examples/customPluginExample.js b/examples/customPluginExample.js index 65d903b8..e62ba446 100644 --- a/examples/customPluginExample.js +++ b/examples/customPluginExample.js @@ -63,7 +63,6 @@ async function boot () { const worker = new NodeResque.Worker({connection: connectionDetails, queues: ['default']}, jobs) await worker.connect() - await worker.workerCleanup() // optional: cleanup any previous improperly shutdown workers on this host worker.start() // ////////////////////// diff --git a/examples/errorExample.js b/examples/errorExample.js index f769fdef..73a860c7 100644 --- a/examples/errorExample.js +++ b/examples/errorExample.js @@ -52,7 +52,6 @@ async function boot () { const worker = new NodeResque.Worker({connection: connectionDetails, queues: ['default']}, jobs) await worker.connect() - await worker.workerCleanup() // optional: cleanup any previous improperly shutdown workers on this host worker.start() // ////////////////////// diff --git a/examples/example.js b/examples/example.js index 5f084b48..deb99d80 100644 --- a/examples/example.js +++ b/examples/example.js @@ -66,7 +66,6 @@ async function boot () { const worker = new NodeResque.Worker({connection: connectionDetails, queues: ['math', 'otherQueue']}, jobs) await worker.connect() - await worker.workerCleanup() // optional: cleanup any previous improperly shutdown workers on this host worker.start() // //////////////////// @@ -85,6 +84,7 @@ async function boot () { worker.on('end', () => { console.log('worker ended') }) worker.on('cleaning_worker', (worker, pid) => { console.log(`cleaning old worker ${worker}`) }) worker.on('poll', (queue) => { console.log(`worker polling ${queue}`) }) + worker.on('ping', (time) => { console.log(`worker check in @ ${time}`) }) worker.on('job', (queue, job) => { console.log(`working job ${queue} ${JSON.stringify(job)}`) }) worker.on('reEnqueue', (queue, job, plugin) => { console.log(`reEnqueue job (${plugin}) ${queue} ${JSON.stringify(job)}`) }) worker.on('success', (queue, job, result) => { console.log(`job success ${queue} ${JSON.stringify(job)} >> ${result}`) }) @@ -97,6 +97,7 @@ async function boot () { scheduler.on('poll', () => { console.log('scheduler polling') }) scheduler.on('master', (state) => { console.log('scheduler became master') }) scheduler.on('error', (error) => { console.log(`scheduler error >> ${error}`) }) + scheduler.on('cleanStuckWorker', (workerName, errorPayload, delta) => { console.log(`failing ${workerName} (stuck for ${delta}s) and failing job ${errorPayload}`) }) scheduler.on('workingTimestamp', (timestamp) => { console.log(`scheduler working timestamp ${timestamp}`) }) scheduler.on('transferredJob', (timestamp, job) => { console.log(`scheduler enquing job ${timestamp} >> ${JSON.stringify(job)}`) }) diff --git a/examples/retry.js b/examples/retry.js index 7eb4168d..510b4de8 100644 --- a/examples/retry.js +++ b/examples/retry.js @@ -48,7 +48,6 @@ async function boot () { const worker = new NodeResque.Worker({connection: connectionDetails, queues: ['math']}, jobs) await worker.connect() - await worker.workerCleanup() // optional: cleanup any previous improperly shutdown workers on this host worker.start() // //////////////////// diff --git a/examples/scheduledJobs.js b/examples/scheduledJobs.js index 0811f7ff..303774b7 100644 --- a/examples/scheduledJobs.js +++ b/examples/scheduledJobs.js @@ -39,7 +39,6 @@ async function boot () { const worker = new NodeResque.Worker({connection: connectionDetails, queues: ['time']}, jobs) await worker.connect() - await worker.workerCleanup() // optional: cleanup any previous improperly shutdown workers on this host worker.start() // //////////////////// diff --git a/examples/stuckWorker.js b/examples/stuckWorker.js new file mode 100644 index 00000000..d8deb708 --- /dev/null +++ b/examples/stuckWorker.js @@ -0,0 +1,96 @@ +const path = require('path') +const NodeResque = require(path.join(__dirname, '..', 'index.js')) +// In your projects: const NodeResque = require('node-resque'); + +async function boot () { + // //////////////////////// + // SET UP THE CONNECTION // + // //////////////////////// + + const connectionDetails = { + pkg: 'ioredis', + host: '127.0.0.1', + password: null, + port: 6379, + database: 0 + // namespace: 'resque', + // looping: true, + // options: {password: 'abc'}, + } + + // /////////////////////////// + // DEFINE YOUR WORKER TASKS // + // /////////////////////////// + + const jobs = { + 'stuck': { + perform: async function () { + console.log(`${this.name} is starting stuck job...`) + await new Promise((resolve) => { + clearTimeout(this.pingTimer)// stop the worker from checkin in, like the process crashed + setTimeout(resolve, 60 * 60 * 1000) // 1 hour job + }) + } + } + } + + // ///////////////// + // START A WORKER // + // ///////////////// + + const worker = new NodeResque.Worker({connection: connectionDetails, queues: ['stuckJobs']}, jobs) + await worker.connect() + worker.start() + + // //////////////////// + // START A SCHEDULER // + // //////////////////// + + const scheduler = new NodeResque.Scheduler({ + stuckWorkerTimeout: (10 * 1000), + connection: connectionDetails + }) + + await scheduler.connect() + scheduler.start() + + // ////////////////////// + // REGESTER FOR EVENTS // + // ////////////////////// + + worker.on('start', () => { console.log('worker started') }) + worker.on('end', () => { console.log('worker ended') }) + worker.on('cleaning_worker', (worker, pid) => { console.log(`cleaning old worker ${worker}`) }) + worker.on('poll', (queue) => { console.log(`worker polling ${queue}`) }) + worker.on('ping', (time) => { console.log(`worker check in @ ${time}`) }) + worker.on('job', (queue, job) => { console.log(`working job ${queue} ${JSON.stringify(job)}`) }) + worker.on('reEnqueue', (queue, job, plugin) => { console.log(`reEnqueue job (${plugin}) ${queue} ${JSON.stringify(job)}`) }) + worker.on('success', (queue, job, result) => { console.log(`job success ${queue} ${JSON.stringify(job)} >> ${result}`) }) + worker.on('failure', (queue, job, failure) => { console.log(`job failure ${queue} ${JSON.stringify(job)} >> ${failure}`) }) + worker.on('error', (error, queue, job) => { console.log(`error ${queue} ${JSON.stringify(job)} >> ${error}`) }) + worker.on('pause', () => { console.log('worker paused') }) + + scheduler.on('start', () => { console.log('scheduler started') }) + scheduler.on('end', () => { console.log('scheduler ended') }) + scheduler.on('poll', () => { console.log('scheduler polling') }) + scheduler.on('master', (state) => { console.log('scheduler became master') }) + scheduler.on('error', (error) => { console.log(`scheduler error >> ${error}`) }) + scheduler.on('workingTimestamp', (timestamp) => { console.log(`scheduler working timestamp ${timestamp}`) }) + scheduler.on('transferredJob', (timestamp, job) => { console.log(`scheduler enquing job ${timestamp} >> ${JSON.stringify(job)}`) }) + + scheduler.on('cleanStuckWorker', (workerName, errorPayload, delta) => { + console.log(`failing ${workerName} (stuck for ${delta}s) and failing job: ${JSON.stringify(errorPayload)}`) + process.exit() + }) + + // ////////////////////// + // CONNECT TO A QUEUE // + // ////////////////////// + + const queue = new NodeResque.Queue({connection: connectionDetails}, jobs) + queue.on('error', function (error) { console.log(error) }) + await queue.connect() + await queue.enqueue('stuckJobs', 'stuck', ['oh no']) +} + +boot() diff --git a/lib/multiWorker.js b/lib/multiWorker.js index f049cd60..11e8b4dc 100644 --- a/lib/multiWorker.js +++ b/lib/multiWorker.js @@ -67,6 +67,7 @@ class MultiWorker extends EventEmitter { worker.on('end', () => { this.emit('end', worker.id) }) worker.on('cleaning_worker', (worker, pid) => { this.emit('cleaning_worker', worker.id, worker, pid) }) worker.on('poll', (queue) => { this.emit('poll', worker.id, queue) }) + worker.on('ping', (time) => { this.emit('ping', worker.id, time) }) worker.on('job', (queue, job) => { this.emit('job', worker.id, queue, job) }) worker.on('reEnqueue', (queue, job, plugin) => { this.emit('reEnqueue', worker.id, queue, job, plugin) }) worker.on('success', (queue, job, result) => { this.emit('success', worker.id, queue, job, result) }) @@ -77,7 +78,6 @@ class MultiWorker extends EventEmitter { this.workers.push(worker) await worker.connect() - await worker.workerCleanup() await worker.start() } @@ -145,6 +145,7 @@ class MultiWorker extends EventEmitter { 'end', 'cleaning_worker', 'poll', + 'ping', 'job', 'reEnqueue', 'success', diff --git a/lib/queue.js b/lib/queue.js index af53933a..8e6847df 100644 --- a/lib/queue.js +++ b/lib/queue.js @@ -1,4 +1,5 @@ const path = require('path') +const os = require('os') const EventEmitter = require('events').EventEmitter const Connection = require(path.join(__dirname, 'connection.js')).Connection const PluginRunner = require(path.join(__dirname, 'pluginRunner.js')) @@ -249,15 +250,20 @@ class Queue extends EventEmitter { if (!queues) { throw new Error('worker not found') } let workingOn = await this.workingOn(workerName, queues) + let message = 'Worker Timeout (killed manually)' if (workingOn) { workingOn = JSON.parse(workingOn) errorPayload = { worker: workerName, queue: workingOn.queue, - payload: workingOn.payload, - exception: 'Worker Timeout (killed manually)', - error: 'Worker Timeout (killed manually)', - backtrace: null, + payload: workingOn.payload || [], + exception: message, + error: message, + backtrace: [ + `killed by ${os.hostname} at ${new Date()}`, + 'queue#forceCleanWorker', + 'node-resque' + ], failed_at: (new Date()).toString() } @@ -268,6 +274,7 @@ class Queue extends EventEmitter { await this.connection.redis.del(this.connection.key('stat', 'failed', workerName)) await this.connection.redis.del(this.connection.key('stat', 'processed', workerName)) + await this.connection.redis.del(this.connection.key('worker', 'ping', workerName)) await this.connection.redis.del(this.connection.key('worker', workerName)) await this.connection.redis.srem(this.connection.key('workers'), workerName + ':' + queues) diff --git a/lib/scheduler.js b/lib/scheduler.js index c3fdd311..05af7131 100644 --- a/lib/scheduler.js +++ b/lib/scheduler.js @@ -13,6 +13,7 @@ class Scheduler extends EventEmitter { const defaults = { timeout: 5000, // in ms + stuckWorkerTimeout: 60 * 60 * 1000, // 60 minutes in ms masterLockTimeout: 60 * 3, // in seconds name: os.hostname() + ':' + process.pid // assumes only one worker per node process } @@ -88,6 +89,8 @@ class Scheduler extends EventEmitter { this.emit('master') } + await this.checkStuckWorkers() + this.emit('poll') let timestamp = await this.nextDelayedTimestamp() if (timestamp) { @@ -184,6 +187,31 @@ class Scheduler extends EventEmitter { await this.connection.redis.zrem(this.connection.key('delayed_queue_schedule'), timestamp) } } + + async checkStuckWorkers () { + if (!this.options.stuckWorkerTimeout) { return } + + const keys = await this.connection.redis.keys(this.connection.key('worker', 'ping', '*')) + const payloads = await Promise.all(keys.map(async (k) => { + return JSON.parse(await this.connection.redis.get(k)) + })) + + const nowInSeconds = Math.round(new Date().getTime() / 1000) + const stuckWorkerTimeoutInSeconds = Math.round(this.options.stuckWorkerTimeout / 1000) + for (let i in payloads) { + const {name, time} = payloads[i] + const delta = nowInSeconds - time + if (delta > stuckWorkerTimeoutInSeconds) { + await this.forceCleanWorker(name, delta) + } + i++ + } + } + + async forceCleanWorker (workerName, delta) { + const errorPayload = await this.queue.forceCleanWorker(workerName) + this.emit('cleanStuckWorker', workerName, errorPayload, delta) + } } exports.Scheduler = Scheduler diff --git a/lib/worker.js b/lib/worker.js index cbf06c9f..9b383cc8 100644 --- a/lib/worker.js +++ b/lib/worker.js @@ -1,6 +1,5 @@ const os = require('os') const path = require('path') -const exec = require('child_process').exec const EventEmitter = require('events').EventEmitter const Queue = require(path.join(__dirname, 'queue.js')).Queue const PluginRunner = require(path.join(__dirname, 'pluginRunner.js')) @@ -39,6 +38,7 @@ class Worker extends EventEmitter { this.running = false this.working = false this.job = null + this.pingTimer = null this.queueObject = new Queue({connection: options.connection}, this.jobs) this.queueObject.on('error', (error) => { this.emit('error', error) }) @@ -64,6 +64,8 @@ class Worker extends EventEmitter { this.connection.key('worker', this.name, this.stringQueues(), 'started'), Math.round((new Date()).getTime() / 1000) ) + await this.ping() + this.pingTimer = setInterval(this.ping.bind(this), this.options.timeout) } async end () { @@ -75,7 +77,8 @@ class Worker extends EventEmitter { } if (this.connection.connected === true || this.connection.connected === undefined || this.connection.connected === null) { - await this.untrack(this.name, this.stringQueues()) + clearInterval(this.pingTimer) + await this.untrack() } await this.queueObject.end() @@ -267,61 +270,31 @@ class Worker extends EventEmitter { return this.connection.redis.sadd(this.connection.key('workers'), (this.name + ':' + this.stringQueues())) } - async untrack (name, queues) { + async ping () { + const name = this.name + const nowSeconds = Math.round(new Date().getTime() / 1000) + this.emit('ping', nowSeconds) + const payload = JSON.stringify({ + time: nowSeconds, + name: name, + queues: this.stringQueues() + }) + await this.connection.redis.set(this.connection.key('worker', 'ping', name), payload) + } + + async untrack () { + const name = this.name + const queues = this.stringQueues() if (!this.connection || !this.connection.redis) { return } await this.connection.redis.srem(this.connection.key('workers'), (name + ':' + queues)) - await this.connection.redis.del(this.connection.key('worker', name, this.stringQueues())) - await this.connection.redis.del(this.connection.key('worker', name, this.stringQueues(), 'started')) + await this.connection.redis.del(this.connection.key('worker', 'ping', name)) + await this.connection.redis.del(this.connection.key('worker', name, queues)) + await this.connection.redis.del(this.connection.key('worker', name, queues, 'started')) await this.connection.redis.del(this.connection.key('stat', 'failed', name)) await this.connection.redis.del(this.connection.key('stat', 'processed', name)) } - async workerCleanup () { - let pids = await this.getPids() - let workers = await this.connection.redis.smembers(this.connection.key('workers')) - - for (let i in workers) { - let worker = workers[i] - let parts = worker.split(':') - let host = parts[0] - let pid = parseInt(parts[1]) - let queues = parts.splice(-1, 1) - let pureName = parts.join(':') - if (host === os.hostname() && pids.indexOf(pid) < 0) { - this.emit('cleaning_worker', worker, pid) - await this.untrack(pureName, queues) - } - } - } - - async getPids () { - let cmd - if (process.platform === 'win32') { - cmd = 'powershell.exe -command "Get-Process | select Id"' - } else { - cmd = 'ps -eo pid=' - } - - return new Promise((resolve, reject) => { - exec(cmd, (error, stdout, stderr) => { - if (!error && stderr) { error = new Error(stderr) } - if (error) { return reject(error) } - - let pids = [] - stdout.split('\n').forEach((line) => { - line = line.trim() - if (line.length > 0) { - var pid = parseInt(line.split(' ')[0]) - if (!isNaN(pid)) { pids.push(pid) } - } - }) - - return resolve(pids) - }) - }) - } - async checkQueues () { if (typeof this.queues === 'string') { this.queues = this.queues.split(',') @@ -334,7 +307,7 @@ class Worker extends EventEmitter { if ((this.queues[0] === '*' && this.queues.length === 1) || this.queues.length === 0) { this.originalQueue = '*' - await this.untrack(this.name, this.stringQueues()) + await this.untrack() let response = await this.connection.redis.smembers(this.connection.key('queues')) this.queues = (response ? response.sort() : []) await this.track()