Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement new 'points' job #1028

Merged
merged 9 commits into from
Sep 24, 2024
1 change: 1 addition & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,6 @@ WRLS_LOG_LEVEL=debug
WRLS_CRON_NALD='15 23 * * *'
WRLS_CRON_LICENCES='15 3 * * 1,2,3,4,5'
WRLS_CRON_RETURN_VERSIONS='15 7 * * 1,2,3,4,5'
WRLS_CRON_POINTS='45 7 * * 1,2,3,4,5'
WRLS_CRON_MOD_LOGS='30 7 * * 1,2,3,4,5'
WRLS_CRON_TRACKER='0 10 * * 1,2,3,4,5'
3 changes: 3 additions & 0 deletions config.js
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ module.exports = {
modLogs: {
schedule: process.env.WRLS_CRON_MOD_LOGS || '30 7 * * 1,2,3,4,5'
},
points: {
schedule: process.env.WRLS_CRON_MOD_LOGS || '45 7 * * 1,2,3,4,5'
},
returnVersions: {
schedule: process.env.WRLS_CRON_RETURN_VERSIONS || '15 7 * * 1,2,3,4,5'
},
Expand Down
1 change: 1 addition & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const plugins = [
require('./src/modules/licence-import/plugin'),
require('./src/modules/charging-import/plugin'),
require('./src/modules/mod-logs/plugin'),
require('./src/modules/points/plugin'),
require('./src/modules/return-versions/plugin.js'),
require('./src/modules/nald-import/plugin'),
require('./src/modules/bill-runs-import/plugin'),
Expand Down
14 changes: 14 additions & 0 deletions src/modules/points/controller.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
'use strict'

const PointsJob = require('./jobs/points.js')

async function points (request, h) {
await request.messageQueue.deleteQueue(PointsJob.JOB_NAME)
await request.messageQueue.publish(PointsJob.createMessage())

return h.response().code(204)
}

module.exports = {
points
}
67 changes: 67 additions & 0 deletions src/modules/points/jobs/licences.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
'use strict'

const { pool } = require('../../../lib/connectors/db.js')
const ReturnsJob = require('./returns.js')

const JOB_NAME = 'points.licences'

function createMessage () {
return {
name: JOB_NAME,
options: {
singletonKey: JOB_NAME
}
}
}

async function handler () {
try {
global.GlobalNotifier.omg(`${JOB_NAME}: started`)

// Import licence version purpose points
await pool.query(_query())
} catch (error) {
global.GlobalNotifier.omfg(`${JOB_NAME}: errored`, error)
throw error
}
}

async function onComplete (messageQueue, job) {
if (!job.failed) {
await messageQueue.publish(ReturnsJob.createMessage())

global.GlobalNotifier.omg(`${JOB_NAME}: finished`)
} else {
global.GlobalNotifier.omg(`${JOB_NAME}: failed`)
}
}

function _query () {
return `
INSERT INTO water.licence_version_purpose_points (
licence_version_purpose_id,
external_id,
point_id
)
SELECT
lvp.licence_version_purpose_id,
(concat_ws(':', napp."FGAC_REGION_CODE", napp."AABP_ID", napp."AAIP_ID")) AS external_id,
p.id AS point_id
FROM
"import"."NALD_ABS_PURP_POINTS" napp
INNER JOIN water.licence_version_purposes lvp
ON napp."FGAC_REGION_CODE" = split_part(lvp.external_id, ':', 1) AND napp."AABP_ID" = split_part(lvp.external_id, ':', 2)
INNER JOIN water.points p
ON napp."FGAC_REGION_CODE"=split_part(p.external_id, ':',1) AND napp."AAIP_ID"=split_part(p.external_id, ':',2)
ON CONFLICT(external_id) DO
UPDATE SET
point_id = excluded.point_id;
`
}

module.exports = {
JOB_NAME,
createMessage,
handler,
onComplete
}
118 changes: 118 additions & 0 deletions src/modules/points/jobs/points.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
'use strict'

const { pool } = require('../../../lib/connectors/db.js')
const LicencesJob = require('./licences.js')

const JOB_NAME = 'points.points'

function createMessage () {
return {
name: JOB_NAME,
options: {
singletonKey: JOB_NAME
}
}
}

async function handler () {
try {
global.GlobalNotifier.omg(`${JOB_NAME}: started`)

// Import points
await pool.query(_query())
} catch (error) {
global.GlobalNotifier.omfg(`${JOB_NAME}: errored`, error)
throw error
}
}

async function onComplete (messageQueue, job) {
if (!job.failed) {
await messageQueue.publish(LicencesJob.createMessage())

global.GlobalNotifier.omg(`${JOB_NAME}: finished`)
} else {
global.GlobalNotifier.omg(`${JOB_NAME}: failed`)
}
}

function _query () {
return `
INSERT INTO water.points (
description,
ngr_1,
ngr_2,
ngr_3,
ngr_4,
source_id,
category,
primary_type,
secondary_type,
note,
location_note,
"depth",
bgs_reference,
well_reference,
hydro_reference,
hydro_intercept_distance,
hydro_offset_distance,
external_id
)
SELECT
np."LOCAL_NAME" AS description,
concat_ws(' ', np."NGR1_SHEET", np."NGR1_EAST", np."NGR1_NORTH") AS ngr_1,
(CASE np."NGR2_SHEET" WHEN 'null' THEN null ELSE concat_ws(' ', np."NGR2_SHEET", np."NGR2_EAST", np."NGR2_NORTH") END) AS ngr_2,
(CASE np."NGR3_SHEET" WHEN 'null' THEN null ELSE concat_ws(' ', np."NGR3_SHEET", np."NGR3_EAST", np."NGR3_NORTH") END) AS ngr_3,
(CASE np."NGR4_SHEET" WHEN 'null' THEN null ELSE concat_ws(' ', np."NGR4_SHEET", np."NGR4_EAST", np."NGR4_NORTH") END) AS ngr_4,
s.id AS source_id,
npc."DESCR" AS category,
nptp."DESCR" AS primary_type,
npts."DESCR" AS secondary_type,
(CASE np."NOTES" WHEN 'null' THEN NULL ELSE np."NOTES" END) AS note,
(CASE np."LOCATION_TEXT" WHEN 'null' THEN NULL ELSE np."LOCATION_TEXT" END) AS location_note,
(CASE np."DEPTH" WHEN 'null' THEN 0 ELSE np."DEPTH"::decimal END) AS "depth",
(CASE np."BGS_NO" WHEN 'null' THEN NULL ELSE np."BGS_NO" END) AS "bgs_reference",
(CASE np."REG_WELL_INDEX_REF" WHEN 'null' THEN NULL ELSE np."REG_WELL_INDEX_REF" END) AS well_reference,
(CASE np."HYDRO_REF" WHEN 'null' THEN NULL ELSE np."HYDRO_REF" END) AS hydro_reference,
(CASE np."HYDRO_INTERCEPT_DIST" WHEN 'null' THEN 0 ELSE np."HYDRO_INTERCEPT_DIST"::decimal END) AS hydro_intercept_distance,
(CASE np."HYDRO_GW_OFFSET_DIST" WHEN 'null' THEN 0 ELSE np."HYDRO_GW_OFFSET_DIST"::decimal END) AS hydro_offset_distance,
concat_ws(':', np."FGAC_REGION_CODE", np."ID") AS external_id
FROM
"import"."NALD_POINTS" np
INNER JOIN
water.sources s ON s.legacy_id = np."ASRC_CODE"
LEFT JOIN
"import"."NALD_POINT_CATEGORIES" npc ON npc."CODE" = np."AAPC_CODE"
LEFT JOIN
"import"."NALD_POINT_TYPE_PRIMS" nptp ON nptp."CODE" = np."AAPT_APTP_CODE"
LEFT JOIN
"import"."NALD_POINT_TYPE_SECS" npts ON npts."CODE" = np."AAPT_APTS_CODE"
ON CONFLICT(external_id)
DO UPDATE
SET
description = excluded.description,
ngr_1 = excluded.ngr_1,
ngr_2 = excluded.ngr_2,
ngr_3 = excluded.ngr_3,
ngr_4 = excluded.ngr_4,
source_id = excluded.source_id,
category = excluded.category,
primary_type = excluded.primary_type,
secondary_type = excluded.secondary_type,
note = excluded.note,
location_note = excluded.location_note,
"depth" = excluded."depth",
bgs_reference = excluded.bgs_reference,
well_reference = excluded.well_reference,
hydro_reference = excluded.hydro_reference,
hydro_intercept_distance = excluded.hydro_intercept_distance,
hydro_offset_distance = excluded.hydro_offset_distance;
`
}

module.exports = {
JOB_NAME,
createMessage,
handler,
onComplete
}
67 changes: 67 additions & 0 deletions src/modules/points/jobs/returns.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
'use strict'

const { pool } = require('../../../lib/connectors/db.js')

const JOB_NAME = 'points.returns'

function createMessage () {
return {
name: JOB_NAME,
options: {
singletonKey: JOB_NAME
}
}
}

async function handler () {
try {
global.GlobalNotifier.omg(`${JOB_NAME}: started`)

// Import licence version purpose points
await pool.query(_query())
} catch (error) {
global.GlobalNotifier.omfg(`${JOB_NAME}: errored`, error)
throw error
}
}

async function onComplete (job) {
if (!job.failed) {
global.GlobalNotifier.omg(`${JOB_NAME}: finished`)
} else {
global.GlobalNotifier.omg(`${JOB_NAME}: failed`)
}
}

function _query () {
return `
INSERT INTO water.return_requirement_points (
return_requirement_id,
external_id,
point_id
)
SELECT
rr.return_requirement_id,
concat_ws(':', nrfp."FGAC_REGION_CODE", nrfp."ARTY_ID", nrfp."AAIP_ID") AS external_id,
p.id AS point_id
FROM
"import"."NALD_RET_FMT_POINTS" nrfp
INNER JOIN
water.return_requirements rr
ON nrfp."FGAC_REGION_CODE"=split_part(rr.external_id, ':',1)
AND nrfp."ARTY_ID"=split_part(rr.external_id, ':',2)
INNER JOIN
water.points p
ON nrfp."FGAC_REGION_CODE"=split_part(p.external_id, ':',1)
AND nrfp."AAIP_ID"=split_part(p.external_id, ':',2)
ON CONFLICT(external_id) DO UPDATE SET
point_id = excluded.point_id;
`
}

module.exports = {
JOB_NAME,
createMessage,
handler,
onComplete
}
42 changes: 42 additions & 0 deletions src/modules/points/plugin.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
'use strict'

const cron = require('node-cron')

const LicencesJob = require('./jobs/licences.js')
const PointsJob = require('./jobs/points.js')
const ReturnsJob = require('./jobs/returns.js')

const config = require('../../../config.js')

async function register (server, _options) {
// Register points job
await server.messageQueue.subscribe(PointsJob.JOB_NAME, PointsJob.handler)
await server.messageQueue.onComplete(PointsJob.JOB_NAME, (executedJob) => {
return PointsJob.onComplete(server.messageQueue, executedJob)
})

// Register licences job (licence_version_purpose_points)
await server.messageQueue.subscribe(LicencesJob.JOB_NAME, LicencesJob.handler)
await server.messageQueue.onComplete(LicencesJob.JOB_NAME, (executedJob) => {
return LicencesJob.onComplete(server.messageQueue, executedJob)
})

// Register returns job (return_requirement_points)
await server.messageQueue.subscribe(ReturnsJob.JOB_NAME, ReturnsJob.handler)
await server.messageQueue.onComplete(ReturnsJob.JOB_NAME, (executedJob) => {
return ReturnsJob.onComplete(executedJob)
})

// Schedule points job using cron. The points job will then queue the licences job in its onComplete
cron.schedule(config.import.points.schedule, async () => {
await server.messageQueue.publish(PointsJob.createMessage())
})
}

module.exports = {
plugin: {
name: 'importPoints',
dependencies: ['pgBoss'],
register
}
}
13 changes: 13 additions & 0 deletions src/modules/points/routes.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
'use strict'

const controller = require('./controller')

const routes = [
{
method: 'post',
handler: controller.points,
path: '/import/points'
}
]

module.exports = routes
1 change: 0 additions & 1 deletion src/modules/return-versions/jobs/import.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ async function handler () {

await pool.query(Queries.importReturnVersions)
await pool.query(Queries.importReturnRequirements)
await pool.query(Queries.importReturnRequirementPoints)
await pool.query(Queries.importReturnRequirementPurposes)
await pool.query(Queries.importReturnVersionsMultipleUpload)
await pool.query(Queries.importReturnVersionsCreateNotesFromDescriptions)
Expand Down
Loading
Loading