Skip to content

Commit

Permalink
scheduler clears workers which fail to ping
Browse files Browse the repository at this point in the history
  • Loading branch information
evantahler committed Apr 5, 2018
1 parent 3bc017b commit 6110e7c
Show file tree
Hide file tree
Showing 9 changed files with 295 additions and 17 deletions.
30 changes: 28 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ I learn best by examples:

```javascript
const path = require('path')
const NodeResque = require(path.join(__dirname, '..', 'index.js'))
// In your projects: const NR = require('node-resque');
const NodeResque = require('node-resque')

async function boot () {
// ////////////////////////
Expand Down Expand Up @@ -84,6 +83,7 @@ async function boot () {
worker.on('end', () => { console.log('worker ended') })
worker.on('cleaning_worker', (worker, pid) => { console.log(`cleaning old worker ${worker}`) })
worker.on('poll', (queue) => { console.log(`worker polling ${queue}`) })
worker.on('ping', (time) => { console.log(`worker check in @ ${time}`) })
worker.on('job', (queue, job) => { console.log(`working job ${queue} ${JSON.stringify(job)}`) })
worker.on('reEnqueue', (queue, job, plugin) => { console.log(`reEnqueue job (${plugin}) ${queue} ${JSON.stringify(job)}`) })
worker.on('success', (queue, job, result) => { console.log(`job success ${queue} ${JSON.stringify(job)} >> ${result}`) })
Expand All @@ -95,6 +95,7 @@ async function boot () {
scheduler.on('end', () => { console.log('scheduler ended') })
scheduler.on('poll', () => { console.log('scheduler polling') })
scheduler.on('master', (state) => { console.log('scheduler became master') })
scheduler.on('cleanStuckWorker', (workerName, errorPayload, delta) => { console.log(`failing ${workerName} (stuck for ${delta}s) and failing job ${errorPayload}`) })
scheduler.on('error', (error) => { console.log(`scheduler error >> ${error}`) })
scheduler.on('workingTimestamp', (timestamp) => { console.log(`scheduler working timestamp ${timestamp}`) })
scheduler.on('transferredJob', (timestamp, job) => { console.log(`scheduler enquing job ${timestamp} >> ${JSON.stringify(job)}`) })
Expand Down Expand Up @@ -301,6 +302,30 @@ We use a try/catch pattern to catch errors in your jobs. If any job throws an un

## Failed Worker Management

### Automatically

By default, the scheduler will check for workers which haven't pinged redis in 60 minutes. If this happens, we will assume the process crashed, and remove it from redis. If this worker was working on a job, we will place it in the failed queue for later inspection. Every worker has a timer running in which it then updates a key in redis every `timeout` (default: 5 seconds). If your job is slow, but async, there should be no problem. However, if your job consumes 100% of the CPU of the process, this timer might not fire.

To modify the 60 minute check, change `stuckWorkerTimeout` when configuring your scheudler, ie:

```js
const scheduler = new NodeResque.Scheduler({
stuckWorkerTimeout: (1000 * 60 * 60) // 1 hour, in ms
connection: connectionDetails
})
```

Set your scheduler's `stuckWorkerTimeout = false` to disable this behavior.

```js
const scheduler = new NodeResque.Scheduler({
stuckWorkerTimeout: false // will not fail jobs which haven't pinged redis
connection: connectionDetails
})
```

### Manually

Sometimes a worker crashes is a *severe* way, and it doesn't get the time/chance to notify redis that it is leaving the pool (this happens all the time on PAAS providers like Heroku). When this happens, you will not only need to extract the job from the now-zombie worker's "working on" status, but also remove the stuck worker. To aid you in these edge cases, `await queue.cleanOldWorkers(age)` is available.

Because there are no 'heartbeats' in resque, it is imposable for the application to know if a worker has been working on a long job or it is dead. You are required to provide an "age" for how long a worker has been "working", and all those older than that age will be removed, and the job they are working on moved to the error queue (where you can then use `queue.retryAndRemoveFailed`) to re-enqueue the job.
Expand Down Expand Up @@ -436,6 +461,7 @@ multiWorker.on('start', (workerId) => { console
multiWorker.on('end', (workerId) => { console.log("worker["+workerId+"] ended"); })
multiWorker.on('cleaning_worker', (workerId, worker, pid) => { console.log("cleaning old worker " + worker); })
multiWorker.on('poll', (workerId, queue) => { console.log("worker["+workerId+"] polling " + queue); })
multiWorker.on('ping', (workerId, time) => { console.log("worker["+workerId+"] check in @ " + time); })
multiWorker.on('job', (workerId, queue, job) => { console.log("worker["+workerId+"] working job " + queue + " " + JSON.stringify(job)); })
multiWorker.on('reEnqueue', (workerId, queue, job, plugin) => { console.log("worker["+workerId+"] reEnqueue job (" + plugin + ") " + queue + " " + JSON.stringify(job)); })
multiWorker.on('success', (workerId, queue, job, result) => { console.log("worker["+workerId+"] job success " + queue + " " + JSON.stringify(job) + " >> " + result); })
Expand Down
72 changes: 70 additions & 2 deletions __tests__/core/scheduler.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ describe('scheduler', () => {
namespace: specHelper.connectionDetails.namespace
}

scheduler = new NodeResque.Scheduler({connection: connectionDetails, timeout: specHelper.timeout})
scheduler = new NodeResque.Scheduler({ connection: connectionDetails, timeout: specHelper.timeout })

scheduler.on('poll', () => { throw new Error('Should not emit poll') })
scheduler.on('master', () => { throw new Error('Should not emit master') })
Expand Down Expand Up @@ -70,7 +70,11 @@ describe('scheduler', () => {
describe('[with connection]', () => {
beforeEach(async () => {
await specHelper.cleanup()
scheduler = new NodeResque.Scheduler({connection: specHelper.connectionDetails, timeout: specHelper.timeout})
scheduler = new NodeResque.Scheduler({
connection: specHelper.connectionDetails,
timeout: specHelper.timeout,
stuckWorkerTimeout: 1000
})
queue = new NodeResque.Queue({connection: specHelper.connectionDetails, queue: specHelper.queue})
await scheduler.connect()
await queue.connect()
Expand Down Expand Up @@ -100,6 +104,70 @@ describe('scheduler', () => {
expect(obj).toBeFalsy()
await scheduler.end()
})

describe('stuck workers', () => {
let worker
const jobs = {
'stuck': {
perform: async function () {
await new Promise((resolve) => {
// stop the worker from checkin in, like the process crashed
// don't resolve
clearTimeout(this.pingTimer)
})
}
}
}

beforeAll(async () => {
worker = new NodeResque.Worker({
connection: specHelper.connectionDetails,
timeout: specHelper.timeout,
queues: ['stuckJobs']
}, jobs)
await worker.connect()
})

afterAll(async () => {
await scheduler.end()
await worker.end()
})

test('will remove stuck workers and fail thier jobs', async () => {
await scheduler.connect()
await scheduler.start()
await worker.start()

const workers = await queue.allWorkingOn()
let h = {}
h[worker.name] = 'started'
expect(workers).toEqual(h)

await queue.enqueue('stuckJobs', 'stuck', ['oh no!'])

await new Promise(async (resolve) => {
scheduler.on('cleanStuckWorker', async (workerName, errorPayload, delta) => {
// response data should contain failure
expect(workerName).toEqual(worker.name)
expect(errorPayload.worker).toEqual(worker.name)
expect(errorPayload.error).toEqual('Worker Timeout (killed manually)')

// check the workers list, should be empty now
expect(await queue.allWorkingOn()).toEqual({})

// check the failed list
let failed = await specHelper.redis.rpop(specHelper.namespace + ':' + 'failed')
failed = JSON.parse(failed)
expect(failed.queue).toBe('stuckJobs')
expect(failed.exception).toBe('Worker Timeout (killed manually)')
expect(failed.error).toBe('Worker Timeout (killed manually)')

scheduler.removeAllListeners('cleanStuckWorker')
resolve()
})
})
})
})
})
})
})
38 changes: 34 additions & 4 deletions __tests__/core/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ let jobs = {
return 'yay'
}
},
'twoSeconds': {
perform: async () => {
await new Promise((resolve) => { setTimeout(resolve, 1000 * 2) })
return 'slow'
}
},
'quickDefine': async () => { return 'ok' }
}

Expand Down Expand Up @@ -150,7 +156,7 @@ describe('worker', () => {
await new Promise(async (resolve) => {
worker.start()

worker.on('success', function (q, job, result) {
worker.on('success', (q, job, result) => {
expect(q).toBe(specHelper.queue)
expect(job['class']).toBe('add')
expect(result).toBe(3)
Expand All @@ -168,7 +174,7 @@ describe('worker', () => {
await new Promise(async (resolve) => {
worker.start()

worker.on('success', function (q, job, result) {
worker.on('success', (q, job, result) => {
expect(result.a).toBe('starting value')
expect(worker.result).toBe(result)

Expand All @@ -184,7 +190,7 @@ describe('worker', () => {
await new Promise(async (resolve) => {
worker.start()

worker.on('success', function (q, job, result) {
worker.on('success', (q, job, result) => {
expect(result).toBe('ok')
worker.removeAllListeners('success')
return resolve()
Expand All @@ -198,7 +204,7 @@ describe('worker', () => {
await new Promise(async (resolve) => {
worker.start()

worker.on('failure', function (q, job, failure) {
worker.on('failure', (q, job, failure) => {
expect(q).toBe(specHelper.queue)
expect(String(failure)).toBe('Error: No job defined for class "somethingFake"')

Expand All @@ -215,6 +221,30 @@ describe('worker', () => {
expect(data.exception).toBe('Error')
expect(data.error).toBe('No job defined for class "somethingFake"')
})

test('will ping with status even when working a slow job', async () => {
const nowInSeconds = Math.round(new Date().getTime() / 1000)
await worker.start()
await new Promise((resolve) => setTimeout(resolve, (worker.options.timeout * 2)))
const pingKey = worker.connection.key('worker', 'ping', worker.name)
let firstPayload = JSON.parse(await specHelper.redis.get(pingKey))
expect(firstPayload.name).toEqual(worker.name)
expect(firstPayload.time).toBeGreaterThanOrEqual(nowInSeconds)

await queue.enqueue(specHelper.queue, 'twoSeconds')

await new Promise(async (resolve) => {
worker.on('success', (q, job, result) => {
expect(result).toBe('slow')
worker.removeAllListeners('success')
return resolve()
})
})

let secondPayload = JSON.parse(await specHelper.redis.get(pingKey))
expect(secondPayload.name).toEqual(worker.name)
expect(secondPayload.time).toBeGreaterThanOrEqual(firstPayload.time)
})
})
})
})
2 changes: 2 additions & 0 deletions examples/example.js
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ async function boot () {
worker.on('end', () => { console.log('worker ended') })
worker.on('cleaning_worker', (worker, pid) => { console.log(`cleaning old worker ${worker}`) })
worker.on('poll', (queue) => { console.log(`worker polling ${queue}`) })
worker.on('ping', (time) => { console.log(`worker check in @ ${time}`) })
worker.on('job', (queue, job) => { console.log(`working job ${queue} ${JSON.stringify(job)}`) })
worker.on('reEnqueue', (queue, job, plugin) => { console.log(`reEnqueue job (${plugin}) ${queue} ${JSON.stringify(job)}`) })
worker.on('success', (queue, job, result) => { console.log(`job success ${queue} ${JSON.stringify(job)} >> ${result}`) })
Expand All @@ -96,6 +97,7 @@ async function boot () {
scheduler.on('poll', () => { console.log('scheduler polling') })
scheduler.on('master', (state) => { console.log('scheduler became master') })
scheduler.on('error', (error) => { console.log(`scheduler error >> ${error}`) })
scheduler.on('cleanStuckWorker', (workerName, errorPayload, delta) => { console.log(`failing ${workerName} (stuck for ${delta}s) and failing job ${errorPayload}`) })
scheduler.on('workingTimestamp', (timestamp) => { console.log(`scheduler working timestamp ${timestamp}`) })
scheduler.on('transferredJob', (timestamp, job) => { console.log(`scheduler enquing job ${timestamp} >> ${JSON.stringify(job)}`) })

Expand Down
96 changes: 96 additions & 0 deletions examples/stuckWorker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
const path = require('path')
const NodeResque = require(path.join(__dirname, '..', 'index.js'))
// In your projects: const NodeResque = require('node-resque');

async function boot () {
// ////////////////////////
// SET UP THE CONNECTION //
// ////////////////////////

const connectionDetails = {
pkg: 'ioredis',
host: '127.0.0.1',
password: null,
port: 6379,
database: 0
// namespace: 'resque',
// looping: true,
// options: {password: 'abc'},
}

// ///////////////////////////
// DEFINE YOUR WORKER TASKS //
// ///////////////////////////

const jobs = {
'stuck': {
perform: async function () {
console.log(`${this.name} is starting stuck job...`)
await new Promise((resolve) => {
clearTimeout(this.pingTimer)// stop the worker from checkin in, like the process crashed
setTimeout(resolve, 60 * 60 * 1000) // 1 hour job
})
}
}
}

// /////////////////
// START A WORKER //
// /////////////////

const worker = new NodeResque.Worker({connection: connectionDetails, queues: ['stuckJobs']}, jobs)
await worker.connect()
worker.start()

// ////////////////////
// START A SCHEDULER //
// ////////////////////

const scheduler = new NodeResque.Scheduler({
stuckWorkerTimeout: (10 * 1000),
connection: connectionDetails
})

await scheduler.connect()
scheduler.start()

// //////////////////////
// REGESTER FOR EVENTS //
// //////////////////////

worker.on('start', () => { console.log('worker started') })
worker.on('end', () => { console.log('worker ended') })
worker.on('cleaning_worker', (worker, pid) => { console.log(`cleaning old worker ${worker}`) })
worker.on('poll', (queue) => { console.log(`worker polling ${queue}`) })
worker.on('ping', (time) => { console.log(`worker check in @ ${time}`) })
worker.on('job', (queue, job) => { console.log(`working job ${queue} ${JSON.stringify(job)}`) })
worker.on('reEnqueue', (queue, job, plugin) => { console.log(`reEnqueue job (${plugin}) ${queue} ${JSON.stringify(job)}`) })
worker.on('success', (queue, job, result) => { console.log(`job success ${queue} ${JSON.stringify(job)} >> ${result}`) })
worker.on('failure', (queue, job, failure) => { console.log(`job failure ${queue} ${JSON.stringify(job)} >> ${failure}`) })
worker.on('error', (error, queue, job) => { console.log(`error ${queue} ${JSON.stringify(job)} >> ${error}`) })
worker.on('pause', () => { console.log('worker paused') })

scheduler.on('start', () => { console.log('scheduler started') })
scheduler.on('end', () => { console.log('scheduler ended') })
scheduler.on('poll', () => { console.log('scheduler polling') })
scheduler.on('master', (state) => { console.log('scheduler became master') })
scheduler.on('error', (error) => { console.log(`scheduler error >> ${error}`) })
scheduler.on('workingTimestamp', (timestamp) => { console.log(`scheduler working timestamp ${timestamp}`) })
scheduler.on('transferredJob', (timestamp, job) => { console.log(`scheduler enquing job ${timestamp} >> ${JSON.stringify(job)}`) })

scheduler.on('cleanStuckWorker', (workerName, errorPayload, delta) => {
console.log(`failing ${workerName} (stuck for ${delta}s) and failing job: ${JSON.stringify(errorPayload)}`)
process.exit()
})

// //////////////////////
// CONNECT TO A QUEUE //
// //////////////////////

const queue = new NodeResque.Queue({connection: connectionDetails}, jobs)
queue.on('error', function (error) { console.log(error) })
await queue.connect()
await queue.enqueue('stuckJobs', 'stuck', ['oh no'])
}

boot()
2 changes: 2 additions & 0 deletions lib/multiWorker.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class MultiWorker extends EventEmitter {
worker.on('end', () => { this.emit('end', worker.id) })
worker.on('cleaning_worker', (worker, pid) => { this.emit('cleaning_worker', worker.id, worker, pid) })
worker.on('poll', (queue) => { this.emit('poll', worker.id, queue) })
worker.on('ping', (time) => { this.emit('ping', worker.id, time) })
worker.on('job', (queue, job) => { this.emit('job', worker.id, queue, job) })
worker.on('reEnqueue', (queue, job, plugin) => { this.emit('reEnqueue', worker.id, queue, job, plugin) })
worker.on('success', (queue, job, result) => { this.emit('success', worker.id, queue, job, result) })
Expand Down Expand Up @@ -144,6 +145,7 @@ class MultiWorker extends EventEmitter {
'end',
'cleaning_worker',
'poll',
'ping',
'job',
'reEnqueue',
'success',
Expand Down
Loading

0 comments on commit 6110e7c

Please sign in to comment.