diff --git a/src/lib/File.class.ts b/src/lib/File.class.ts index 2799107..14e7158 100644 --- a/src/lib/File.class.ts +++ b/src/lib/File.class.ts @@ -1,10 +1,11 @@ -import { type WriteStream, createWriteStream, existsSync, statSync } from 'fs' +import { type WriteStream, createWriteStream, existsSync, statSync, mkdirSync } from 'fs' import { isFile, isFilePathString } from '../utils/guards.js' +import { dirname } from 'path' export default class File { public static $id = 'File' private readonly $isValid?: boolean - public constructor(private $path: string) {} + public constructor(private $path: string, private readonly skipExistsCheck: boolean = false) {} public validate(): File { if (this.$isValid !== undefined) return this @@ -13,7 +14,7 @@ export default class File { throw new Error(`The filename \`${wrongFilePath}\` should start with \`file://\``) } this.$path = this.$path.replace(/^file:\/\//, '') - if (!existsSync(this.$path) || !statSync(this.$path).isFile()) { + if (!this.skipExistsCheck && (!existsSync(this.$path) || !statSync(this.$path).isFile())) { throw new Error(`File not found: \`${this.$path}\``) } return this @@ -23,11 +24,14 @@ export default class File { return this.$path } - public getStream(): WriteStream { + public getStream(append: boolean = false): WriteStream { if (existsSync(this.$path)) { // throw new Error(`File already exists: \`${this.$path}\``) } - return createWriteStream(this.$path) + if (!existsSync(dirname(this.$path))) { + mkdirSync(dirname(this.$path), { recursive: true}) + } + return createWriteStream(this.$path, append ? {flags: 'a'} : {}) } public toString(): string { diff --git a/src/lib/LDWorkbenchConfiguration.d.ts b/src/lib/LDWorkbenchConfiguration.d.ts index fb58e03..a0623b9 100644 --- a/src/lib/LDWorkbenchConfiguration.d.ts +++ b/src/lib/LDWorkbenchConfiguration.d.ts @@ -21,7 +21,7 @@ export interface LDWorkbenchConfiguration { /** * The file where the final result of your pipeline is saved. */ - destination: string; + destination?: string; /** * This is where you define the individual iterator/generator for each step. * diff --git a/src/lib/Pipeline.class.ts b/src/lib/Pipeline.class.ts index 0907ff6..d4d91cb 100644 --- a/src/lib/Pipeline.class.ts +++ b/src/lib/Pipeline.class.ts @@ -4,8 +4,10 @@ import type { LDWorkbenchConfiguration } from "./LDWorkbenchConfiguration.js"; import chalk from "chalk"; import Stage from "./Stage.class.js"; import duration from "../utils/duration.js"; +import File from './File.class.js' import path from "node:path"; import * as fs from "node:fs"; +import { isFilePathString } from '../utils/guards.js'; class Pipeline { public readonly stages = new Map(); @@ -13,6 +15,7 @@ class Pipeline { private $isValidated: boolean = false; private stageNames: string[] = []; private now = new Date(); + private readonly destination: File public constructor( private readonly $configuration: LDWorkbenchConfiguration @@ -20,6 +23,14 @@ class Pipeline { // create data folder: this.dataDir = path.join("data", kebabcase(this.$configuration.name)); fs.mkdirSync(this.dataDir, { recursive: true }); + const destinationFile = this.configuration.destination ?? `file://${path.join(this.dataDir, 'statements.nt')}` + if (!isFilePathString(destinationFile)) { + throw new Error('We currently only allow publishing data to local files.') + } + if(!destinationFile.endsWith('.nt')) { + throw new Error('We currently only writing results in N-Triples format,\nmake sure your destination filename ends with \'.nt\'.') + } + this.destination = new File(destinationFile, true) } private error(e: Error, stage?: string): void { @@ -142,7 +153,7 @@ class Pipeline { if (this.stageNames.length !== 0) { this.runRecursive(); } else { - this.concat() + this.writeResult() console.info( chalk.green( `✔ your pipeline "${chalk.bold( @@ -159,10 +170,14 @@ class Pipeline { } } - private concat(): void { + private writeResult(): void { const spinner = ora("Combining statements from all stages:").start(); - const destinationPath = path.join(this.dataDir, 'statements.nt') - const destinationStream = fs.createWriteStream(destinationPath, {flags:'a'}) + + const destinationPathNew = this.configuration.destination + if (!isFilePathString(destinationPathNew)) { + throw new Error('We currently only allow publishing data to local files.') + } + const destinationStream = this.destination.getStream() const stageNames = Array.from(this.stages.keys()) for (const stageName of stageNames) { spinner.suffixText = chalk.bold(stageName) @@ -171,7 +186,7 @@ class Pipeline { destinationStream.write(buffer) }) } - spinner.suffixText = chalk.bold(destinationPath) + spinner.suffixText = chalk.bold(this.destination.toString()) spinner.succeed() } diff --git a/static/example/config.yml b/static/example/config.yml index 70eb982..748587c 100644 --- a/static/example/config.yml +++ b/static/example/config.yml @@ -1,6 +1,5 @@ # Metadata for your pipeline: name: Example Pipeline -destination: file://data/example/example.ttl description: > This is an example pipeline. It uses files that are available in this repository and SPARQL endpoints that should work. @@ -8,7 +7,7 @@ description: > # The individual stages for your pipeline stages: - name: "Stage 1" - iterator: + iterator: query: file://static/example/iterator-stage-1.rq endpoint: https://api.triplydb.com/datasets/Triply/iris/services/demo-service/sparql generator: diff --git a/static/ld-workbench.schema.json b/static/ld-workbench.schema.json index 79c4772..6f813de 100644 --- a/static/ld-workbench.schema.json +++ b/static/ld-workbench.schema.json @@ -77,5 +77,5 @@ } } }, - "required": ["name", "destination", "stages"] + "required": ["name", "stages"] } \ No newline at end of file