Skip to content

Commit

Permalink
fix: Healthcheck endpoint does not raise error when skipping too much…
Browse files Browse the repository at this point in the history
… jobs (closes #280)
  • Loading branch information
claustres committed Sep 25, 2024
1 parent 1ce695b commit fd39524
Show file tree
Hide file tree
Showing 5 changed files with 221 additions and 116 deletions.
2 changes: 2 additions & 0 deletions healthcheck.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,7 @@ try {
await healthcheck(program)
process.exit(0)
} catch (error) {
// Healthcheck is already writing to stderr
// console.error(error)
process.exit(1)
}
21 changes: 14 additions & 7 deletions lib/cli.js
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ export async function createApp (job, options = {}) {
// Add a healthcheck for cron jobs
app.get(apiPrefix + '/healthcheck', (req, res, next) => {
if (Healthcheck.error) {
res.status(500).json(_.pick(Healthcheck, ['jobId', 'error.code', 'error.message']))
res.status(500).json(_.pick(Healthcheck, ['jobId', 'nbSkippedJobs', 'isRunning', 'error.code', 'error.message']))
} else {
res.status(200).json(_.omit(Healthcheck, ['error']))
}
Expand Down Expand Up @@ -152,9 +152,7 @@ export async function runJob (job, options = {}) {
const nbTotalTasks = (Healthcheck.nbSuccessfulTasks + Healthcheck.nbFailedTasks)
// Job with 0 tasks is always succesful
Healthcheck.successRate = (nbTotalTasks > 0 ? Healthcheck.nbSuccessfulTasks / nbTotalTasks : 1)
// Check if run has to be considered as successful
await healthcheck(options)
return Promise.resolve(tasks)
return tasks
} catch (error) {
console.error(error.message)
Healthcheck.isRunning = false
Expand All @@ -166,15 +164,24 @@ export async function runJob (job, options = {}) {
let cronJob
if (options.cron) {
console.log('Scheduling job with cron pattern ' + options.cron)
Healthcheck.nbSkippedJobs = 0
cronJob = new CronJob(options.cron, async () => {
// If last job has not yet finished skip this call as we are late
if (!Healthcheck.isRunning) {
Healthcheck.nbSkippedJobs = 0
await runJobWithOptions()
Healthcheck.nbSkippedJobs = 0
} else {
console.log('Skipping scheduled job as previous one is not yet finished')
Healthcheck.nbSkippedJobs++
}
try {
// Check if run has to be considered as successful
await healthcheck(options)
} catch (error) {
// Healthcheck is already writing to stderr
//console.error(error)
Healthcheck.error = error
}
})
// In case the server is forced to exit stop the job as well
server.on('close', () => {
Expand All @@ -195,13 +202,13 @@ export async function runJob (job, options = {}) {
}

export async function run (job, options = {}) {
await createApp(job, options)
const server = await createApp(job, options)
const tasks = await runJob(job, options)
if (!options.cron) {
// When not running job continuously stop the server as well
await server.close()
}
return tasks
return (tasks ? tasks : server)
}

export async function processOptions () {
Expand Down
36 changes: 21 additions & 15 deletions lib/healthcheck.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,19 @@ export const Healthcheck = {
nbSkippedJobs: 0, // Number of times the scheduled job has been skipped due to on-going one
error: null // Indicating error if job has erroned for cron jobs
/* Undefined by default
nbSkippedJobs: 0, // Number of time last job was not yet finished when cron ran
nbFailedTasks: 0, // Number of failed/success tasks of last run for fault-tolerant jobs
nbSuccessfulTasks: 0,
successRate: 0 // Ratio of previous values
*/
}

export class HealthcheckError extends Error {}
export class HealthcheckError extends Error {
constructor(data) {
super(data.message)
this.code = data.code
}
}

function readFromLog () {
try {
Expand All @@ -45,7 +51,7 @@ function writeToLog (data) {
}
}

function publishToConsole (data, compilers, pretext, stream = 'error') {
export function publishToConsole (data, compilers, pretext, stream = 'error') {
try {
if (stream === 'error') console.error(pretext, compilers.message(data))
else console.log(pretext, compilers.message(data))
Expand All @@ -55,7 +61,7 @@ function publishToConsole (data, compilers, pretext, stream = 'error') {
}
}

async function publishToSlack (slackWebhook, data, compilers, posttext = '', color = 'danger') {
export async function publishToSlack (slackWebhook, data, compilers, posttext = '', color = 'danger') {
if (!slackWebhook) return
try {
const message = compilers.message(data)
Expand Down Expand Up @@ -102,20 +108,17 @@ export async function healthcheck (options) {
const response = await utils.promisify(request.get)(endpoint)
const data = JSON.parse(response.body)
if (options.debug) {
console.log('Current healthcheck status from service', response.statusCode)
console.log('Current healthcheck output read from service', data)
console.log('Previous healthcheck output read from log', previousHealthcheck)
}
if (response.statusCode === 200) {
// Fault-tolerant jobs always return 200, we use more criteria to check for health status
if (_.has(data, 'successRate') && (data.successRate < options.successRate)) {
data.error = { code: 'HEALTHCHECK_SUCCESS_RATE', message: `Insufficient success rate (${data.successRate.toFixed(2)})` }
}
if (data.nbSkippedJobs >= options.nbSkippedJobs) {
data.error = { code: 'HEALTHCHECK_SKIPPED_JOBS', message: `Too much skipped jobs (${data.nbSkippedJobs})` }
}
if ((options.maxDuration > 0) && (data.duration > options.maxDuration)) {
data.error = { code: 'HEALTHCHECK_DURATION', message: `Too much slow execution (${data.duration}s)` }
}
// Fault-tolerant jobs always return 200, we use more criteria to check for health status
if (_.has(data, 'successRate') && (data.successRate < options.successRate)) {
data.error = { code: 'HEALTHCHECK_SUCCESS_RATE', message: `Insufficient success rate (${data.successRate.toFixed(2)})` }
} else if (data.nbSkippedJobs >= options.nbSkippedJobs) {
data.error = { code: 'HEALTHCHECK_SKIPPED_JOBS', message: `Too much skipped jobs (${data.nbSkippedJobs})` }
} else if ((options.maxDuration > 0) && (data.duration > options.maxDuration)) {
data.error = { code: 'HEALTHCHECK_DURATION', message: `Too much slow execution (${data.duration}s)` }
}
writeToLog(data)
// Add env available for templates
Expand All @@ -126,7 +129,7 @@ export async function healthcheck (options) {
publishToConsole(data, compilers, '[ALERT]', 'error')
await publishToSlack(options.slackWebhook, data, compilers, '', 'danger')
}
throw new HealthcheckError(data.error.message)
throw new HealthcheckError(data.error)
} else {
// Only notify on closing errors
if (previousError) {
Expand All @@ -136,6 +139,9 @@ export async function healthcheck (options) {
}
}
} catch (error) {
if (options.debug) {
console.log('Current healthcheck raised error', error)
}
// Give feedback for any error raised by the healthcheck process
if (!(error instanceof HealthcheckError)) {
// Set jobId variable/error available in context so that templates will not fail
Expand Down
178 changes: 84 additions & 94 deletions test/cli.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ describe('krawler:cli', () => {
expect(fs.existsSync(path.join(outputPath, 'RJTT-30-18000-2-1.tif'))).beFalse()
expect(fs.existsSync(path.join(outputPath, 'RJTT-30-18000-2-1.tif.csv'))).beTrue()
} catch (error) {
console.log(error)
assert.fail('Healthcheck should not fail')
}
})
Expand Down Expand Up @@ -101,110 +102,99 @@ describe('krawler:cli', () => {
// Let enough time to process
.timeout(15000)

it('runs as CRON using CLI with continuous healthcheck', (done) => {
it('runs as CRON using CLI with continuous healthcheck', async () => {
// Clean previous test output
fs.removeSync(path.join(outputPath, 'RJTT-30-18000-2-1.tif.csv'))
// Setup the app
cli(jobfile, {
appServer = await cli(jobfile, {
mode: 'setup',
sync: 'mongodb://127.0.0.1:27017/krawler-test',
port: 3030,
cron: '*/10 * * * * *',
messageTemplate: process.env.MESSAGE_TEMPLATE,
debug: true,
debug: false,
slackWebhook: process.env.SLACK_WEBHOOK_URL
})
.then(server => {
appServer = server
// Clean any previous healthcheck log
fs.removeSync(path.join(__dirname, '..', 'healthcheck.log'))
const app = getApp()
// Add hook to know how many times the job will run
const jobService = app.service('jobs')
let runCount = 0
jobService.hooks({
after: {
create: (hook) => {
runCount++
// First run is fine, second one raises an error
if (runCount === 1) return hook
else throw new Error('Test Error')
}
}
})
// Check for event emission
let eventCount = 0
app.on('krawler', event => {
if ((event.name === 'task-done') || (event.name === 'job-done')) eventCount++
})
// Only run as we already setup the app
cli(jobfile, { mode: 'runJob', port: 3030, cron: '*/10 * * * * *', run: true, messageTemplate: process.env.MESSAGE_TEMPLATE, debug: true, slackWebhook: process.env.SLACK_WEBHOOK_URL })
.then(async () => {
// As it runs every 10 seconds wait until it should have ran at least once again
const seconds = Math.floor(moment().seconds())
const remainingSecondsForNextRun = 11 - seconds % 10
setTimeout(async () => {
try {
expect(runCount).to.equal(2) // 2 runs
const response = await utils.promisify(request.get)('http://localhost:3030/healthcheck')
console.log(response.body)
expect(response.statusCode).to.equal(500)
const { error, stdout, stderr } = await runCommand('node ' + path.join(__dirname, '..', 'healthcheck.js'))
expect(error).toExist()
expect(stdout).to.equal('')
expect(stderr.includes('[ALERT]')).beTrue()
const healthcheckLog = fs.readJsonSync(path.join(__dirname, '..', 'healthcheck.log'))
const healthcheck = JSON.parse(response.body)
console.log(healthcheck)
expect(healthcheck).to.deep.equal(healthcheckLog)
expect(healthcheck.isRunning).beUndefined()
expect(healthcheck.duration).beUndefined()
expect(healthcheck.nbSkippedJobs).beUndefined()
expect(healthcheck.nbFailedTasks).beUndefined()
expect(healthcheck.nbSuccessfulTasks).beUndefined()
expect(healthcheck.successRate).beUndefined()
expect(healthcheck.error).toExist()
expect(healthcheck.error.message).toExist()
expect(healthcheck.error.message).to.equal('Test Error')
expect(eventCount).to.equal(4) // 4 events
collection = client.db.collection('krawler-events')
const taskEvents = await collection.find({ event: 'task-done' }).toArray()
expect(taskEvents.length).to.equal(2)
const jobEvents = await collection.find({ event: 'job-done' }).toArray()
expect(jobEvents.length).to.equal(2)
server.close()
appServer = null
done()
} catch (error) {
console.log(error)
done(error)
}
}, remainingSecondsForNextRun * 1000)
expect(runCount).to.equal(1) // First run
const response = await utils.promisify(request.get)('http://localhost:3030/healthcheck')
expect(response.statusCode).to.equal(200)
const { error, stdout, stderr } = await runCommand('node ' + path.join(__dirname, '..', 'healthcheck.js'))
expect(error).to.equal(null)
expect(stdout).to.equal('')
expect(stderr).to.equal('')
const healthcheckLog = fs.readJsonSync(path.join(__dirname, '..', 'healthcheck.log'))
const healthcheck = JSON.parse(response.body)
expect(healthcheck).to.deep.equal(healthcheckLog)
expect(healthcheck.isRunning).toExist()
expect(healthcheck.nbSkippedJobs).toExist()
expect(healthcheck.error).beUndefined()
expect(healthcheck.nbFailedTasks).to.equal(0)
expect(healthcheck.nbSuccessfulTasks).to.equal(1)
expect(healthcheck.successRate).to.equal(1)
expect(healthcheck.state).toExist()
expect(eventCount).to.equal(2) // 2 events
collection = client.db.collection('krawler-events')
const taskEvents = await collection.find({ event: 'task-done' }).toArray()
expect(taskEvents.length).to.equal(1)
const jobEvents = await collection.find({ event: 'job-done' }).toArray()
expect(jobEvents.length).to.equal(1)
})
})
// Clean any previous healthcheck log
fs.removeSync(path.join(__dirname, '..', 'healthcheck.log'))
const app = getApp()
// Add hook to know how many times the job will run
const jobService = app.service('jobs')
let runCount = 0
jobService.hooks({
after: {
create: (hook) => {
runCount++
// First run is fine, second one raises an error
if (runCount === 1) return hook
else throw new Error('Test Error')
}
}
})
// Check for event emission
let eventCount = 0
app.on('krawler', event => {
if ((event.name === 'task-done') || (event.name === 'job-done')) eventCount++
})
// Only run as we already setup the app
await cli(jobfile, { mode: 'runJob', port: 3030, cron: '*/10 * * * * *', run: true, messageTemplate: process.env.MESSAGE_TEMPLATE, debug: false, slackWebhook: process.env.SLACK_WEBHOOK_URL })
expect(runCount).to.equal(1) // First run
const response = await utils.promisify(request.get)('http://localhost:3030/healthcheck')
// console.log(response.body)
expect(response.statusCode).to.equal(200)
const healthcheck = JSON.parse(response.body)
// console.log(healthcheck)
const { error } = await runCommand('node ' + path.join(__dirname, '..', 'healthcheck.js'))
expect(error).beNull()
const healthcheckLog = fs.readJsonSync(path.join(__dirname, '..', 'healthcheck.log'))
expect(healthcheck).to.deep.equal(healthcheckLog)
expect(healthcheck.isRunning).to.equal(false)
expect(healthcheck.nbSkippedJobs).to.equal(0)
expect(healthcheck.error).beUndefined()
expect(healthcheck.nbFailedTasks).to.equal(0)
expect(healthcheck.nbSuccessfulTasks).to.equal(1)
expect(healthcheck.successRate).to.equal(1)
expect(healthcheck.state).toExist()
expect(eventCount).to.equal(2) // 2 events
collection = client.db.collection('krawler-events')
const taskEvents = await collection.find({ event: 'task-done' }).toArray()
expect(taskEvents.length).to.equal(1)
const jobEvents = await collection.find({ event: 'job-done' }).toArray()
expect(jobEvents.length).to.equal(1)
// As it runs every 10 seconds wait until it should have ran at least once again
const seconds = Math.floor(moment().seconds())
const remainingSecondsForNextRun = 10 - seconds % 10
await utils.promisify(setTimeout)((1 + remainingSecondsForNextRun) * 1000)
try {
expect(runCount).to.be.at.least(2) // 2 runs
const response = await utils.promisify(request.get)('http://localhost:3030/healthcheck')
// console.log(response.body)
expect(response.statusCode).to.equal(500)
const healthcheck = JSON.parse(response.body)
// console.log(healthcheck)
const { error } = await runCommand('node ' + path.join(__dirname, '..', 'healthcheck.js'))
expect(error).toExist()
const healthcheckLog = fs.readJsonSync(path.join(__dirname, '..', 'healthcheck.log'))
expect(healthcheck).to.deep.equal(healthcheckLog)
expect(healthcheck.isRunning).to.equal(false)
expect(healthcheck.duration).beUndefined()
expect(healthcheck.nbSkippedJobs).to.equal(0)
expect(healthcheck.nbFailedTasks).beUndefined()
expect(healthcheck.nbSuccessfulTasks).beUndefined()
expect(healthcheck.successRate).beUndefined()
expect(healthcheck.error).toExist()
expect(healthcheck.error.message).toExist()
expect(healthcheck.error.message).to.equal('Test Error')
expect(eventCount).to.be.at.least(4) // 4 events
collection = client.db.collection('krawler-events')
const taskEvents = await collection.find({ event: 'task-done' }).toArray()
expect(taskEvents.length).to.be.at.least(2)
const jobEvents = await collection.find({ event: 'job-done' }).toArray()
expect(jobEvents.length).to.be.at.least(2)
} catch (error) {
console.log(error)
throw error
}
})
// Let enough time to process
.timeout(15000)
Expand Down
Loading

0 comments on commit fd39524

Please sign in to comment.