From 5a460b4f1800f37f15ea78f857137bd6b276a88a Mon Sep 17 00:00:00 2001 From: Jeremiah Senkpiel Date: Thu, 6 Jun 2019 16:36:23 -0700 Subject: [PATCH] helpers: introduce Stream() Introduces `Stream()`, a much more user friendly way of composing streaming operations out of bob components. `Stream()`s also act as a bare passthrough to their subcomponents and can be use as a component when composing higher-order `Stream()`s. Relys on the newly added `start(exitCb)` to keep things nice. Refs: https://github.com/Fishrock123/bob/issues/26 PR-URL: https://github.com/Fishrock123/bob/pull/31 --- helpers/stream.js | 52 ++++++++++++++++++++++++++++++++++++ reference-passthrough.js | 8 +++--- tests/file-to-file-test.js | 11 ++++---- tests/stream-stream-async.js | 29 ++++++++++++++++++++ tests/zlib-transform-test.js | 12 ++++----- 5 files changed, 97 insertions(+), 15 deletions(-) create mode 100644 helpers/stream.js create mode 100644 tests/stream-stream-async.js diff --git a/helpers/stream.js b/helpers/stream.js new file mode 100644 index 0000000..d45a8b9 --- /dev/null +++ b/helpers/stream.js @@ -0,0 +1,52 @@ +'use strict' + +class Stream { + source = null + sink = null + + constructor (...components) { + const last = components.length - 1 + this.source = components[0] + this.sink = components[last] + + let above = this.source // Above the next sink in the flow + for (const intermediate of components.slice(1)) { + above = intermediate.bindSource(above) + } + } + + start (exitCb) { + // If sink is undefined a programmer error has been made. + this.sink.start(exitCb) + return this + } + + stop () { + // If source is undefined or does not have stop(), a programmer error has been made. + this.source.stop() + return this + } + + // Implements a passthrough + // + // This is done to avoid having excess setup code to extract + // "underlying" sources and sinks. + + bindSource (source) { + return this.source.bindSource(source) + } + + bindSink (sink) { + return this.sink.bindSink(sink) + } + + next (status, error, buffer, bytes) { + this.source.next(status, error, buffer, bytes) + } + + pull (error, buffer) { + this.sink.pull(error, buffer) + } +} + +module.exports = Stream diff --git a/reference-passthrough.js b/reference-passthrough.js index 7efe6f1..2546b42 100644 --- a/reference-passthrough.js +++ b/reference-passthrough.js @@ -1,10 +1,10 @@ 'use strict' class PassThrough { - constructor () { - this.sink = null - this.source = null - } + sink = null + source = null + + constructor () {} bindSource (source) { source.bindSink(this) diff --git a/tests/file-to-file-test.js b/tests/file-to-file-test.js index 0b185d7..b647159 100644 --- a/tests/file-to-file-test.js +++ b/tests/file-to-file-test.js @@ -2,6 +2,7 @@ // node --expose-internals file-to-file-test.js ./fixtures/test +const Stream = require('../helpers/stream') const FileSource = require('fs-source') const FileSink = require('fs-sink') const PassThrough = require('../reference-passthrough') @@ -10,10 +11,10 @@ const fileSource = new FileSource(process.argv[2]) const fileSink = new FileSink(process.argv[2] + '_') const passThrough = new PassThrough() -fileSink.bindSource(passThrough.bindSource(fileSource), error => { - if (error) - console.error('ERROR!', error) - else { - console.log('done') +const stream = new Stream(fileSource, passThrough, fileSink) +stream.start(err => { + if (err) { + return console.error('ERROR!', err) } + console.log('done') }) diff --git a/tests/stream-stream-async.js b/tests/stream-stream-async.js new file mode 100644 index 0000000..31375b2 --- /dev/null +++ b/tests/stream-stream-async.js @@ -0,0 +1,29 @@ +'use strict' + +// node --expose-internals stream-stream-async.js ./fixtures/test + +const zlib = require('zlib') +const util = require('util') + +const Stream = require('../helpers/stream') +const FileSource = require('fs-source') +const FileSink = require('fs-sink') +const ZlibTransform = require('zlib-transform') +const PassThrough = require('../reference-passthrough') + +const fileSource = new FileSource(process.argv[2]) +const fileSink = new FileSink(process.argv[2] + '.gz') +const zlibTransform = new ZlibTransform({}, zlib.constants.GZIP) +const passThrough = new PassThrough() + +;(async function main () { + const streamSource = new Stream(fileSource, zlibTransform) + const streamSink = new Stream(passThrough, fileSink) + const stream = new Stream(streamSource, new PassThrough(), streamSink) + + await util.promisify(stream.start.bind(stream))() + + console.log('done (resolved)') +})().catch(err => { + console.error('ERROR! (rejected)', err) +}) diff --git a/tests/zlib-transform-test.js b/tests/zlib-transform-test.js index 2ee3dd2..35b4003 100644 --- a/tests/zlib-transform-test.js +++ b/tests/zlib-transform-test.js @@ -4,6 +4,7 @@ const zlib = require('zlib') +const Stream = require('../helpers/stream') const FileSource = require('fs-source') const FileSink = require('fs-sink') const ZlibTransform = require('zlib-transform') @@ -12,11 +13,10 @@ const fileSource = new FileSource(process.argv[2]) const fileSink = new FileSink(process.argv[2] + '.gz') const zlibTransform = new ZlibTransform({}, zlib.constants.GZIP) -fileSink.bindSource(zlibTransform.bindSource(fileSource), error => { - if (error) { - console.error('ERROR!', error) - console.error((new Error()).stack) - } else { - console.log('done') +const stream = new Stream(fileSource, zlibTransform, fileSink) +stream.start(err => { + if (err) { + return console.error('ERROR!', err) } + console.log('done') })