Skip to content

Commit

Permalink
Merge pull request #39 from DEFRA/feature/add-sql-task
Browse files Browse the repository at this point in the history
Feature/add sql task
  • Loading branch information
suityou01 authored Oct 10, 2024
2 parents 9051fa6 + 12fa904 commit dc89533
Show file tree
Hide file tree
Showing 23 changed files with 900 additions and 108 deletions.
3 changes: 2 additions & 1 deletion .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
"chris-noring.node-snippets",
"cweijan.vscode-postgresql-client2",
"mechatroner.rainbow-csv",
"esbenp.prettier-vscode"
"esbenp.prettier-vscode",
"sonarsource.sonarlint-vscode"
],
"settings": {
"jest.runMode": {
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "ffc-pay-etl-framework",
"version": "0.1.14",
"version": "1.0.0",
"publisher": "Defra",
"main": "dist/cjs/index.js",
"private": false,
Expand Down
4 changes: 2 additions & 2 deletions sonar-project.properties
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
sonar.javascript.exclusions=**/jest.config.js,**/jest.setup.js,**/__mocks__/**,**/node_modules/**,**/test/**,**/test-output/**
sonar.javascript.exclusions=**/jest.config.js,**/jest.setup.js,**/__mocks__/**,**/node_modules/**,**/test/**,**/test-output/**,/src/examples/**
sonar.javascript.lcov.reportPaths=test-output/lcov.info
sonar.exclusions=/test/**,**/*.test.js,*snyk_report.html,*snyk_report.css
sonar.exclusions=/test/**,**/*.test.js,*snyk_report.html,*snyk_report.css,/src/examples/**
5 changes: 5 additions & 0 deletions src/database_connections/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
const { PostgresDatabaseConnection } = require("./postgresDatabaseConnection")

module.exports = {
PostgresDatabaseConnection
}
47 changes: 47 additions & 0 deletions src/database_connections/postgresDatabaseConnection.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
const DEFAULT_PORT = 5432
const { Sequelize } = require('sequelize')
const debug = require('debug')('connection')

/**
*
* @param {Object} options
* @param {Object} options.connectionname
* @param {Object} options.username
* @param {Object} options.password
* @param {Object} options.database
* @param {Object} options.host
* @param {Object} options.port
* @returns Connection
*/
async function PostgresDatabaseConnection(options){
const connectionname = options.connectionname
const username = options.username
const password = options.password
const database = options.database
const host = options.host
const port = options.port || DEFAULT_PORT

const sequelize = new Sequelize(database, username, password, {
host: host,
port: port,
dialect: 'postgres',
logging: false
})

try{
await sequelize.authenticate()
debug('sequelize.authenticate succeeded')
return {
name: connectionname,
db: sequelize
}
} catch(e){
debug('sequelize.authenticate failed')
debug(e)
throw e
}
}

module.exports = {
PostgresDatabaseConnection
}
18 changes: 17 additions & 1 deletion src/destinations/consoleDestination.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,30 @@ const { Writable } = require("node:stream")
*/
function ConsoleDestination(options){
const includeErrors = options.includeErrors
return new Writable({
const writable = new Writable({
objectMode: true,
write(chunk, _, callback){
if(chunk.errors.length === 0 || includeErrors)
console.log(chunk)
// @ts-ignore
this.tasks?.forEach(task => task.write(chunk))
callback()
}
})

Object.assign(writable, {
setConnection: function (connection){
this.connection = connection
}.bind(writable),
getConnectionName: function (){
return this.connection?.name
}.bind(writable),
setTasks: function(tasks){
this.tasks = tasks
}.bind(writable)
})

return writable
}

module.exports = {
Expand Down
18 changes: 17 additions & 1 deletion src/destinations/csvFileDestination.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ function CSVFileDestination(options){
fileHandle = fd
}))
let headersWritten = false
return new Writable({
const writable = new Writable({
objectMode: true,
write(chunk, _, callback){
if(!headersWritten && headers) {
Expand All @@ -42,6 +42,8 @@ function CSVFileDestination(options){
fs.writeFileSync(fileHandle, `${chunk.map(c => `"${c}"`).join(",")}\n`)
}
}
// @ts-ignore
this.tasks?.forEach(task => task.write(chunk))
lastChunk = chunk
callback()
},
Expand All @@ -50,6 +52,20 @@ function CSVFileDestination(options){
callback()
}
})

Object.assign(writable, {
setConnection: function (connection){
this.connection = connection
}.bind(writable),
getConnectionName: function (){
return this.connection?.name
}.bind(writable),
setTasks: function(tasks){
this.tasks = tasks
}.bind(writable)
})

return writable
}

util.inherits(CSVFileDestination, EventEmitter)
Expand Down
50 changes: 20 additions & 30 deletions src/destinations/postgresDestination.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,49 +47,25 @@ function writeInsertStatement(columnMapping, table, chunk){
*
* @param {Object} options
* @param {Object} options.table
* @param {Object} options.username
* @param {Object} options.password
* @param {Object} options.database
* @param {Object} options.host
* @param {Object} options.port
* @param {Object} options.connectionname
* @param {Object} options.mapping
* @param {Object} options.includeErrors
* @returns Transform
*/
function PostgresDestination(options){
EventEmitter.call(this)
const table = options.table
const username = options.username
const password = options.password
const database = options.database
const host = options.host
const port = options.port || DEFAULT_PORT
const connectionname = options.connectionname
const mapping = options.mapping
const includeErrors = options.includeErrors
let lastChunk

const sequelize = new Sequelize(database, username, password, {
host: host,
port: port,
dialect: 'postgres',
logging: false
})

try{
sequelize.authenticate()
debug('sequelize.authenticate succeeded')
} catch(e){
debug('sequelize.authenticate failed')
debug(e)
throw e
}

return new Transform({
const transform = new Transform({
objectMode: true,
emitClose: true,
construct(callback){
// @ts-ignore
this.sequelize = sequelize
// @ts-ignore
this.connectionname = connectionname
callback()
},
write(chunk, _, callback){
Expand All @@ -99,12 +75,14 @@ function PostgresDestination(options){
insertStatement = writeInsertStatement(mapping, table, chunk)
debug('Insert statement: [%s]', insertStatement)
// @ts-ignore
this.sequelize.query(insertStatement)
this.connection.db.query(insertStatement)
.then(result => {
debug('result %o', result)
chunk._result = result
lastChunk = chunk
// @ts-ignore
this.tasks?.forEach(task => task.write(chunk))
// @ts-ignore
callback(null, chunk)
}).catch(error => {
debug('error %o', error)
Expand All @@ -122,6 +100,18 @@ function PostgresDestination(options){
callback()
}
})
Object.assign(Transform.prototype, {
setConnection: function (connection){
this.connection = connection
}.bind(transform),
getConnectionName: function (){
return this.connection?.name
}.bind(transform),
setTasks: function(tasks){
this.tasks = tasks
}.bind(transform)
})
return transform;
}

module.exports = {
Expand Down
30 changes: 23 additions & 7 deletions src/destinations/sqlFileDestination.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ function SQLFileDestination(options){
})
}
function writeUpdateStatement(chunk){

//TODO
}
return new Writable({
const writable = new Writable({
objectMode: true,
write(chunk, _, callback){
if(chunk.errors.length === 0)
Expand All @@ -52,15 +52,31 @@ function SQLFileDestination(options){
if(chunk.errors.length === 0 | includeErrors){
writeInsertStatement(chunk)
}
} else if (sqlMode === SQL_MODE.UPDATE_MODE){
// @ts-ignore
if(chunk.errors.length === 0 | includeErrors){
writeUpdateStatement(chunk)
}
}
// TODO - Disabled because Sonar
// } else if (sqlMode === SQL_MODE.UPDATE_MODE){
// // @ts-ignore
// if(chunk.errors.length === 0 | includeErrors){
// writeUpdateStatement(chunk)
// }
// }
callback()
}
})

Object.assign(writable, {
setConnection: function (connection){
this.connection = connection
}.bind(writable),
getConnectionName: function (){
return this.connection?.name
}.bind(writable),
setTasks: function(tasks){
this.tasks = tasks
}.bind(writable)
})

return writable
}

module.exports = {
Expand Down
Loading

0 comments on commit dc89533

Please sign in to comment.