From 6110e7c95f5113f0a483b3654a78eb2557788f3b Mon Sep 17 00:00:00 2001 From: evan tahler Date: Wed, 4 Apr 2018 23:21:23 -0700 Subject: [PATCH] scheduler clears workers which fail to ping --- README.md | 30 +++++++++++- __tests__/core/scheduler.js | 72 +++++++++++++++++++++++++++- __tests__/core/worker.js | 38 +++++++++++++-- examples/example.js | 2 + examples/stuckWorker.js | 96 +++++++++++++++++++++++++++++++++++++ lib/multiWorker.js | 2 + lib/queue.js | 15 ++++-- lib/scheduler.js | 28 +++++++++++ lib/worker.js | 29 +++++++++-- 9 files changed, 295 insertions(+), 17 deletions(-) create mode 100644 examples/stuckWorker.js diff --git a/README.md b/README.md index aee52aba..44c9a535 100644 --- a/README.md +++ b/README.md @@ -15,8 +15,7 @@ I learn best by examples: ```javascript const path = require('path') -const NodeResque = require(path.join(__dirname, '..', 'index.js')) -// In your projects: const NR = require('node-resque'); +const NodeResque = require('node-resque') async function boot () { // //////////////////////// @@ -84,6 +83,7 @@ async function boot () { worker.on('end', () => { console.log('worker ended') }) worker.on('cleaning_worker', (worker, pid) => { console.log(`cleaning old worker ${worker}`) }) worker.on('poll', (queue) => { console.log(`worker polling ${queue}`) }) + worker.on('ping', (time) => { console.log(`worker check in @ ${time}`) }) worker.on('job', (queue, job) => { console.log(`working job ${queue} ${JSON.stringify(job)}`) }) worker.on('reEnqueue', (queue, job, plugin) => { console.log(`reEnqueue job (${plugin}) ${queue} ${JSON.stringify(job)}`) }) worker.on('success', (queue, job, result) => { console.log(`job success ${queue} ${JSON.stringify(job)} >> ${result}`) }) @@ -95,6 +95,7 @@ async function boot () { scheduler.on('end', () => { console.log('scheduler ended') }) scheduler.on('poll', () => { console.log('scheduler polling') }) scheduler.on('master', (state) => { console.log('scheduler became master') }) + scheduler.on('cleanStuckWorker', (workerName, errorPayload, delta) => { console.log(`failing ${workerName} (stuck for ${delta}s) and failing job ${errorPayload}`) }) scheduler.on('error', (error) => { console.log(`scheduler error >> ${error}`) }) scheduler.on('workingTimestamp', (timestamp) => { console.log(`scheduler working timestamp ${timestamp}`) }) scheduler.on('transferredJob', (timestamp, job) => { console.log(`scheduler enquing job ${timestamp} >> ${JSON.stringify(job)}`) }) @@ -301,6 +302,30 @@ We use a try/catch pattern to catch errors in your jobs. If any job throws an un ## Failed Worker Management +### Automatically + +By default, the scheduler will check for workers which haven't pinged redis in 60 minutes. If this happens, we will assume the process crashed, and remove it from redis. If this worker was working on a job, we will place it in the failed queue for later inspection. Every worker has a timer running in which it then updates a key in redis every `timeout` (default: 5 seconds). If your job is slow, but async, there should be no problem. However, if your job consumes 100% of the CPU of the process, this timer might not fire. + +To modify the 60 minute check, change `stuckWorkerTimeout` when configuring your scheudler, ie: + +```js +const scheduler = new NodeResque.Scheduler({ + stuckWorkerTimeout: (1000 * 60 * 60) // 1 hour, in ms + connection: connectionDetails +}) +``` + +Set your scheduler's `stuckWorkerTimeout = false` to disable this behavior. + +```js +const scheduler = new NodeResque.Scheduler({ + stuckWorkerTimeout: false // will not fail jobs which haven't pinged redis + connection: connectionDetails +}) +``` + +### Manually + Sometimes a worker crashes is a *severe* way, and it doesn't get the time/chance to notify redis that it is leaving the pool (this happens all the time on PAAS providers like Heroku). When this happens, you will not only need to extract the job from the now-zombie worker's "working on" status, but also remove the stuck worker. To aid you in these edge cases, `await queue.cleanOldWorkers(age)` is available. Because there are no 'heartbeats' in resque, it is imposable for the application to know if a worker has been working on a long job or it is dead. You are required to provide an "age" for how long a worker has been "working", and all those older than that age will be removed, and the job they are working on moved to the error queue (where you can then use `queue.retryAndRemoveFailed`) to re-enqueue the job. @@ -436,6 +461,7 @@ multiWorker.on('start', (workerId) => { console multiWorker.on('end', (workerId) => { console.log("worker["+workerId+"] ended"); }) multiWorker.on('cleaning_worker', (workerId, worker, pid) => { console.log("cleaning old worker " + worker); }) multiWorker.on('poll', (workerId, queue) => { console.log("worker["+workerId+"] polling " + queue); }) +multiWorker.on('ping', (workerId, time) => { console.log("worker["+workerId+"] check in @ " + time); }) multiWorker.on('job', (workerId, queue, job) => { console.log("worker["+workerId+"] working job " + queue + " " + JSON.stringify(job)); }) multiWorker.on('reEnqueue', (workerId, queue, job, plugin) => { console.log("worker["+workerId+"] reEnqueue job (" + plugin + ") " + queue + " " + JSON.stringify(job)); }) multiWorker.on('success', (workerId, queue, job, result) => { console.log("worker["+workerId+"] job success " + queue + " " + JSON.stringify(job) + " >> " + result); }) diff --git a/__tests__/core/scheduler.js b/__tests__/core/scheduler.js index c63ca1db..e6a04509 100644 --- a/__tests__/core/scheduler.js +++ b/__tests__/core/scheduler.js @@ -26,7 +26,7 @@ describe('scheduler', () => { namespace: specHelper.connectionDetails.namespace } - scheduler = new NodeResque.Scheduler({connection: connectionDetails, timeout: specHelper.timeout}) + scheduler = new NodeResque.Scheduler({ connection: connectionDetails, timeout: specHelper.timeout }) scheduler.on('poll', () => { throw new Error('Should not emit poll') }) scheduler.on('master', () => { throw new Error('Should not emit master') }) @@ -70,7 +70,11 @@ describe('scheduler', () => { describe('[with connection]', () => { beforeEach(async () => { await specHelper.cleanup() - scheduler = new NodeResque.Scheduler({connection: specHelper.connectionDetails, timeout: specHelper.timeout}) + scheduler = new NodeResque.Scheduler({ + connection: specHelper.connectionDetails, + timeout: specHelper.timeout, + stuckWorkerTimeout: 1000 + }) queue = new NodeResque.Queue({connection: specHelper.connectionDetails, queue: specHelper.queue}) await scheduler.connect() await queue.connect() @@ -100,6 +104,70 @@ describe('scheduler', () => { expect(obj).toBeFalsy() await scheduler.end() }) + + describe('stuck workers', () => { + let worker + const jobs = { + 'stuck': { + perform: async function () { + await new Promise((resolve) => { + // stop the worker from checkin in, like the process crashed + // don't resolve + clearTimeout(this.pingTimer) + }) + } + } + } + + beforeAll(async () => { + worker = new NodeResque.Worker({ + connection: specHelper.connectionDetails, + timeout: specHelper.timeout, + queues: ['stuckJobs'] + }, jobs) + await worker.connect() + }) + + afterAll(async () => { + await scheduler.end() + await worker.end() + }) + + test('will remove stuck workers and fail thier jobs', async () => { + await scheduler.connect() + await scheduler.start() + await worker.start() + + const workers = await queue.allWorkingOn() + let h = {} + h[worker.name] = 'started' + expect(workers).toEqual(h) + + await queue.enqueue('stuckJobs', 'stuck', ['oh no!']) + + await new Promise(async (resolve) => { + scheduler.on('cleanStuckWorker', async (workerName, errorPayload, delta) => { + // response data should contain failure + expect(workerName).toEqual(worker.name) + expect(errorPayload.worker).toEqual(worker.name) + expect(errorPayload.error).toEqual('Worker Timeout (killed manually)') + + // check the workers list, should be empty now + expect(await queue.allWorkingOn()).toEqual({}) + + // check the failed list + let failed = await specHelper.redis.rpop(specHelper.namespace + ':' + 'failed') + failed = JSON.parse(failed) + expect(failed.queue).toBe('stuckJobs') + expect(failed.exception).toBe('Worker Timeout (killed manually)') + expect(failed.error).toBe('Worker Timeout (killed manually)') + + scheduler.removeAllListeners('cleanStuckWorker') + resolve() + }) + }) + }) + }) }) }) }) diff --git a/__tests__/core/worker.js b/__tests__/core/worker.js index b2759503..3d199a2a 100644 --- a/__tests__/core/worker.js +++ b/__tests__/core/worker.js @@ -26,6 +26,12 @@ let jobs = { return 'yay' } }, + 'twoSeconds': { + perform: async () => { + await new Promise((resolve) => { setTimeout(resolve, 1000 * 2) }) + return 'slow' + } + }, 'quickDefine': async () => { return 'ok' } } @@ -150,7 +156,7 @@ describe('worker', () => { await new Promise(async (resolve) => { worker.start() - worker.on('success', function (q, job, result) { + worker.on('success', (q, job, result) => { expect(q).toBe(specHelper.queue) expect(job['class']).toBe('add') expect(result).toBe(3) @@ -168,7 +174,7 @@ describe('worker', () => { await new Promise(async (resolve) => { worker.start() - worker.on('success', function (q, job, result) { + worker.on('success', (q, job, result) => { expect(result.a).toBe('starting value') expect(worker.result).toBe(result) @@ -184,7 +190,7 @@ describe('worker', () => { await new Promise(async (resolve) => { worker.start() - worker.on('success', function (q, job, result) { + worker.on('success', (q, job, result) => { expect(result).toBe('ok') worker.removeAllListeners('success') return resolve() @@ -198,7 +204,7 @@ describe('worker', () => { await new Promise(async (resolve) => { worker.start() - worker.on('failure', function (q, job, failure) { + worker.on('failure', (q, job, failure) => { expect(q).toBe(specHelper.queue) expect(String(failure)).toBe('Error: No job defined for class "somethingFake"') @@ -215,6 +221,30 @@ describe('worker', () => { expect(data.exception).toBe('Error') expect(data.error).toBe('No job defined for class "somethingFake"') }) + + test('will ping with status even when working a slow job', async () => { + const nowInSeconds = Math.round(new Date().getTime() / 1000) + await worker.start() + await new Promise((resolve) => setTimeout(resolve, (worker.options.timeout * 2))) + const pingKey = worker.connection.key('worker', 'ping', worker.name) + let firstPayload = JSON.parse(await specHelper.redis.get(pingKey)) + expect(firstPayload.name).toEqual(worker.name) + expect(firstPayload.time).toBeGreaterThanOrEqual(nowInSeconds) + + await queue.enqueue(specHelper.queue, 'twoSeconds') + + await new Promise(async (resolve) => { + worker.on('success', (q, job, result) => { + expect(result).toBe('slow') + worker.removeAllListeners('success') + return resolve() + }) + }) + + let secondPayload = JSON.parse(await specHelper.redis.get(pingKey)) + expect(secondPayload.name).toEqual(worker.name) + expect(secondPayload.time).toBeGreaterThanOrEqual(firstPayload.time) + }) }) }) }) diff --git a/examples/example.js b/examples/example.js index 038d5689..deb99d80 100644 --- a/examples/example.js +++ b/examples/example.js @@ -84,6 +84,7 @@ async function boot () { worker.on('end', () => { console.log('worker ended') }) worker.on('cleaning_worker', (worker, pid) => { console.log(`cleaning old worker ${worker}`) }) worker.on('poll', (queue) => { console.log(`worker polling ${queue}`) }) + worker.on('ping', (time) => { console.log(`worker check in @ ${time}`) }) worker.on('job', (queue, job) => { console.log(`working job ${queue} ${JSON.stringify(job)}`) }) worker.on('reEnqueue', (queue, job, plugin) => { console.log(`reEnqueue job (${plugin}) ${queue} ${JSON.stringify(job)}`) }) worker.on('success', (queue, job, result) => { console.log(`job success ${queue} ${JSON.stringify(job)} >> ${result}`) }) @@ -96,6 +97,7 @@ async function boot () { scheduler.on('poll', () => { console.log('scheduler polling') }) scheduler.on('master', (state) => { console.log('scheduler became master') }) scheduler.on('error', (error) => { console.log(`scheduler error >> ${error}`) }) + scheduler.on('cleanStuckWorker', (workerName, errorPayload, delta) => { console.log(`failing ${workerName} (stuck for ${delta}s) and failing job ${errorPayload}`) }) scheduler.on('workingTimestamp', (timestamp) => { console.log(`scheduler working timestamp ${timestamp}`) }) scheduler.on('transferredJob', (timestamp, job) => { console.log(`scheduler enquing job ${timestamp} >> ${JSON.stringify(job)}`) }) diff --git a/examples/stuckWorker.js b/examples/stuckWorker.js new file mode 100644 index 00000000..d8deb708 --- /dev/null +++ b/examples/stuckWorker.js @@ -0,0 +1,96 @@ +const path = require('path') +const NodeResque = require(path.join(__dirname, '..', 'index.js')) +// In your projects: const NodeResque = require('node-resque'); + +async function boot () { + // //////////////////////// + // SET UP THE CONNECTION // + // //////////////////////// + + const connectionDetails = { + pkg: 'ioredis', + host: '127.0.0.1', + password: null, + port: 6379, + database: 0 + // namespace: 'resque', + // looping: true, + // options: {password: 'abc'}, + } + + // /////////////////////////// + // DEFINE YOUR WORKER TASKS // + // /////////////////////////// + + const jobs = { + 'stuck': { + perform: async function () { + console.log(`${this.name} is starting stuck job...`) + await new Promise((resolve) => { + clearTimeout(this.pingTimer)// stop the worker from checkin in, like the process crashed + setTimeout(resolve, 60 * 60 * 1000) // 1 hour job + }) + } + } + } + + // ///////////////// + // START A WORKER // + // ///////////////// + + const worker = new NodeResque.Worker({connection: connectionDetails, queues: ['stuckJobs']}, jobs) + await worker.connect() + worker.start() + + // //////////////////// + // START A SCHEDULER // + // //////////////////// + + const scheduler = new NodeResque.Scheduler({ + stuckWorkerTimeout: (10 * 1000), + connection: connectionDetails + }) + + await scheduler.connect() + scheduler.start() + + // ////////////////////// + // REGESTER FOR EVENTS // + // ////////////////////// + + worker.on('start', () => { console.log('worker started') }) + worker.on('end', () => { console.log('worker ended') }) + worker.on('cleaning_worker', (worker, pid) => { console.log(`cleaning old worker ${worker}`) }) + worker.on('poll', (queue) => { console.log(`worker polling ${queue}`) }) + worker.on('ping', (time) => { console.log(`worker check in @ ${time}`) }) + worker.on('job', (queue, job) => { console.log(`working job ${queue} ${JSON.stringify(job)}`) }) + worker.on('reEnqueue', (queue, job, plugin) => { console.log(`reEnqueue job (${plugin}) ${queue} ${JSON.stringify(job)}`) }) + worker.on('success', (queue, job, result) => { console.log(`job success ${queue} ${JSON.stringify(job)} >> ${result}`) }) + worker.on('failure', (queue, job, failure) => { console.log(`job failure ${queue} ${JSON.stringify(job)} >> ${failure}`) }) + worker.on('error', (error, queue, job) => { console.log(`error ${queue} ${JSON.stringify(job)} >> ${error}`) }) + worker.on('pause', () => { console.log('worker paused') }) + + scheduler.on('start', () => { console.log('scheduler started') }) + scheduler.on('end', () => { console.log('scheduler ended') }) + scheduler.on('poll', () => { console.log('scheduler polling') }) + scheduler.on('master', (state) => { console.log('scheduler became master') }) + scheduler.on('error', (error) => { console.log(`scheduler error >> ${error}`) }) + scheduler.on('workingTimestamp', (timestamp) => { console.log(`scheduler working timestamp ${timestamp}`) }) + scheduler.on('transferredJob', (timestamp, job) => { console.log(`scheduler enquing job ${timestamp} >> ${JSON.stringify(job)}`) }) + + scheduler.on('cleanStuckWorker', (workerName, errorPayload, delta) => { + console.log(`failing ${workerName} (stuck for ${delta}s) and failing job: ${JSON.stringify(errorPayload)}`) + process.exit() + }) + + // ////////////////////// + // CONNECT TO A QUEUE // + // ////////////////////// + + const queue = new NodeResque.Queue({connection: connectionDetails}, jobs) + queue.on('error', function (error) { console.log(error) }) + await queue.connect() + await queue.enqueue('stuckJobs', 'stuck', ['oh no']) +} + +boot() diff --git a/lib/multiWorker.js b/lib/multiWorker.js index cc5ca7a4..11e8b4dc 100644 --- a/lib/multiWorker.js +++ b/lib/multiWorker.js @@ -67,6 +67,7 @@ class MultiWorker extends EventEmitter { worker.on('end', () => { this.emit('end', worker.id) }) worker.on('cleaning_worker', (worker, pid) => { this.emit('cleaning_worker', worker.id, worker, pid) }) worker.on('poll', (queue) => { this.emit('poll', worker.id, queue) }) + worker.on('ping', (time) => { this.emit('ping', worker.id, time) }) worker.on('job', (queue, job) => { this.emit('job', worker.id, queue, job) }) worker.on('reEnqueue', (queue, job, plugin) => { this.emit('reEnqueue', worker.id, queue, job, plugin) }) worker.on('success', (queue, job, result) => { this.emit('success', worker.id, queue, job, result) }) @@ -144,6 +145,7 @@ class MultiWorker extends EventEmitter { 'end', 'cleaning_worker', 'poll', + 'ping', 'job', 'reEnqueue', 'success', diff --git a/lib/queue.js b/lib/queue.js index af53933a..8e6847df 100644 --- a/lib/queue.js +++ b/lib/queue.js @@ -1,4 +1,5 @@ const path = require('path') +const os = require('os') const EventEmitter = require('events').EventEmitter const Connection = require(path.join(__dirname, 'connection.js')).Connection const PluginRunner = require(path.join(__dirname, 'pluginRunner.js')) @@ -249,15 +250,20 @@ class Queue extends EventEmitter { if (!queues) { throw new Error('worker not found') } let workingOn = await this.workingOn(workerName, queues) + let message = 'Worker Timeout (killed manually)' if (workingOn) { workingOn = JSON.parse(workingOn) errorPayload = { worker: workerName, queue: workingOn.queue, - payload: workingOn.payload, - exception: 'Worker Timeout (killed manually)', - error: 'Worker Timeout (killed manually)', - backtrace: null, + payload: workingOn.payload || [], + exception: message, + error: message, + backtrace: [ + `killed by ${os.hostname} at ${new Date()}`, + 'queue#forceCleanWorker', + 'node-resque' + ], failed_at: (new Date()).toString() } @@ -268,6 +274,7 @@ class Queue extends EventEmitter { await this.connection.redis.del(this.connection.key('stat', 'failed', workerName)) await this.connection.redis.del(this.connection.key('stat', 'processed', workerName)) + await this.connection.redis.del(this.connection.key('worker', 'ping', workerName)) await this.connection.redis.del(this.connection.key('worker', workerName)) await this.connection.redis.srem(this.connection.key('workers'), workerName + ':' + queues) diff --git a/lib/scheduler.js b/lib/scheduler.js index c3fdd311..f2eeccd3 100644 --- a/lib/scheduler.js +++ b/lib/scheduler.js @@ -13,6 +13,7 @@ class Scheduler extends EventEmitter { const defaults = { timeout: 5000, // in ms + stuckWorkerTimeout: 60 * 60 * 1000, // 60 minutes in ms masterLockTimeout: 60 * 3, // in seconds name: os.hostname() + ':' + process.pid // assumes only one worker per node process } @@ -88,6 +89,8 @@ class Scheduler extends EventEmitter { this.emit('master') } + await this.checkStuckWorkers() + this.emit('poll') let timestamp = await this.nextDelayedTimestamp() if (timestamp) { @@ -184,6 +187,31 @@ class Scheduler extends EventEmitter { await this.connection.redis.zrem(this.connection.key('delayed_queue_schedule'), timestamp) } } + + async checkStuckWorkers () { + if (!this.options.stuckWorkerTimeout) { return } + + const keys = await this.connection.redis.keys(this.connection.key('worker', 'ping', '*')) + const payloads = await Promise.all(keys.map(async (k) => { + return JSON.parse(await this.connection.redis.get(k)) + })) + + const nowInSeconds = Math.round(new Date().getTime() / 1000) + const stuckWorkerTimeoutInSeconts = Math.round(this.options.stuckWorkerTimeout / 1000) + for (let i in payloads) { + const {name, time} = payloads[i] + const delta = nowInSeconds - time + if (delta > stuckWorkerTimeoutInSeconts) { + await this.forceCleanWorker(name, delta) + } + i++ + } + } + + async forceCleanWorker (workerName, delta) { + const errorPayload = await this.queue.forceCleanWorker(workerName) + this.emit('cleanStuckWorker', workerName, errorPayload, delta) + } } exports.Scheduler = Scheduler diff --git a/lib/worker.js b/lib/worker.js index 063b5574..9b383cc8 100644 --- a/lib/worker.js +++ b/lib/worker.js @@ -38,6 +38,7 @@ class Worker extends EventEmitter { this.running = false this.working = false this.job = null + this.pingTimer = null this.queueObject = new Queue({connection: options.connection}, this.jobs) this.queueObject.on('error', (error) => { this.emit('error', error) }) @@ -63,6 +64,8 @@ class Worker extends EventEmitter { this.connection.key('worker', this.name, this.stringQueues(), 'started'), Math.round((new Date()).getTime() / 1000) ) + await this.ping() + this.pingTimer = setInterval(this.ping.bind(this), this.options.timeout) } async end () { @@ -74,7 +77,8 @@ class Worker extends EventEmitter { } if (this.connection.connected === true || this.connection.connected === undefined || this.connection.connected === null) { - await this.untrack(this.name, this.stringQueues()) + clearInterval(this.pingTimer) + await this.untrack() } await this.queueObject.end() @@ -266,12 +270,27 @@ class Worker extends EventEmitter { return this.connection.redis.sadd(this.connection.key('workers'), (this.name + ':' + this.stringQueues())) } - async untrack (name, queues) { + async ping () { + const name = this.name + const nowSeconds = Math.round(new Date().getTime() / 1000) + this.emit('ping', nowSeconds) + const payload = JSON.stringify({ + time: nowSeconds, + name: name, + queues: this.stringQueues() + }) + await this.connection.redis.set(this.connection.key('worker', 'ping', name), payload) + } + + async untrack () { + const name = this.name + const queues = this.stringQueues() if (!this.connection || !this.connection.redis) { return } await this.connection.redis.srem(this.connection.key('workers'), (name + ':' + queues)) - await this.connection.redis.del(this.connection.key('worker', name, this.stringQueues())) - await this.connection.redis.del(this.connection.key('worker', name, this.stringQueues(), 'started')) + await this.connection.redis.del(this.connection.key('worker', 'ping', name)) + await this.connection.redis.del(this.connection.key('worker', name, queues)) + await this.connection.redis.del(this.connection.key('worker', name, queues, 'started')) await this.connection.redis.del(this.connection.key('stat', 'failed', name)) await this.connection.redis.del(this.connection.key('stat', 'processed', name)) } @@ -288,7 +307,7 @@ class Worker extends EventEmitter { if ((this.queues[0] === '*' && this.queues.length === 1) || this.queues.length === 0) { this.originalQueue = '*' - await this.untrack(this.name, this.stringQueues()) + await this.untrack() let response = await this.connection.redis.smembers(this.connection.key('queues')) this.queues = (response ? response.sort() : []) await this.track()