Skip to content

Commit

Permalink
Implement new 'points' job (#1028)
Browse files Browse the repository at this point in the history
https://eaflood.atlassian.net/browse/WATER-4645

In [Start importing licence version points to 'water'](#1009), we added a step to the **licence-import** job to start importing licence points to a new table we'd created: `water.licence_version_purpose_points`.

However, we've since encountered two issues with that original approach.

- We've found the legacy UI also displays details about sources, which are connected to points. Our original solution didn't cater for this, which means we cannot switch wholly to it
- Updating the `licence_version_purpose_points` table happens as the last step of the **licence-import** job. But we've found it is often failing because of bad licence data, which means our new step is never getting called

So, in this change, we are creating a whole new job to handle points. Part of the change is NALD points will now sit in their own table. `return_requirement_points` and `licence_version_purpose_points` will link to it rather than holding their own copies of the licence. The job will be responsible for updating all three.
  • Loading branch information
Cruikshanks authored Sep 24, 2024
1 parent c112a2d commit 6e2e300
Show file tree
Hide file tree
Showing 12 changed files with 329 additions and 33 deletions.
1 change: 1 addition & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,6 @@ WRLS_LOG_LEVEL=debug
WRLS_CRON_NALD='15 23 * * *'
WRLS_CRON_LICENCES='15 3 * * 1,2,3,4,5'
WRLS_CRON_RETURN_VERSIONS='15 7 * * 1,2,3,4,5'
WRLS_CRON_POINTS='45 7 * * 1,2,3,4,5'
WRLS_CRON_MOD_LOGS='30 7 * * 1,2,3,4,5'
WRLS_CRON_TRACKER='0 10 * * 1,2,3,4,5'
3 changes: 3 additions & 0 deletions config.js
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ module.exports = {
modLogs: {
schedule: process.env.WRLS_CRON_MOD_LOGS || '30 7 * * 1,2,3,4,5'
},
points: {
schedule: process.env.WRLS_CRON_MOD_LOGS || '45 7 * * 1,2,3,4,5'
},
returnVersions: {
schedule: process.env.WRLS_CRON_RETURN_VERSIONS || '15 7 * * 1,2,3,4,5'
},
Expand Down
1 change: 1 addition & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const plugins = [
require('./src/modules/licence-import/plugin'),
require('./src/modules/charging-import/plugin'),
require('./src/modules/mod-logs/plugin'),
require('./src/modules/points/plugin'),
require('./src/modules/return-versions/plugin.js'),
require('./src/modules/nald-import/plugin'),
require('./src/modules/bill-runs-import/plugin'),
Expand Down
14 changes: 14 additions & 0 deletions src/modules/points/controller.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
'use strict'

const PointsJob = require('./jobs/points.js')

async function points (request, h) {
await request.messageQueue.deleteQueue(PointsJob.JOB_NAME)
await request.messageQueue.publish(PointsJob.createMessage())

return h.response().code(204)
}

module.exports = {
points
}
67 changes: 67 additions & 0 deletions src/modules/points/jobs/licences.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
'use strict'

const { pool } = require('../../../lib/connectors/db.js')
const ReturnsJob = require('./returns.js')

const JOB_NAME = 'points.licences'

function createMessage () {
return {
name: JOB_NAME,
options: {
singletonKey: JOB_NAME
}
}
}

async function handler () {
try {
global.GlobalNotifier.omg(`${JOB_NAME}: started`)

// Import licence version purpose points
await pool.query(_query())
} catch (error) {
global.GlobalNotifier.omfg(`${JOB_NAME}: errored`, error)
throw error
}
}

async function onComplete (messageQueue, job) {
if (!job.failed) {
await messageQueue.publish(ReturnsJob.createMessage())

global.GlobalNotifier.omg(`${JOB_NAME}: finished`)
} else {
global.GlobalNotifier.omg(`${JOB_NAME}: failed`)
}
}

function _query () {
return `
INSERT INTO water.licence_version_purpose_points (
licence_version_purpose_id,
external_id,
point_id
)
SELECT
lvp.licence_version_purpose_id,
(concat_ws(':', napp."FGAC_REGION_CODE", napp."AABP_ID", napp."AAIP_ID")) AS external_id,
p.id AS point_id
FROM
"import"."NALD_ABS_PURP_POINTS" napp
INNER JOIN water.licence_version_purposes lvp
ON napp."FGAC_REGION_CODE" = split_part(lvp.external_id, ':', 1) AND napp."AABP_ID" = split_part(lvp.external_id, ':', 2)
INNER JOIN water.points p
ON napp."FGAC_REGION_CODE"=split_part(p.external_id, ':',1) AND napp."AAIP_ID"=split_part(p.external_id, ':',2)
ON CONFLICT(external_id) DO
UPDATE SET
point_id = excluded.point_id;
`
}

module.exports = {
JOB_NAME,
createMessage,
handler,
onComplete
}
118 changes: 118 additions & 0 deletions src/modules/points/jobs/points.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
'use strict'

const { pool } = require('../../../lib/connectors/db.js')
const LicencesJob = require('./licences.js')

const JOB_NAME = 'points.points'

function createMessage () {
return {
name: JOB_NAME,
options: {
singletonKey: JOB_NAME
}
}
}

async function handler () {
try {
global.GlobalNotifier.omg(`${JOB_NAME}: started`)

// Import points
await pool.query(_query())
} catch (error) {
global.GlobalNotifier.omfg(`${JOB_NAME}: errored`, error)
throw error
}
}

async function onComplete (messageQueue, job) {
if (!job.failed) {
await messageQueue.publish(LicencesJob.createMessage())

global.GlobalNotifier.omg(`${JOB_NAME}: finished`)
} else {
global.GlobalNotifier.omg(`${JOB_NAME}: failed`)
}
}

function _query () {
return `
INSERT INTO water.points (
description,
ngr_1,
ngr_2,
ngr_3,
ngr_4,
source_id,
category,
primary_type,
secondary_type,
note,
location_note,
"depth",
bgs_reference,
well_reference,
hydro_reference,
hydro_intercept_distance,
hydro_offset_distance,
external_id
)
SELECT
np."LOCAL_NAME" AS description,
concat_ws(' ', np."NGR1_SHEET", np."NGR1_EAST", np."NGR1_NORTH") AS ngr_1,
(CASE np."NGR2_SHEET" WHEN 'null' THEN null ELSE concat_ws(' ', np."NGR2_SHEET", np."NGR2_EAST", np."NGR2_NORTH") END) AS ngr_2,
(CASE np."NGR3_SHEET" WHEN 'null' THEN null ELSE concat_ws(' ', np."NGR3_SHEET", np."NGR3_EAST", np."NGR3_NORTH") END) AS ngr_3,
(CASE np."NGR4_SHEET" WHEN 'null' THEN null ELSE concat_ws(' ', np."NGR4_SHEET", np."NGR4_EAST", np."NGR4_NORTH") END) AS ngr_4,
s.id AS source_id,
npc."DESCR" AS category,
nptp."DESCR" AS primary_type,
npts."DESCR" AS secondary_type,
(CASE np."NOTES" WHEN 'null' THEN NULL ELSE np."NOTES" END) AS note,
(CASE np."LOCATION_TEXT" WHEN 'null' THEN NULL ELSE np."LOCATION_TEXT" END) AS location_note,
(CASE np."DEPTH" WHEN 'null' THEN 0 ELSE np."DEPTH"::decimal END) AS "depth",
(CASE np."BGS_NO" WHEN 'null' THEN NULL ELSE np."BGS_NO" END) AS "bgs_reference",
(CASE np."REG_WELL_INDEX_REF" WHEN 'null' THEN NULL ELSE np."REG_WELL_INDEX_REF" END) AS well_reference,
(CASE np."HYDRO_REF" WHEN 'null' THEN NULL ELSE np."HYDRO_REF" END) AS hydro_reference,
(CASE np."HYDRO_INTERCEPT_DIST" WHEN 'null' THEN 0 ELSE np."HYDRO_INTERCEPT_DIST"::decimal END) AS hydro_intercept_distance,
(CASE np."HYDRO_GW_OFFSET_DIST" WHEN 'null' THEN 0 ELSE np."HYDRO_GW_OFFSET_DIST"::decimal END) AS hydro_offset_distance,
concat_ws(':', np."FGAC_REGION_CODE", np."ID") AS external_id
FROM
"import"."NALD_POINTS" np
INNER JOIN
water.sources s ON s.legacy_id = np."ASRC_CODE"
LEFT JOIN
"import"."NALD_POINT_CATEGORIES" npc ON npc."CODE" = np."AAPC_CODE"
LEFT JOIN
"import"."NALD_POINT_TYPE_PRIMS" nptp ON nptp."CODE" = np."AAPT_APTP_CODE"
LEFT JOIN
"import"."NALD_POINT_TYPE_SECS" npts ON npts."CODE" = np."AAPT_APTS_CODE"
ON CONFLICT(external_id)
DO UPDATE
SET
description = excluded.description,
ngr_1 = excluded.ngr_1,
ngr_2 = excluded.ngr_2,
ngr_3 = excluded.ngr_3,
ngr_4 = excluded.ngr_4,
source_id = excluded.source_id,
category = excluded.category,
primary_type = excluded.primary_type,
secondary_type = excluded.secondary_type,
note = excluded.note,
location_note = excluded.location_note,
"depth" = excluded."depth",
bgs_reference = excluded.bgs_reference,
well_reference = excluded.well_reference,
hydro_reference = excluded.hydro_reference,
hydro_intercept_distance = excluded.hydro_intercept_distance,
hydro_offset_distance = excluded.hydro_offset_distance;
`
}

module.exports = {
JOB_NAME,
createMessage,
handler,
onComplete
}
67 changes: 67 additions & 0 deletions src/modules/points/jobs/returns.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
'use strict'

const { pool } = require('../../../lib/connectors/db.js')

const JOB_NAME = 'points.returns'

function createMessage () {
return {
name: JOB_NAME,
options: {
singletonKey: JOB_NAME
}
}
}

async function handler () {
try {
global.GlobalNotifier.omg(`${JOB_NAME}: started`)

// Import licence version purpose points
await pool.query(_query())
} catch (error) {
global.GlobalNotifier.omfg(`${JOB_NAME}: errored`, error)
throw error
}
}

async function onComplete (job) {
if (!job.failed) {
global.GlobalNotifier.omg(`${JOB_NAME}: finished`)
} else {
global.GlobalNotifier.omg(`${JOB_NAME}: failed`)
}
}

function _query () {
return `
INSERT INTO water.return_requirement_points (
return_requirement_id,
external_id,
point_id
)
SELECT
rr.return_requirement_id,
concat_ws(':', nrfp."FGAC_REGION_CODE", nrfp."ARTY_ID", nrfp."AAIP_ID") AS external_id,
p.id AS point_id
FROM
"import"."NALD_RET_FMT_POINTS" nrfp
INNER JOIN
water.return_requirements rr
ON nrfp."FGAC_REGION_CODE"=split_part(rr.external_id, ':',1)
AND nrfp."ARTY_ID"=split_part(rr.external_id, ':',2)
INNER JOIN
water.points p
ON nrfp."FGAC_REGION_CODE"=split_part(p.external_id, ':',1)
AND nrfp."AAIP_ID"=split_part(p.external_id, ':',2)
ON CONFLICT(external_id) DO UPDATE SET
point_id = excluded.point_id;
`
}

module.exports = {
JOB_NAME,
createMessage,
handler,
onComplete
}
42 changes: 42 additions & 0 deletions src/modules/points/plugin.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
'use strict'

const cron = require('node-cron')

const LicencesJob = require('./jobs/licences.js')
const PointsJob = require('./jobs/points.js')
const ReturnsJob = require('./jobs/returns.js')

const config = require('../../../config.js')

async function register (server, _options) {
// Register points job
await server.messageQueue.subscribe(PointsJob.JOB_NAME, PointsJob.handler)
await server.messageQueue.onComplete(PointsJob.JOB_NAME, (executedJob) => {
return PointsJob.onComplete(server.messageQueue, executedJob)
})

// Register licences job (licence_version_purpose_points)
await server.messageQueue.subscribe(LicencesJob.JOB_NAME, LicencesJob.handler)
await server.messageQueue.onComplete(LicencesJob.JOB_NAME, (executedJob) => {
return LicencesJob.onComplete(server.messageQueue, executedJob)
})

// Register returns job (return_requirement_points)
await server.messageQueue.subscribe(ReturnsJob.JOB_NAME, ReturnsJob.handler)
await server.messageQueue.onComplete(ReturnsJob.JOB_NAME, (executedJob) => {
return ReturnsJob.onComplete(executedJob)
})

// Schedule points job using cron. The points job will then queue the licences job in its onComplete
cron.schedule(config.import.points.schedule, async () => {
await server.messageQueue.publish(PointsJob.createMessage())
})
}

module.exports = {
plugin: {
name: 'importPoints',
dependencies: ['pgBoss'],
register
}
}
13 changes: 13 additions & 0 deletions src/modules/points/routes.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
'use strict'

const controller = require('./controller')

const routes = [
{
method: 'post',
handler: controller.points,
path: '/import/points'
}
]

module.exports = routes
1 change: 0 additions & 1 deletion src/modules/return-versions/jobs/import.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ async function handler () {

await pool.query(Queries.importReturnVersions)
await pool.query(Queries.importReturnRequirements)
await pool.query(Queries.importReturnRequirementPoints)
await pool.query(Queries.importReturnRequirementPurposes)
await pool.query(Queries.importReturnVersionsMultipleUpload)
await pool.query(Queries.importReturnVersionsCreateNotesFromDescriptions)
Expand Down
Loading

0 comments on commit 6e2e300

Please sign in to comment.