Skip to content

Commit

Permalink
feat: allow to start with observable (#44)
Browse files Browse the repository at this point in the history
  • Loading branch information
nickredmark authored and buehler committed Jan 30, 2018
1 parent 66cd416 commit 13b1aaf
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 4 deletions.
8 changes: 4 additions & 4 deletions src/Etl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,14 @@ export class Etl {
* during the "next" process step you get update on how many are processed yet.
* Throws when any step produces an error.
*/
public start(): Observable<any> {
public start(observable: Observable<any> = Observable.empty()): Observable<any> {
this._state = EtlState.Running;

const observable = Observable
.merge(...this._extractors.map(extractor => extractor.read(this._context)));
const o: Observable<any> = Observable
.merge(observable, ...this._extractors.map(extractor => extractor.read(this._context)));

return this._generalTransformers
.reduce((observable, transformer) => transformer.process(observable, this._context), observable)
.reduce((observable, transformer) => transformer.process(observable, this._context), o)
.flatMap(object => Observable.merge(...this._loaders.map(loader => loader.write(object, this._context))))
.do(
() => { },
Expand Down
14 changes: 14 additions & 0 deletions test/Etl.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -242,4 +242,18 @@ describe('Etl', () => {
});
});

it('should pipe inital observable', done => {
const context = 1;
etl = new Etl(context);
etl
.addTransformer(dummyTransformer)
.addLoader(dummyLoader)
.start(Observable.of('hi'))
.subscribe(null, null, () => {
expect((dummyTransformer.process as any).mock.calls[0]).toContain('hi');
expect((dummyLoader.write as any).mock.calls[0]).toContain('hi');
done();
});
});

});

0 comments on commit 13b1aaf

Please sign in to comment.