Skip to content

Commit

Permalink
helpers: introduce Stream()
Browse files Browse the repository at this point in the history
Introduces `Stream()`, a much more user friendly way of composing
streaming operations out of bob components.

`Stream()`s also act as a bare passthrough to their subcomponents and
can be use as a component when composing higher-order `Stream()`s.

Relys on the newly added `start(exitCb)` to keep things nice.

Refs: #26
PR-URL: #31
  • Loading branch information
Fishrock123 committed Jun 11, 2019
1 parent ab3308d commit 5a460b4
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 15 deletions.
52 changes: 52 additions & 0 deletions helpers/stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
'use strict'

class Stream {
source = null
sink = null

constructor (...components) {
const last = components.length - 1
this.source = components[0]
this.sink = components[last]

let above = this.source // Above the next sink in the flow
for (const intermediate of components.slice(1)) {
above = intermediate.bindSource(above)
}
}

start (exitCb) {
// If sink is undefined a programmer error has been made.
this.sink.start(exitCb)
return this
}

stop () {
// If source is undefined or does not have stop(), a programmer error has been made.
this.source.stop()
return this
}

// Implements a passthrough
//
// This is done to avoid having excess setup code to extract
// "underlying" sources and sinks.

bindSource (source) {
return this.source.bindSource(source)
}

bindSink (sink) {
return this.sink.bindSink(sink)
}

next (status, error, buffer, bytes) {
this.source.next(status, error, buffer, bytes)
}

pull (error, buffer) {
this.sink.pull(error, buffer)
}
}

module.exports = Stream
8 changes: 4 additions & 4 deletions reference-passthrough.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
'use strict'

class PassThrough {
constructor () {
this.sink = null
this.source = null
}
sink = null
source = null

constructor () {}

bindSource (source) {
source.bindSink(this)
Expand Down
11 changes: 6 additions & 5 deletions tests/file-to-file-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

// node --expose-internals file-to-file-test.js ./fixtures/test

const Stream = require('../helpers/stream')
const FileSource = require('fs-source')
const FileSink = require('fs-sink')
const PassThrough = require('../reference-passthrough')
Expand All @@ -10,10 +11,10 @@ const fileSource = new FileSource(process.argv[2])
const fileSink = new FileSink(process.argv[2] + '_')
const passThrough = new PassThrough()

fileSink.bindSource(passThrough.bindSource(fileSource), error => {
if (error)
console.error('ERROR!', error)
else {
console.log('done')
const stream = new Stream(fileSource, passThrough, fileSink)
stream.start(err => {
if (err) {
return console.error('ERROR!', err)
}
console.log('done')
})
29 changes: 29 additions & 0 deletions tests/stream-stream-async.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
'use strict'

// node --expose-internals stream-stream-async.js ./fixtures/test

const zlib = require('zlib')
const util = require('util')

const Stream = require('../helpers/stream')
const FileSource = require('fs-source')
const FileSink = require('fs-sink')
const ZlibTransform = require('zlib-transform')
const PassThrough = require('../reference-passthrough')

const fileSource = new FileSource(process.argv[2])
const fileSink = new FileSink(process.argv[2] + '.gz')
const zlibTransform = new ZlibTransform({}, zlib.constants.GZIP)
const passThrough = new PassThrough()

;(async function main () {
const streamSource = new Stream(fileSource, zlibTransform)
const streamSink = new Stream(passThrough, fileSink)
const stream = new Stream(streamSource, new PassThrough(), streamSink)

await util.promisify(stream.start.bind(stream))()

console.log('done (resolved)')
})().catch(err => {
console.error('ERROR! (rejected)', err)
})
12 changes: 6 additions & 6 deletions tests/zlib-transform-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

const zlib = require('zlib')

const Stream = require('../helpers/stream')
const FileSource = require('fs-source')
const FileSink = require('fs-sink')
const ZlibTransform = require('zlib-transform')
Expand All @@ -12,11 +13,10 @@ const fileSource = new FileSource(process.argv[2])
const fileSink = new FileSink(process.argv[2] + '.gz')
const zlibTransform = new ZlibTransform({}, zlib.constants.GZIP)

fileSink.bindSource(zlibTransform.bindSource(fileSource), error => {
if (error) {
console.error('ERROR!', error)
console.error((new Error()).stack)
} else {
console.log('done')
const stream = new Stream(fileSource, zlibTransform, fileSink)
stream.start(err => {
if (err) {
return console.error('ERROR!', err)
}
console.log('done')
})

0 comments on commit 5a460b4

Please sign in to comment.