Skip to content

Commit

Permalink
Merge pull request #14 from DEFRA/fix/postgresDestination-back-pressure
Browse files Browse the repository at this point in the history
Fix/postgres destination back pressure
  • Loading branch information
suityou01 authored Aug 15, 2024
2 parents b6e448a + 35f683d commit 25539f2
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 30 deletions.
13 changes: 13 additions & 0 deletions __mocks__/sequelize.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
"use strict"

const Sequelize = jest.fn()
const authenticate = jest.fn().mockImplementation(()=>Promise.resolve(true))
const query = jest.fn().mockImplementation((sql)=>Promise.resolve([[],1]))
Sequelize.mockImplementation((database, username, password, options) => ({
authenticate: authenticate,
query: query
}))

module.exports = Sequelize
module.exports.Sequelize = Sequelize
module.exports.default = Sequelize
2 changes: 1 addition & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "ffc-pay-etl-framework",
"version": "0.0.6",
"version": "0.0.7",
"publisher": "Defra",
"main": "dist/cjs/index.js",
"private": false,
Expand Down
19 changes: 16 additions & 3 deletions src/destinations/postgresDestination.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@ function PostgresDestination(options){
dialect: 'postgres',
logging: false
})

try{
sequelize.authenticate()
debug('sequelize.authenticate succeeded')
} catch(e){
debug('sequelize.authenticate failed')
debug(e)
throw e
}

function getMappingForColumn(column){
Expand All @@ -54,28 +56,39 @@ function PostgresDestination(options){
if(!mapping) debug('Mapping not found for column %s', column)
if(mapping.targetType === "varchar" || mapping.targetType === "char")
return `'${chunk[index]}'`
return chunk[index]
return chunk[index] ? chunk[index] : 'null'
})})`
return statement
}
return new Transform({
objectMode: true,
transform(chunk, _, callback){
emitClose: true,
construct(callback){
// @ts-ignore
this.sequelize = sequelize
callback()
},
write(chunk, _, callback){
let insertStatement
// @ts-ignore
if(chunk.errors.length === 0 | options.includeErrors){
insertStatement = writeInsertStatement(chunk)
debug('Insert statement: [%s]', insertStatement)
sequelize.query(insertStatement)
// @ts-ignore
this.sequelize.query(insertStatement)
.then(result => {
debug('result %o', result)
chunk._result = result
// @ts-ignore
callback(null, chunk)
}).catch(error => {
debug('error %o', error)
chunk.errors.push(error)
// @ts-ignore
callback(null, chunk)
})
} else {
debug('Chunk has errors %o', chunk)
}
}
})
Expand Down
2 changes: 1 addition & 1 deletion src/lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ function Etl(){
.pump(this.loader)
.pipe(
compose(
RowMetaData,
RowMetaData(),
...self.validatorList,
...self.transformationList,
...self.destinationList
Expand Down
3 changes: 2 additions & 1 deletion src/loaders/csvloader.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ function CSVLoader(options){
.pipe(new Transform({
readableObjectMode: true,
writableObjectMode: true,
emitClose: true,
transform(chunk, _, callback){
chunk["_columns"] = options.columns
callback(null, chunk)
}
})
)
)
}
return csvLoader
}
Expand Down
33 changes: 10 additions & 23 deletions test/destinations/postgresDestination.test.js
Original file line number Diff line number Diff line change
@@ -1,25 +1,16 @@
const { expect } = require('@jest/globals')
const { PostgresDestination } = require('../../src/destinations')
const { Readable } = require('node:stream')
const { Readable, PassThrough } = require('node:stream')
const { Sequelize } = require('sequelize')

jest.mock("sequelize", () => ({
Sequelize: jest.fn().mockImplementation(()=> {
return {
authenticate: jest.fn().mockResolvedValue(true),
query: jest.fn().mockResolvedValue([[],1])
}
})
})
)

Sequelize.prototype.authenticate = jest.fn()
jest.mock('sequelize')

jest.mock('fs', () => ({
writeFileSync: jest.fn(),
open: jest.fn().mockReturnValue({ fd : 1 })
}))

const logSpy = jest.spyOn(process.stderr, 'write')

const config = {
username: "postgres",
password : "ppp",
Expand Down Expand Up @@ -47,12 +38,8 @@ const config = {
}

describe('postgresDestination tests', () => {
beforeEach(()=> {
Sequelize.prototype.authenticate = jest.fn().mockResolvedValue(true)
Sequelize.prototype.query = jest.fn().mockResolvedValue([[],1])
})
afterEach(() => {
jest.resetAllMocks()
jest.clearAllMocks()
})
it('should write a row', (done) => {
const uut = PostgresDestination(config)
Expand All @@ -62,25 +49,25 @@ describe('postgresDestination tests', () => {
testData._columns = ["column1", "column2", "column3"]
const readable = Readable.from([testData])
readable
.pipe(uut)
.on("data", (chunk) => {
.on('close', (result) => {
expect(Sequelize().query).toHaveBeenLastCalledWith("INSERT INTO target (target_column1,target_column2,target_column3) VALUES ('a','b','c')")
done()
})
.pipe(uut)
})
it('should produce debug output', (done) => {
const logSpy = jest.spyOn(process.stderr, 'write')
const uut = PostgresDestination(config)
const testData =["a", "b", "c"]
testData.errors = []
testData.rowId = 1
testData._columns = ["column1", "column2", "column3"]
const readable = Readable.from([testData])
readable
.pipe(uut)
.on("data", (chunk) => {
.on('close', () => {
expect(logSpy).toBeCalledTimes(3)
done()
})
.pipe(uut)
})
it('should connect to different port', () => {
PostgresDestination({
Expand Down

0 comments on commit 25539f2

Please sign in to comment.