Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
ivoputzer committed Oct 14, 2023
1 parent d2ed567 commit db7e409
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 115 deletions.
22 changes: 10 additions & 12 deletions lib/date.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -64,20 +64,18 @@ export function jsonDateReviver (_, value, iso8601Regex = /^\d{4}-\d{2}-\d{2}T\d
: value
}

export function csvDateReplacer (_, value) {
return Array.isArray(value)
? value.map(replaceIfDate)
: replaceIfDate(value)
export function noopDateReplacer (_, value) {
return value
}

function replaceIfDate (date) {
return date instanceof Date
? date.toISOString().slice(0, 16).replace('T', ' ')
: date
}
export function isoDateReplacer (_, value) {
return !isNaN(Date.parse(value)) && value === new Date(value).toISOString()
? value?.substring(0, 10)
: value
}

export function dateReplacerFor (interval) {
export function shortDateFor (interval) {
return /\d+[dwmy]/.test(interval)
? (_, value) => value === new Date(value).toJSON() ? new Date(value).toJSON().substring(0, 10) : value
: (_, value) => value
? isoDateReplacer
: noopDateReplacer
}
96 changes: 40 additions & 56 deletions lib/jsonl.mjs
Original file line number Diff line number Diff line change
@@ -1,23 +1,19 @@
import os from 'node:os'
import readline from 'node:readline'
import stream, { PassThrough } from 'node:stream'
import stream from 'node:stream'

import { jsonDateReviver, csvDateReplacer } from './date.mjs'
import { jsonDateReviver, noopDateReplacer } from './date.mjs'

/*
createReadSteam(file)
.pipe(parseJsonl())
->
.pipe(
groupBy(line => line[0].getFullYear())
-> un readable stream per ogni anno: 2015, 2016, 2017, 2018, 2019, 2020, 2021, 2022, 2023
)
.pipe(
flatMap(groupReadableStream => groupReadableStream.pipe(writeJsonl()))
)
export function parseJsonl ({ objectMode, reviver } = { objectMode: true, reviver: jsonDateReviver }, { createInterface } = readline, { Duplex } = stream) {
return new Duplex({ objectMode, read () {}, write (_0, _1, next) { next(null) } })
.on('pipe', function (input) {
createInterface({ input, terminal: false })
.on('line', (line) => this.push(objectMode ? JSON.parse(line, reviver) : line))
.on('close', () => this.push(null))
})
}

*/
export function groupBy (groupFn, options = { objectMode: true }, { Transform } = stream) {
export function groupBy (groupFn, options = { objectMode: true }, { Transform, PassThrough } = stream) {
const groups = new Map()
return new Transform({
...options,
Expand All @@ -36,16 +32,7 @@ export function groupBy (groupFn, options = { objectMode: true }, { Transform }
})
}

export function parseJsonl ({ objectMode, reviver } = { objectMode: true, reviver: jsonDateReviver }, { createInterface } = readline, { Duplex } = stream) {
return new Duplex({ objectMode, read () {}, write (_0, _1, next) { next(null) } })
.on('pipe', function (input) {
createInterface({ input, terminal: false })
.on('line', (line) => this.push(objectMode ? JSON.parse(line, reviver) : line))
.on('close', () => this.push(null))
})
}

export function writeJsonl ({ objectMode, replacer } = { objectMode: true, replacer: (key, value) => value }, { createInterface } = readline, { Transform } = stream, { EOL } = os) {
export function writeJsonl ({ objectMode, replacer } = { objectMode: true, replacer: noopDateReplacer }, { createInterface } = readline, { Transform } = stream, { EOL } = os) {
return new Transform({
objectMode,
transform (line, _, next) {
Expand All @@ -54,7 +41,7 @@ export function writeJsonl ({ objectMode, replacer } = { objectMode: true, repla
})
}

export function writeJson ({ objectMode, replacer } = { objectMode: true, replacer: (_, value) => value }, { Transform } = stream, { EOL } = os) {
export function writeJson ({ objectMode, replacer } = { objectMode: true, replacer: noopDateReplacer }, { Transform } = stream, { EOL } = os) {
let isFirstLine = true
return new Transform({
objectMode: true,
Expand All @@ -71,8 +58,8 @@ export function writeJson ({ objectMode, replacer } = { objectMode: true, replac
}
})
}
// .pipe( writeCsv(['date', 'open, 'high', 'low, 'close', 'volume', 'metadata']) )
export function writeCsv (headers = null, { objectMode, replacer } = { objectMode: true, replacer: csvDateReplacer }, { Transform } = stream, { EOL } = os) {

export function writeCsv (headers = null, { objectMode, replacer } = { objectMode: true, replacer: noopDateReplacer }, { Transform } = stream, { EOL } = os) {
let isFirstLine = true
return new Transform({
objectMode: true,
Expand All @@ -94,32 +81,29 @@ export function writeCsv (headers = null, { objectMode, replacer } = { objectMod
})
}

// export function writeXml (headers, elementName = 'candle', { objectMode, replacer } = { objectMode: true, replacer: csvDateReplacer }, { Transform } = stream, { EOL } = os) {
// const isFirstLine = true
// return new Transform({
// objectMode: true,
// transform (line, _, next) {
// if (isFirstLine) {
// isFirstLine = false
// this.push(`<?xml version="1.0" encoding="UTF-8"?>${EOL}`)
// this.push(`<?xml-stylesheet type="text/xsl" href="/candles.xsl"?>${EOL}`)
// this.push(`<${elementName}s>${EOL}`)
// const [date, open, high, low, close, volume] = line
// return `\t<candle>${EOL}\t\t<date>${date.toJSON().substr(0, 10)}</date>${EOL}\t\t<open>${open}</open>${EOL}\t\t<high>${high}</high>${EOL}\t\t<low>${low}</low>${EOL}\t\t<close>${close}</close>${EOL}\t\t<volume>${volume}</volume>${EOL}\t</candle>${EOL}`

// // ${(headers ?? Object.keys(line)).join(',')}${EOL}`)
// // // return next(null)
// } else {
// // // this.push(`${objectMode ? JSON.stringify(Object.values(line), replacer).slice(1, -1).replaceAll('"', '') : line}${EOL}`)
// // // next(null)
// }
// },
// flush (next) {
// next(null, `</${elementName}s>${EOL}`)
// }
// })
export function writeXml (documentRoot = 'candle', { objectMode, replacer } = { objectMode: true, replacer: noopDateReplacer }, { Stream } = stream, { EOL } = os) {
throw new Error('not implemented')
// const isFirstLine = true
// return new Transform({
// objectMode: true,
// transform (line, _, next) {
// if (isFirstLine) {
// isFirstLine = false
// this.push(`<?xml version="1.0" encoding="UTF-8"?>${EOL}`)
// this.push(`<?xml-stylesheet type="text/xsl" href="/candles.xsl"?>${EOL}`)
// this.push(`<${elementName}s>${EOL}`)
// const [date, open, high, low, close, volume] = line
// return `\t<candle>${EOL}\t\t<date>${date.toJSON().substr(0, 10)}</date>${EOL}\t\t<open>${open}</open>${EOL}\t\t<high>${high}</high>${EOL}\t\t<low>${low}</low>${EOL}\t\t<close>${close}</close>${EOL}\t\t<volume>${volume}</volume>${EOL}\t</candle>${EOL}`

// function toXML () {

// }
// }
// // ${(headers ?? Object.keys(line)).join(',')}${EOL}`)
// // // return next(null)
// } else {
// // // this.push(`${objectMode ? JSON.stringify(Object.values(line), replacer).slice(1, -1).replaceAll('"', '') : line}${EOL}`)
// // // next(null)
// }
// },
// flush (next) {
// next(null, `</${elementName}s>${EOL}`)
// }
// })
}
91 changes: 68 additions & 23 deletions test/lib/date.mjs
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
import { describe, it } from 'node:test'
import { ok, equal } from 'node:assert/strict'
import { ok, equal, deepEqual } from 'node:assert/strict'

import { INTERVALS, parseInterval, daysBetween, jsonDateReviver, utcDate } from '../../lib/date.mjs'
import {
INTERVALS,
parseInterval,
daysBetween, utcDate,
jsonDateReviver,
noopDateReplacer, isoDateReplacer, shortDateFor
} from '../../lib/date.mjs'

describe('lib/date', () => {
describe('.INTERVALS', () => {
Expand All @@ -24,20 +30,6 @@ describe('lib/date', () => {
ok(INTERVALS.has('1'))
})
})
describe('.daysBetween', () => {
it('is callable', () => {
equal(typeof daysBetween, 'function')
})
it('returns a number', () => {
equal(typeof daysBetween(new Date(), new Date()), 'number')
})
it('returns the number of days between two dates', () => {
equal(daysBetween(new Date('2021-09-26'), new Date('2021-09-27')), 1)
})
it('returns the number of days between two dates with iso8601 format', () => {
equal(daysBetween(new Date('2021-09-26T10:30:00.000Z'), new Date('2021-09-27T10:30:00.000Z')), 1)
})
})
describe('.parseInterval', () => {
it('is callable', () => {
equal(typeof parseInterval, 'function')
Expand Down Expand Up @@ -76,6 +68,32 @@ describe('lib/date', () => {
equal(parseInterval('2'), INTERVALS.get('2'))
})
})
describe('.daysBetween', () => {
it('is callable', () => {
equal(typeof daysBetween, 'function')
})
it('returns a number', () => {
equal(typeof daysBetween(new Date(), new Date()), 'number')
})
it('returns the number of days between two dates', () => {
equal(daysBetween(new Date('2021-09-26'), new Date('2021-09-27')), 1)
})
it('returns the number of days between two dates with iso8601 format', () => {
equal(daysBetween(new Date('2021-09-26T10:30:00.000Z'), new Date('2021-09-27T10:30:00.000Z')), 1)
})
})
describe('.utcDate', () => {
it('is callable', () => {
equal(typeof utcDate, 'function')
})
it('returns a Date', () => {
ok(utcDate() instanceof Date)
})
it('returns a Date that is adjusted to .getTimezoneOffset ', () => {
const now = new Date()
equal(utcDate(now).toJSON(), new Date(now.getTime() + (now.getTimezoneOffset() * 6e4)).toJSON())
})
})
describe('.jsonDateReviver', () => {
it('is callable', () => {
equal(typeof jsonDateReviver, 'function')
Expand All @@ -96,16 +114,43 @@ describe('lib/date', () => {
ok(jsonDateReviver(null, '2023/09/26 10:30:00', /^\d{4}\/\d{2}\/\d{2} \d{2}:\d{2}:\d{2}$/) instanceof Date) // Custom format regex
})
})
describe('.utcDate', () => {
describe('.noopDateReplacer', () => {
it('is callable', () => {
equal(typeof utcDate, 'function')
ok(noopDateReplacer instanceof Function)
})
it('returns a Date', () => {
ok(utcDate() instanceof Date)
it('returns without applying any replacement', () => {
deepEqual(noopDateReplacer('key', 'value'), 'value')
})
it('returns a Date that is adjusted to .getTimezoneOffset ', () => {
const now = new Date()
equal(utcDate(now).toJSON(), new Date(now.getTime() + (now.getTimezoneOffset() * 6e4)).toJSON())
})
describe('.isoDateReplacer', () => {
it('is callable', () => {
ok(isoDateReplacer instanceof Function)
})
it('shortens iso dates down to its date value', () => {
const aJsonDateString = new Date().toISOString()
equal(isoDateReplacer(null, aJsonDateString), aJsonDateString.slice(0, 10))
})
it('returns the original value otherwise', () => {
equal(isoDateReplacer(null, 42), 42)
})
it('is compatible with JSON.stringify', () => {
const aDate = new Date() // note: to my understanding .toJSON get's invoked prior calling the replacer function
equal(JSON.stringify(aDate, isoDateReplacer), JSON.stringify(aDate.toISOString().slice(0, 10)))
})
})
describe('.shortDateFor', () => {
it('is callable', () => {
ok(shortDateFor instanceof Function)
})
it('returns noopDateReplacer intervals that require hours minutes and seconds', () => {
equal(shortDateFor('1'), noopDateReplacer)
equal(shortDateFor('2h'), noopDateReplacer)
})
it('returns isoDateReplacer for date intervals that are daily timeframe and above', () => {
equal(shortDateFor('1d'), isoDateReplacer)
equal(shortDateFor('2w'), isoDateReplacer)
equal(shortDateFor('3m'), isoDateReplacer)
equal(shortDateFor('4y'), isoDateReplacer)
})
})
})
56 changes: 32 additions & 24 deletions test/lib/jsonl.mjs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { describe, it } from 'node:test'
import { ok, equal, deepEqual } from 'node:assert/strict'
import { throws, ok, equal, deepEqual } from 'node:assert/strict'
import { Duplex, Readable, Transform } from 'node:stream'

import { groupBy, parseJsonl, writeJsonl, writeJson, writeCsv } from '../../lib/jsonl.mjs'
import { parseJsonl, groupBy, writeJsonl, writeJson, writeCsv, writeXml } from '../../lib/jsonl.mjs'
import { EOL } from 'node:os'
import { fromAsync } from '../../lib/async.mjs'

Expand Down Expand Up @@ -70,6 +70,31 @@ describe('lib/jsonl', () => {
it.todo('handles object mode true -> and converts to objects')
it.todo('handles object mode false -> splits by line but keeps buffers|strings and does not parse nor rewrite dates')
})
describe('.groupBy', () => {
it('is callable', () => {
equal(typeof groupBy, 'function')
})
it('returns a stream.Transform', () => {
ok(groupBy() instanceof Transform)
})
it('groups by a given function', async () => {
const aByYearPredicate = ([date]) => date.getUTCFullYear()
const groups = groupBy(aByYearPredicate)

createReadableObjectsWith(
[new Date(Date.UTC(2022, 1, 1)), 'foo'],
[new Date(Date.UTC(2023, 1, 1)), 'bar'],
[new Date(Date.UTC(2023, 2, 1)), 'baz']
).pipe(groups)

const groupStreams = []
for await (const group of groups) {
groupStreams.push(group)
}

equal(groupStreams.length, 2)
})
})
describe('.writeJsonl', () => {
it('is callable', () => {
equal(typeof writeJsonl, 'function')
Expand Down Expand Up @@ -157,30 +182,13 @@ describe('lib/jsonl', () => {
deepEqual(await fromAsync(await csvLines), ['xyz' + EOL, 'bar' + EOL, 'baz' + EOL])
})
})
describe.todo('.writeXml')
describe('.groupBy', () => {

describe('.writeXml', () => {
it('is callable', () => {
equal(typeof groupBy, 'function')
})
it('returns a stream.Transform', () => {
ok(groupBy() instanceof Transform)
ok(writeXml instanceof Function)
})
it('groups by a given function', async () => {
const aByYearPredicate = ([date]) => date.getUTCFullYear()
const groups = groupBy(aByYearPredicate)

createReadableObjectsWith(
[new Date(Date.UTC(2022, 1, 1)), 'foo'],
[new Date(Date.UTC(2023, 1, 1)), 'bar'],
[new Date(Date.UTC(2023, 2, 1)), 'baz']
).pipe(groups)

const groupStreams = []
for await (const group of groups) {
groupStreams.push(group)
}

equal(groupStreams.length, 2)
it('returns a stream', () => {
throws(() => writeXml())
})
})
})

0 comments on commit db7e409

Please sign in to comment.