diff --git a/__tests__/core/queue.js b/__tests__/core/queue.js index 38a7f148..73f8ac9a 100644 --- a/__tests__/core/queue.js +++ b/__tests__/core/queue.js @@ -525,11 +525,12 @@ describe('queue', () => { }) test('will not remove stuck jobs within the time limit', async () => { - var age = 999 + const age = 999 queue.enqueue(specHelper.queue, 'slowJob') workerA.start() await new Promise((resolve) => { + // hijack a worker in the middle of working on a job workerA.on('job', async () => { workerA.removeAllListeners('job') @@ -537,7 +538,7 @@ describe('queue', () => { expect(Object.keys(data).length).toBe(0) const workingOn = await queue.allWorkingOn() - var paylaod = workingOn.workerA.payload + const paylaod = workingOn.workerA.payload expect(paylaod.queue).toBe('test_queue') expect(paylaod.class).toBe('slowJob') @@ -545,6 +546,47 @@ describe('queue', () => { }) }) }) + + test('can forceClean a worker, returning the error payload', async () => { + queue.enqueue(specHelper.queue, 'slowJob') + workerA.start() + + await new Promise((resolve) => { + // hijack a worker in the middle of working on a job + workerA.on('job', async () => { + workerA.removeAllListeners('job') + + const errorPayload = await queue.forceCleanWorker(workerA.name) + + expect(errorPayload.worker).toBe('workerA') + expect(errorPayload.queue).toBe('test_queue') + expect(errorPayload.payload.class).toBe('slowJob') + expect(errorPayload.exception).toBe('Worker Timeout (killed manually)') + expect(errorPayload.backtrace[0]).toMatch(/killed by/) + expect(errorPayload.backtrace[1]).toBe('queue#forceCleanWorker') + expect(errorPayload.backtrace[2]).toBe('node-resque') + expect(errorPayload.failed_at).toBeTruthy() + + return resolve() + }) + }) + }) + + test('can forceClean a worker, returning the error payload and removing all keys it had set in redis', async () => { + queue.enqueue(specHelper.queue, 'slowJob') + workerA.start() + + // wait for the job to complete + await new Promise((resolve) => { setTimeout(resolve, 501 + 100) }) + + const errorPayload = await queue.forceCleanWorker(workerA.name) + expect(errorPayload).toBeFalsy() // no job should have been running after the wait + + const keys = await specHelper.redis.keys(specHelper.namespace + '*') + keys.map(key => { + expect(key).not.toMatch(/workerA/) + }) + }) }) }) }) diff --git a/lib/queue.js b/lib/queue.js index abba87b3..20283f7c 100644 --- a/lib/queue.js +++ b/lib/queue.js @@ -279,6 +279,7 @@ class Queue extends EventEmitter { await this.connection.redis.del(this.connection.key('stat', 'failed', workerName)) await this.connection.redis.del(this.connection.key('stat', 'processed', workerName)) await this.connection.redis.del(this.connection.key('worker', 'ping', workerName)) + await this.connection.redis.del(this.connection.key('worker', workerName, queues, 'started')) await this.connection.redis.del(this.connection.key('worker', workerName)) await this.connection.redis.srem(this.connection.key('workers'), workerName + ':' + queues)