From 13b1aafe8d7346db25346cf603ad30220f724f78 Mon Sep 17 00:00:00 2001 From: Nick Redmark Date: Tue, 30 Jan 2018 15:11:44 +0100 Subject: [PATCH 1/3] feat: allow to start with observable (#44) --- src/Etl.ts | 8 ++++---- test/Etl.spec.ts | 14 ++++++++++++++ 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/src/Etl.ts b/src/Etl.ts index 983c737..47b9637 100644 --- a/src/Etl.ts +++ b/src/Etl.ts @@ -89,14 +89,14 @@ export class Etl { * during the "next" process step you get update on how many are processed yet. * Throws when any step produces an error. */ - public start(): Observable { + public start(observable: Observable = Observable.empty()): Observable { this._state = EtlState.Running; - const observable = Observable - .merge(...this._extractors.map(extractor => extractor.read(this._context))); + const o: Observable = Observable + .merge(observable, ...this._extractors.map(extractor => extractor.read(this._context))); return this._generalTransformers - .reduce((observable, transformer) => transformer.process(observable, this._context), observable) + .reduce((observable, transformer) => transformer.process(observable, this._context), o) .flatMap(object => Observable.merge(...this._loaders.map(loader => loader.write(object, this._context)))) .do( () => { }, diff --git a/test/Etl.spec.ts b/test/Etl.spec.ts index d952e7a..aaccc7e 100644 --- a/test/Etl.spec.ts +++ b/test/Etl.spec.ts @@ -242,4 +242,18 @@ describe('Etl', () => { }); }); + it('should pipe inital observable', done => { + const context = 1; + etl = new Etl(context); + etl + .addTransformer(dummyTransformer) + .addLoader(dummyLoader) + .start(Observable.of('hi')) + .subscribe(null, null, () => { + expect((dummyTransformer.process as any).mock.calls[0]).toContain('hi'); + expect((dummyLoader.write as any).mock.calls[0]).toContain('hi'); + done(); + }); + }); + }); From 984fd45ce04e337144efacb3977343adca0fc2cc Mon Sep 17 00:00:00 2001 From: "greenkeeper[bot]" Date: Thu, 1 Feb 2018 15:24:14 +0100 Subject: [PATCH 2/3] chore(package): update typedoc to version 0.10.0 (#46) --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index f9d5dd3..cf1d5c7 100644 --- a/package.json +++ b/package.json @@ -38,7 +38,7 @@ "ts-jest": "^22.0.1", "tslint": "^5.9.1", "tsutils": "^2.18.0", - "typedoc": "^0.9.0", + "typedoc": "^0.10.0", "typescript": "^2.6.2" }, "dependencies": { From c588708491461f8e728893039252fc70ede7be07 Mon Sep 17 00:00:00 2001 From: Thilo Haas Date: Tue, 20 Dec 2022 10:38:38 +0100 Subject: [PATCH 3/3] feat: Support for rxjs v7 (#59) BREAKING CHANGE: Removed support for rxjs v5, minimum version is rxjs v7. Removed support for Node.js v6, minimum version is v16 --- .appveyor.yml | 19 - .eslintrc.json | 3 + .github/workflows/release.yml | 25 ++ .github/workflows/test.yml | 23 + .prettierrc.json | 1 + .travis.yml | 41 -- README.md | 26 +- jest.json | 25 +- package.json | 37 +- src/Etl.ts | 206 ++++----- src/extractors/JsonExtractor.ts | 30 +- src/interfaces/Extractor.ts | 4 +- src/interfaces/GeneralTransformer.ts | 4 +- src/interfaces/Loader.ts | 2 +- src/interfaces/Transformer.ts | 2 +- src/loaders/ConsoleLoader.ts | 10 +- src/transformers/MapTransformer.ts | 10 +- src/transformers/MatchMergeTransformer.ts | 47 +- test/Etl.spec.ts | 506 ++++++++++++---------- test/JsonExtractor.spec.ts | 85 ++-- test/MapTransformer.spec.ts | 31 +- test/MatchMergeTransformer.spec.ts | 44 +- tslint.json | 12 - 23 files changed, 606 insertions(+), 587 deletions(-) delete mode 100644 .appveyor.yml create mode 100644 .eslintrc.json create mode 100644 .github/workflows/release.yml create mode 100644 .github/workflows/test.yml create mode 100644 .prettierrc.json delete mode 100644 .travis.yml delete mode 100644 tslint.json diff --git a/.appveyor.yml b/.appveyor.yml deleted file mode 100644 index c4a2f0f..0000000 --- a/.appveyor.yml +++ /dev/null @@ -1,19 +0,0 @@ -version: "{build} - {branch}" -skip_tags: true -skip_branch_with_pr: true - -environment: - matrix: - - nodejs_version: "9" - - nodejs_version: "8" - - nodejs_version: "7" - - nodejs_version: "6" - -install: - - ps: Install-Product node $env:nodejs_version - - npm install - -test_script: - - npm test - -build: off diff --git a/.eslintrc.json b/.eslintrc.json new file mode 100644 index 0000000..cbd4d25 --- /dev/null +++ b/.eslintrc.json @@ -0,0 +1,3 @@ +{ + "extends": ["@smartive/eslint-config"] +} diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml new file mode 100644 index 0000000..bbf0ffb --- /dev/null +++ b/.github/workflows/release.yml @@ -0,0 +1,25 @@ +name: Release +on: + push: + branches: + - master +jobs: + release: + name: Release + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v2 + with: + fetch-depth: 0 + - name: Setup Node.js + uses: actions/setup-node@v2 + with: + node-version: "lts/*" + - name: Install dependencies + run: npm ci + - name: Release + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + NPM_TOKEN: ${{ secrets.NPM_TOKEN }} + run: npx semantic-release diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml new file mode 100644 index 0000000..3887cf7 --- /dev/null +++ b/.github/workflows/test.yml @@ -0,0 +1,23 @@ +name: Unit Tests +on: + push: + branches: + - master + pull_request: + branches: [master, develop] +jobs: + test: + runs-on: ubuntu-latest + strategy: + matrix: + node-version: [16.x, 18.x] + steps: + - uses: actions/checkout@v3 + - name: Use Node.js ${{ matrix.node-version }} + uses: actions/setup-node@v3 + with: + node-version: ${{ matrix.node-version }} + - name: Run Tests + run: | + npm install + npm test diff --git a/.prettierrc.json b/.prettierrc.json new file mode 100644 index 0000000..65c08dc --- /dev/null +++ b/.prettierrc.json @@ -0,0 +1 @@ +"@smartive/prettier-config" diff --git a/.travis.yml b/.travis.yml deleted file mode 100644 index f831765..0000000 --- a/.travis.yml +++ /dev/null @@ -1,41 +0,0 @@ -language: node_js - -stages: - - name: test - if: tag IS blank - - name: deploy - if: branch = master AND type != pull_request - -notifications: - email: false - -jobs: - include: - - stage: test - node_js: '9' - after_success: - - npm install coveralls@^2.11.9 && cat ./coverage/lcov.info | coveralls - - stage: test - node_js: '8' - after_success: - - npm install coveralls@^2.11.9 && cat ./coverage/lcov.info | coveralls - - stage: test - node_js: '7' - after_success: - - npm install coveralls@^2.11.9 && cat ./coverage/lcov.info | coveralls - - stage: test - node_js: '6' - after_success: - - npm install coveralls@^2.11.9 && cat ./coverage/lcov.info | coveralls - - stage: deploy - node_js: '9' - script: npm run typedoc - deploy: - provider: pages - skip_cleanup: true - github_token: $GH_TOKEN - local_dir: ./docs - - stage: deploy - node_js: '9' - before_script: npm run build - script: npm run semantic-release diff --git a/README.md b/README.md index 0c0b898..0f428d6 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ Basically instantiate the `Etl` class and add extractors (which pull data from a A basic, hypothetic example could be: "Load data from a JSON array, snake_case all properties and store those objects into a mongoDB." -The package is written in `typescript` but can be used in plain javascript as well. +The package is written in `typescript` but can be used in plain javascript as well ##### A bunch of badges @@ -21,14 +21,14 @@ The package is written in `typescript` but can be used in plain javascript as we ## Usage ```typescript -import {Etl} from 'proc-that'; +import { Etl } from "proc-that"; new Etl() - .addExtractor(/* class that implements Extractor */) - .addTransformer(/* class that implements Transformer */) - .addLoader(/* class that implements Loader */) - .start() - .subscribe(progress, error, success); + .addExtractor(/* class that implements Extractor */) + .addTransformer(/* class that implements Transformer */) + .addLoader(/* class that implements Loader */) + .start() + .subscribe(progress, error, success); ``` After all objects are extracted, transformed and loaded, the `.start()` observable completes and the process is finished. @@ -37,15 +37,15 @@ Below is a list if extractors and loaders that are already implemented. Feel fre ## Extractors - Name | Description | Link ---------------------------------|--------------------------------------------------|------------------------------------------------------- - `proc-that-rest-extractor` | Extract objects from GET requests | https://github.com/smartive/proc-that-rest-extractor +| Name | Description | Link | +| -------------------------- | --------------------------------- | ---------------------------------------------------- | +| `proc-that-rest-extractor` | Extract objects from GET requests | https://github.com/smartive/proc-that-rest-extractor | ## Loaders - Name | Description | Link ---------------------------------|--------------------------------------------------|------------------------------------------------------- - `proc-that-elastic-loader` | Load transformed objects into elasticsearch | https://github.com/smartive/proc-that-elastic-loader +| Name | Description | Link | +| -------------------------- | ------------------------------------------- | ---------------------------------------------------- | +| `proc-that-elastic-loader` | Load transformed objects into elasticsearch | https://github.com/smartive/proc-that-elastic-loader | ## Implement your own diff --git a/jest.json b/jest.json index 8e751c3..a33ba5c 100644 --- a/jest.json +++ b/jest.json @@ -1,19 +1,10 @@ { - "collectCoverage": true, - "mapCoverage": true, - "transform": { - "^.+\\.tsx?$": "/node_modules/ts-jest/preprocessor.js" - }, - "testMatch": [ - "**/test/**/*.spec.ts" - ], - "testPathIgnorePatterns": [ - "/node_modules/" - ], - "moduleFileExtensions": [ - "ts", - "tsx", - "js", - "json" - ] + "collectCoverage": true, + "mapCoverage": true, + "transform": { + "^.+\\.tsx?$": "ts-jest" + }, + "testMatch": ["**/test/**/*.spec.ts"], + "testPathIgnorePatterns": ["/node_modules/"], + "moduleFileExtensions": ["ts", "tsx", "js", "json"] } diff --git a/package.json b/package.json index cf1d5c7..26704a1 100644 --- a/package.json +++ b/package.json @@ -8,7 +8,12 @@ "clean": "del-cli ./build ./coverage", "build": "npm run clean && tsc -p ./config/tsconfig.build.json", "develop": "npm run clean && tsc -p .", - "lint": "tslint -c ./tslint.json -p ./config/tsconfig.build.json", + "lint": "npm run lint:ts && npm run prettier", + "lint:fix": "npm run lint:ts:fix && npm run prettier:fix", + "lint:ts": "eslint --max-warnings=-1", + "lint:ts:fix": "eslint --max-warnings=-1 --fix", + "prettier": "prettier --config .prettierrc.json --list-different \"./**/*.{ts,tsx}\"", + "prettier:fix": "prettier --config .prettierrc.json --list-different \"./**/*.{ts,tsx}\" --write", "test": "npm run lint && npm run clean && jest -c ./jest.json", "test:watch": "npm run clean && jest -c ./jest.json --watch", "typedoc": "del-cli ./docs && typedoc --ignoreCompilerErrors --out ./docs --mode file --tsconfig ./config/tsconfig.build.json ./src/", @@ -20,7 +25,7 @@ "typescript" ], "engines": { - "node": ">=6" + "node": ">=16" }, "repository": { "type": "git", @@ -30,20 +35,22 @@ "author": "Christoph Bühler ", "license": "MIT", "devDependencies": { - "@smartive/tslint-config": "^2.0.0", - "@types/jest": "^22.0.1", - "del-cli": "^1.1.0", - "jest": "^22.1.1", - "semantic-release": "^12.2.2", - "ts-jest": "^22.0.1", - "tslint": "^5.9.1", - "tsutils": "^2.18.0", - "typedoc": "^0.10.0", - "typescript": "^2.6.2" + "@smartive/eslint-config": "^3.1.1", + "@smartive/prettier-config": "^3.0.0", + "@types/jest": "^29.2.4", + "del-cli": "^5.0.0", + "eslint": "^8.30.0", + "jest": "^29.3.1", + "prettier": "^2.8.1", + "semantic-release": "^19.0.5", + "ts-jest": "^29.0.3", + "tsutils": "^3.21.0", + "typedoc": "^0.23.23", + "typescript": "^4.9.4" }, "dependencies": { - "@types/node": "^9.3.0", - "rxjs": "^5.5.6", - "tslib": "^1.8.1" + "@types/node": "^18.11.17", + "rxjs": "^7.8.0", + "tslib": "^2.4.1" } } diff --git a/src/Etl.ts b/src/Etl.ts index 47b9637..72b1b12 100644 --- a/src/Etl.ts +++ b/src/Etl.ts @@ -1,4 +1,4 @@ -import { Observable } from 'rxjs'; +import { EMPTY, merge, mergeMap, Observable, tap, throwError } from 'rxjs'; import { Extractor } from './interfaces/Extractor'; import { GeneralTransformer } from './interfaces/GeneralTransformer'; @@ -7,9 +7,9 @@ import { Transformer } from './interfaces/Transformer'; import { MapTransformer } from './transformers/MapTransformer'; export enum EtlState { - Running, - Stopped, - Error, + Running, + Stopped, + Error, } /** @@ -19,105 +19,105 @@ export enum EtlState { * This processor is modular, you can find other implemented loaders and extractors in the README */ export class Etl { - private _extractors: Extractor[] = []; - private _generalTransformers: GeneralTransformer[] = []; - private _transformers: Transformer[] = []; - private _loaders: Loader[] = []; - private _state: EtlState = EtlState.Stopped; - private _context: any = null; - - public constructor(context?: any) { - this.setContext(context); + private _extractors: Extractor[] = []; + private _generalTransformers: GeneralTransformer[] = []; + private _transformers: Transformer[] = []; + private _loaders: Loader[] = []; + private _state: EtlState = EtlState.Stopped; + private _context: any = null; + + public constructor(context?: any) { + this.setContext(context); + } + + public get extractors(): Extractor[] { + return this._extractors; + } + + public get generalTransformers(): GeneralTransformer[] { + return this._generalTransformers; + } + + public get transformers(): Transformer[] { + return this._transformers; + } + + public get loaders(): Loader[] { + return this._loaders; + } + + public get state(): EtlState { + return this._state; + } + + public setContext(context: any): this { + if (this._state !== EtlState.Stopped) { + this._state = EtlState.Error; + throw new Error('Tried to set context on invalid state.'); } - - public get extractors(): Extractor[] { - return this._extractors; - } - - public get generalTransformers(): GeneralTransformer[] { - return this._generalTransformers; - } - - public get transformers(): Transformer[] { - return this._transformers; - } - - public get loaders(): Loader[] { - return this._loaders; - } - - public get state(): EtlState { - return this._state; - } - - public setContext(context: any): this { - if (this._state !== EtlState.Stopped) { + this._context = context; + return this; + } + + public addExtractor(extract: Extractor): Etl { + this._extractors.push(extract); + return this; + } + + public addGeneralTransformer(transformer: GeneralTransformer): Etl { + this._generalTransformers.push(transformer); + return this; + } + + public addTransformer(transformer: Transformer): Etl { + this.addGeneralTransformer(new MapTransformer(transformer)); + this._transformers.push(transformer); + return this; + } + + public addLoader(loader: Loader): Etl { + this._loaders.push(loader); + return this; + } + + /** + * Starts the etl process. First, all extractors are run in parallel and deliver their results into an observable. + * Once the buffer gets a result, it transfers all objects through the transformers (one by one). + * After that, the transformed results are run through all loaders in parallel. + * + * @returns {Observable} Observable that completes when the process is finished, + * during the "next" process step you get update on how many are processed yet. + * Throws when any step produces an error. + */ + public start(observable: Observable = EMPTY): Observable { + this._state = EtlState.Running; + + const o: Observable = merge(observable, ...this._extractors.map((extractor) => extractor.read(this._context))); + + return this._generalTransformers + .reduce((observable, transformer) => transformer.process(observable, this._context), o) + .pipe(mergeMap((object) => merge(...this._loaders.map((loader) => loader.write(object, this._context))))) + .pipe( + tap({ + error: (err) => { this._state = EtlState.Error; - throw new Error('Tried to set context on invalid state.'); - } - this._context = context; - return this; - } - - public addExtractor(extract: Extractor): Etl { - this._extractors.push(extract); - return this; - } - - public addGeneralTransformer(transformer: GeneralTransformer): Etl { - this._generalTransformers.push(transformer); - return this; - } - - public addTransformer(transformer: Transformer): Etl { - this.addGeneralTransformer(new MapTransformer(transformer)); - this._transformers.push(transformer); - return this; - } - - public addLoader(loader: Loader): Etl { - this._loaders.push(loader); - return this; - } - - /** - * Starts the etl process. First, all extractors are run in parallel and deliver their results into an observable. - * Once the buffer gets a result, it transfers all objects through the transformers (one by one). - * After that, the transformed results are run through all loaders in parallel. - * - * @returns {Observable} Observable that completes when the process is finished, - * during the "next" process step you get update on how many are processed yet. - * Throws when any step produces an error. - */ - public start(observable: Observable = Observable.empty()): Observable { - this._state = EtlState.Running; - - const o: Observable = Observable - .merge(observable, ...this._extractors.map(extractor => extractor.read(this._context))); - - return this._generalTransformers - .reduce((observable, transformer) => transformer.process(observable, this._context), o) - .flatMap(object => Observable.merge(...this._loaders.map(loader => loader.write(object, this._context)))) - .do( - () => { }, - (err) => { - this._state = EtlState.Error; - return Observable.throw(err); - }, - () => { - this._state = EtlState.Stopped; - }, - ); - } - - /** - * Resets the whole Etl object. Deletes all modifiers and resets the state. - */ - public reset(): void { - this._extractors = []; - this._transformers = []; - this._loaders = []; - this._state = EtlState.Stopped; - this._context = null; - } + return throwError(() => err); + }, + complete: () => { + this._state = EtlState.Stopped; + }, + }) + ); + } + + /** + * Resets the whole Etl object. Deletes all modifiers and resets the state. + */ + public reset(): void { + this._extractors = []; + this._transformers = []; + this._loaders = []; + this._state = EtlState.Stopped; + this._context = null; + } } diff --git a/src/extractors/JsonExtractor.ts b/src/extractors/JsonExtractor.ts index 5d9ba44..7203896 100644 --- a/src/extractors/JsonExtractor.ts +++ b/src/extractors/JsonExtractor.ts @@ -1,5 +1,5 @@ import { resolve } from 'path'; -import { Observable } from 'rxjs'; +import { from, Observable, throwError } from 'rxjs'; import { Extractor } from '../interfaces/Extractor'; @@ -7,21 +7,21 @@ import { Extractor } from '../interfaces/Extractor'; * Extractor that reads a JSON file at a given filepath. The path is resolved relatively to the running tasks root dir. */ export class JsonExtractor implements Extractor { - private filePath: string; + private filePath: string; - constructor(filePath: string) { - this.filePath = resolve(process.cwd(), filePath); - } + constructor(filePath: string) { + this.filePath = resolve(process.cwd(), filePath); + } - public read(): Observable { - try { - const content = require(this.filePath); - if (!(content instanceof Array) && content.constructor !== Array) { - return Observable.from([content]); - } - return Observable.from(content); - } catch (e) { - return Observable.throw(e); - } + public read(): Observable { + try { + const content = require(this.filePath); + if (!(content instanceof Array) && content.constructor !== Array) { + return from([content]); + } + return from(content); + } catch (e) { + return throwError(() => e); } + } } diff --git a/src/interfaces/Extractor.ts b/src/interfaces/Extractor.ts index 52da085..ed4137d 100644 --- a/src/interfaces/Extractor.ts +++ b/src/interfaces/Extractor.ts @@ -2,10 +2,10 @@ import { Observable } from 'rxjs'; /** * Extractor interface. Only provides "read()" method that returns an observable with the result. - * + * * @export * @interface Extractor */ export interface Extractor { - read(context?: any): Observable; + read(context?: any): Observable; } diff --git a/src/interfaces/GeneralTransformer.ts b/src/interfaces/GeneralTransformer.ts index 2f91ac5..d2c88e2 100644 --- a/src/interfaces/GeneralTransformer.ts +++ b/src/interfaces/GeneralTransformer.ts @@ -3,10 +3,10 @@ import { Observable } from 'rxjs'; /** * GeneralTransformer interface. Provides a "process(observable)" method that processes an observable. * Represents a stage in the ETL pipeline. - * + * * @export * @interface GeneralTransformer */ export interface GeneralTransformer { - process(observable: Observable, context?: any): Observable; + process(observable: Observable, context?: any): Observable; } diff --git a/src/interfaces/Loader.ts b/src/interfaces/Loader.ts index 7f838b1..ffffcb4 100644 --- a/src/interfaces/Loader.ts +++ b/src/interfaces/Loader.ts @@ -4,5 +4,5 @@ import { Observable } from 'rxjs'; * Loader interface. Provides ".write(obj)" method that returns an observable with the loaded value. */ export interface Loader { - write(object: any, context?: any): Observable; + write(object: any, context?: any): Observable; } diff --git a/src/interfaces/Transformer.ts b/src/interfaces/Transformer.ts index b92bcd0..9d70b8e 100644 --- a/src/interfaces/Transformer.ts +++ b/src/interfaces/Transformer.ts @@ -5,5 +5,5 @@ import { Observable } from 'rxjs'; * the new result (array will be flattend). */ export interface Transformer { - process(object: any, context?: any): Observable; + process(object: any, context?: any): Observable; } diff --git a/src/loaders/ConsoleLoader.ts b/src/loaders/ConsoleLoader.ts index 0a0a35b..dbc25f7 100644 --- a/src/loaders/ConsoleLoader.ts +++ b/src/loaders/ConsoleLoader.ts @@ -1,4 +1,4 @@ -import { Observable } from 'rxjs'; +import { Observable, of } from 'rxjs'; import { Loader } from '../interfaces/Loader'; @@ -10,8 +10,8 @@ import { Loader } from '../interfaces/Loader'; * @implements {Loader} */ export class ConsoleLoader implements Loader { - public write(object: any): Observable { - console.log(object); - return Observable.of(object); - } + public write(object: any): Observable { + console.log(object); + return of(object); + } } diff --git a/src/transformers/MapTransformer.ts b/src/transformers/MapTransformer.ts index 6430510..00ee2ac 100644 --- a/src/transformers/MapTransformer.ts +++ b/src/transformers/MapTransformer.ts @@ -1,12 +1,12 @@ -import { Observable } from 'rxjs'; +import { mergeMap, Observable } from 'rxjs'; import { GeneralTransformer } from '../interfaces/GeneralTransformer'; import { Transformer } from '../interfaces/Transformer'; export class MapTransformer implements GeneralTransformer { - constructor(private transformer: Transformer) { } + constructor(private transformer: Transformer) {} - process(observable: Observable, context?: any): Observable { - return observable.flatMap(o => this.transformer.process(o, context)); - } + public process(observable: Observable, context?: any): Observable { + return observable.pipe(mergeMap((o) => this.transformer.process(o, context))); + } } diff --git a/src/transformers/MatchMergeTransformer.ts b/src/transformers/MatchMergeTransformer.ts index a69fa63..41b8b89 100644 --- a/src/transformers/MatchMergeTransformer.ts +++ b/src/transformers/MatchMergeTransformer.ts @@ -1,35 +1,30 @@ -import { Observable } from 'rxjs'; +import { from, mergeMap, Observable, reduce } from 'rxjs'; import { GeneralTransformer } from '../interfaces/GeneralTransformer'; export abstract class MatchMergeTransformer implements GeneralTransformer { + public process(observable: Observable, context?: any): Observable { + const matchMerge = (merged: any[], o2: any) => { + return this.matchMerge(merged, o2, context); + }; + return observable.pipe(reduce(matchMerge, [])).pipe(mergeMap((v) => from(v))); + } - public process(observable: Observable, context?: any): Observable { - const matchMerge = (merged: any[], o2: any) => { - return this.matchMerge(merged, o2, context); - }; - return observable.reduce(matchMerge, []).flatMap((merged) => { - return Observable.from(merged); - }); - } - - protected abstract match(o1: any, o2: any, context?: any): boolean; + protected abstract match(o1: any, o2: any, context?: any): boolean; - protected abstract merge(o1: any, o2: any, context?: any): any; + protected abstract merge(o1: any, o2: any, context?: any): any; - private matchMerge(merged: any[], o2: any, context?: any): any[] { - for (let i = 0; i < merged.length; i++) { - if (this.match(merged[i], o2, context)) { - const o1 = merged.splice(i, 1)[0]; - // tslint:disable-next-line - o2 = this.merge(o1, o2, context); - // Try to merge the merged element with the remaining elements, - // starting from the current position - i--; - } - } - merged.push(o2); - return merged; + private matchMerge(merged: any[], o2: any, context?: any): any[] { + for (let i = 0; i < merged.length; i++) { + if (this.match(merged[i], o2, context)) { + const o1 = merged.splice(i, 1)[0]; + o2 = this.merge(o1, o2, context); + // Try to merge the merged element with the remaining elements, + // starting from the current position + i--; + } } - + merged.push(o2); + return merged; + } } diff --git a/test/Etl.spec.ts b/test/Etl.spec.ts index aaccc7e..f9708bd 100644 --- a/test/Etl.spec.ts +++ b/test/Etl.spec.ts @@ -1,259 +1,305 @@ -import { Observable } from 'rxjs'; +import { from, of, reduce, throwError } from 'rxjs'; import { Etl, EtlState, Extractor, JsonExtractor, Loader, MatchMergeTransformer, Transformer } from '../src'; describe('Etl', () => { + let etl: Etl; + const extractor: Extractor = new JsonExtractor('./test/.testdata/json-extractor.object.json'); + const arrayExtractor: Extractor = new JsonExtractor('./test/.testdata/json-extractor.array.json'); + const matchMergeExtractor: Extractor = new JsonExtractor('./test/.testdata/match-merge.json'); + let o; + let dummyExtractor: Extractor; + let dummyTransformer: Transformer; + let dummyLoader: Loader; - let etl: Etl; - let extractor: Extractor = new JsonExtractor('./test/.testdata/json-extractor.object.json'); - let arrayExtractor: Extractor = new JsonExtractor('./test/.testdata/json-extractor.array.json'); - let matchMergeExtractor: Extractor = new JsonExtractor('./test/.testdata/match-merge.json'); - let o; - let dummyExtractor: Extractor; - let dummyTransformer: Transformer; - let dummyLoader: Loader; + beforeEach(() => { + etl = new Etl(); - beforeEach(() => { - etl = new Etl(); + o = { _id: '001' }; - o = {_id: "001"}; + dummyExtractor = { + read: () => of(o), + }; + dummyExtractor.read = jest.fn(dummyExtractor.read); - dummyExtractor = { - read: () => Observable.of(o), - }; - dummyExtractor.read = jest.fn(dummyExtractor.read); + dummyTransformer = { + process: (o) => of(o), + }; + dummyTransformer.process = jest.fn(dummyTransformer.process); - dummyTransformer = { - process: o => Observable.of(o), - }; - dummyTransformer.process = jest.fn(dummyTransformer.process); + dummyLoader = { + write: (o) => of(o), + }; + dummyLoader.write = jest.fn(dummyLoader.write); + }); - dummyLoader = { - write: o => Observable.of(o), - }; - dummyLoader.write = jest.fn(dummyLoader.write); + it('should initialize with correct default params', () => { + expect(etl.state).toBe(EtlState.Stopped); + expect(etl.extractors.length).toBe(0); + expect(etl.transformers.length).toBe(0); + expect(etl.loaders.length).toBe(0); + }); + it('should reset correctly', () => { + etl.addExtractor({ + read: function () { + return of(null); + }, }); - it('should initialize with correct default params', () => { - expect(etl.state).toBe(EtlState.Stopped); - expect(etl.extractors.length).toBe(0); - expect(etl.transformers.length).toBe(0); - expect(etl.loaders.length).toBe(0); - }); + expect(etl.extractors.length).toBe(1); + etl.reset(); + expect(etl.extractors.length).toBe(0); + }); - it('should reset correctly', () => { - etl.addExtractor({ - read: function() { - return null; - } - }); + it('should pass context down the pipeline', (done) => { + const context = 1; + etl = new Etl(context); + etl + .addExtractor(dummyExtractor) + .addTransformer(dummyTransformer) + .addLoader(dummyLoader) + .start() + .subscribe({ + complete: () => { + expect((dummyExtractor.read as any).mock.calls[0]).toContain(context); + expect((dummyTransformer.process as any).mock.calls[0]).toContain(context); + expect((dummyLoader.write as any).mock.calls[0]).toContain(context); + done(); + }, + }); + }); - expect(etl.extractors.length).toBe(1); - etl.reset(); - expect(etl.extractors.length).toBe(0); - }); + it('should pass newly set context down the pipeline', (done) => { + const context = 1; + etl + .addExtractor(dummyExtractor) + .addTransformer(dummyTransformer) + .addLoader(dummyLoader) + .setContext(context) + .start() + .subscribe({ + complete: () => { + expect((dummyExtractor.read as any).mock.calls[0]).toContain(context); + expect((dummyTransformer.process as any).mock.calls[0]).toContain(context); + expect((dummyLoader.write as any).mock.calls[0]).toContain(context); + done(); + }, + }); + }); - it('should pass context down the pipeline', done => { - const context = 1; - etl = new Etl(context); - etl - .addExtractor(dummyExtractor) - .addTransformer(dummyTransformer) - .addLoader(dummyLoader) - .start() - .subscribe(null, null, () => { - expect((dummyExtractor.read as any).mock.calls[0]).toContain(context); - expect((dummyTransformer.process as any).mock.calls[0]).toContain(context); - expect((dummyLoader.write as any).mock.calls[0]).toContain(context); - done(); - }); - }); + it('should process simple object', (done) => { + etl + .addExtractor(extractor) + .addLoader(dummyLoader) + .start() + .subscribe({ + complete: () => { + expect((dummyLoader.write as any).mock.calls).toHaveLength(1); + expect((dummyLoader.write as any).mock.calls[0][0]).toMatchObject({ + foo: 'bar', + hello: 'world', + }); + done(); + }, + }); + }); - it('should pass newly set context down the pipeline', done => { - const context = 1; - etl - .addExtractor(dummyExtractor) - .addTransformer(dummyTransformer) - .addLoader(dummyLoader) - .setContext(context) - .start() - .subscribe(null, null, () => { - expect((dummyExtractor.read as any).mock.calls[0]).toContain(context); - expect((dummyTransformer.process as any).mock.calls[0]).toContain(context); - expect((dummyLoader.write as any).mock.calls[0]).toContain(context); - done(); - }); - }); + it('should process simple array', (done) => { + etl + .addExtractor(arrayExtractor) + .addLoader(dummyLoader) + .start() + .subscribe({ + complete: () => { + expect((dummyLoader.write as any).mock.calls).toHaveLength(3); + expect((dummyLoader.write as any).mock.calls[0][0]).toMatchObject({ + objId: 1, + name: 'foobar', + }); + expect((dummyLoader.write as any).mock.calls[1][0]).toMatchObject({ + objId: 2, + name: 'hello world', + }); + expect((dummyLoader.write as any).mock.calls[2][0]).toMatchObject({ + objId: 3, + name: 'third test', + }); + done(); + }, + }); + }); - it('should process simple object', done => { - etl - .addExtractor(extractor) - .addLoader(dummyLoader) - .start() - .subscribe(null, null, () => { - expect((dummyLoader.write as any).mock.calls[0][0]).toMatchObject({ foo: 'bar', hello: 'world' }); - done(); - }); - }); + it('should call error on extractor error', (done) => { + etl + .addExtractor({ + read: () => throwError(() => new Error('test')), + }) + .addLoader(dummyLoader) + .start() + .subscribe({ + error: () => { + done(); + }, + complete: () => { + done(new Error('did not throw')); + }, + }); + }); - it('should process simple array', done => { - etl - .addExtractor(arrayExtractor) - .addLoader(dummyLoader) - .start() - .subscribe(null, null, () => { - expect((dummyLoader.write as any).mock.calls[0][0]).toMatchObject({ objId: 1, name: 'foobar' }); - expect((dummyLoader.write as any).mock.calls[1][0]).toMatchObject({ objId: 2, name: 'hello world' }); - expect((dummyLoader.write as any).mock.calls[2][0]).toMatchObject({ objId: 3, name: 'third test' }); - done(); - }); - }); + it('should call error on loader error', (done) => { + etl + .addExtractor(extractor) + .addLoader({ + write: () => throwError(() => new Error('test')), + }) + .start() + .subscribe({ + error: () => { + done(); + }, + complete: () => { + done(new Error('did not throw')); + }, + }); + }); - it('should call error on extractor error', done => { - etl - .addExtractor({ - read: () => Observable.throw(new Error('test')) - }) - .addLoader(dummyLoader) - .start() - .subscribe(null, () => { - done(); - }, () => { - done(new Error('did not throw')); - }); - }); + it('should call error on transformer error', (done) => { + etl + .addExtractor(extractor) + .addLoader(dummyLoader) + .addTransformer({ + process: () => throwError(() => new Error('test')), + }) + .start() + .subscribe({ + error: () => { + done(); + }, + complete: () => { + done(new Error('did not throw')); + }, + }); + }); - it('should call error on loader error', done => { - etl - .addExtractor(extractor) - .addLoader({ - write: o => Observable.throw(new Error('test')) - }) - .start() - .subscribe(null, () => { - done(); - }, () => { - done(new Error('did not throw')); - }); - }); + it('should process simple object with transformer', (done) => { + const spy = jest.fn(); + etl + .addExtractor(extractor) + .addLoader(dummyLoader) + .addTransformer({ + process: (o) => of(o), + }) + .start() + .subscribe({ + next: spy, + error: () => { + done(new Error('did throw')); + }, + complete: () => { + expect(spy.mock.calls.length).toBe(1); + done(); + }, + }); + }); - it('should call error on transformer error', done => { - etl - .addExtractor(extractor) - .addLoader(dummyLoader) - .addTransformer({ - process: o => Observable.throw(new Error('test')) - }) - .start() - .subscribe(null, () => { - done(); - }, () => { - done(new Error('did not throw')); - }); - }); - - it('should process simple object with transformer', done => { - let spy = jest.fn(); - etl - .addExtractor(extractor) - .addLoader(dummyLoader) - .addTransformer({ - process: o => Observable.of(o) - }) - .start() - .subscribe(spy, () => { - done(new Error('did throw')); - }, () => { - expect(spy.mock.calls.length).toBe(1); - done(); - }); - }); - - it('should process simple array with transformer (flat)', done => { - let spy = jest.fn(); - etl - .addExtractor(arrayExtractor) - .addLoader(dummyLoader) - .addTransformer({ - process: o => Observable.from([o, o]) - }) - .start() - .subscribe(spy, () => { - done(new Error('did throw')); - }, () => { - expect(spy.mock.calls.length).toBe(6); - done(); - }); - }); + it('should process simple array with transformer (flat)', (done) => { + const spy = jest.fn(); + etl + .addExtractor(arrayExtractor) + .addLoader(dummyLoader) + .addTransformer({ + process: (o) => from([o, o]), + }) + .start() + .subscribe({ + next: spy, + error: () => { + done(new Error('did throw')); + }, + complete: () => { + expect(spy.mock.calls.length).toBe(6); + done(); + }, + }); + }); - it('should process a general transformer', done => { - let spy = jest.fn(); - etl - .addExtractor(arrayExtractor) - .addLoader(dummyLoader) - .addGeneralTransformer({ - process: o => o.reduce((x, y) => x + y.objId, 0) - }) - .start() - .subscribe(spy, () => { - done(new Error('did throw')); - }, () => { - expect(spy.mock.calls.length).toBe(1); - expect(spy.mock.calls[0][0]).toBe(6); - done(); - }); - }); - - it('should process a match-merge transformer', done => { - let spy = jest.fn(); + it('should process a general transformer', (done) => { + const spy = jest.fn(); + etl + .addExtractor(arrayExtractor) + .addLoader(dummyLoader) + .addGeneralTransformer({ + process: (o) => o.pipe(reduce((x, y) => x + y.objId, 0)), + }) + .start() + .subscribe({ + next: spy, + error: () => { + done(new Error('did throw')); + }, + complete: () => { + expect(spy.mock.calls.length).toBe(1); + expect(spy.mock.calls[0][0]).toBe(6); + done(); + }, + }); + }); - class TestMatchTransformer extends MatchMergeTransformer { - match(o1, o2) { - return o1.location === o2.location; - } + it('should process a match-merge transformer', (done) => { + const spy = jest.fn(); - merge(o1, o2) { - return { - location: o1.location, - things: [...o1.things, ...o2.things] - }; - } - } + class TestMatchTransformer extends MatchMergeTransformer { + match(o1, o2) { + return o1.location === o2.location; + } - etl - .addExtractor(matchMergeExtractor) - .addLoader(dummyLoader) - .addGeneralTransformer(new TestMatchTransformer) - .start() - .subscribe(spy, () => { - done(new Error('did throw')); - }, () => { - expect(spy.mock.calls.length).toBe(2); - expect(spy.mock.calls[0][0]).toMatchObject({ - location: "A", - things: ["a", "c"] - }); - expect(spy.mock.calls[1][0]).toMatchObject({ - location: "B", - things: ["b", "d"] - }); - done(); - }); - }); + merge(o1, o2) { + return { + location: o1.location, + things: [...o1.things, ...o2.things], + }; + } + } - it('should pipe inital observable', done => { - const context = 1; - etl = new Etl(context); - etl - .addTransformer(dummyTransformer) - .addLoader(dummyLoader) - .start(Observable.of('hi')) - .subscribe(null, null, () => { - expect((dummyTransformer.process as any).mock.calls[0]).toContain('hi'); - expect((dummyLoader.write as any).mock.calls[0]).toContain('hi'); - done(); - }); - }); + etl + .addExtractor(matchMergeExtractor) + .addLoader(dummyLoader) + .addGeneralTransformer(new TestMatchTransformer()) + .start() + .subscribe({ + next: spy, + error: () => { + done(new Error('did throw')); + }, + complete: () => { + expect(spy.mock.calls.length).toBe(2); + expect(spy.mock.calls[0][0]).toMatchObject({ + location: 'A', + things: ['a', 'c'], + }); + expect(spy.mock.calls[1][0]).toMatchObject({ + location: 'B', + things: ['b', 'd'], + }); + done(); + }, + }); + }); + it('should pipe inital observable', (done) => { + const context = 1; + etl = new Etl(context); + etl + .addTransformer(dummyTransformer) + .addLoader(dummyLoader) + .start(of('hi')) + .subscribe({ + complete: () => { + expect((dummyTransformer.process as any).mock.calls[0]).toContain('hi'); + expect((dummyLoader.write as any).mock.calls[0]).toContain('hi'); + done(); + }, + }); + }); }); diff --git a/test/JsonExtractor.spec.ts b/test/JsonExtractor.spec.ts index 64e189e..7ae23d5 100644 --- a/test/JsonExtractor.spec.ts +++ b/test/JsonExtractor.spec.ts @@ -3,49 +3,52 @@ import { join } from 'path'; import { JsonExtractor } from '../src'; describe('JsonExtractor', () => { - - it('should return an observable', () => { - const ext = new JsonExtractor('./test/.testdata/json-extractor.object.json'); - expect(ext.read()).toBeInstanceOf(Object); - }); - - it('should get correct path', () => { - const ext = new JsonExtractor('hello'); - const anyExt: any = ext; - const result = join(process.cwd(), 'hello'); - expect(anyExt.filePath).toBe(result); - }); - - it('should receive a json object', (done) => { - const ext = new JsonExtractor('./test/.testdata/json-extractor.object.json'); - ext.read().subscribe((obj) => { - expect(obj).toMatchObject({ - foo: 'bar', - hello: 'world', - }); - done(); + it('should return an observable', () => { + const ext = new JsonExtractor('./test/.testdata/json-extractor.object.json'); + expect(ext.read()).toBeInstanceOf(Object); + }); + + it('should get correct path', () => { + const ext = new JsonExtractor('hello'); + const anyExt: any = ext; + const result = join(process.cwd(), 'hello'); + expect(anyExt.filePath).toBe(result); + }); + + it('should receive a json object', (done) => { + const ext = new JsonExtractor('./test/.testdata/json-extractor.object.json'); + ext.read().subscribe({ + next: (obj) => { + expect(obj).toMatchObject({ + foo: 'bar', + hello: 'world', }); + done(); + }, }); - - it('should receive a json array', (done) => { - const ext = new JsonExtractor('./test/.testdata/json-extractor.array.json'); - const spy = jest.fn(); - ext.read().subscribe(spy, null, () => { - expect(spy.mock.calls.length).toBe(3); - done(); - }); + }); + + it('should receive a json array', (done) => { + const ext = new JsonExtractor('./test/.testdata/json-extractor.array.json'); + const spy = jest.fn(); + ext.read().subscribe({ + next: spy, + complete: () => { + expect(spy.mock.calls.length).toBe(3); + done(); + }, }); - - it('should throw on not found file', (done) => { - const ext = new JsonExtractor('404.json'); - ext.read().subscribe( - () => { - done(new Error('did not throw')); - }, - () => { - done(); - }, - ); + }); + + it('should throw on not found file', (done) => { + const ext = new JsonExtractor('404.json'); + ext.read().subscribe({ + next: () => { + done(new Error('did not throw')); + }, + error: () => { + done(); + }, }); - + }); }); diff --git a/test/MapTransformer.spec.ts b/test/MapTransformer.spec.ts index 165dbf4..5f72208 100644 --- a/test/MapTransformer.spec.ts +++ b/test/MapTransformer.spec.ts @@ -1,27 +1,24 @@ -import { Observable } from 'rxjs/Rx'; +import { from, of } from 'rxjs'; import { MapTransformer } from '../src'; describe('MapTransformer', () => { + it('should return an observable', () => { + const spy = jest.fn(); - it('should return an observable', () => { - const spy = jest.fn(); + const subt = { + process(o) { + return of(o); + }, + }; - const subt = { - process(o) { - return Observable.of(o); - } - } + subt.process = jest.fn(subt.process); - subt.process = jest.fn(subt.process); + const t = new MapTransformer(subt); - const t = new MapTransformer(subt); - - t.process(Observable.from([1]), 2) - .subscribe(spy, null, () => { - expect(spy.mock.calls.length).toBe(1); - expect((subt.process as any).mock.calls.length).toBe(1); - }); + t.process(from([1]), 2).subscribe(spy, null, () => { + expect(spy.mock.calls.length).toBe(1); + expect((subt.process as any).mock.calls.length).toBe(1); }); - + }); }); diff --git a/test/MatchMergeTransformer.spec.ts b/test/MatchMergeTransformer.spec.ts index 06660f4..e88e21e 100644 --- a/test/MatchMergeTransformer.spec.ts +++ b/test/MatchMergeTransformer.spec.ts @@ -1,28 +1,28 @@ -import { Observable } from 'rxjs/Rx'; +import { from } from 'rxjs'; import { MatchMergeTransformer } from '../src/transformers/MatchMergeTransformer'; describe('MatchMergeTransformer', () => { - - it('should return an observable', () => { - const spy = jest.fn(); - - class TestMatchMergeTransformer extends MatchMergeTransformer { - match(o1, o2) { - return o1 === o2; - } - - merge(o1, o2) { - return o1; - } - } - - const t = new TestMatchMergeTransformer(); - - t.process(Observable.from([1, 2, 3, 2, 3])) - .subscribe(spy, null, () => { - expect(spy.mock.calls.length).toBe(3); - }); + it('should return an observable', () => { + const spy = jest.fn(); + + class TestMatchMergeTransformer extends MatchMergeTransformer { + match(o1, o2) { + return o1 === o2; + } + + merge(o1) { + return o1; + } + } + + const t = new TestMatchMergeTransformer(); + + t.process(from([1, 2, 3, 2, 3])).subscribe({ + next: spy, + complete: () => { + expect(spy.mock.calls.length).toBe(3); + }, }); - + }); }); diff --git a/tslint.json b/tslint.json deleted file mode 100644 index d5c5f71..0000000 --- a/tslint.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "extends": "@smartive/tslint-config", - "rules": { - "ter-indent": [ - true, - 4, - { - "SwitchCase": 1 - } - ] - } -}