diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index 0e82d55..ac391d6 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -26,7 +26,8 @@ "chris-noring.node-snippets", "cweijan.vscode-postgresql-client2", "mechatroner.rainbow-csv", - "esbenp.prettier-vscode" + "esbenp.prettier-vscode", + "sonarsource.sonarlint-vscode" ], "settings": { "jest.runMode": { diff --git a/package.json b/package.json index 056db4a..a4115dd 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "ffc-pay-etl-framework", - "version": "0.1.14", + "version": "1.0.0", "publisher": "Defra", "main": "dist/cjs/index.js", "private": false, diff --git a/sonar-project.properties b/sonar-project.properties index 1877ed6..d8f3c6a 100644 --- a/sonar-project.properties +++ b/sonar-project.properties @@ -1,3 +1,3 @@ -sonar.javascript.exclusions=**/jest.config.js,**/jest.setup.js,**/__mocks__/**,**/node_modules/**,**/test/**,**/test-output/** +sonar.javascript.exclusions=**/jest.config.js,**/jest.setup.js,**/__mocks__/**,**/node_modules/**,**/test/**,**/test-output/**,/src/examples/** sonar.javascript.lcov.reportPaths=test-output/lcov.info -sonar.exclusions=/test/**,**/*.test.js,*snyk_report.html,*snyk_report.css +sonar.exclusions=/test/**,**/*.test.js,*snyk_report.html,*snyk_report.css,/src/examples/** diff --git a/src/database_connections/index.js b/src/database_connections/index.js new file mode 100644 index 0000000..3557777 --- /dev/null +++ b/src/database_connections/index.js @@ -0,0 +1,5 @@ +const { PostgresDatabaseConnection } = require("./postgresDatabaseConnection") + +module.exports = { + PostgresDatabaseConnection +} \ No newline at end of file diff --git a/src/database_connections/postgresDatabaseConnection.js b/src/database_connections/postgresDatabaseConnection.js new file mode 100644 index 0000000..affe2b3 --- /dev/null +++ b/src/database_connections/postgresDatabaseConnection.js @@ -0,0 +1,47 @@ +const DEFAULT_PORT = 5432 +const { Sequelize } = require('sequelize') +const debug = require('debug')('connection') + +/** + * + * @param {Object} options + * @param {Object} options.connectionname + * @param {Object} options.username + * @param {Object} options.password + * @param {Object} options.database + * @param {Object} options.host + * @param {Object} options.port + * @returns Connection + */ +async function PostgresDatabaseConnection(options){ + const connectionname = options.connectionname + const username = options.username + const password = options.password + const database = options.database + const host = options.host + const port = options.port || DEFAULT_PORT + + const sequelize = new Sequelize(database, username, password, { + host: host, + port: port, + dialect: 'postgres', + logging: false + }) + + try{ + await sequelize.authenticate() + debug('sequelize.authenticate succeeded') + return { + name: connectionname, + db: sequelize + } + } catch(e){ + debug('sequelize.authenticate failed') + debug(e) + throw e + } +} + +module.exports = { + PostgresDatabaseConnection +} \ No newline at end of file diff --git a/src/destinations/consoleDestination.js b/src/destinations/consoleDestination.js index c022c64..ddced2f 100644 --- a/src/destinations/consoleDestination.js +++ b/src/destinations/consoleDestination.js @@ -8,14 +8,30 @@ const { Writable } = require("node:stream") */ function ConsoleDestination(options){ const includeErrors = options.includeErrors - return new Writable({ + const writable = new Writable({ objectMode: true, write(chunk, _, callback){ if(chunk.errors.length === 0 || includeErrors) console.log(chunk) + // @ts-ignore + this.tasks?.forEach(task => task.write(chunk)) callback() } }) + + Object.assign(writable, { + setConnection: function (connection){ + this.connection = connection + }.bind(writable), + getConnectionName: function (){ + return this.connection?.name + }.bind(writable), + setTasks: function(tasks){ + this.tasks = tasks + }.bind(writable) + }) + + return writable } module.exports = { diff --git a/src/destinations/csvFileDestination.js b/src/destinations/csvFileDestination.js index f012f3f..e7c6510 100644 --- a/src/destinations/csvFileDestination.js +++ b/src/destinations/csvFileDestination.js @@ -24,7 +24,7 @@ function CSVFileDestination(options){ fileHandle = fd })) let headersWritten = false - return new Writable({ + const writable = new Writable({ objectMode: true, write(chunk, _, callback){ if(!headersWritten && headers) { @@ -42,6 +42,8 @@ function CSVFileDestination(options){ fs.writeFileSync(fileHandle, `${chunk.map(c => `"${c}"`).join(",")}\n`) } } + // @ts-ignore + this.tasks?.forEach(task => task.write(chunk)) lastChunk = chunk callback() }, @@ -50,6 +52,20 @@ function CSVFileDestination(options){ callback() } }) + + Object.assign(writable, { + setConnection: function (connection){ + this.connection = connection + }.bind(writable), + getConnectionName: function (){ + return this.connection?.name + }.bind(writable), + setTasks: function(tasks){ + this.tasks = tasks + }.bind(writable) + }) + + return writable } util.inherits(CSVFileDestination, EventEmitter) diff --git a/src/destinations/postgresDestination.js b/src/destinations/postgresDestination.js index 2d17533..b8413d0 100644 --- a/src/destinations/postgresDestination.js +++ b/src/destinations/postgresDestination.js @@ -47,11 +47,7 @@ function writeInsertStatement(columnMapping, table, chunk){ * * @param {Object} options * @param {Object} options.table - * @param {Object} options.username - * @param {Object} options.password - * @param {Object} options.database - * @param {Object} options.host - * @param {Object} options.port + * @param {Object} options.connectionname * @param {Object} options.mapping * @param {Object} options.includeErrors * @returns Transform @@ -59,37 +55,17 @@ function writeInsertStatement(columnMapping, table, chunk){ function PostgresDestination(options){ EventEmitter.call(this) const table = options.table - const username = options.username - const password = options.password - const database = options.database - const host = options.host - const port = options.port || DEFAULT_PORT + const connectionname = options.connectionname const mapping = options.mapping const includeErrors = options.includeErrors let lastChunk - const sequelize = new Sequelize(database, username, password, { - host: host, - port: port, - dialect: 'postgres', - logging: false - }) - - try{ - sequelize.authenticate() - debug('sequelize.authenticate succeeded') - } catch(e){ - debug('sequelize.authenticate failed') - debug(e) - throw e - } - - return new Transform({ + const transform = new Transform({ objectMode: true, emitClose: true, construct(callback){ - // @ts-ignore - this.sequelize = sequelize + // @ts-ignore + this.connectionname = connectionname callback() }, write(chunk, _, callback){ @@ -99,12 +75,14 @@ function PostgresDestination(options){ insertStatement = writeInsertStatement(mapping, table, chunk) debug('Insert statement: [%s]', insertStatement) // @ts-ignore - this.sequelize.query(insertStatement) + this.connection.db.query(insertStatement) .then(result => { debug('result %o', result) chunk._result = result lastChunk = chunk // @ts-ignore + this.tasks?.forEach(task => task.write(chunk)) + // @ts-ignore callback(null, chunk) }).catch(error => { debug('error %o', error) @@ -122,6 +100,18 @@ function PostgresDestination(options){ callback() } }) + Object.assign(Transform.prototype, { + setConnection: function (connection){ + this.connection = connection + }.bind(transform), + getConnectionName: function (){ + return this.connection?.name + }.bind(transform), + setTasks: function(tasks){ + this.tasks = tasks + }.bind(transform) + }) + return transform; } module.exports = { diff --git a/src/destinations/sqlFileDestination.js b/src/destinations/sqlFileDestination.js index 90a7298..7e1cde9 100644 --- a/src/destinations/sqlFileDestination.js +++ b/src/destinations/sqlFileDestination.js @@ -41,9 +41,9 @@ function SQLFileDestination(options){ }) } function writeUpdateStatement(chunk){ - + //TODO } - return new Writable({ + const writable = new Writable({ objectMode: true, write(chunk, _, callback){ if(chunk.errors.length === 0) @@ -52,15 +52,31 @@ function SQLFileDestination(options){ if(chunk.errors.length === 0 | includeErrors){ writeInsertStatement(chunk) } - } else if (sqlMode === SQL_MODE.UPDATE_MODE){ - // @ts-ignore - if(chunk.errors.length === 0 | includeErrors){ - writeUpdateStatement(chunk) - } } + // TODO - Disabled because Sonar + // } else if (sqlMode === SQL_MODE.UPDATE_MODE){ + // // @ts-ignore + // if(chunk.errors.length === 0 | includeErrors){ + // writeUpdateStatement(chunk) + // } + // } callback() } }) + + Object.assign(writable, { + setConnection: function (connection){ + this.connection = connection + }.bind(writable), + getConnectionName: function (){ + return this.connection?.name + }.bind(writable), + setTasks: function(tasks){ + this.tasks = tasks + }.bind(writable) + }) + + return writable } module.exports = { diff --git a/src/examples/additional-tasks.js b/src/examples/additional-tasks.js new file mode 100644 index 0000000..7d8a5f2 --- /dev/null +++ b/src/examples/additional-tasks.js @@ -0,0 +1,127 @@ +const { + Etl, + Loaders, + Destinations, + Connections, + Misc, +} = require("ffc-pay-etl-framework") + +const csvFile = `${process.cwd()}/test/fixtures/Crops.csv` + +const columns = [ + "DistCode", + "Year", + "StateCode", + "StateName", + "DistName", + "RICEAREA", + "RICEPRODUCTION", + "RICEYIELD", + "WHEATAREA", + "WHEATPRODUCTION", + "WHEATYIELD", + "KHARIFSORGHUMAREA", + "KHARIFSORGHUMPRODUCTION", + "KHARIFSORGHUMYIELD", + "RABISORGHUMAREA", + "RABISORGHUMPRODUCTION", + "RABISORGHUMYIELD", + "SORGHUMAREA", + "SORGHUMPRODUCTION", + "SORGHUMYIELD", + "PEARLMILLETAREA", + "PEARLMILLETPRODUCTION", + "PEARLMILLETYIELD", + "MAIZEAREA", + "MAIZEPRODUCTION", + "MAIZEYIELD", + "FINGERMILLETAREA", + "FINGERMILLETPRODUCTION", + "FINGERMILLETYIELD", + "BARLEYAREA", + "BARLEYPRODUCTION", + "BARLEYYIELD", + "CHICKPEAAREA", + "CHICKPEAPRODUCTION", + "CHICKPEAYIELD", + "PIGEONPEAAREA", + "PIGEONPEAPRODUCTION", + "PIGEONPEAYIELD", + "MINORPULSESAREA", + "MINORPULSESPRODUCTION", + "MINORPULSESYIELD", + "GROUNDNUTAREA", + "GROUNDNUTPRODUCTION", + "GROUNDNUTYIELD", + "SESAMUMAREA", + "SESAMUMPRODUCTION", + "SESAMUMYIELD", + "RAPESEEDANDMUSTARDAREA", + "RAPESEEDANDMUSTARDPRODUCTION", + "RAPESEEDANDMUSTARDYIELD", + "SAFFLOWERAREA", + "SAFFLOWERPRODUCTION", + "SAFFLOWERYIELD", + "CASTORAREA", + "CASTORPRODUCTION", + "CASTORYIELD", + "LINSEEDAREA", + "LINSEEDPRODUCTION", + "LINSEEDYIELD", + "SUNFLOWERAREA", + "SUNFLOWERPRODUCTION", + "SUNFLOWERYIELD", + "SOYABEANAREA", + "SOYABEANPRODUCTION", + "SOYABEANYIELD", + "OILSEEDSAREA", + "OILSEEDSPRODUCTION", + "OILSEEDSYIELD", + "SUGARCANEAREA", + "SUGARCANEPRODUCTION", + "SUGARCANEYIELD", + "COTTONAREA", + "COTTONPRODUCTION", + "COTTONYIELD", + "FRUITSAREA", + "VEGETABLESAREA", + "FRUITSANDVEGETABLESAREA", + "POTATOESAREA", + "ONIONAREA", + "FODDERAREA", +] + +const etl = new Etl.Etl() + +etl + .connection( + new Connections.PostgresDatabaseConnection({ + name: "MyNewConnection", + username: "postgres", + password: "ppp", + host: "localhost", + }) + ) + .loader(Loaders.CSVLoader({ path: csvFile, columns: columns })) + .destination( + Destinations.PostgresDestination( + { + table: "target_table", + connection: "MyConnection", + mapping: [ + { + column: "Dist Code", + targetColumn: "dist_code", + targetType: "string", + }, + ], + includeErrors: false, + }, + new Misc.PostgresSQLTask({ + connectionname: "MyConnection", + sql: "INSERT INTO TARGET_TABLE (ColumnA, ColumnB, ColumnC) VALUES ('${columns.DistCode}', '${columns.Year}', '${columns.StateCode}');", + }) + ) + ) + .pump(); + diff --git a/src/examples/csv-to-postgres.js b/src/examples/csv-to-postgres.js index 47b9840..2e0b6ad 100644 --- a/src/examples/csv-to-postgres.js +++ b/src/examples/csv-to-postgres.js @@ -1,4 +1,4 @@ -const { Etl, Loaders, Destinations } = require("ffc-pay-etl-framework") +const { Etl, Loaders, Destinations, Connections } = require("ffc-pay-etl-framework") let csvFile = `${process.cwd()}/test/fixtures/SoilType.csv` @@ -14,12 +14,16 @@ const columns = [ const etl = new Etl.Etl() etl -.loader(Loaders.CSVLoader({path: csvFile, columns: columns})) -.destination(Destinations.PostgresDestination({ +.connection(new Connections.PostgresDatabaseConnection({ + name: 'MyConnection', username: "postgres", password: "ppp", + host: "localhost" +})) +.loader(Loaders.CSVLoader({path: csvFile, columns: columns})) +.destination(Destinations.PostgresDestination({ table: "target", - host: "localhost", + connection: "MyConnection", mapping: [ { column: "Dist Code", diff --git a/src/index.js b/src/index.js index 8a142de..955a2c5 100644 --- a/src/index.js +++ b/src/index.js @@ -2,6 +2,7 @@ const Validators = require("./validators") const Transformers = require("./transformers") const Loaders = require("./loaders") const Destinations = require("./destinations") +const Connections = require("./database_connections") const Etl = require("./lib") module.exports = { @@ -10,4 +11,5 @@ module.exports = { Transformers, Loaders, Destinations, + Connections } \ No newline at end of file diff --git a/src/lib/index.js b/src/lib/index.js index 4d385e2..5c78a6e 100644 --- a/src/lib/index.js +++ b/src/lib/index.js @@ -8,6 +8,7 @@ const { compose } = require("node:stream") * @typedef {Object} Etl * @function loader * @function pump + * @function connection * @function validator * @function destination * @function transform @@ -20,6 +21,8 @@ const { compose } = require("node:stream") function Etl(){ EventEmitter.call(this) let self = this + self.beforeETLList = [] + self.connectionList = [] self.validatorList = [] self.transformationList = [] self.destinationList = [] @@ -30,6 +33,9 @@ function Etl(){ } this.pump = () => { + this.beforeETLList.forEach(task => { + task.write({}) + }) this.loader .pump(this.loader) .pipe( @@ -44,12 +50,36 @@ function Etl(){ return self } + this.beforeETL = (pipelineTask) => { + const connectionname = pipelineTask.getConnectionName() + const connection = this.connectionList.filter(c => c.name === connectionname)[0] + if (!connection) { + throw new Error(`Connection with name ${connectionname} not found`) + } + pipelineTask.setConnection(connection) + self.beforeETLList.push(pipelineTask) + return self + } + + this.connection = (connection) => { + self.connectionList.push(connection) + return self + } + this.validator = (validator) => { self.validatorList.push(validator) return self } - this.destination = (destination) => { + this.destination = (destination, ...tasks) => { + const connectionname = destination.getConnectionName() + + const connection = this.connectionList.filter(c => c.name === connectionname)[0] + if (connection) { + destination.setConnection(connection) + } + + tasks && destination.setTasks(tasks) self.destinationList.push(destination) return self } diff --git a/src/misc/README.md b/src/misc/README.md new file mode 100644 index 0000000..6bc6822 --- /dev/null +++ b/src/misc/README.md @@ -0,0 +1,55 @@ +# Miscellanous pipeline items + +## PostgresSQLTask + +Executes an arbitrary sql task either as part of another transformation or as an event + +### Example + +```js +const { + Connections, + Destinations, + Etl, + Loaders, + Misc, + Validators, + Transformers, +} = require("ffc-pay-etl-framework"); + +const etl = new Etl.Etl(); + +etl + .connections( + new Connections.PostgresSQLConnection({ + connectionname: "MyConnection", + database: "MyDB", + username: "MyUserName", + password: "MyPassword", + host: "localhost", + port: 5432, + }) + ) + .beforeETL( + new Misc.PostgresSQLTask({ + connection: "MyConnection", + sql: "INSERT INTO MyTable (column1, column2) VALUES ('apples','oranges');", + }) + ) + .loader(new Loaders.CSVLoader({ path: csvFile, columns: columns })) + .destination( + new Destinations.PostgresDestination({ + connection: "MyConnection", + table: "target", + mapping: [ + { + column: "Dist Code", + targetColumn: "dist_code", + targetType: "string", + }, + ], + includeErrors: false, + }) + ) + .pump(); +``` diff --git a/src/misc/index.js b/src/misc/index.js new file mode 100644 index 0000000..e69de29 diff --git a/src/misc/postgresSQLTask.js b/src/misc/postgresSQLTask.js new file mode 100644 index 0000000..3365c41 --- /dev/null +++ b/src/misc/postgresSQLTask.js @@ -0,0 +1,64 @@ +// @ts-nocheck +const { PassThrough } = require("stream") +const startPosOffset = 3 +const endPosOffset = 1 + +function getPlaceHolders(sql){ + const columnNamePlaceholderRegex = /(?<=\$\{)(.*?)(?=})/g + const matches = Array.from(sql.matchAll(columnNamePlaceholderRegex)) + return matches.map(match => ({ + match: match[0], + value: match[0].split('.')[1], + collection: match[0].split('.')[0], + startPos: match.index - startPosOffset, + endPos: match.index + match[0].length - endPosOffset + })) +} + +function doPlaceHolderValueInterpolations(chunk, sql, placeholders){ + placeholders.forEach(placeholder => { + sql = sql.replace(new RegExp(`\\$\{${placeholder.match}}`), + chunk[chunk[`_${placeholder.collection}`].indexOf(placeholder.value)]) + }) + return sql +} + +function PostgresSQLTask(options){ + const passthrough = new PassThrough({ + readableObjectMode: true, + writableObjectMode: true, + decodeStrings: false, + construct(callback){ + this.sql = options.sql + this.connectionname = options.connectionname + callback() + }, + transform(chunk, _, callback){ + const placeholders = getPlaceHolders(this.sql) + if(placeholders.length === 0){ + this.connection.db.query(this.sql) + } + else { + const interpolatedSql = doPlaceHolderValueInterpolations(chunk, this.sql, placeholders) + this.connection.db.query(interpolatedSql) + } + callback(null, chunk) + } + }) + // Should definately split this out into a mixin + Object.assign(passthrough, { + setConnection: function (connection){ + this.connection = connection + }.bind(passthrough), + getConnectionName: function (){ + return this.connectionname + }.bind(passthrough) + }) + return passthrough +} + +module.exports = { + PostgresSQLTask, + getPlaceHolders, + doPlaceHolderValueInterpolations +} \ No newline at end of file diff --git a/test/database_connections/postgresDatabaseConnection.test.js b/test/database_connections/postgresDatabaseConnection.test.js new file mode 100644 index 0000000..68e10ca --- /dev/null +++ b/test/database_connections/postgresDatabaseConnection.test.js @@ -0,0 +1,87 @@ +const { expect } = require('@jest/globals') +const Connections = require('../../src/database_connections') +const { Sequelize } = require('sequelize') + +const mockAuthenticate = jest.fn().mockResolvedValue(true) + +jest.mock('sequelize', () => ({ + Sequelize: jest.fn().mockImplementation(()=>({ + authenticate: mockAuthenticate, + query: jest.fn().mockResolvedValue([[], 1]), + })) + }) +) + +describe('postgresConnection tests', () => { + beforeEach(() => { + jest.clearAllMocks() + }) + it('should connect', async () => { + const connectionName = 'MyConnection' + const databaseName = 'MyTestDB' + const userName = 'testUser' + const password = 'testPassword' + const host = 'localhost' + const port = 1234 + + const uut = await Connections.PostgresDatabaseConnection({ + connectionname: connectionName, + database: databaseName, + username: userName, + password: password, + host: host, + port: port + }) + + expect(uut.name).toEqual(connectionName) + expect(uut.db).toBeTruthy() + expect(Sequelize).toBeCalledTimes(1) + expect(Sequelize).toBeCalledWith( + databaseName, + userName, + password, + { + "dialect": "postgres", + "host": host, + "logging": false, + "port": port + } + ) + expect(mockAuthenticate).toHaveBeenCalled() + }) + + it('should fail to connect', async () => { + mockAuthenticate.mockImplementation(()=> Promise.reject('error')) + const connectionName = 'MyConnection' + const databaseName = 'MyTestDB' + const userName = 'testUser' + const password = 'testPassword' + const host = 'localhost' + const port = 1234 + + try { + const uut = await Connections.PostgresDatabaseConnection({ + connectionname: connectionName, + database: databaseName, + username: userName, + password: password, + host: host, + port: port + }) + } catch(e) { + expect(Sequelize).toBeCalledTimes(1) + expect(Sequelize).toBeCalledWith( + databaseName, + userName, + password, + { + "dialect": "postgres", + "host": host, + "logging": false, + "port": port + } + ) + expect(mockAuthenticate).rejects.toMatch('error') + } + }) +}) \ No newline at end of file diff --git a/test/destinations/consoleDestination.test.js b/test/destinations/consoleDestination.test.js new file mode 100644 index 0000000..08cccc5 --- /dev/null +++ b/test/destinations/consoleDestination.test.js @@ -0,0 +1,80 @@ +const { expect } = require("@jest/globals") +const { ConsoleDestination } = require("../../src/destinations") +const { Readable } = require("node:stream") + +const spy = jest.spyOn(console,"log") + +describe('consoleDestination tests', () => { + afterEach(() => { + jest.resetAllMocks() + }) + it('should write to the console', (done) => { + const uut = ConsoleDestination({ + includeErrors: false + }) + const testData =["a", "b", "c"] + testData.errors = [] + testData.rowId = 1 + testData._columns = ["column1", "column2", "column3"] + const readable = Readable.from([testData]) + readable + .pipe(uut) + .on("finish", () => { + expect(spy).toHaveBeenCalledWith(testData) + done() + }) + }) + it('should set a connection', () => { + const connnectionName = "MyConnection" + const mockConnection = { + db: { + query: jest.fn() + }, + name: connnectionName + } + const uut = ConsoleDestination({ + includeErrors: false + }) + expect(uut.connection).toBeFalsy() + uut.setConnection(mockConnection) + expect(uut.connection).toBeTruthy() + }) + it('should get the connection name', () => { + const connnectionName = "MyConnection" + const mockConnection = { + db: { + query: jest.fn() + }, + name: connnectionName + } + const uut = ConsoleDestination({ + includeErrors: false + }) + expect(uut.connection).toBeFalsy() + uut.setConnection(mockConnection) + expect(uut.connection).toBeTruthy() + expect(uut.getConnectionName()).toEqual(connnectionName) + }) + it('should execute additional tasks', (done) => { + const mockTasks = [{ + write: jest.fn() + }] + const uut = ConsoleDestination({ + includeErrors: false + }) + uut.setTasks(mockTasks) + expect(uut.tasks.length).toEqual(1) + const testData =["a", "b", "c"] + testData.errors = [] + testData.rowId = 1 + testData._columns = ["column1", "column2", "column3"] + const readable = Readable.from([testData]) + readable + .pipe(uut) + .on("finish", () => { + expect(mockTasks[0].write).toHaveBeenCalled() + expect(mockTasks[0].write).toBeCalledWith(testData) + done() + }) +}) +}) \ No newline at end of file diff --git a/test/destinations/csvFileDestination.test.js b/test/destinations/csvFileDestination.test.js index 31b7cef..453d8f0 100644 --- a/test/destinations/csvFileDestination.test.js +++ b/test/destinations/csvFileDestination.test.js @@ -35,4 +35,63 @@ describe('csvFileDestination tests', () => { done() }) }) + it('should execute additional tasks', (done) => { + const mockTasks = [{ + write: jest.fn() + }] + const uut = CSVFileDestination({ + fileName: "csv.sql", + headers: true, + includeErrors: false + }) + uut.setTasks(mockTasks) + expect(uut.tasks.length).toEqual(1) + const testData =["a", "b", "c"] + testData.errors = [] + testData.rowId = 1 + testData._columns = ["column1", "column2", "column3"] + const readable = Readable.from([testData]) + readable + .pipe(uut) + .on("finish", () => { + expect(mockTasks[0].write).toHaveBeenCalled() + expect(mockTasks[0].write).toBeCalledWith(testData) + done() + }) + }) + it('should set a connection', () => { + const connnectionName = "MyConnection" + const mockConnection = { + db: { + query: jest.fn() + }, + name: connnectionName + } + const uut = CSVFileDestination({ + fileName: "csv.sql", + headers: true, + includeErrors: false + }) + expect(uut.connection).toBeFalsy() + uut.setConnection(mockConnection) + expect(uut.connection).toBeTruthy() + }) + it('should get the connection name', () => { + const connnectionName = "MyConnection" + const mockConnection = { + db: { + query: jest.fn() + }, + name: connnectionName + } + const uut = CSVFileDestination({ + fileName: "csv.sql", + headers: true, + includeErrors: false + }) + expect(uut.connection).toBeFalsy() + uut.setConnection(mockConnection) + expect(uut.connection).toBeTruthy() + expect(uut.getConnectionName()).toEqual(connnectionName) + }) }) \ No newline at end of file diff --git a/test/destinations/postgresDestination.test.js b/test/destinations/postgresDestination.test.js index 7cd93bb..855c3c1 100644 --- a/test/destinations/postgresDestination.test.js +++ b/test/destinations/postgresDestination.test.js @@ -1,21 +1,9 @@ const { expect } = require('@jest/globals') const { - PostgresDestination, writeInsertStatement, isKeyWord, - getMappingForColumn + PostgresDestination, writeInsertStatement } = require('../../src/destinations/postgresDestination') const { Readable } = require('node:stream') -const { Sequelize } = require('sequelize') - -jest.mock('sequelize', () => { - const mockQuery = jest.fn().mockResolvedValue([[], 1]) - return { - Sequelize: jest.fn(() =>({ - authenticate: jest.fn(), - query: mockQuery - }) - ) - } -}) +const { Sequelize } = require('sequelize').Sequelize jest.mock('fs', () => ({ writeFileSync: jest.fn(), @@ -50,12 +38,21 @@ const config = { ] } +const mockConnection = { + name: 'Mock Connection', + db: { + query: jest.fn().mockResolvedValue([[],1]) + } +} + describe('postgresDestination tests', () => { beforeEach(() => { jest.clearAllMocks() }) it('should write a row', (done) => { + // This test was working fine until today const uut = new PostgresDestination(config) + uut.setConnection(mockConnection) const testData =["a", "b", "c"] testData.errors = [] testData.rowId = 1 @@ -63,13 +60,15 @@ describe('postgresDestination tests', () => { const readable = Readable.from([testData]) readable .on('close', (result) => { - expect(Sequelize().query).toHaveBeenLastCalledWith("INSERT INTO target (target_column1,target_column2,target_column3) VALUES ('a','b','c')") + expect(mockConnection.db.query).toHaveBeenLastCalledWith("INSERT INTO target (target_column1,target_column2,target_column3) VALUES ('a','b','c')") done() }) .pipe(uut) }) it('should fire result event', (done) => { + // This test was working fine until today const uut = new PostgresDestination(config) + uut.setConnection(mockConnection) const testData =["a", "b", "c"] testData.errors = [] testData.rowId = 1 @@ -88,7 +87,9 @@ describe('postgresDestination tests', () => { })) }) it('should produce debug output', (done) => { + // This test was working fine until today const uut = new PostgresDestination(config) + uut.setConnection(mockConnection) const testData =["a", "b", "c"] testData.errors = [] testData.rowId = 1 @@ -96,56 +97,18 @@ describe('postgresDestination tests', () => { const readable = Readable.from([testData]) readable .on('close', () => { - expect(logSpy).toBeCalledTimes(3) + expect(logSpy).toBeCalledTimes(2) done() }) .pipe(uut) }) - it('should connect to different port', () => { - new PostgresDestination({ - username: "postgres", - password : "abc", - database: "etl_db", - host: "postgres", - port: 5433, - table: "target", - includeErrors: false, - mapping: [ - { - column: "column1", - targetColumn: "target_column1", - targetType: "varchar" - }, - { - column: "column2", - targetColumn: "target_column2", - targetType: "varchar" - }, - { - column: "column3", - targetColumn: "target_column3", - targetType: "varchar" - }, - ]}) - expect(Sequelize).toBeCalledTimes(1) - expect(Sequelize).toBeCalledWith( - "etl_db", - "postgres", - "abc", - { - "dialect": "postgres", - "host": "postgres", - "logging": false, - "port": 5433 - } - ) - } - ) it('should format a date as specified', (done) => { + // This test was working fine until today const newConfig = JSON.parse(JSON.stringify(config)) newConfig.mapping[1].targetType = "date" newConfig.mapping[1].format = "DD-MM-YYYY HH24:MI:SS" const uut = new PostgresDestination(newConfig) + uut.setConnection(mockConnection) const testData =["a", "19-06-2024 00:00", "c"] testData.errors = [] testData.rowId = 1 @@ -153,7 +116,7 @@ describe('postgresDestination tests', () => { const readable = Readable.from([testData]) readable .on('close', (result) => { - expect(Sequelize().query).toHaveBeenLastCalledWith("INSERT INTO target (target_column1,target_column2,target_column3) VALUES ('a',to_date('19-06-2024 00:00','DD-MM-YYYY HH24:MI:SS'),'c')") + expect(mockConnection.db.query).toHaveBeenLastCalledWith("INSERT INTO target (target_column1,target_column2,target_column3) VALUES ('a',to_date('19-06-2024 00:00','DD-MM-YYYY HH24:MI:SS'),'c')") done() }) .pipe(uut) @@ -168,7 +131,7 @@ describe('postgresDestination tests', () => { expect(result).toEqual("INSERT INTO MockTable (target_column1,target_column2,target_column3) VALUES ('a','19-06-2024 00:00','c')") }) it('should write a sql statement with a date format', () => { - const newMapping = [...config.mapping] + const newMapping = JSON.parse(JSON.stringify(config.mapping)) newMapping[1].targetType = "date" newMapping[1].format = "DD-MM-YYYY HH24:MI:SS" const mockTable = "MockTable" @@ -207,7 +170,7 @@ describe('postgresDestination tests', () => { expect(result).toEqual("INSERT INTO MockTable (target_column1,\"User\",target_column3) VALUES ('a',to_date('19-06-2024 00:00','DD-MM-YYYY HH24:MI:SS'),'c')") }) it('should write a sql statement when a target column type is a number', () => { - const newMapping = [...config.mapping] + const newMapping = JSON.parse(JSON.stringify(config.mapping)) newMapping[1].targetType = "number" newMapping[1].format = "DD-MM-YYYY HH24:MI:SS" const mockTable = "MockTable" @@ -219,7 +182,7 @@ describe('postgresDestination tests', () => { expect(result).toEqual("INSERT INTO MockTable (target_column1,target_column2,target_column3) VALUES ('a',999,'c')") }) it('should write a sql statement when a target column type is a number but the value is NaN', () => { - const newMapping = [...config.mapping] + const newMapping = JSON.parse(JSON.stringify(config.mapping)) newMapping[1].targetType = "number" newMapping[1].format = "DD-MM-YYYY HH24:MI:SS" const mockTable = "MockTable" @@ -230,4 +193,32 @@ describe('postgresDestination tests', () => { const result = writeInsertStatement(newMapping, mockTable, mockChunk) expect(result).toEqual("INSERT INTO MockTable (target_column1,target_column2,target_column3) VALUES ('a',0,'c')") }) + it('should fire off any addtional tasks', (done) => { + const mockTasks = [{ + write: jest.fn() + }] + const uut = new PostgresDestination(config) + uut.setConnection(mockConnection) + uut.setTasks(mockTasks) + expect(uut.tasks.length).toEqual(1) + const testData =["a", "b", "c"] + + testData.errors = [] + testData.rowId = 1 + testData._columns = ["column1", "column2", "column3"] + const readable = Readable.from([testData]) + readable + .on('close', (result) => { + expect(mockConnection.db.query).toHaveBeenLastCalledWith("INSERT INTO target (target_column1,target_column2,target_column3) VALUES ('a','b','c')") + expect(mockTasks[0].write).toHaveBeenCalled() + expect(mockTasks[0].write).toBeCalledWith(testData) + done() + }) + .pipe(uut) + }) + it('should get connection name', () => { + const uut = new PostgresDestination(config) + uut.setConnection(mockConnection) + expect(uut.getConnectionName()).toEqual(mockConnection.name) + }) }) \ No newline at end of file diff --git a/test/destinations/sqlFileDestination.test.js b/test/destinations/sqlFileDestination.test.js index fef4662..eaae79f 100644 --- a/test/destinations/sqlFileDestination.test.js +++ b/test/destinations/sqlFileDestination.test.js @@ -52,4 +52,99 @@ describe('sqlFileDestination tests', () => { done() }) }) + it('should set the connection', () => { + const connectionName = "MyConnection" + const mockConnection = { + db: { + query: jest.fn() + }, + name: connectionName + } + const uut = SQLFileDestination({ + fileName: "statements.sql", + mode: SQL_MODE.INSERT_MODE, + table: "target_table", + mapping: [ + { + column: "column1", + targetColumn: "target_column1", + targetType: "string" + }, + { + column: "column2", + targetColumn: "target_column2", + targetType: "string" + }, + { + column: "column3", + targetColumn: "target_column3", + targetType: "string" + }, + ] + }) + uut.setConnection(mockConnection) + expect(uut.connection.name).toEqual(connectionName) + }) + it('should get the connection name', () => { + const connectionName = "MyConnection" + const mockConnection = { + db: { + query: jest.fn() + }, + name: connectionName + } + const uut = SQLFileDestination({ + fileName: "statements.sql", + mode: SQL_MODE.INSERT_MODE, + table: "target_table", + mapping: [ + { + column: "column1", + targetColumn: "target_column1", + targetType: "string" + }, + { + column: "column2", + targetColumn: "target_column2", + targetType: "string" + }, + { + column: "column3", + targetColumn: "target_column3", + targetType: "string" + }, + ] + }) + uut.setConnection(mockConnection) + expect(uut.getConnectionName()).toEqual(connectionName) + }) + it('should set tasks', () => { + const mockTasks = [{ + write: jest.fn() + }] + const uut = SQLFileDestination({ + fileName: "statements.sql", + mode: SQL_MODE.INSERT_MODE, + table: "target_table", + mapping: [ + { + column: "column1", + targetColumn: "target_column1", + targetType: "string" + }, + { + column: "column2", + targetColumn: "target_column2", + targetType: "string" + }, + { + column: "column3", + targetColumn: "target_column3", + targetType: "string" + }, + ] + }) + uut.setTasks(mockTasks) + expect(uut.tasks.length).toEqual(1) + }) }) \ No newline at end of file diff --git a/test/etl/etl.test.js b/test/etl/etl.test.js index c82552b..64f5c06 100644 --- a/test/etl/etl.test.js +++ b/test/etl/etl.test.js @@ -64,4 +64,49 @@ describe('csvFileDestination tests', () => { done() }) }) + it('should add a connection to the connections list', () => { + const etl = new Etl.Etl() + const connectionName = 'MockConnection' + const mockConnection = { + db: { + query: jest.fn() + }, + name: connectionName + } + etl.connection(mockConnection) + expect(etl.connectionList.length).toEqual(1) + expect(etl.connectionList[0].name).toEqual(connectionName) + }) + it('should execute beforeETL task', () => { + const etl = new Etl.Etl() + const connectionName = 'MockConnection' + const mockConnection = { + db: { + query: jest.fn() + }, + name: connectionName + } + const mockTask = { + setConnection: jest.fn(), + getConnectionName: jest.fn().mockReturnValue(connectionName) + } + etl.connection(mockConnection) + etl.beforeETL(mockTask) + expect(mockTask.setConnection).toHaveBeenCalled() + expect(mockTask.getConnectionName).toHaveBeenCalled() + }) + it('should throw if no connection found for beforeETL task', () => { + const etl = new Etl.Etl() + const connectionName = 'MockConnection' + const mockTask = { + setConnection: jest.fn(), + getConnectionName: jest.fn().mockReturnValue(connectionName) + } + try{ + etl.beforeETL(mockTask) + }catch(e){ + expect(e.message).toEqual('Connection with name MockConnection not found') + } + + }) }) \ No newline at end of file diff --git a/test/misc/postgresSQLTask.test.js b/test/misc/postgresSQLTask.test.js new file mode 100644 index 0000000..e859a72 --- /dev/null +++ b/test/misc/postgresSQLTask.test.js @@ -0,0 +1,62 @@ +const { PostgresSQLTask, getPlaceHolders, doPlaceHolderValueInterpolations } = require('../../src/misc/postgresSQLTask.js') +const { expect } = require("@jest/globals") + +const mockConnection = { + db: { + query: jest.fn() + } +} + +describe('PostgresSQLTask tests', () => { + it('should execute plain sql', (done) => { + const sql = "INSERT INTO Foo(bar,baz) VALUES (1,2);" + const uut = new PostgresSQLTask({ sql:sql }) + uut.setConnection(mockConnection) + uut + .on('data', () => { + expect(mockConnection.db.query).toHaveBeenCalledWith(sql) + done() + }) + .write([1,2,3,4,5]) + }) + it('should get column name placeholders and character positions', () => { + const sql = "INSERT INTO Foo(bar,baz) VALUES (${columns.column1},${columns.column2});" + const uut = getPlaceHolders + const matches = uut(sql) + expect(matches[0].value).toEqual('column1') + expect(matches[0].collection).toEqual('columns') + expect(matches[0].startPos).toEqual(32) + expect(matches[0].endPos).toEqual(49) + expect(matches[1].value).toEqual('column2') + expect(matches[1].collection).toEqual('columns') + expect(matches[1].startPos).toEqual(51) + expect(matches[1].endPos).toEqual(68) + console.log(matches) + }) + it('should interpolate column values', () => { + const mockPlaceHolders = [ + { match: 'columns.column1', value: 'column1', collection: 'columns', startPos: 35, endPos: 50 }, + { match: 'columns.column2', value: 'column2', collection: 'columns', startPos: 54, endPos: 69 } + ] + const testData = ["apples","oranges","bananas","cumcwats","dragon fruit"] + testData._columns = ["column1", "column2", "column3"] + const sql = "INSERT INTO Foo(bar,baz) VALUES (${columns.column1},${columns.column2});" + const interpolatedSql = doPlaceHolderValueInterpolations(testData, sql, mockPlaceHolders) + expect(interpolatedSql).toEqual("INSERT INTO Foo(bar,baz) VALUES (apples,oranges);") + }) + it('should interpolate column values into sql', (done) => { + const sql = "INSERT INTO Foo(bar,baz) VALUES (${columns.column1},${columns.column2});" + const interpolatedSql = "INSERT INTO Foo(bar,baz) VALUES (apples,oranges);" + const uut = new PostgresSQLTask({ sql:sql }) + const testData = ["apples","oranges","bananas","cumcwats","dragon fruit"] + testData._columns = ["column1", "column2", "column3"] + + uut.setConnection(mockConnection) + uut + .on('data', () => { + expect(mockConnection.db.query).toHaveBeenCalledWith(interpolatedSql) + done() + }) + .write(testData) + }) +})