Skip to content

Commit

Permalink
feat: e2e testing and fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
jenspots committed Jul 20, 2024
1 parent 06940d4 commit 5eb55f2
Show file tree
Hide file tree
Showing 37 changed files with 636 additions and 245 deletions.
9 changes: 4 additions & 5 deletions processors/file-utils-ts/index.ttl
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,10 @@ rdfc:FileWriterTS
sh:path rdfc:outgoing ;
sh:class rdfc:Writer;
], [
sh:maxCount 1 ;
sh:minCount 1 ;
sh:name "path" ;
sh:path rdfc:path ;
sh:datatype xsd:string ;
sh:name "paths" ;
sh:path rdfc:paths ;
sh:nodeKind sh:IRIOrLiteral ;
] ;
] ;
].
Expand All @@ -76,7 +75,7 @@ rdfc:FileWriterTS
sh:minCount 1 ;
sh:name "path" ;
sh:path rdfc:path ;
sh:datatype xsd:string ;
sh:nodeKind sh:IRIOrLiteral ;
] ;
] ;
].
1 change: 1 addition & 0 deletions processors/file-utils-ts/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"keywords": [
"rdf-connect"
],
"type": "module",
"author": "Jens Pots",
"license": "MIT",
"bugs": {
Expand Down
35 changes: 29 additions & 6 deletions processors/file-utils-ts/src/FileReader.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,45 @@
import { Processor, Log } from "jvm-runner-ts";
import * as fs from "node:fs";

export default class FileWriter extends Processor {
export default class FileReader extends Processor {
private outgoing = this.args.get("outgoing", {
type: "writer",
list: "false",
nullable: "false",
});

private path = this.args.get("path", {
private paths = this.args.get("paths", {
type: "string",
list: "false",
list: "true",
nullable: "false",
});

async exec(): Promise<void> {
Log.shared.debug(() => `Reading file: ${this.path}`);
const data = fs.readFileSync(this.path);
this.outgoing.write(data);
for (const path of this.paths) {
await this.readFile(path);
}
this.outgoing.close();
}

async readFile(path: string) {
Log.shared.debug(() => `Reading file: ${path}`);

// Remove the file prefix, since that is not valid in Node.js.
if (path.startsWith("file://")) {
path = path.slice(7);
}

let data: Buffer;
try {
data = fs.readFileSync(path);
} catch (e) {
if (e instanceof Error) {
Log.shared.fatal(`Failed to read file: ${e.message}`);
} else {
Log.shared.fatal("Failed to read file");
}
}

this.outgoing.write(data!);
}
}
16 changes: 14 additions & 2 deletions processors/file-utils-ts/src/FileWriter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,21 @@ export default class FileWriter extends Processor {
});

async exec(): Promise<void> {
while (true) {
// Remove the file prefix, since that is not valid in Node.js.
if (this.path.startsWith("file://")) {
this.path = this.path.slice(7);
}

// Remove file.
try {
fs.unlinkSync(this.path);
} catch (e) {
Log.shared.debug(() => `Could not remove file: ${this.path}`);
}

while (!this.incoming.isClosed()) {
const data = await this.incoming.read();
Log.shared.debug(() => `Writing to ${this.path}: ${data.toString()}`);
Log.shared.debug(() => `Writing file: ${this.path}`);
fs.writeFileSync(this.path, data, { flag: "a" });
}
}
Expand Down
8 changes: 4 additions & 4 deletions processors/file-utils-ts/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
// "moduleDetection": "auto", /* Control what method is used to detect module-format JS files. */

/* Modules */
"module": "commonjs" /* Specify what module code is generated. */,
"module": "es2022" /* Specify what module code is generated. */,
"rootDir": "./src" /* Specify the root folder within your source files. */,
// "moduleResolution": "node10", /* Specify how TypeScript looks up a file from a given module specifier. */
"moduleResolution": "node" /* Specify how TypeScript looks up a file from a given module specifier. */,
// "baseUrl": "./", /* Specify the base directory to resolve non-relative module names. */
// "paths": {}, /* Specify a set of entries that re-map imports to additional lookup locations. */
// "rootDirs": [], /* Allow multiple folders to be treated as one when resolving modules. */
Expand Down Expand Up @@ -76,7 +76,7 @@
/* Interop Constraints */
// "isolatedModules": true, /* Ensure that each file can be safely transpiled without relying on other imports. */
// "verbatimModuleSyntax": true, /* Do not transform or elide any imports or exports not marked as type-only, ensuring they are written in the output file's format based on the 'module' setting. */
// "allowSyntheticDefaultImports": true, /* Allow 'import x from y' when a module doesn't have a default export. */
"allowSyntheticDefaultImports": true /* Allow 'import x from y' when a module doesn't have a default export. */,
"esModuleInterop": true /* Emit additional JavaScript to ease support for importing CommonJS modules. This enables 'allowSyntheticDefaultImports' for type compatibility. */,
// "preserveSymlinks": true, /* Disable resolving symlinks to their realpath. This correlates to the same flag in node. */
"forceConsistentCasingInFileNames": true /* Ensure that casing is correct in imports. */,
Expand Down Expand Up @@ -106,6 +106,6 @@
// "skipDefaultLibCheck": true, /* Skip type checking .d.ts files that are included with TypeScript. */
"skipLibCheck": true /* Skip type checking all .d.ts files. */
},
"include": ["src/**/*", "types/**/*.d.ts"],
"include": ["src/**/*"],
"exclude": ["node_modules"]
}
6 changes: 6 additions & 0 deletions processors/shacl-validator-ts/index.ttl
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ rdfc:SHACLValidatorTS
sh:path rdfc:arguments ;
sh:node [
sh:property [
sh:maxCount 1 ;
sh:minCount 1 ;
sh:name "shapes" ;
sh:path rdfc:shapes ;
sh:nodeKind sh:IRIOrLiteral ;
], [
sh:maxCount 1 ;
sh:minCount 1 ;
sh:name "incoming" ;
Expand Down
1 change: 1 addition & 0 deletions processors/shacl-validator-ts/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"keywords": [
"rdf-connect"
],
"type": "module",
"author": "Jens Pots",
"license": "MIT",
"bugs": {
Expand Down
28 changes: 18 additions & 10 deletions processors/shacl-validator-ts/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import { Arguments, Processor } from "jvm-runner-ts";
import { Arguments, Log, Processor } from "jvm-runner-ts";
import rdf, { PrefixMapFactory } from "rdf-ext";
import Serializer from "@rdfjs/serializer-turtle";
import formatsPretty from "@rdfjs/formats/pretty.js";
import { Validator } from "shacl-engine";
import { Readable } from "stream";

export default class SHACLValidator extends Processor {
private incoming = this.args.get("input", {
private incoming = this.args.get("incoming", {
type: "reader",
list: "false",
nullable: "false",
Expand All @@ -24,7 +24,7 @@ export default class SHACLValidator extends Processor {
nullable: "true",
});

private path = this.args.get("path", {
private path = this.args.get("shapes", {
type: "string",
list: "false",
nullable: "false",
Expand Down Expand Up @@ -66,42 +66,50 @@ export default class SHACLValidator extends Processor {
// Create a new validator.
const res = await rdf.fetch(this.path);
if (!res.ok) {
throw Error("Could not parse SHACL path.");
Log.shared.fatal("Could not fetch SHACL file.");
}

const shapes = await res.dataset().catch(() => {
throw Error("Could not parse SHACL file.");
// Read the shapes file.
const shapes = await res.dataset().catch((e) => {
Log.shared.fatal(`Could not parse SHACL file: ${e}`);
});

// Parse input stream using shape stream.
// @ts-expect-error Factory is valid.
const validator = new Validator(shapes, { factory: rdf });

// eslint-ignore no-constant-condition
while (true) {
// Parse data into a dataset.
while (!this.incoming.isClosed()) {
// Convert incoming data to a quad stream.
const data = await this.incoming.read();
const rawStream = Readable.from(data);
const quadStream = parser.import(rawStream);

// Create a new dataset.
const dataset = await rdf
.dataset()
.import(quadStream)
.catch(() => {
throw new Error("The incoming data could not be parsed");
Log.shared.fatal("The incoming data could not be parsed");
});

// Run through validator.
const result = await validator.validate({ dataset });

// Pass through data if valid.
if (result.conforms) {
Log.shared.debug("Validation passed.");
this.outgoing.write(data);
} else if (this.validationIsFatal) {
throw new Error("Validation failed");
Log.shared.fatal("Validation failed and is fatal.");
} else if (this.report) {
Log.shared.debug("Validation failed.");
const resultRaw = serializer.transform(result.dataset);
this.report.write(new TextEncoder().encode(resultRaw));
}
}

this.outgoing.close();
this.report?.close();
}
}
6 changes: 3 additions & 3 deletions processors/shacl-validator-ts/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
// "moduleDetection": "auto", /* Control what method is used to detect module-format JS files. */

/* Modules */
"module": "commonjs" /* Specify what module code is generated. */,
"module": "es2022" /* Specify what module code is generated. */,
"rootDir": "./src" /* Specify the root folder within your source files. */,
// "moduleResolution": "node10", /* Specify how TypeScript looks up a file from a given module specifier. */
"moduleResolution": "node" /* Specify how TypeScript looks up a file from a given module specifier. */,
// "baseUrl": "./", /* Specify the base directory to resolve non-relative module names. */
// "paths": {}, /* Specify a set of entries that re-map imports to additional lookup locations. */
// "rootDirs": [], /* Allow multiple folders to be treated as one when resolving modules. */
Expand Down Expand Up @@ -76,7 +76,7 @@
/* Interop Constraints */
// "isolatedModules": true, /* Ensure that each file can be safely transpiled without relying on other imports. */
// "verbatimModuleSyntax": true, /* Do not transform or elide any imports or exports not marked as type-only, ensuring they are written in the output file's format based on the 'module' setting. */
// "allowSyntheticDefaultImports": true, /* Allow 'import x from y' when a module doesn't have a default export. */
"allowSyntheticDefaultImports": true /* Allow 'import x from y' when a module doesn't have a default export. */,
"esModuleInterop": true /* Emit additional JavaScript to ease support for importing CommonJS modules. This enables 'allowSyntheticDefaultImports' for type compatibility. */,
// "preserveSymlinks": true, /* Disable resolving symlinks to their realpath. This correlates to the same flag in node. */
"forceConsistentCasingInFileNames": true /* Ensure that casing is correct in imports. */,
Expand Down
1 change: 1 addition & 0 deletions runners/nodejs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
"test": "vitest run --coverage --coverage.include src",
"format": "eslint --fix . && prettier --write ."
},
"type": "module",
"main": "./build/index.js",
"types": "./build/index.d.ts",
"files": [
Expand Down
4 changes: 4 additions & 0 deletions runners/nodejs/src/error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,8 @@ export class RunnerError extends Error {
static unexpectedBehaviour(): never {
throw new RunnerError("Unexpected behaviour");
}

static stageError(message: string): never {
throw new RunnerError(message);
}
}
75 changes: 75 additions & 0 deletions runners/nodejs/src/interfaces/buffered_callback_channel.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import { Writer } from "./writer";
import { RunnerError } from "../error";

/**
* A buffered callback channel is a simple implementation of a writer that calls
* a callback whenever a value is written to it. The class does therefore not
* implement the `Reader` interface, as it is not possible to read from the
* channel.
*
* The class buffers all values that are written to the channel before
* the callback is set. Once the callback is set, all buffered values are
* written to the callback.
*
* Note that the callback cannot be overwritten once it is set, and if the
* channel is closed before a callback is set, an error is thrown.
*/
export class BufferedCallbackChannel<T> implements Writer<T> {
/**
* The buffer that stores the values written to the channel as long as there
* is no callback set.
* @private
*/
private buffer: Array<T> = [];

/**
* The callback that is called whenever a value is written to the channel. If
* it is not set, the values are buffered in the `buffer` array.
* @private
*/
private callback: null | ((value: T) => void | Promise<void>) = null;

/**
* Whether the channel has been closed or not.
* @private
*/
private closed = false;

close(): void {
// The channel was closed before a callback was set, which results in a loss of data.
if (this.callback === null) {
RunnerError.channelError();
}

this.closed = true;
}

isClosed(): boolean {
return this.closed;
}

write(data: T): void {
if (this.callback === null) {
this.buffer.push(data);
} else {
this.callback(data);
}
}

/**
* Set the callback that is called whenever a value is written to the
* channel. All values that were written to the channel before the callback
* was set are written to the callback as well.
* @param callback The callback to call whenever a value is written to the
*/
setCallback(callback: (value: T) => void | Promise<void>): void {
// The callback cannot be overwritten.
if (this.callback != null) {
RunnerError.channelError();
}

this.callback = callback;
this.buffer.forEach((value) => callback(value));
this.buffer = [];
}
}
Loading

0 comments on commit 5eb55f2

Please sign in to comment.