Skip to content

Commit

Permalink
Merge pull request #239 from taskrabbit/worker-status
Browse files Browse the repository at this point in the history
Worker Cleanup by Scheduler
  • Loading branch information
evantahler authored Apr 18, 2018
2 parents 7da9f3a + 22a13c8 commit 1b85096
Show file tree
Hide file tree
Showing 13 changed files with 295 additions and 98 deletions.
31 changes: 28 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ I learn best by examples:

```javascript
const path = require('path')
const NodeResque = require(path.join(__dirname, '..', 'index.js'))
// In your projects: const NR = require('node-resque');
const NodeResque = require('node-resque')

async function boot () {
// ////////////////////////
Expand Down Expand Up @@ -66,7 +65,6 @@ async function boot () {

const worker = new NodeResque.Worker({connection: connectionDetails, queues: ['math', 'otherQueue']}, jobs)
await worker.connect()
await worker.workerCleanup() // optional: cleanup any previous improperly shutdown workers on this host
worker.start()

// ////////////////////
Expand All @@ -85,6 +83,7 @@ async function boot () {
worker.on('end', () => { console.log('worker ended') })
worker.on('cleaning_worker', (worker, pid) => { console.log(`cleaning old worker ${worker}`) })
worker.on('poll', (queue) => { console.log(`worker polling ${queue}`) })
worker.on('ping', (time) => { console.log(`worker check in @ ${time}`) })
worker.on('job', (queue, job) => { console.log(`working job ${queue} ${JSON.stringify(job)}`) })
worker.on('reEnqueue', (queue, job, plugin) => { console.log(`reEnqueue job (${plugin}) ${queue} ${JSON.stringify(job)}`) })
worker.on('success', (queue, job, result) => { console.log(`job success ${queue} ${JSON.stringify(job)} >> ${result}`) })
Expand All @@ -96,6 +95,7 @@ async function boot () {
scheduler.on('end', () => { console.log('scheduler ended') })
scheduler.on('poll', () => { console.log('scheduler polling') })
scheduler.on('master', (state) => { console.log('scheduler became master') })
scheduler.on('cleanStuckWorker', (workerName, errorPayload, delta) => { console.log(`failing ${workerName} (stuck for ${delta}s) and failing job ${errorPayload}`) })
scheduler.on('error', (error) => { console.log(`scheduler error >> ${error}`) })
scheduler.on('workingTimestamp', (timestamp) => { console.log(`scheduler working timestamp ${timestamp}`) })
scheduler.on('transferredJob', (timestamp, job) => { console.log(`scheduler enquing job ${timestamp} >> ${JSON.stringify(job)}`) })
Expand Down Expand Up @@ -302,6 +302,30 @@ We use a try/catch pattern to catch errors in your jobs. If any job throws an un

## Failed Worker Management

### Automatically

By default, the scheduler will check for workers which haven't pinged redis in 60 minutes. If this happens, we will assume the process crashed, and remove it from redis. If this worker was working on a job, we will place it in the failed queue for later inspection. Every worker has a timer running in which it then updates a key in redis every `timeout` (default: 5 seconds). If your job is slow, but async, there should be no problem. However, if your job consumes 100% of the CPU of the process, this timer might not fire.

To modify the 60 minute check, change `stuckWorkerTimeout` when configuring your scheudler, ie:

```js
const scheduler = new NodeResque.Scheduler({
stuckWorkerTimeout: (1000 * 60 * 60) // 1 hour, in ms
connection: connectionDetails
})
```

Set your scheduler's `stuckWorkerTimeout = false` to disable this behavior.

```js
const scheduler = new NodeResque.Scheduler({
stuckWorkerTimeout: false // will not fail jobs which haven't pinged redis
connection: connectionDetails
})
```

### Manually

Sometimes a worker crashes is a *severe* way, and it doesn't get the time/chance to notify redis that it is leaving the pool (this happens all the time on PAAS providers like Heroku). When this happens, you will not only need to extract the job from the now-zombie worker's "working on" status, but also remove the stuck worker. To aid you in these edge cases, `await queue.cleanOldWorkers(age)` is available.

Because there are no 'heartbeats' in resque, it is imposable for the application to know if a worker has been working on a long job or it is dead. You are required to provide an "age" for how long a worker has been "working", and all those older than that age will be removed, and the job they are working on moved to the error queue (where you can then use `queue.retryAndRemoveFailed`) to re-enqueue the job.
Expand Down Expand Up @@ -437,6 +461,7 @@ multiWorker.on('start', (workerId) => { console
multiWorker.on('end', (workerId) => { console.log("worker["+workerId+"] ended"); })
multiWorker.on('cleaning_worker', (workerId, worker, pid) => { console.log("cleaning old worker " + worker); })
multiWorker.on('poll', (workerId, queue) => { console.log("worker["+workerId+"] polling " + queue); })
multiWorker.on('ping', (workerId, time) => { console.log("worker["+workerId+"] check in @ " + time); })
multiWorker.on('job', (workerId, queue, job) => { console.log("worker["+workerId+"] working job " + queue + " " + JSON.stringify(job)); })
multiWorker.on('reEnqueue', (workerId, queue, job, plugin) => { console.log("worker["+workerId+"] reEnqueue job (" + plugin + ") " + queue + " " + JSON.stringify(job)); })
multiWorker.on('success', (workerId, queue, job, result) => { console.log("worker["+workerId+"] job success " + queue + " " + JSON.stringify(job) + " >> " + result); })
Expand Down
72 changes: 70 additions & 2 deletions __tests__/core/scheduler.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ describe('scheduler', () => {
namespace: specHelper.connectionDetails.namespace
}

scheduler = new NodeResque.Scheduler({connection: connectionDetails, timeout: specHelper.timeout})
scheduler = new NodeResque.Scheduler({ connection: connectionDetails, timeout: specHelper.timeout })

scheduler.on('poll', () => { throw new Error('Should not emit poll') })
scheduler.on('master', () => { throw new Error('Should not emit master') })
Expand Down Expand Up @@ -70,7 +70,11 @@ describe('scheduler', () => {
describe('[with connection]', () => {
beforeEach(async () => {
await specHelper.cleanup()
scheduler = new NodeResque.Scheduler({connection: specHelper.connectionDetails, timeout: specHelper.timeout})
scheduler = new NodeResque.Scheduler({
connection: specHelper.connectionDetails,
timeout: specHelper.timeout,
stuckWorkerTimeout: 1000
})
queue = new NodeResque.Queue({connection: specHelper.connectionDetails, queue: specHelper.queue})
await scheduler.connect()
await queue.connect()
Expand Down Expand Up @@ -100,6 +104,70 @@ describe('scheduler', () => {
expect(obj).toBeFalsy()
await scheduler.end()
})

describe('stuck workers', () => {
let worker
const jobs = {
'stuck': {
perform: async function () {
await new Promise((resolve) => {
// stop the worker from checkin in, like the process crashed
// don't resolve
clearTimeout(this.pingTimer)
})
}
}
}

beforeAll(async () => {
worker = new NodeResque.Worker({
connection: specHelper.connectionDetails,
timeout: specHelper.timeout,
queues: ['stuckJobs']
}, jobs)
await worker.connect()
})

afterAll(async () => {
await scheduler.end()
await worker.end()
})

test('will remove stuck workers and fail thier jobs', async () => {
await scheduler.connect()
await scheduler.start()
await worker.start()

const workers = await queue.allWorkingOn()
let h = {}
h[worker.name] = 'started'
expect(workers).toEqual(h)

await queue.enqueue('stuckJobs', 'stuck', ['oh no!'])

await new Promise(async (resolve) => {
scheduler.on('cleanStuckWorker', async (workerName, errorPayload, delta) => {
// response data should contain failure
expect(workerName).toEqual(worker.name)
expect(errorPayload.worker).toEqual(worker.name)
expect(errorPayload.error).toEqual('Worker Timeout (killed manually)')

// check the workers list, should be empty now
expect(await queue.allWorkingOn()).toEqual({})

// check the failed list
let failed = await specHelper.redis.rpop(specHelper.namespace + ':' + 'failed')
failed = JSON.parse(failed)
expect(failed.queue).toBe('stuckJobs')
expect(failed.exception).toBe('Worker Timeout (killed manually)')
expect(failed.error).toBe('Worker Timeout (killed manually)')

scheduler.removeAllListeners('cleanStuckWorker')
resolve()
})
})
})
})
})
})
})
66 changes: 34 additions & 32 deletions __tests__/core/worker.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
const path = require('path')
const specHelper = require(path.join(__dirname, '..', 'utils', 'specHelper.js'))
const os = require('os')
const NodeResque = require(path.join(__dirname, '..', '..', 'index.js'))

let jobs = {
Expand All @@ -27,6 +26,12 @@ let jobs = {
return 'yay'
}
},
'twoSeconds': {
perform: async () => {
await new Promise((resolve) => { setTimeout(resolve, 1000 * 2) })
return 'slow'
}
},
'quickDefine': async () => { return 'ok' }
}

Expand Down Expand Up @@ -108,33 +113,6 @@ describe('worker', () => {
await worker.end()
})

describe('crashing workers', () => {
test('can clear previously crashed workers from the same host', async () => {
let name1 = os.hostname() + ':' + '0' // fake pid
let name2 = os.hostname() + ':' + process.pid // real pid
let worker1 = new NodeResque.Worker({connection: specHelper.connectionDetails, timeout: specHelper.timeout, name: name1}, jobs)

await worker1.connect()
await worker1.init()
worker1.running = false

await new Promise((resolve) => { setTimeout(resolve, 500) })

let worker2 = new NodeResque.Worker({connection: specHelper.connectionDetails, timeout: specHelper.timeout, name: name2}, jobs)
await worker2.connect()

await new Promise((resolve) => {
worker2.workerCleanup()

worker2.on('cleaning_worker', (worker, pid) => {
expect(worker).toMatch(new RegExp(name1))
expect(pid).toBe(0)
return resolve()
})
})
})
})

test('will determine the proper queue names', async () => {
let worker = new NodeResque.Worker({connection: specHelper.connectionDetails, timeout: specHelper.timeout}, jobs)
await worker.connect()
Expand Down Expand Up @@ -178,7 +156,7 @@ describe('worker', () => {
await new Promise(async (resolve) => {
worker.start()

worker.on('success', function (q, job, result) {
worker.on('success', (q, job, result) => {
expect(q).toBe(specHelper.queue)
expect(job['class']).toBe('add')
expect(result).toBe(3)
Expand All @@ -196,7 +174,7 @@ describe('worker', () => {
await new Promise(async (resolve) => {
worker.start()

worker.on('success', function (q, job, result) {
worker.on('success', (q, job, result) => {
expect(result.a).toBe('starting value')
expect(worker.result).toBe(result)

Expand All @@ -212,7 +190,7 @@ describe('worker', () => {
await new Promise(async (resolve) => {
worker.start()

worker.on('success', function (q, job, result) {
worker.on('success', (q, job, result) => {
expect(result).toBe('ok')
worker.removeAllListeners('success')
return resolve()
Expand All @@ -226,7 +204,7 @@ describe('worker', () => {
await new Promise(async (resolve) => {
worker.start()

worker.on('failure', function (q, job, failure) {
worker.on('failure', (q, job, failure) => {
expect(q).toBe(specHelper.queue)
expect(String(failure)).toBe('Error: No job defined for class "somethingFake"')

Expand All @@ -243,6 +221,30 @@ describe('worker', () => {
expect(data.exception).toBe('Error')
expect(data.error).toBe('No job defined for class "somethingFake"')
})

test('will ping with status even when working a slow job', async () => {
const nowInSeconds = Math.round(new Date().getTime() / 1000)
await worker.start()
await new Promise((resolve) => setTimeout(resolve, (worker.options.timeout * 2)))
const pingKey = worker.connection.key('worker', 'ping', worker.name)
let firstPayload = JSON.parse(await specHelper.redis.get(pingKey))
expect(firstPayload.name).toEqual(worker.name)
expect(firstPayload.time).toBeGreaterThanOrEqual(nowInSeconds)

await queue.enqueue(specHelper.queue, 'twoSeconds')

await new Promise(async (resolve) => {
worker.on('success', (q, job, result) => {
expect(result).toBe('slow')
worker.removeAllListeners('success')
return resolve()
})
})

let secondPayload = JSON.parse(await specHelper.redis.get(pingKey))
expect(secondPayload.name).toEqual(worker.name)
expect(secondPayload.time).toBeGreaterThanOrEqual(firstPayload.time)
})
})
})
})
1 change: 0 additions & 1 deletion examples/customPluginExample.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ async function boot () {

const worker = new NodeResque.Worker({connection: connectionDetails, queues: ['default']}, jobs)
await worker.connect()
await worker.workerCleanup() // optional: cleanup any previous improperly shutdown workers on this host
worker.start()

// //////////////////////
Expand Down
1 change: 0 additions & 1 deletion examples/errorExample.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ async function boot () {

const worker = new NodeResque.Worker({connection: connectionDetails, queues: ['default']}, jobs)
await worker.connect()
await worker.workerCleanup() // optional: cleanup any previous improperly shutdown workers on this host
worker.start()

// //////////////////////
Expand Down
3 changes: 2 additions & 1 deletion examples/example.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ async function boot () {

const worker = new NodeResque.Worker({connection: connectionDetails, queues: ['math', 'otherQueue']}, jobs)
await worker.connect()
await worker.workerCleanup() // optional: cleanup any previous improperly shutdown workers on this host
worker.start()

// ////////////////////
Expand All @@ -85,6 +84,7 @@ async function boot () {
worker.on('end', () => { console.log('worker ended') })
worker.on('cleaning_worker', (worker, pid) => { console.log(`cleaning old worker ${worker}`) })
worker.on('poll', (queue) => { console.log(`worker polling ${queue}`) })
worker.on('ping', (time) => { console.log(`worker check in @ ${time}`) })
worker.on('job', (queue, job) => { console.log(`working job ${queue} ${JSON.stringify(job)}`) })
worker.on('reEnqueue', (queue, job, plugin) => { console.log(`reEnqueue job (${plugin}) ${queue} ${JSON.stringify(job)}`) })
worker.on('success', (queue, job, result) => { console.log(`job success ${queue} ${JSON.stringify(job)} >> ${result}`) })
Expand All @@ -97,6 +97,7 @@ async function boot () {
scheduler.on('poll', () => { console.log('scheduler polling') })
scheduler.on('master', (state) => { console.log('scheduler became master') })
scheduler.on('error', (error) => { console.log(`scheduler error >> ${error}`) })
scheduler.on('cleanStuckWorker', (workerName, errorPayload, delta) => { console.log(`failing ${workerName} (stuck for ${delta}s) and failing job ${errorPayload}`) })
scheduler.on('workingTimestamp', (timestamp) => { console.log(`scheduler working timestamp ${timestamp}`) })
scheduler.on('transferredJob', (timestamp, job) => { console.log(`scheduler enquing job ${timestamp} >> ${JSON.stringify(job)}`) })

Expand Down
1 change: 0 additions & 1 deletion examples/retry.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ async function boot () {

const worker = new NodeResque.Worker({connection: connectionDetails, queues: ['math']}, jobs)
await worker.connect()
await worker.workerCleanup() // optional: cleanup any previous improperly shutdown workers on this host
worker.start()

// ////////////////////
Expand Down
1 change: 0 additions & 1 deletion examples/scheduledJobs.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ async function boot () {

const worker = new NodeResque.Worker({connection: connectionDetails, queues: ['time']}, jobs)
await worker.connect()
await worker.workerCleanup() // optional: cleanup any previous improperly shutdown workers on this host
worker.start()

// ////////////////////
Expand Down
Loading

0 comments on commit 1b85096

Please sign in to comment.