diff --git a/.env.example b/.env.example index 15525094..1787e3ac 100644 --- a/.env.example +++ b/.env.example @@ -49,5 +49,6 @@ WRLS_LOG_LEVEL=debug WRLS_CRON_NALD='15 23 * * *' WRLS_CRON_LICENCES='15 3 * * 1,2,3,4,5' WRLS_CRON_RETURN_VERSIONS='15 7 * * 1,2,3,4,5' +WRLS_CRON_POINTS='45 7 * * 1,2,3,4,5' WRLS_CRON_MOD_LOGS='30 7 * * 1,2,3,4,5' WRLS_CRON_TRACKER='0 10 * * 1,2,3,4,5' diff --git a/config.js b/config.js index 7f66ecd9..29525509 100644 --- a/config.js +++ b/config.js @@ -106,6 +106,9 @@ module.exports = { modLogs: { schedule: process.env.WRLS_CRON_MOD_LOGS || '30 7 * * 1,2,3,4,5' }, + points: { + schedule: process.env.WRLS_CRON_MOD_LOGS || '45 7 * * 1,2,3,4,5' + }, returnVersions: { schedule: process.env.WRLS_CRON_RETURN_VERSIONS || '15 7 * * 1,2,3,4,5' }, diff --git a/index.js b/index.js index 5acb3779..1e0b6e54 100644 --- a/index.js +++ b/index.js @@ -29,6 +29,7 @@ const plugins = [ require('./src/modules/licence-import/plugin'), require('./src/modules/charging-import/plugin'), require('./src/modules/mod-logs/plugin'), + require('./src/modules/points/plugin'), require('./src/modules/return-versions/plugin.js'), require('./src/modules/nald-import/plugin'), require('./src/modules/bill-runs-import/plugin'), diff --git a/src/modules/points/controller.js b/src/modules/points/controller.js new file mode 100644 index 00000000..7bba36c9 --- /dev/null +++ b/src/modules/points/controller.js @@ -0,0 +1,14 @@ +'use strict' + +const PointsJob = require('./jobs/points.js') + +async function points (request, h) { + await request.messageQueue.deleteQueue(PointsJob.JOB_NAME) + await request.messageQueue.publish(PointsJob.createMessage()) + + return h.response().code(204) +} + +module.exports = { + points +} diff --git a/src/modules/points/jobs/licences.js b/src/modules/points/jobs/licences.js new file mode 100644 index 00000000..4c200e2f --- /dev/null +++ b/src/modules/points/jobs/licences.js @@ -0,0 +1,67 @@ +'use strict' + +const { pool } = require('../../../lib/connectors/db.js') +const ReturnsJob = require('./returns.js') + +const JOB_NAME = 'points.licences' + +function createMessage () { + return { + name: JOB_NAME, + options: { + singletonKey: JOB_NAME + } + } +} + +async function handler () { + try { + global.GlobalNotifier.omg(`${JOB_NAME}: started`) + + // Import licence version purpose points + await pool.query(_query()) + } catch (error) { + global.GlobalNotifier.omfg(`${JOB_NAME}: errored`, error) + throw error + } +} + +async function onComplete (messageQueue, job) { + if (!job.failed) { + await messageQueue.publish(ReturnsJob.createMessage()) + + global.GlobalNotifier.omg(`${JOB_NAME}: finished`) + } else { + global.GlobalNotifier.omg(`${JOB_NAME}: failed`) + } +} + +function _query () { + return ` + INSERT INTO water.licence_version_purpose_points ( + licence_version_purpose_id, + external_id, + point_id + ) + SELECT + lvp.licence_version_purpose_id, + (concat_ws(':', napp."FGAC_REGION_CODE", napp."AABP_ID", napp."AAIP_ID")) AS external_id, + p.id AS point_id + FROM + "import"."NALD_ABS_PURP_POINTS" napp + INNER JOIN water.licence_version_purposes lvp + ON napp."FGAC_REGION_CODE" = split_part(lvp.external_id, ':', 1) AND napp."AABP_ID" = split_part(lvp.external_id, ':', 2) + INNER JOIN water.points p + ON napp."FGAC_REGION_CODE"=split_part(p.external_id, ':',1) AND napp."AAIP_ID"=split_part(p.external_id, ':',2) + ON CONFLICT(external_id) DO + UPDATE SET + point_id = excluded.point_id; + ` +} + +module.exports = { + JOB_NAME, + createMessage, + handler, + onComplete +} diff --git a/src/modules/points/jobs/points.js b/src/modules/points/jobs/points.js new file mode 100644 index 00000000..d2cd926a --- /dev/null +++ b/src/modules/points/jobs/points.js @@ -0,0 +1,118 @@ +'use strict' + +const { pool } = require('../../../lib/connectors/db.js') +const LicencesJob = require('./licences.js') + +const JOB_NAME = 'points.points' + +function createMessage () { + return { + name: JOB_NAME, + options: { + singletonKey: JOB_NAME + } + } +} + +async function handler () { + try { + global.GlobalNotifier.omg(`${JOB_NAME}: started`) + + // Import points + await pool.query(_query()) + } catch (error) { + global.GlobalNotifier.omfg(`${JOB_NAME}: errored`, error) + throw error + } +} + +async function onComplete (messageQueue, job) { + if (!job.failed) { + await messageQueue.publish(LicencesJob.createMessage()) + + global.GlobalNotifier.omg(`${JOB_NAME}: finished`) + } else { + global.GlobalNotifier.omg(`${JOB_NAME}: failed`) + } +} + +function _query () { + return ` + INSERT INTO water.points ( + description, + ngr_1, + ngr_2, + ngr_3, + ngr_4, + source_id, + category, + primary_type, + secondary_type, + note, + location_note, + "depth", + bgs_reference, + well_reference, + hydro_reference, + hydro_intercept_distance, + hydro_offset_distance, + external_id + ) + SELECT + np."LOCAL_NAME" AS description, + concat_ws(' ', np."NGR1_SHEET", np."NGR1_EAST", np."NGR1_NORTH") AS ngr_1, + (CASE np."NGR2_SHEET" WHEN 'null' THEN null ELSE concat_ws(' ', np."NGR2_SHEET", np."NGR2_EAST", np."NGR2_NORTH") END) AS ngr_2, + (CASE np."NGR3_SHEET" WHEN 'null' THEN null ELSE concat_ws(' ', np."NGR3_SHEET", np."NGR3_EAST", np."NGR3_NORTH") END) AS ngr_3, + (CASE np."NGR4_SHEET" WHEN 'null' THEN null ELSE concat_ws(' ', np."NGR4_SHEET", np."NGR4_EAST", np."NGR4_NORTH") END) AS ngr_4, + s.id AS source_id, + npc."DESCR" AS category, + nptp."DESCR" AS primary_type, + npts."DESCR" AS secondary_type, + (CASE np."NOTES" WHEN 'null' THEN NULL ELSE np."NOTES" END) AS note, + (CASE np."LOCATION_TEXT" WHEN 'null' THEN NULL ELSE np."LOCATION_TEXT" END) AS location_note, + (CASE np."DEPTH" WHEN 'null' THEN 0 ELSE np."DEPTH"::decimal END) AS "depth", + (CASE np."BGS_NO" WHEN 'null' THEN NULL ELSE np."BGS_NO" END) AS "bgs_reference", + (CASE np."REG_WELL_INDEX_REF" WHEN 'null' THEN NULL ELSE np."REG_WELL_INDEX_REF" END) AS well_reference, + (CASE np."HYDRO_REF" WHEN 'null' THEN NULL ELSE np."HYDRO_REF" END) AS hydro_reference, + (CASE np."HYDRO_INTERCEPT_DIST" WHEN 'null' THEN 0 ELSE np."HYDRO_INTERCEPT_DIST"::decimal END) AS hydro_intercept_distance, + (CASE np."HYDRO_GW_OFFSET_DIST" WHEN 'null' THEN 0 ELSE np."HYDRO_GW_OFFSET_DIST"::decimal END) AS hydro_offset_distance, + concat_ws(':', np."FGAC_REGION_CODE", np."ID") AS external_id + FROM + "import"."NALD_POINTS" np + INNER JOIN + water.sources s ON s.legacy_id = np."ASRC_CODE" + LEFT JOIN + "import"."NALD_POINT_CATEGORIES" npc ON npc."CODE" = np."AAPC_CODE" + LEFT JOIN + "import"."NALD_POINT_TYPE_PRIMS" nptp ON nptp."CODE" = np."AAPT_APTP_CODE" + LEFT JOIN + "import"."NALD_POINT_TYPE_SECS" npts ON npts."CODE" = np."AAPT_APTS_CODE" + ON CONFLICT(external_id) + DO UPDATE + SET + description = excluded.description, + ngr_1 = excluded.ngr_1, + ngr_2 = excluded.ngr_2, + ngr_3 = excluded.ngr_3, + ngr_4 = excluded.ngr_4, + source_id = excluded.source_id, + category = excluded.category, + primary_type = excluded.primary_type, + secondary_type = excluded.secondary_type, + note = excluded.note, + location_note = excluded.location_note, + "depth" = excluded."depth", + bgs_reference = excluded.bgs_reference, + well_reference = excluded.well_reference, + hydro_reference = excluded.hydro_reference, + hydro_intercept_distance = excluded.hydro_intercept_distance, + hydro_offset_distance = excluded.hydro_offset_distance; + ` +} + +module.exports = { + JOB_NAME, + createMessage, + handler, + onComplete +} diff --git a/src/modules/points/jobs/returns.js b/src/modules/points/jobs/returns.js new file mode 100644 index 00000000..f208a59b --- /dev/null +++ b/src/modules/points/jobs/returns.js @@ -0,0 +1,67 @@ +'use strict' + +const { pool } = require('../../../lib/connectors/db.js') + +const JOB_NAME = 'points.returns' + +function createMessage () { + return { + name: JOB_NAME, + options: { + singletonKey: JOB_NAME + } + } +} + +async function handler () { + try { + global.GlobalNotifier.omg(`${JOB_NAME}: started`) + + // Import licence version purpose points + await pool.query(_query()) + } catch (error) { + global.GlobalNotifier.omfg(`${JOB_NAME}: errored`, error) + throw error + } +} + +async function onComplete (job) { + if (!job.failed) { + global.GlobalNotifier.omg(`${JOB_NAME}: finished`) + } else { + global.GlobalNotifier.omg(`${JOB_NAME}: failed`) + } +} + +function _query () { + return ` + INSERT INTO water.return_requirement_points ( + return_requirement_id, + external_id, + point_id + ) + SELECT + rr.return_requirement_id, + concat_ws(':', nrfp."FGAC_REGION_CODE", nrfp."ARTY_ID", nrfp."AAIP_ID") AS external_id, + p.id AS point_id + FROM + "import"."NALD_RET_FMT_POINTS" nrfp + INNER JOIN + water.return_requirements rr + ON nrfp."FGAC_REGION_CODE"=split_part(rr.external_id, ':',1) + AND nrfp."ARTY_ID"=split_part(rr.external_id, ':',2) + INNER JOIN + water.points p + ON nrfp."FGAC_REGION_CODE"=split_part(p.external_id, ':',1) + AND nrfp."AAIP_ID"=split_part(p.external_id, ':',2) + ON CONFLICT(external_id) DO UPDATE SET + point_id = excluded.point_id; + ` +} + +module.exports = { + JOB_NAME, + createMessage, + handler, + onComplete +} diff --git a/src/modules/points/plugin.js b/src/modules/points/plugin.js new file mode 100644 index 00000000..06c25235 --- /dev/null +++ b/src/modules/points/plugin.js @@ -0,0 +1,42 @@ +'use strict' + +const cron = require('node-cron') + +const LicencesJob = require('./jobs/licences.js') +const PointsJob = require('./jobs/points.js') +const ReturnsJob = require('./jobs/returns.js') + +const config = require('../../../config.js') + +async function register (server, _options) { + // Register points job + await server.messageQueue.subscribe(PointsJob.JOB_NAME, PointsJob.handler) + await server.messageQueue.onComplete(PointsJob.JOB_NAME, (executedJob) => { + return PointsJob.onComplete(server.messageQueue, executedJob) + }) + + // Register licences job (licence_version_purpose_points) + await server.messageQueue.subscribe(LicencesJob.JOB_NAME, LicencesJob.handler) + await server.messageQueue.onComplete(LicencesJob.JOB_NAME, (executedJob) => { + return LicencesJob.onComplete(server.messageQueue, executedJob) + }) + + // Register returns job (return_requirement_points) + await server.messageQueue.subscribe(ReturnsJob.JOB_NAME, ReturnsJob.handler) + await server.messageQueue.onComplete(ReturnsJob.JOB_NAME, (executedJob) => { + return ReturnsJob.onComplete(executedJob) + }) + + // Schedule points job using cron. The points job will then queue the licences job in its onComplete + cron.schedule(config.import.points.schedule, async () => { + await server.messageQueue.publish(PointsJob.createMessage()) + }) +} + +module.exports = { + plugin: { + name: 'importPoints', + dependencies: ['pgBoss'], + register + } +} diff --git a/src/modules/points/routes.js b/src/modules/points/routes.js new file mode 100644 index 00000000..2ac89bb6 --- /dev/null +++ b/src/modules/points/routes.js @@ -0,0 +1,13 @@ +'use strict' + +const controller = require('./controller') + +const routes = [ + { + method: 'post', + handler: controller.points, + path: '/import/points' + } +] + +module.exports = routes diff --git a/src/modules/return-versions/jobs/import.js b/src/modules/return-versions/jobs/import.js index 5f0db107..96b19603 100644 --- a/src/modules/return-versions/jobs/import.js +++ b/src/modules/return-versions/jobs/import.js @@ -20,7 +20,6 @@ async function handler () { await pool.query(Queries.importReturnVersions) await pool.query(Queries.importReturnRequirements) - await pool.query(Queries.importReturnRequirementPoints) await pool.query(Queries.importReturnRequirementPurposes) await pool.query(Queries.importReturnVersionsMultipleUpload) await pool.query(Queries.importReturnVersionsCreateNotesFromDescriptions) diff --git a/src/modules/return-versions/lib/import-queries.js b/src/modules/return-versions/lib/import-queries.js index 4685fcfd..8f14a646 100644 --- a/src/modules/return-versions/lib/import-queries.js +++ b/src/modules/return-versions/lib/import-queries.js @@ -117,36 +117,6 @@ join water.purposes_uses u on nrp."APUR_APUS_CODE"=u.legacy_id join water.return_requirements r on r.external_id = concat_ws(':', nrp."FGAC_REGION_CODE", nrp."ARTY_ID") on conflict(external_id) do update set purpose_alias=excluded.purpose_alias, date_updated=excluded.date_updated; ` -const importReturnRequirementPoints = `insert into water.return_requirement_points ( - return_requirement_id, - description, - ngr_1, - ngr_2, - ngr_3, - ngr_4, - external_id, - nald_point_id - ) - select - rr.return_requirement_id, - np."LOCAL_NAME" as description, - concat_ws(' ', np."NGR1_SHEET", np."NGR1_EAST", np."NGR1_NORTH") AS ngr_1, - case np."NGR2_SHEET" when 'null' then null else concat_ws(' ', np."NGR2_SHEET", np."NGR2_EAST", np."NGR2_NORTH") end AS ngr_2, - case np."NGR3_SHEET" when 'null' then null else concat_ws(' ', np."NGR3_SHEET", np."NGR3_EAST", np."NGR3_NORTH") end AS ngr_3, - case np."NGR4_SHEET" when 'null' then null else concat_ws(' ', np."NGR4_SHEET", np."NGR4_EAST", np."NGR4_NORTH") end AS ngr_4, - concat_ws(':', nrfp."FGAC_REGION_CODE", nrfp."ARTY_ID", nrfp."AAIP_ID") as external_id, - nrfp."AAIP_ID"::integer as nald_point_id - from import."NALD_RET_FMT_POINTS" nrfp - join water.return_requirements rr on nrfp."FGAC_REGION_CODE"=split_part(rr.external_id, ':',1) and nrfp."ARTY_ID"=split_part(rr.external_id, ':',2) - join import."NALD_POINTS" np on np."ID"=nrfp."AAIP_ID" and np."FGAC_REGION_CODE"=nrfp."FGAC_REGION_CODE" - on conflict(external_id) do update set - description=excluded.description, - ngr_1=excluded.ngr_1, - ngr_2=excluded.ngr_2, - ngr_3=excluded.ngr_3, - ngr_4=excluded.ngr_4; -` - const importReturnVersionsMultipleUpload = `update water.return_versions set multiple_upload = distinctReturnRequirements.is_upload from ( @@ -234,7 +204,6 @@ WHERE rv.return_version_id = bq.return_version_id; module.exports = { importReturnVersions, importReturnRequirements, - importReturnRequirementPoints, importReturnRequirementPurposes, importReturnVersionsCreateNotesFromDescriptions, importReturnVersionsMultipleUpload, diff --git a/src/routes.js b/src/routes.js index 1ec5276c..7aab2ee0 100644 --- a/src/routes.js +++ b/src/routes.js @@ -7,6 +7,7 @@ const naldImportRoutes = require('./modules/nald-import/routes') const returnsRoutes = require('./modules/returns/routes') const returnVersionsRoutes = require('./modules/return-versions/routes.js') const modLogsRoutes = require('./modules/mod-logs/routes.js') +const pointsRoutes = require('./modules/points/routes.js') module.exports = [ ...chargingImportRoutes, @@ -17,5 +18,6 @@ module.exports = [ ...naldImportRoutes, ...returnsRoutes, ...returnVersionsRoutes, - ...modLogsRoutes + ...modLogsRoutes, + ...pointsRoutes ]