Skip to content

Commit

Permalink
Merge pull request #22 from DEFRA/fix/csv-file-loader-line-count
Browse files Browse the repository at this point in the history
Added line count to csv loader and surfaced results from destination
  • Loading branch information
suityou01 authored Sep 6, 2024
2 parents cf43003 + a506852 commit 457f7da
Show file tree
Hide file tree
Showing 10 changed files with 155 additions and 31 deletions.
72 changes: 51 additions & 21 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
## 🚀 Features

<!-- ALL-CONTRIBUTORS-BADGE:START - Do not remove or modify this section -->

[![All Contributors](https://img.shields.io/badge/all_contributors-2-orange.svg?style=flat-square)](#contributors-)

<!-- ALL-CONTRIBUTORS-BADGE:END -->

- 💾 CSV Source and Destination
- 👮 Validators
- 🤖 Transformers
- 🚨 Error Checking
- 🐘 Write directly to Postgres
- 👨‍⚕️ Intellisense
- 👨‍⚕️ Intellisense

## 📦 Install

```bash
npm install --save-dev ffc-pay-etl-framework
```
Expand All @@ -18,30 +23,56 @@ npm install --save-dev ffc-pay-etl-framework

```js
// ESM
import { Etl, Loaders, Validators, Transformers, Destinations } from "ffc-pay-etl-framework"
import {
Etl,
Loaders,
Validators,
Transformers,
Destinations,
} from "ffc-pay-etl-framework";

// CJS
const { Etl, Loaders, Validators, Transformers, Destinations } = require("ffc-pay-etl-framework")
const {
Etl,
Loaders,
Validators,
Transformers,
Destinations,
} = require("ffc-pay-etl-framework");

let csvFile = `${process.cwd()}/test/fixtures/SoilType.csv`
let csvFile = `${process.cwd()}/test/fixtures/SoilType.csv`;
const spinner = new pkg.Spinner().start("Running ETL Pipeline");

const etl = new Etl.Etl()
const etl = new Etl.Etl();

etl
.loader(Loaders.CSVLoader({path: csvFile, columns: columns}))
.transform(Transformers.FakerTransformer({
columns: [{
name: "Dist Name",
faker: "location.city"
}]
}))
.destination(Destinations.CSVFileDestination({
fileName: "SoilType_Output.csv",
headers: true,
includeErrors: false,
quotationMarks: true
}))
.pump()
.loader(Loaders.CSVLoader({ path: csvFile, columns: columns }))
.transform(
Transformers.FakerTransformer({
columns: [
{
name: "Dist Name",
faker: "location.city",
},
],
})
)
.destination(
Destinations.CSVFileDestination({
fileName: "SoilType_Output.csv",
headers: true,
includeErrors: false,
quotationMarks: true,
})
)
.pump()
.on("finish", () => {
//Update spinner
spinner.succeed("ETL Pipeline - succeeded");
})
.on("result", (data) => {
console.log(data); // emits the last row with error information
});
```

## 📢 Shout outs
Expand All @@ -56,7 +87,6 @@ etl

Please make sure to read the [Contributing Guide](https://github.com/DEFRA/ffc-pay-etl-framework/blob/next/CONTRIBUTING.md) before making a pull request.


## Contributors ✨

Thanks goes to these wonderful people ([emoji key](https://allcontributors.org/docs/en/emoji-key)):
Expand Down Expand Up @@ -87,4 +117,4 @@ Thanks goes to these wonderful people ([emoji key](https://allcontributors.org/d

<!-- ALL-CONTRIBUTORS-LIST:END -->

This project follows the [all-contributors](https://github.com/all-contributors/all-contributors) specification. Contributions of any kind welcome!
This project follows the [all-contributors](https://github.com/all-contributors/all-contributors) specification. Contributions of any kind welcome!
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "ffc-pay-etl-framework",
"version": "0.0.10",
"version": "0.0.11",
"publisher": "Defra",
"main": "dist/cjs/index.js",
"private": false,
Expand Down
12 changes: 11 additions & 1 deletion src/destinations/csvFileDestination.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
const EventEmitter = require('node:events')
const util = require('node:util')
const { Writable } = require("node:stream")
const fs = require("fs")

Expand All @@ -11,7 +13,8 @@ const fs = require("fs")
* @returns Writable
*/
function CSVFileDestination(options){

EventEmitter.call(this)
let lastChunk
const fileName = options.fileName
const headers = options.headers
const includeErrors = options.includeErrors
Expand Down Expand Up @@ -39,11 +42,18 @@ function CSVFileDestination(options){
fs.writeFileSync(fileHandle, `${chunk.map(c => `"${c}"`).join(",")}\n`)
}
}
lastChunk = chunk
callback()
},
final(callback){
this.emit('result', lastChunk)
callback()
}
})
}

util.inherits(CSVFileDestination, EventEmitter)

module.exports = {
CSVFileDestination
}
11 changes: 10 additions & 1 deletion src/destinations/postgresDestination.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
const EventEmitter = require('node:events')
const util = require('node:util')
const { Transform } = require('node:stream')
const { Sequelize } = require('sequelize')
const debug = require('debug')('destination')
Expand All @@ -16,7 +18,7 @@ const DEFAULT_PORT = 5432
* @returns Transform
*/
function PostgresDestination(options){

EventEmitter.call(this)
const table = options.table
const username = options.username
const password = options.password
Expand All @@ -25,6 +27,7 @@ function PostgresDestination(options){
const port = options.port || DEFAULT_PORT
const mapping = options.mapping
const includeErrors = options.includeErrors
let lastChunk

const sequelize = new Sequelize(database, username, password, {
host: host,
Expand Down Expand Up @@ -93,17 +96,23 @@ function PostgresDestination(options){
.then(result => {
debug('result %o', result)
chunk._result = result
lastChunk = chunk
// @ts-ignore
callback(null, chunk)
}).catch(error => {
debug('error %o', error)
chunk.errors.push(error)
lastChunk = chunk
// @ts-ignore
callback(null, chunk)
})
} else {
debug('Chunk has errors %o', chunk)
}
},
final(callback){
this.emit('result', lastChunk)
callback()
}
})
}
Expand Down
2 changes: 1 addition & 1 deletion src/lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ function Etl(){
RowMetaData(),
...self.validatorList,
...self.transformationList,
...self.destinationList
...self.destinationList.map(dl => dl.on('result', (data) => self.emit('result', data)))
)
// @ts-ignore
).on('finish', (data) => self.emit('finish', data))
Expand Down
5 changes: 4 additions & 1 deletion src/loaders/csvloader.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
const fs = require("fs")
const { Transform } = require("stream")
const { parse } = require("csv-parse")
const { stdout } = require("process")
const { stdout, stderr } = require("process")

/**
*
Expand All @@ -12,6 +12,7 @@ const { stdout } = require("process")
*/
function CSVLoader(options){
let csvLoader = fs.createReadStream(options.path)
let lineCount = 1
csvLoader._columns = options.columns
csvLoader.pump = (csvLoader) => {
return csvLoader
Expand All @@ -22,6 +23,8 @@ function CSVLoader(options){
emitClose: true,
transform(chunk, _, callback){
chunk["_columns"] = options.columns
chunk["_linecount"] = lineCount
lineCount +=1
callback(null, chunk)
}
})
Expand Down
23 changes: 21 additions & 2 deletions test/destinations/postgresDestination.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ describe('postgresDestination tests', () => {
jest.clearAllMocks()
})
it('should write a row', (done) => {
const uut = PostgresDestination(config)
const uut = new PostgresDestination(config)
const testData =["a", "b", "c"]
testData.errors = []
testData.rowId = 1
Expand All @@ -55,8 +55,27 @@ describe('postgresDestination tests', () => {
})
.pipe(uut)
})
it('should fire result event', (done) => {
const uut = new PostgresDestination(config)
const testData =["a", "b", "c"]
testData.errors = []
testData.rowId = 1
testData._columns = ["column1", "column2", "column3"]
const readable = Readable.from([testData])
readable
.pipe(uut.on('result', (data) => {
expect(data[0]).toEqual('a')
expect(data[1]).toEqual('b')
expect(data[2]).toEqual('c')
expect(data['errors']).toEqual([])
expect(data['rowId']).toEqual(1)
expect(data['_columns']).toEqual([ 'column1', 'column2', 'column3' ])
expect(data['_result']).toEqual([ [], 1 ])
done()
}))
})
it('should produce debug output', (done) => {
const uut = PostgresDestination(config)
const uut = new PostgresDestination(config)
const testData =["a", "b", "c"]
testData.errors = []
testData.rowId = 1
Expand Down
35 changes: 33 additions & 2 deletions test/etl/etl.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ describe('csvFileDestination tests', () => {
const etl = new Etl.Etl()

etl
.loader(Loaders.CSVLoader({path: testPath, columns: ["column1","column2","column3"]}))
.destination(Destinations.CSVFileDestination({
.loader(new Loaders.CSVLoader({path: testPath, columns: ["column1","column2","column3"]}))
.destination(new Destinations.CSVFileDestination({
fileName: "SoilType_Output.csv",
headers: true,
includeErrors: false,
Expand All @@ -33,4 +33,35 @@ describe('csvFileDestination tests', () => {
done()
})
})
it('should fire result event', (done) => {
const testData = [
"column1, column2, column3\n",
"1,2,3\n",
"4,5,6\n"
]

const testPath = "someRandomPath"
fs.__setMockFileContent(testPath, testData)
const etl = new Etl.Etl()

etl
.loader(new Loaders.CSVLoader({path: testPath, columns: ["column1","column2","column3"]}))
.destination(new Destinations.CSVFileDestination({
fileName: "SoilType_Output.csv",
headers: true,
includeErrors: false,
quotationMarks: true
}))
.pump()
.on('result', (data) => {
expect(data[0]).toEqual("4")
expect(data[1]).toEqual("5")
expect(data[2]).toEqual("6")
expect(data['_columns']).toEqual([ 'column1', 'column2', 'column3' ])
expect(data['_linecount']).toEqual(2)
expect(data['_rowId']).toEqual(1)
expect(data['errors']).toEqual([])
done()
})
})
})
23 changes: 23 additions & 0 deletions test/loaders/csvLoader.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,27 @@ describe('csvLoader tests', () => {
}
}))
})
it('should count csv file lines', (done) => {
const testData = [
"column1, column2, column3\n",
"1,2,3\n",
"4,5,6\n"
]
let lineCount = 1
const testPath = "someRandomPath"
fs.__setMockFileContent(testPath, testData)
const uut = CSVLoader({ path: testPath, columns: ["a","b","c"]})
uut
.pump(uut)
.pipe(new PassThrough({
objectMode: true,
transform(chunk, _, callback){
expect(chunk._linecount).toEqual(lineCount)
if(lineCount === testData.length - 1) //Ignore header row
done()
lineCount +=1
callback(null, chunk)
}
}))
})
})
1 change: 0 additions & 1 deletion test/transformers/fakerTransformer.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ describe('fakerTransformer tests', () => {
.pipe(new PassThrough({
objectMode: true,
transform(chunk,_,callback){
console.log(chunk)
expect(chunk.errors.length).toEqual(0)
expect(chunk[1]).not.toEqual("b")
const regex = /^([A-Za-z]{2}[\d]{1,2}[A-Za-z]?)[\s]+([\d][A-Za-z]{2})$/
Expand Down

0 comments on commit 457f7da

Please sign in to comment.