Skip to content

Commit

Permalink
Merge pull request #297 from taskrabbit/worker-started-clear
Browse files Browse the repository at this point in the history
ensure that forceCleanWorker removes the 'started' key
  • Loading branch information
evantahler authored Sep 16, 2019
2 parents 1455529 + e9a9b90 commit d065057
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 2 deletions.
46 changes: 44 additions & 2 deletions __tests__/core/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -525,26 +525,68 @@ describe('queue', () => {
})

test('will not remove stuck jobs within the time limit', async () => {
var age = 999
const age = 999
queue.enqueue(specHelper.queue, 'slowJob')
workerA.start()

await new Promise((resolve) => {
// hijack a worker in the middle of working on a job
workerA.on('job', async () => {
workerA.removeAllListeners('job')

const data = await queue.cleanOldWorkers(age)
expect(Object.keys(data).length).toBe(0)

const workingOn = await queue.allWorkingOn()
var paylaod = workingOn.workerA.payload
const paylaod = workingOn.workerA.payload
expect(paylaod.queue).toBe('test_queue')
expect(paylaod.class).toBe('slowJob')

return resolve()
})
})
})

test('can forceClean a worker, returning the error payload', async () => {
queue.enqueue(specHelper.queue, 'slowJob')
workerA.start()

await new Promise((resolve) => {
// hijack a worker in the middle of working on a job
workerA.on('job', async () => {
workerA.removeAllListeners('job')

const errorPayload = await queue.forceCleanWorker(workerA.name)

expect(errorPayload.worker).toBe('workerA')
expect(errorPayload.queue).toBe('test_queue')
expect(errorPayload.payload.class).toBe('slowJob')
expect(errorPayload.exception).toBe('Worker Timeout (killed manually)')
expect(errorPayload.backtrace[0]).toMatch(/killed by/)
expect(errorPayload.backtrace[1]).toBe('queue#forceCleanWorker')
expect(errorPayload.backtrace[2]).toBe('node-resque')
expect(errorPayload.failed_at).toBeTruthy()

return resolve()
})
})
})

test('can forceClean a worker, returning the error payload and removing all keys it had set in redis', async () => {
queue.enqueue(specHelper.queue, 'slowJob')
workerA.start()

// wait for the job to complete
await new Promise((resolve) => { setTimeout(resolve, 501 + 100) })

const errorPayload = await queue.forceCleanWorker(workerA.name)
expect(errorPayload).toBeFalsy() // no job should have been running after the wait

const keys = await specHelper.redis.keys(specHelper.namespace + '*')
keys.map(key => {
expect(key).not.toMatch(/workerA/)
})
})
})
})
})
1 change: 1 addition & 0 deletions lib/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ class Queue extends EventEmitter {
await this.connection.redis.del(this.connection.key('stat', 'failed', workerName))
await this.connection.redis.del(this.connection.key('stat', 'processed', workerName))
await this.connection.redis.del(this.connection.key('worker', 'ping', workerName))
await this.connection.redis.del(this.connection.key('worker', workerName, queues, 'started'))
await this.connection.redis.del(this.connection.key('worker', workerName))
await this.connection.redis.srem(this.connection.key('workers'), workerName + ':' + queues)

Expand Down

0 comments on commit d065057

Please sign in to comment.