From d01b318a504e7f04b05e97934402a55848186834 Mon Sep 17 00:00:00 2001 From: Alan Cruikshanks Date: Tue, 17 Sep 2024 22:48:50 +0100 Subject: [PATCH] Remove system jobs from licence-import (#1026) https://eaflood.atlassian.net/browse/WATER-4670 We've been making changes to the import service to support our work migrating return version management to WRLS, and replacing the current licence import with [one ready for ReSP](https://eaflood.atlassian.net/browse/WATER-4535). We've also attempted to clarify what is actually happening during the various 'jobs' and to arrange them in our environments in a more logical order. But the latest version that shipped is failing to complete. We believe the issues are: - Each 'job' is taking longer to complete than expected - Based on current timings, the 'jobs' are overlapping - Our attempts to connect the new ReSP import with the existing one are adding to the burden - Based on investigating the logs, we think downstream jobs are no longer being triggered We're [working on simplifying the project](https://github.com/DEFRA/water-abstraction-import/pull/1023) and removing elements that we think decrease performance, such as the use of [pg-boss](https://github.com/timgit/pg-boss). But in the meantime, we need to get the import back up and running. This change removes the extra jobs we put in for connecting to the [water-abstraction-system](https://github.com/DEFRA/water-abstraction-system) until we can develop a better alternative. We also update the default schedule to allow each part of the import more time to complete to avoid overlapping. --- .env.example | 8 +- config.js | 8 +- .../licence-import/jobs/import-company.js | 4 +- .../jobs/import-licence-system.js | 60 ------ .../jobs/queue-licences-system.js | 53 ------ src/modules/licence-import/plugin.js | 12 -- .../jobs/import-company.test.js | 4 +- .../jobs/queue-licences-system.test.js | 177 ------------------ test/modules/licence-import/plugin.test.js | 49 +---- 9 files changed, 15 insertions(+), 360 deletions(-) delete mode 100644 src/modules/licence-import/jobs/import-licence-system.js delete mode 100644 src/modules/licence-import/jobs/queue-licences-system.js delete mode 100644 test/modules/licence-import/jobs/queue-licences-system.test.js diff --git a/.env.example b/.env.example index 9a17dbe8..15525094 100644 --- a/.env.example +++ b/.env.example @@ -46,8 +46,8 @@ IMPORT_LICENCE_AGREEMENTS=false WRLS_LOG_LEVEL=debug # Use Cron type syntax to set timings for these background processes -WRLS_CRON_NALD='0 1 * * *' -WRLS_CRON_LICENCES='0 4 * * 1,2,3,4,5' -WRLS_CRON_RETURN_VERSIONS='0 6 * * 1,2,3,4,5' -WRLS_CRON_MOD_LOGS='0 7 * * 1,2,3,4,5' +WRLS_CRON_NALD='15 23 * * *' +WRLS_CRON_LICENCES='15 3 * * 1,2,3,4,5' +WRLS_CRON_RETURN_VERSIONS='15 7 * * 1,2,3,4,5' +WRLS_CRON_MOD_LOGS='30 7 * * 1,2,3,4,5' WRLS_CRON_TRACKER='0 10 * * 1,2,3,4,5' diff --git a/config.js b/config.js index 40b3cc2d..7f66ecd9 100644 --- a/config.js +++ b/config.js @@ -84,10 +84,10 @@ module.exports = { nald: { zipPassword: process.env.NALD_ZIP_PASSWORD, path: process.env.S3_NALD_IMPORT_PATH || 'wal_nald_data_release', - schedule: process.env.WRLS_CRON_NALD || '0 1 * * *' + schedule: process.env.WRLS_CRON_NALD || '15 23 * * *' }, licences: { - schedule: process.env.WRLS_CRON_LICENCES || '0 4 * * 1,2,3,4,5', + schedule: process.env.WRLS_CRON_LICENCES || '15 3 * * 1,2,3,4,', // Note: these 2 flags need to be set to false for charging go-live // to suspend the import of invoice accounts and licence agreements // Update: I've changed those values to false ahead of the v2.0 charging @@ -104,10 +104,10 @@ module.exports = { isBillingDocumentRoleImportEnabled: false }, modLogs: { - schedule: process.env.WRLS_CRON_MOD_LOGS || '0 7 * * 1,2,3,4,5' + schedule: process.env.WRLS_CRON_MOD_LOGS || '30 7 * * 1,2,3,4,5' }, returnVersions: { - schedule: process.env.WRLS_CRON_RETURN_VERSIONS || '0 6 * * 1,2,3,4,5' + schedule: process.env.WRLS_CRON_RETURN_VERSIONS || '15 7 * * 1,2,3,4,5' }, tracker: { schedule: process.env.WRLS_CRON_TRACKER || '0 10 * * 1,2,3,4,5' diff --git a/src/modules/licence-import/jobs/import-company.js b/src/modules/licence-import/jobs/import-company.js index 76b09c55..75a32186 100644 --- a/src/modules/licence-import/jobs/import-company.js +++ b/src/modules/licence-import/jobs/import-company.js @@ -1,6 +1,6 @@ 'use strict' -const QueueLicencesSystemJob = require('./queue-licences-system') +const QueueLicencesJob = require('./queue-licences.js') const extract = require('../extract') const importCompanies = require('../connectors/import-companies') const load = require('../load') @@ -81,7 +81,7 @@ async function onComplete (messageQueue) { if (count === 0) { await messageQueue.deleteQueue('__state__completed__licence-import.import-company') - await messageQueue.publish(QueueLicencesSystemJob.createMessage()) + await messageQueue.publish(QueueLicencesJob.createMessage()) global.GlobalNotifier.omg(`${JOB_NAME}: finished`) } diff --git a/src/modules/licence-import/jobs/import-licence-system.js b/src/modules/licence-import/jobs/import-licence-system.js deleted file mode 100644 index 821a3fa3..00000000 --- a/src/modules/licence-import/jobs/import-licence-system.js +++ /dev/null @@ -1,60 +0,0 @@ -'use strict' - -const WaterSystemService = require('../../../lib/services/water-system-service.js') -const QueueLicencesJob = require('./queue-licences.js') - -const JOB_NAME = 'licence-import.import-licence-system' -const STATUS_NO_CONTENT = 204 - -const options = { - teamSize: 75, teamConcurrency: 1 -} - -function createMessage (data) { - return { - name: JOB_NAME, - data, - options: { - singletonKey: `${JOB_NAME}.${data.licenceNumber}` - } - } -} - -async function handler (job) { - try { - if (job.data.jobNumber === 1) { - global.GlobalNotifier.omg(`${JOB_NAME}: started`) - } - - const { licenceNumber } = job.data - - const response = await WaterSystemService.postImportLicence({ - licenceRef: licenceNumber - }) - - if (response.statusCode !== STATUS_NO_CONTENT) { - throw new Error(`Licence ${licenceNumber} failed with status code - ${response.statusCode}`) - } - } catch (error) { - global.GlobalNotifier.omfg(`${JOB_NAME}: errored`, error) - throw error - } -} - -async function onComplete (messageQueue, job) { - try { - const { data } = job.data.request - - if (data.jobNumber === data.numberOfJobs) { - await messageQueue.publish(QueueLicencesJob.createMessage()) - global.GlobalNotifier.omg(`${JOB_NAME}: finished`, { numberOfJobs: job.data.request.data.numberOfJobs }) - } - } catch (error) { - global.GlobalNotifier.omfg(`${JOB_NAME}: errored`, error) - throw error - } -} - -module.exports = { - createMessage, handler, name: JOB_NAME, options, onComplete -} diff --git a/src/modules/licence-import/jobs/queue-licences-system.js b/src/modules/licence-import/jobs/queue-licences-system.js deleted file mode 100644 index cfca07fd..00000000 --- a/src/modules/licence-import/jobs/queue-licences-system.js +++ /dev/null @@ -1,53 +0,0 @@ -'use strict' - -const extract = require('../extract') -const ImportLicenceSystemJob = require('./import-licence-system.js') - -const JOB_NAME = 'licence-import.queue-licences-system' - -function createMessage () { - return { - name: JOB_NAME, - options: { - singletonKey: JOB_NAME, expireIn: '1 hours' - } - } -} - -async function handler () { - try { - global.GlobalNotifier.omg(`${JOB_NAME}: started`) - - const rows = await extract.getAllLicenceNumbers() - - return rows - } catch (error) { - global.GlobalNotifier.omfg(`${JOB_NAME}: errored`, error) - throw error - } -} - -async function onComplete (messageQueue, job) { - if (!job.failed) { - const { value: licences } = job.data.response - const numberOfJobs = licences.length - - for (const [index, licence] of licences.entries()) { - // This information is to help us log when the import licence jobs start and finish. See - // src/modules/licence-import/jobs/import-licence.js for more details - const data = { - licenceNumber: licence.LIC_NO, - jobNumber: index + 1, - numberOfJobs - } - - await messageQueue.publish(ImportLicenceSystemJob.createMessage(data)) - } - } - - global.GlobalNotifier.omg(`${JOB_NAME}: finished`) -} - -module.exports = { - createMessage, handler, onComplete, name: JOB_NAME -} diff --git a/src/modules/licence-import/plugin.js b/src/modules/licence-import/plugin.js index 11bf33d5..020fafa7 100644 --- a/src/modules/licence-import/plugin.js +++ b/src/modules/licence-import/plugin.js @@ -5,12 +5,10 @@ const cron = require('node-cron') const DeleteRemovedDocumentsJob = require('./jobs/delete-removed-documents.js') const ImportCompanyJob = require('./jobs/import-company.js') const ImportLicenceJob = require('./jobs/import-licence.js') -const ImportLicenceSystemJob = require('./jobs/import-licence-system.js') const ImportPointsJob = require('./jobs/import-points.js') const ImportPurposeConditionTypesJob = require('./jobs/import-purpose-condition-types.js') const QueueCompaniesJob = require('./jobs/queue-companies.js') const QueueLicencesJob = require('./jobs/queue-licences.js') -const QueueLicencesSystemJob = require('./jobs/queue-licences-system.js') const config = require('../../../config') @@ -39,16 +37,6 @@ async function register (server, _options) { return ImportCompanyJob.onComplete(server.messageQueue) }) - await server.messageQueue.subscribe(QueueLicencesSystemJob.name, QueueLicencesSystemJob.handler) - await server.messageQueue.onComplete(QueueLicencesSystemJob.name, (executedJob) => { - return QueueLicencesSystemJob.onComplete(server.messageQueue, executedJob) - }) - - await server.messageQueue.subscribe(ImportLicenceSystemJob.name, ImportLicenceSystemJob.options, ImportLicenceSystemJob.handler) - await server.messageQueue.onComplete(ImportLicenceSystemJob.name, (executedJob) => { - return ImportLicenceSystemJob.onComplete(server.messageQueue, executedJob) - }) - await server.messageQueue.subscribe(QueueLicencesJob.name, QueueLicencesJob.handler) await server.messageQueue.onComplete(QueueLicencesJob.name, (executedJob) => { return QueueLicencesJob.onComplete(server.messageQueue, executedJob) diff --git a/test/modules/licence-import/jobs/import-company.test.js b/test/modules/licence-import/jobs/import-company.test.js index eb84f605..4c10dc5e 100644 --- a/test/modules/licence-import/jobs/import-company.test.js +++ b/test/modules/licence-import/jobs/import-company.test.js @@ -183,12 +183,12 @@ experiment('Licence Import: Import Company job', () => { expect(queueName).to.equal('__state__completed__licence-import.import-company') }) - test('the import licences job is published to the queue', async () => { + test('the queue licences job is published to the queue', async () => { await ImportCompanyJob.onComplete(messageQueue) const jobMessage = messageQueue.publish.lastCall.args[0] - expect(jobMessage.name).to.equal('licence-import.queue-licences-system') + expect(jobMessage.name).to.equal('licence-import.queue-licences') }) }) diff --git a/test/modules/licence-import/jobs/queue-licences-system.test.js b/test/modules/licence-import/jobs/queue-licences-system.test.js deleted file mode 100644 index 076da056..00000000 --- a/test/modules/licence-import/jobs/queue-licences-system.test.js +++ /dev/null @@ -1,177 +0,0 @@ -'use strict' - -// Test framework dependencies -const Lab = require('@hapi/lab') -const Code = require('@hapi/code') -const Sinon = require('sinon') - -const { experiment, test, beforeEach, afterEach } = exports.lab = Lab.script() -const { expect } = Code - -// Things we need to stub -const extract = require('../../../../src/modules/licence-import/extract/index.js') - -// Thing under test -const QueueLicencesSystemJob = require('../../../../src/modules/licence-import/jobs/queue-licences-system.js') - -experiment('Licence Import: Queue Licences System job', () => { - let notifierStub - - beforeEach(async () => { - Sinon.stub(extract, 'getAllLicenceNumbers').resolves([ - { LIC_NO: '01/123' }, - { LIC_NO: '01/124' } - ]) - - // RequestLib depends on the GlobalNotifier to have been set. This happens in app/plugins/global-notifier.plugin.js - // when the app starts up and the plugin is registered. As we're not creating an instance of Hapi server in this - // test we recreate the condition by setting it directly with our own stub - notifierStub = { omg: Sinon.stub(), omfg: Sinon.stub() } - global.GlobalNotifier = notifierStub - }) - - afterEach(async () => { - Sinon.restore() - delete global.GlobalNotifier - }) - - experiment('.createMessage', () => { - test('formats a message for PG boss', async () => { - const message = QueueLicencesSystemJob.createMessage() - - expect(message).to.equal({ - name: 'licence-import.queue-licences-system', - options: { - singletonKey: 'licence-import.queue-licences-system', - expireIn: '1 hours' - } - }) - }) - }) - - experiment('.handler', () => { - experiment('when the job is successful', () => { - test('a message is logged', async () => { - await QueueLicencesSystemJob.handler() - - const [message] = notifierStub.omg.lastCall.args - - expect(message).to.equal('licence-import.queue-licences-system: started') - }) - - test('retrieves the licences to import', async () => { - await QueueLicencesSystemJob.handler() - - expect(extract.getAllLicenceNumbers.called).to.equal(true) - }) - - test('resolves with an array of licences to import', async () => { - const result = await QueueLicencesSystemJob.handler() - - expect(result).to.equal([ - { LIC_NO: '01/123' }, - { LIC_NO: '01/124' } - ]) - }) - }) - - experiment('when the job fails', () => { - const err = new Error('Oops!') - - beforeEach(async () => { - extract.getAllLicenceNumbers.throws(err) - }) - - test('logs an error message', async () => { - await expect(QueueLicencesSystemJob.handler()).to.reject() - - expect(notifierStub.omfg.calledWith( - 'licence-import.queue-licences-system: errored', err - )).to.equal(true) - }) - - test('rethrows the error', async () => { - const err = await expect(QueueLicencesSystemJob.handler()).to.reject() - - expect(err.message).to.equal('Oops!') - }) - }) - }) - - experiment('.onComplete', () => { - let job - let messageQueue - - beforeEach(async () => { - messageQueue = { - publish: Sinon.stub() - } - }) - - experiment('when the job succeeds', () => { - beforeEach(async () => { - job = { - failed: false, - data: { - response: { - value: [ - { LIC_NO: '01/123' }, - { LIC_NO: '01/124' } - ] - } - } - } - }) - - test('a message is logged', async () => { - await QueueLicencesSystemJob.onComplete(messageQueue, job) - - const [message] = notifierStub.omg.lastCall.args - - expect(message).to.equal('licence-import.queue-licences-system: finished') - }) - - test('the import licence job is published to the queue for the first licence', async () => { - await QueueLicencesSystemJob.onComplete(messageQueue, job) - - const jobMessage = messageQueue.publish.firstCall.args[0] - - expect(jobMessage.data).to.equal({ licenceNumber: '01/123', jobNumber: 1, numberOfJobs: 2 }) - }) - - test('the import licence job is published to the queue for the second licence', async () => { - await QueueLicencesSystemJob.onComplete(messageQueue, job) - - const jobMessage = messageQueue.publish.lastCall.args[0] - - expect(jobMessage.data).to.equal({ licenceNumber: '01/124', jobNumber: 2, numberOfJobs: 2 }) - }) - - experiment('but an error is thrown', () => { - const err = new Error('oops') - - beforeEach(async () => { - messageQueue.publish.rejects(err) - }) - - test('rethrows the error', async () => { - const error = await expect(QueueLicencesSystemJob.onComplete(messageQueue, job)).to.reject() - - expect(error).to.equal(error) - }) - }) - }) - - experiment('when the job fails', () => { - beforeEach(async () => { - job = { failed: true } - }) - - test('no further jobs are published', async () => { - await QueueLicencesSystemJob.onComplete(messageQueue, job) - - expect(messageQueue.publish.called).to.be.false() - }) - }) - }) -}) diff --git a/test/modules/licence-import/plugin.test.js b/test/modules/licence-import/plugin.test.js index f3b3b7ba..a3c1b414 100644 --- a/test/modules/licence-import/plugin.test.js +++ b/test/modules/licence-import/plugin.test.js @@ -13,11 +13,9 @@ const config = require('../../../config.js') const DeleteRemovedDocumentsJob = require('../../../src/modules/licence-import/jobs/delete-removed-documents.js') const ImportCompanyJob = require('../../../src/modules/licence-import/jobs/import-company.js') const ImportLicenceJob = require('../../../src/modules/licence-import/jobs/import-licence.js') -const ImportLicenceSystemJob = require('../../../src/modules/licence-import/jobs/import-licence-system.js') const ImportPurposeConditionTypesJob = require('../../../src/modules/licence-import/jobs/import-purpose-condition-types.js') const QueueCompaniesJob = require('../../../src/modules/licence-import/jobs/queue-companies.js') const QueueLicencesJob = require('../../../src/modules/licence-import/jobs/queue-licences.js') -const QueueLicencesSystemJob = require('../../../src/modules/licence-import/jobs/queue-licences-system.js') // Things we need to stub const cron = require('node-cron') @@ -139,52 +137,11 @@ experiment('modules/licence-import/plugin.js', () => { }) }) - experiment('for Queue Licences System', () => { - test('subscribes its handler to the job queue', async () => { - await LicenceImportPlugin.plugin.register(server) - - const subscribeArgs = server.messageQueue.subscribe.getCall(4).args - - expect(subscribeArgs[0]).to.equal(QueueLicencesSystemJob.name) - expect(subscribeArgs[1]).to.equal(QueueLicencesSystemJob.handler) - }) - - test('registers its onComplete for the job', async () => { - await LicenceImportPlugin.plugin.register(server) - - const onCompleteArgs = server.messageQueue.onComplete.getCall(4).args - - expect(onCompleteArgs[0]).to.equal(QueueLicencesSystemJob.name) - expect(onCompleteArgs[1]).to.be.a.function() - }) - }) - - experiment('for Import Licence System', () => { - test('subscribes its handler and options to the job queue', async () => { - await LicenceImportPlugin.plugin.register(server) - - const subscribeArgs = server.messageQueue.subscribe.getCall(5).args - - expect(subscribeArgs[0]).to.equal(ImportLicenceSystemJob.name) - expect(subscribeArgs[1]).to.equal(ImportLicenceSystemJob.options) - expect(subscribeArgs[2]).to.equal(ImportLicenceSystemJob.handler) - }) - - test('registers its onComplete for the job', async () => { - await LicenceImportPlugin.plugin.register(server) - - const onCompleteArgs = server.messageQueue.onComplete.getCall(5).args - - expect(onCompleteArgs[0]).to.equal(ImportLicenceSystemJob.name) - expect(onCompleteArgs[1]).to.be.a.function() - }) - }) - experiment('for Queue Licences', () => { test('subscribes its handler to the job queue', async () => { await LicenceImportPlugin.plugin.register(server) - const subscribeArgs = server.messageQueue.subscribe.getCall(6).args + const subscribeArgs = server.messageQueue.subscribe.getCall(4).args expect(subscribeArgs[0]).to.equal(QueueLicencesJob.name) expect(subscribeArgs[1]).to.equal(QueueLicencesJob.handler) @@ -193,7 +150,7 @@ experiment('modules/licence-import/plugin.js', () => { test('registers its onComplete for the job', async () => { await LicenceImportPlugin.plugin.register(server) - const onCompleteArgs = server.messageQueue.onComplete.getCall(6).args + const onCompleteArgs = server.messageQueue.onComplete.getCall(4).args expect(onCompleteArgs[0]).to.equal(QueueLicencesJob.name) expect(onCompleteArgs[1]).to.be.a.function() @@ -204,7 +161,7 @@ experiment('modules/licence-import/plugin.js', () => { test('subscribes its handler and options to the job queue', async () => { await LicenceImportPlugin.plugin.register(server) - const subscribeArgs = server.messageQueue.subscribe.getCall(7).args + const subscribeArgs = server.messageQueue.subscribe.getCall(5).args expect(subscribeArgs[0]).to.equal(ImportLicenceJob.name) expect(subscribeArgs[1]).to.equal(ImportLicenceJob.options)