Skip to content

Commit

Permalink
Merge pull request #40 from DEFRA/feature/return-values-from-raw-sql
Browse files Browse the repository at this point in the history
Return values
  • Loading branch information
suityou01 authored Oct 14, 2024
2 parents dc89533 + d995fa4 commit 54a05bb
Show file tree
Hide file tree
Showing 8 changed files with 216 additions and 27 deletions.
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "ffc-pay-etl-framework",
"version": "1.0.0",
"version": "1.1.0",
"publisher": "Defra",
"main": "dist/cjs/index.js",
"private": false,
Expand All @@ -13,7 +13,7 @@
"build:esm": "tsc -p ./tsconfig.esm.json && mv dist/esm/index.js dist/esm/index.mjs",
"build:cjs": "tsc -p ./tsconfig.cjs.json",
"prepack": "npm run build",
"example": "node ./src/examples/csv-multi-tool-validator.js"
"example": "node ./src/examples/sql-return-values.js"
},
"keywords": [
"ETL"
Expand Down
35 changes: 24 additions & 11 deletions src/destinations/postgresDestination.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
const EventEmitter = require('node:events')
const { Transform } = require('node:stream')
const { Sequelize } = require('sequelize')
const debug = require('debug')('destination')
const DEFAULT_PORT = 5432

Expand All @@ -9,37 +8,48 @@ function isKeyWord(column) {
}

function getMappingForColumn(mapping, column){
if(mapping.length === 0) { return {} }
const [map] = mapping.filter(m => m.column === column)
return map
}

function hasReturningColumns(mapping){
return mapping.filter(m => m.returning === true).length > 0
}

function getReturningColumns(mapping){
return mapping.filter(m => m.returning === true).map(m => m?.targetColumn)
}

function writeInsertStatement(columnMapping, table, chunk){
let statement = `INSERT INTO ${table} (${chunk._columns.map(column => {
const mapping = getMappingForColumn(columnMapping, column)
return mapping.targetColumn
return mapping?.targetColumn
?
isKeyWord(mapping.targetColumn)
? `"${mapping.targetColumn}"`
: mapping.targetColumn
: isKeyWord(column)
? `"${mapping.column}"`
: mapping.column
? `"${mapping?.column}"`
: mapping?.column
})
.join(",")}) VALUES (${chunk._columns.map((column,index) => {
const mapping = getMappingForColumn(columnMapping, column)
if(!mapping) debug('Mapping not found for column %s', column)
if (mapping.targetType === "number" && (isNaN(chunk[index]) || chunk[index] === '')) {
if (mapping?.targetType === "number" && (isNaN(chunk[index]) || chunk[index] === '')) {
debug('Source data is not a number')
return 0
}
if(mapping.targetType === "varchar" || mapping.targetType === "char"){
if(mapping?.targetType === "varchar" || mapping?.targetType === "char"){
return `'${chunk[index]}'`
}
if(mapping.targetType === "date"){
return `to_date('${chunk[index]}','${mapping.format}')`
if(mapping?.targetType === "date"){
return `to_date('${chunk[index]}','${mapping?.format}')`
}
return chunk[index] ? chunk[index] : 'null'
})})`
if(hasReturningColumns(columnMapping)){
statement = statement + ` RETURNING ${getReturningColumns(columnMapping).join(',')}`
}
return statement
}

Expand Down Expand Up @@ -71,7 +81,7 @@ function PostgresDestination(options){
write(chunk, _, callback){
let insertStatement
// @ts-ignore
if(chunk.errors.length === 0 | options.includeErrors){
if(chunk.errors.length === 0 || options.includeErrors){
insertStatement = writeInsertStatement(mapping, table, chunk)
debug('Insert statement: [%s]', insertStatement)
// @ts-ignore
Expand Down Expand Up @@ -101,6 +111,7 @@ function PostgresDestination(options){
}
})
Object.assign(Transform.prototype, {
type: 'PostgresDestination',
setConnection: function (connection){
this.connection = connection
}.bind(transform),
Expand All @@ -118,5 +129,7 @@ module.exports = {
PostgresDestination,
writeInsertStatement,
isKeyWord,
getMappingForColumn
getMappingForColumn,
hasReturningColumns,
getReturningColumns
}
37 changes: 37 additions & 0 deletions src/examples/sql-return-values.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
const { Etl, Loaders, Destinations, Connections } = require("ffc-pay-etl-framework")

const csvFile = `${process.cwd()}/test/fixtures/SoilType.csv`

const columns = [
"Dist Code",
"Year",
"State Code",
"State Name",
"Dist Name",
"SOIL TYPE PERCENT (Percent)"
]

const etl = new Etl.Etl()

etl
.connection(new Connections.PostgresDatabaseConnection({
name: 'MyConnection',
username: "postgres",
password: "ppp",
host: "localhost"
}))
.loader(Loaders.CSVLoader({path: csvFile, columns: columns}))
.destination(Destinations.PostgresDestination({
table: "target",
connection: "MyConnection",
mapping: [
{
column: "Dist Code",
targetColumn: "dist_code",
targetType: "string",
returning: true
}
],
includeErrors: false
}))
.pump()
19 changes: 14 additions & 5 deletions src/lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ const EventEmitter = require('node:events')
const util = require('node:util')
const { RowMetaData } = require("./rowMetaData")
const { compose } = require("node:stream")
const { Destinations } = require('..')

/**
* @typedef {Object} Etl
Expand All @@ -20,7 +21,8 @@ const { compose } = require("node:stream")
*/
function Etl(){
EventEmitter.call(this)
let self = this
const self = this
self.store = []
self.beforeETLList = []
self.connectionList = []
self.validatorList = []
Expand Down Expand Up @@ -57,6 +59,7 @@ function Etl(){
throw new Error(`Connection with name ${connectionname} not found`)
}
pipelineTask.setConnection(connection)
pipelineTask.setETL(self)
self.beforeETLList.push(pipelineTask)
return self
}
Expand All @@ -73,13 +76,19 @@ function Etl(){

this.destination = (destination, ...tasks) => {
const connectionname = destination.getConnectionName()

const connection = this.connectionList.filter(c => c.name === connectionname)[0]
if (connection) {
if (!connection && destination.type === 'PostgresDestination') {
throw new Error(`No connection could be found with name ${connectionname}`)
} else {
destination.setConnection(connection)
}
}

tasks && destination.setTasks(tasks)
if(tasks){
for(const task of tasks){
task.setETL(self)
}
destination.setTasks(tasks)
}
self.destinationList.push(destination)
return self
}
Expand Down
10 changes: 9 additions & 1 deletion src/misc/postgresSQLTask.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ function PostgresSQLTask(options){
}
else {
const interpolatedSql = doPlaceHolderValueInterpolations(chunk, this.sql, placeholders)
//TODO add more interpolation mechanisms to specify return values
// e.g. 'myReturnVal = SELECT MAX ID FROM TABLE;'
// and write to etl.store.myReturnVal or
// e.g. 'chunk.myReturnVal = SELECT MAX ID FROM TABLE;'
// and write to the chunk in say chunk.store.myReturnVal
this.connection.db.query(interpolatedSql)
}
callback(null, chunk)
Expand All @@ -51,7 +56,10 @@ function PostgresSQLTask(options){
this.connection = connection
}.bind(passthrough),
getConnectionName: function (){
return this.connectionname
return this.connection.name
}.bind(passthrough),
setETL: function (etl) {
this.etl = etl
}.bind(passthrough)
})
return passthrough
Expand Down
52 changes: 47 additions & 5 deletions test/destinations/postgresDestination.test.js
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
const { expect } = require('@jest/globals')
const {
PostgresDestination, writeInsertStatement
PostgresDestination, writeInsertStatement, hasReturningColumns,
getReturningColumns, getMappingForColumn
} = require('../../src/destinations/postgresDestination')
const { Readable } = require('node:stream')
const { Sequelize } = require('sequelize').Sequelize

jest.mock('fs', () => ({
writeFileSync: jest.fn(),
open: jest.fn().mockReturnValue({ fd : 1 })
}))

const logSpy = jest.spyOn(process.stderr, 'write')

const config = {
username: "postgres",
password : "ppp",
Expand Down Expand Up @@ -50,7 +49,6 @@ describe('postgresDestination tests', () => {
jest.clearAllMocks()
})
it('should write a row', (done) => {
// This test was working fine until today
const uut = new PostgresDestination(config)
uut.setConnection(mockConnection)
const testData =["a", "b", "c"]
Expand All @@ -65,6 +63,26 @@ describe('postgresDestination tests', () => {
})
.pipe(uut)
})
it('should fail to write a row', (done) => {
const uut = new PostgresDestination(config)
uut.setConnection({
name: 'Mock Connection',
db: {
query: jest.fn().mockResolvedValue(Promise.reject())
}
})

const testData =["a", "b", "c"]
testData.errors = []
testData.rowId = 1
testData._columns = ["column1", "column2", "column3"]
const readable = Readable.from([testData])
readable
.on('close', (result) => {
done()
})
.pipe(uut)
})
it('should fire result event', (done) => {
// This test was working fine until today
const uut = new PostgresDestination(config)
Expand Down Expand Up @@ -103,7 +121,6 @@ describe('postgresDestination tests', () => {
.pipe(uut)
})
it('should format a date as specified', (done) => {
// This test was working fine until today
const newConfig = JSON.parse(JSON.stringify(config))
newConfig.mapping[1].targetType = "date"
newConfig.mapping[1].format = "DD-MM-YYYY HH24:MI:SS"
Expand Down Expand Up @@ -193,6 +210,17 @@ describe('postgresDestination tests', () => {
const result = writeInsertStatement(newMapping, mockTable, mockChunk)
expect(result).toEqual("INSERT INTO MockTable (target_column1,target_column2,target_column3) VALUES ('a',0,'c')")
})
it('should write a sql statement when a target column has the returning flag set', () => {
const newMapping = JSON.parse(JSON.stringify(config.mapping))
newMapping[1].returning = true
const mockTable = "MockTable"
const mockChunk = ["a", "a999", "c"]
mockChunk.errors = []
mockChunk.rowId = 1
mockChunk._columns = ["column1", "column2", "column3"]
const result = writeInsertStatement(newMapping, mockTable, mockChunk)
expect(result).toEqual("INSERT INTO MockTable (target_column1,target_column2,target_column3) VALUES ('a','a999','c') RETURNING target_column2")
})
it('should fire off any addtional tasks', (done) => {
const mockTasks = [{
write: jest.fn()
Expand Down Expand Up @@ -221,4 +249,18 @@ describe('postgresDestination tests', () => {
uut.setConnection(mockConnection)
expect(uut.getConnectionName()).toEqual(mockConnection.name)
})
it('should return true if mapping contains one or more mappings that have the returning flag set', () => {
const newConfig = JSON.parse(JSON.stringify(config))
newConfig.mapping[0].returning = true
expect(hasReturningColumns(newConfig.mapping)).toEqual(true)
})
it('should return false if mapping does not contain one or more mappings that have the returning flag set', () => {
const newConfig = JSON.parse(JSON.stringify(config))
expect(hasReturningColumns(newConfig.mapping)).toEqual(false)
})
it('should return array of mappings that have the returning flag set', () => {
const newConfig = JSON.parse(JSON.stringify(config))
newConfig.mapping[0].returning = true
expect(getReturningColumns(newConfig.mapping)).toEqual(['target_column1'])
})
})
Loading

0 comments on commit 54a05bb

Please sign in to comment.