diff --git a/processors/file-utils-ts/index.ttl b/processors/file-utils-ts/index.ttl index 9eb1963..1918211 100644 --- a/processors/file-utils-ts/index.ttl +++ b/processors/file-utils-ts/index.ttl @@ -45,11 +45,10 @@ rdfc:FileWriterTS sh:path rdfc:outgoing ; sh:class rdfc:Writer; ], [ - sh:maxCount 1 ; sh:minCount 1 ; - sh:name "path" ; - sh:path rdfc:path ; - sh:datatype xsd:string ; + sh:name "paths" ; + sh:path rdfc:paths ; + sh:nodeKind sh:IRIOrLiteral ; ] ; ] ; ]. @@ -76,7 +75,7 @@ rdfc:FileWriterTS sh:minCount 1 ; sh:name "path" ; sh:path rdfc:path ; - sh:datatype xsd:string ; + sh:nodeKind sh:IRIOrLiteral ; ] ; ] ; ]. diff --git a/processors/file-utils-ts/package.json b/processors/file-utils-ts/package.json index 0bfb49d..095320e 100644 --- a/processors/file-utils-ts/package.json +++ b/processors/file-utils-ts/package.json @@ -15,6 +15,7 @@ "keywords": [ "rdf-connect" ], + "type": "module", "author": "Jens Pots", "license": "MIT", "bugs": { diff --git a/processors/file-utils-ts/src/FileReader.ts b/processors/file-utils-ts/src/FileReader.ts index e849d1c..9a57103 100644 --- a/processors/file-utils-ts/src/FileReader.ts +++ b/processors/file-utils-ts/src/FileReader.ts @@ -1,22 +1,45 @@ import { Processor, Log } from "jvm-runner-ts"; import * as fs from "node:fs"; -export default class FileWriter extends Processor { +export default class FileReader extends Processor { private outgoing = this.args.get("outgoing", { type: "writer", list: "false", nullable: "false", }); - private path = this.args.get("path", { + private paths = this.args.get("paths", { type: "string", - list: "false", + list: "true", nullable: "false", }); async exec(): Promise { - Log.shared.debug(() => `Reading file: ${this.path}`); - const data = fs.readFileSync(this.path); - this.outgoing.write(data); + for (const path of this.paths) { + await this.readFile(path); + } + this.outgoing.close(); + } + + async readFile(path: string) { + Log.shared.debug(() => `Reading file: ${path}`); + + // Remove the file prefix, since that is not valid in Node.js. + if (path.startsWith("file://")) { + path = path.slice(7); + } + + let data: Buffer; + try { + data = fs.readFileSync(path); + } catch (e) { + if (e instanceof Error) { + Log.shared.fatal(`Failed to read file: ${e.message}`); + } else { + Log.shared.fatal("Failed to read file"); + } + } + + this.outgoing.write(data!); } } diff --git a/processors/file-utils-ts/src/FileWriter.ts b/processors/file-utils-ts/src/FileWriter.ts index 17699e4..e814c83 100644 --- a/processors/file-utils-ts/src/FileWriter.ts +++ b/processors/file-utils-ts/src/FileWriter.ts @@ -15,9 +15,21 @@ export default class FileWriter extends Processor { }); async exec(): Promise { - while (true) { + // Remove the file prefix, since that is not valid in Node.js. + if (this.path.startsWith("file://")) { + this.path = this.path.slice(7); + } + + // Remove file. + try { + fs.unlinkSync(this.path); + } catch (e) { + Log.shared.debug(() => `Could not remove file: ${this.path}`); + } + + while (!this.incoming.isClosed()) { const data = await this.incoming.read(); - Log.shared.debug(() => `Writing to ${this.path}: ${data.toString()}`); + Log.shared.debug(() => `Writing file: ${this.path}`); fs.writeFileSync(this.path, data, { flag: "a" }); } } diff --git a/processors/file-utils-ts/tsconfig.json b/processors/file-utils-ts/tsconfig.json index f2ee90d..36b3afc 100644 --- a/processors/file-utils-ts/tsconfig.json +++ b/processors/file-utils-ts/tsconfig.json @@ -25,9 +25,9 @@ // "moduleDetection": "auto", /* Control what method is used to detect module-format JS files. */ /* Modules */ - "module": "commonjs" /* Specify what module code is generated. */, + "module": "es2022" /* Specify what module code is generated. */, "rootDir": "./src" /* Specify the root folder within your source files. */, - // "moduleResolution": "node10", /* Specify how TypeScript looks up a file from a given module specifier. */ + "moduleResolution": "node" /* Specify how TypeScript looks up a file from a given module specifier. */, // "baseUrl": "./", /* Specify the base directory to resolve non-relative module names. */ // "paths": {}, /* Specify a set of entries that re-map imports to additional lookup locations. */ // "rootDirs": [], /* Allow multiple folders to be treated as one when resolving modules. */ @@ -76,7 +76,7 @@ /* Interop Constraints */ // "isolatedModules": true, /* Ensure that each file can be safely transpiled without relying on other imports. */ // "verbatimModuleSyntax": true, /* Do not transform or elide any imports or exports not marked as type-only, ensuring they are written in the output file's format based on the 'module' setting. */ - // "allowSyntheticDefaultImports": true, /* Allow 'import x from y' when a module doesn't have a default export. */ + "allowSyntheticDefaultImports": true /* Allow 'import x from y' when a module doesn't have a default export. */, "esModuleInterop": true /* Emit additional JavaScript to ease support for importing CommonJS modules. This enables 'allowSyntheticDefaultImports' for type compatibility. */, // "preserveSymlinks": true, /* Disable resolving symlinks to their realpath. This correlates to the same flag in node. */ "forceConsistentCasingInFileNames": true /* Ensure that casing is correct in imports. */, @@ -106,6 +106,6 @@ // "skipDefaultLibCheck": true, /* Skip type checking .d.ts files that are included with TypeScript. */ "skipLibCheck": true /* Skip type checking all .d.ts files. */ }, - "include": ["src/**/*", "types/**/*.d.ts"], + "include": ["src/**/*"], "exclude": ["node_modules"] } diff --git a/processors/shacl-validator-ts/index.ttl b/processors/shacl-validator-ts/index.ttl index 8b6a969..748431c 100644 --- a/processors/shacl-validator-ts/index.ttl +++ b/processors/shacl-validator-ts/index.ttl @@ -32,6 +32,12 @@ rdfc:SHACLValidatorTS sh:path rdfc:arguments ; sh:node [ sh:property [ + sh:maxCount 1 ; + sh:minCount 1 ; + sh:name "shapes" ; + sh:path rdfc:shapes ; + sh:nodeKind sh:IRIOrLiteral ; + ], [ sh:maxCount 1 ; sh:minCount 1 ; sh:name "incoming" ; diff --git a/processors/shacl-validator-ts/package.json b/processors/shacl-validator-ts/package.json index 0bfb49d..095320e 100644 --- a/processors/shacl-validator-ts/package.json +++ b/processors/shacl-validator-ts/package.json @@ -15,6 +15,7 @@ "keywords": [ "rdf-connect" ], + "type": "module", "author": "Jens Pots", "license": "MIT", "bugs": { diff --git a/processors/shacl-validator-ts/src/index.ts b/processors/shacl-validator-ts/src/index.ts index b2e5a55..ff78a00 100644 --- a/processors/shacl-validator-ts/src/index.ts +++ b/processors/shacl-validator-ts/src/index.ts @@ -1,4 +1,4 @@ -import { Arguments, Processor } from "jvm-runner-ts"; +import { Arguments, Log, Processor } from "jvm-runner-ts"; import rdf, { PrefixMapFactory } from "rdf-ext"; import Serializer from "@rdfjs/serializer-turtle"; import formatsPretty from "@rdfjs/formats/pretty.js"; @@ -6,7 +6,7 @@ import { Validator } from "shacl-engine"; import { Readable } from "stream"; export default class SHACLValidator extends Processor { - private incoming = this.args.get("input", { + private incoming = this.args.get("incoming", { type: "reader", list: "false", nullable: "false", @@ -24,7 +24,7 @@ export default class SHACLValidator extends Processor { nullable: "true", }); - private path = this.args.get("path", { + private path = this.args.get("shapes", { type: "string", list: "false", nullable: "false", @@ -66,11 +66,12 @@ export default class SHACLValidator extends Processor { // Create a new validator. const res = await rdf.fetch(this.path); if (!res.ok) { - throw Error("Could not parse SHACL path."); + Log.shared.fatal("Could not fetch SHACL file."); } - const shapes = await res.dataset().catch(() => { - throw Error("Could not parse SHACL file."); + // Read the shapes file. + const shapes = await res.dataset().catch((e) => { + Log.shared.fatal(`Could not parse SHACL file: ${e}`); }); // Parse input stream using shape stream. @@ -78,16 +79,18 @@ export default class SHACLValidator extends Processor { const validator = new Validator(shapes, { factory: rdf }); // eslint-ignore no-constant-condition - while (true) { - // Parse data into a dataset. + while (!this.incoming.isClosed()) { + // Convert incoming data to a quad stream. const data = await this.incoming.read(); const rawStream = Readable.from(data); const quadStream = parser.import(rawStream); + + // Create a new dataset. const dataset = await rdf .dataset() .import(quadStream) .catch(() => { - throw new Error("The incoming data could not be parsed"); + Log.shared.fatal("The incoming data could not be parsed"); }); // Run through validator. @@ -95,13 +98,18 @@ export default class SHACLValidator extends Processor { // Pass through data if valid. if (result.conforms) { + Log.shared.debug("Validation passed."); this.outgoing.write(data); } else if (this.validationIsFatal) { - throw new Error("Validation failed"); + Log.shared.fatal("Validation failed and is fatal."); } else if (this.report) { + Log.shared.debug("Validation failed."); const resultRaw = serializer.transform(result.dataset); this.report.write(new TextEncoder().encode(resultRaw)); } } + + this.outgoing.close(); + this.report?.close(); } } diff --git a/processors/shacl-validator-ts/tsconfig.json b/processors/shacl-validator-ts/tsconfig.json index f2ee90d..5d0b0de 100644 --- a/processors/shacl-validator-ts/tsconfig.json +++ b/processors/shacl-validator-ts/tsconfig.json @@ -25,9 +25,9 @@ // "moduleDetection": "auto", /* Control what method is used to detect module-format JS files. */ /* Modules */ - "module": "commonjs" /* Specify what module code is generated. */, + "module": "es2022" /* Specify what module code is generated. */, "rootDir": "./src" /* Specify the root folder within your source files. */, - // "moduleResolution": "node10", /* Specify how TypeScript looks up a file from a given module specifier. */ + "moduleResolution": "node" /* Specify how TypeScript looks up a file from a given module specifier. */, // "baseUrl": "./", /* Specify the base directory to resolve non-relative module names. */ // "paths": {}, /* Specify a set of entries that re-map imports to additional lookup locations. */ // "rootDirs": [], /* Allow multiple folders to be treated as one when resolving modules. */ @@ -76,7 +76,7 @@ /* Interop Constraints */ // "isolatedModules": true, /* Ensure that each file can be safely transpiled without relying on other imports. */ // "verbatimModuleSyntax": true, /* Do not transform or elide any imports or exports not marked as type-only, ensuring they are written in the output file's format based on the 'module' setting. */ - // "allowSyntheticDefaultImports": true, /* Allow 'import x from y' when a module doesn't have a default export. */ + "allowSyntheticDefaultImports": true /* Allow 'import x from y' when a module doesn't have a default export. */, "esModuleInterop": true /* Emit additional JavaScript to ease support for importing CommonJS modules. This enables 'allowSyntheticDefaultImports' for type compatibility. */, // "preserveSymlinks": true, /* Disable resolving symlinks to their realpath. This correlates to the same flag in node. */ "forceConsistentCasingInFileNames": true /* Ensure that casing is correct in imports. */, diff --git a/runners/nodejs/package.json b/runners/nodejs/package.json index e45e941..72d1d11 100644 --- a/runners/nodejs/package.json +++ b/runners/nodejs/package.json @@ -8,6 +8,7 @@ "test": "vitest run --coverage --coverage.include src", "format": "eslint --fix . && prettier --write ." }, + "type": "module", "main": "./build/index.js", "types": "./build/index.d.ts", "files": [ diff --git a/runners/nodejs/src/error.ts b/runners/nodejs/src/error.ts index 152faee..23dac10 100644 --- a/runners/nodejs/src/error.ts +++ b/runners/nodejs/src/error.ts @@ -43,4 +43,8 @@ export class RunnerError extends Error { static unexpectedBehaviour(): never { throw new RunnerError("Unexpected behaviour"); } + + static stageError(message: string): never { + throw new RunnerError(message); + } } diff --git a/runners/nodejs/src/interfaces/buffered_callback_channel.ts b/runners/nodejs/src/interfaces/buffered_callback_channel.ts new file mode 100644 index 0000000..82857eb --- /dev/null +++ b/runners/nodejs/src/interfaces/buffered_callback_channel.ts @@ -0,0 +1,75 @@ +import { Writer } from "./writer"; +import { RunnerError } from "../error"; + +/** + * A buffered callback channel is a simple implementation of a writer that calls + * a callback whenever a value is written to it. The class does therefore not + * implement the `Reader` interface, as it is not possible to read from the + * channel. + * + * The class buffers all values that are written to the channel before + * the callback is set. Once the callback is set, all buffered values are + * written to the callback. + * + * Note that the callback cannot be overwritten once it is set, and if the + * channel is closed before a callback is set, an error is thrown. + */ +export class BufferedCallbackChannel implements Writer { + /** + * The buffer that stores the values written to the channel as long as there + * is no callback set. + * @private + */ + private buffer: Array = []; + + /** + * The callback that is called whenever a value is written to the channel. If + * it is not set, the values are buffered in the `buffer` array. + * @private + */ + private callback: null | ((value: T) => void | Promise) = null; + + /** + * Whether the channel has been closed or not. + * @private + */ + private closed = false; + + close(): void { + // The channel was closed before a callback was set, which results in a loss of data. + if (this.callback === null) { + RunnerError.channelError(); + } + + this.closed = true; + } + + isClosed(): boolean { + return this.closed; + } + + write(data: T): void { + if (this.callback === null) { + this.buffer.push(data); + } else { + this.callback(data); + } + } + + /** + * Set the callback that is called whenever a value is written to the + * channel. All values that were written to the channel before the callback + * was set are written to the callback as well. + * @param callback The callback to call whenever a value is written to the + */ + setCallback(callback: (value: T) => void | Promise): void { + // The callback cannot be overwritten. + if (this.callback != null) { + RunnerError.channelError(); + } + + this.callback = callback; + this.buffer.forEach((value) => callback(value)); + this.buffer = []; + } +} diff --git a/runners/nodejs/src/interfaces/callback_channel.ts b/runners/nodejs/src/interfaces/callback_channel.ts new file mode 100644 index 0000000..661c8d0 --- /dev/null +++ b/runners/nodejs/src/interfaces/callback_channel.ts @@ -0,0 +1,47 @@ +import { Writer } from "./writer"; +import { RunnerError } from "../error"; + +/** + * A callback channel is a simple implementation of a writer that calls a + * callback whenever a value is written to it. The class does therefore + * not implement the `Reader` interface, as it is not possible to read from + * the channel. + */ +export class CallbackChannel implements Writer { + /** + * The callback that is called whenever a value is written to the channel. + * @private + */ + private readonly callback: (value: T) => void | Promise; + + /** + * Whether the channel has been closed or not. + * @private + */ + private closed = false; + + /** + * Create a new callback channel with a specific callback. + * @param callback The callback to call whenever a value is written to the + * channel. + */ + constructor(callback: (value: T) => void | Promise) { + this.callback = callback; + } + + close(): void { + this.closed = true; + } + + isClosed(): boolean { + return false; + } + + write(data: T): void { + if (!this.closed) { + this.callback(data); + } else { + RunnerError.channelError(); + } + } +} diff --git a/runners/nodejs/src/interfaces/channel.ts b/runners/nodejs/src/interfaces/channel.ts new file mode 100644 index 0000000..6e8a210 --- /dev/null +++ b/runners/nodejs/src/interfaces/channel.ts @@ -0,0 +1,80 @@ +import { RunnerError } from "../error"; +import { Reader } from "./reader"; +import { Writer } from "./writer"; + +/** + * A channel is a communication mechanism that allows for the transfer of values + * between two endpoints. This is a simple implementation of a channel that + * allows for the writing and reading of values. + */ +export class Channel implements Reader, Writer { + /** + * The values stored in the channel, which are buffered here if there is no + * reader to consume them. + * @private + */ + private readonly values: Array = []; + + /** + * Outstanding reads that are waiting for a value to be written to the channel. + * @private + */ + private readonly reads: Array<(value: T) => void> = []; + + /** + * Whether the channel has been closed or not. + * @private + */ + private closed: boolean = false; + + /** + * Write a value to the channel. If there is a reader waiting for a value, it + * will be resolved immediately. Otherwise, the value will be buffered. + * @param value The value to write to the channel. + */ + write(value: T) { + if (this.closed) { + RunnerError.channelError(); + } + + const read = this.reads.shift(); + + if (read) { + read(value); + } else { + this.values.push(value); + } + } + + /** + * Read a value from the channel. If there is a value buffered, it will be + * resolved immediately. Otherwise, the read will be buffered. + */ + read(): Promise { + return new Promise((resolve, reject) => { + const result = this.values.shift(); + + if (result) { + resolve(result); + } else if (this.closed) { + reject(RunnerError.channelError()); + } else { + this.reads.push(resolve); + } + }); + } + + /** + * Close the channel, preventing any further writes from occurring. + */ + close() { + this.closed = true; + } + + /** + * Check if the channel is closed or not. + */ + isClosed(): boolean { + return this.closed; + } +} diff --git a/runners/nodejs/src/interfaces/log.ts b/runners/nodejs/src/interfaces/log.ts index 9898867..088c5c6 100644 --- a/runners/nodejs/src/interfaces/log.ts +++ b/runners/nodejs/src/interfaces/log.ts @@ -1,5 +1,6 @@ import { Subject } from "rxjs"; import { LogEntry, LogLevel } from "../proto"; +import { RunnerError } from "../error"; /** * Simple wrapper class which exposes an observable to which log messages are @@ -22,6 +23,13 @@ export class Log { this.push({ level: LogLevel.INFO, message }); } + /** + * Write a fatal message to the log stream and do not return. + */ + fatal(message: string): never { + throw RunnerError.stageError(message); + } + /** * Write a message to the log stream with the DEBUG level. * @param message The message to write, either as literal or a function @@ -42,14 +50,6 @@ export class Log { this.push({ level: LogLevel.SEVERE, message }); } - /** - * Write a fatal message, indicating that the program must halt afterward. - * @param message The message to write. - */ - fatal(message: string): void { - this.push({ level: LogLevel.FATAL, message }); - } - /** * Add a new subscriber to the log stream. * @param next The callback to call when a new log message is pushed. diff --git a/runners/nodejs/src/interfaces/processor.ts b/runners/nodejs/src/interfaces/processor.ts index 6b4fdaa..b36780a 100644 --- a/runners/nodejs/src/interfaces/processor.ts +++ b/runners/nodejs/src/interfaces/processor.ts @@ -11,7 +11,7 @@ export abstract class Processor { } /* The actual implementation of the processor must be overridden here. */ - public exec(): void { + public async exec(): Promise { throw RunnerError.missingImplementation(); } } diff --git a/runners/nodejs/src/interfaces/reader.ts b/runners/nodejs/src/interfaces/reader.ts index a921966..840776f 100644 --- a/runners/nodejs/src/interfaces/reader.ts +++ b/runners/nodejs/src/interfaces/reader.ts @@ -1,26 +1,5 @@ -import { firstValueFrom, Observable } from "rxjs"; -import { RunnerError } from "../error"; -import { Log } from "./log"; - -export class Reader { - private channel: Observable; - - constructor(channel: Observable) { - this.channel = channel; - } - - async read(): Promise { - try { - const result = await firstValueFrom(this.channel.pipe()); - - Log.shared.debug(() => { - const serialized = result.toString().replace("\n", "\\n"); - return `[unknown] -> '${serialized}`; - }); - - return result; - } catch (error) { - throw RunnerError.channelError(); - } - } +export interface Reader { + read(): Promise; + isClosed(): boolean; + close(): void; } diff --git a/runners/nodejs/src/interfaces/writer.ts b/runners/nodejs/src/interfaces/writer.ts index fd775d8..ebf1172 100644 --- a/runners/nodejs/src/interfaces/writer.ts +++ b/runners/nodejs/src/interfaces/writer.ts @@ -1,19 +1,5 @@ -import { Observer } from "rxjs"; -import { Log } from "./log"; - -export class Writer { - private channel: Observer; - - constructor(channel: Observer) { - this.channel = channel; - } - - write(bytes: Uint8Array): void { - Log.shared.debug(() => { - const serialized = bytes.toString().replace("\n", "\\n"); - return `'${serialized}' -> [unknown]`; - }); - - this.channel.next(bytes); - } +export interface Writer { + write(data: T): void; + isClosed(): boolean; + close(): void; } diff --git a/runners/nodejs/src/proto/empty.ts b/runners/nodejs/src/proto/empty.ts index 0c6be95..81782ae 100644 --- a/runners/nodejs/src/proto/empty.ts +++ b/runners/nodejs/src/proto/empty.ts @@ -5,7 +5,7 @@ // source: empty.proto /* eslint-disable */ -import * as _m0 from "protobufjs/minimal"; +import _m0 from "protobufjs/minimal.js"; export const protobufPackage = ""; diff --git a/runners/nodejs/src/proto/index.ts b/runners/nodejs/src/proto/index.ts index 5543373..d17073c 100644 --- a/runners/nodejs/src/proto/index.ts +++ b/runners/nodejs/src/proto/index.ts @@ -21,9 +21,9 @@ import { type ServiceError, type UntypedServiceImplementation, } from "@grpc/grpc-js"; -import * as _m0 from "protobufjs/minimal"; -import { Empty } from "./empty"; -import { IRStage } from "./intermediate"; +import _m0 from "protobufjs/minimal.js"; +import { Empty } from "./empty.js"; +import { IRStage } from "./intermediate.js"; export const protobufPackage = ""; diff --git a/runners/nodejs/src/proto/intermediate.ts b/runners/nodejs/src/proto/intermediate.ts index ac802a3..b20fdc1 100644 --- a/runners/nodejs/src/proto/intermediate.ts +++ b/runners/nodejs/src/proto/intermediate.ts @@ -5,7 +5,7 @@ // source: intermediate.proto /* eslint-disable */ -import * as _m0 from "protobufjs/minimal"; +import _m0 from "protobufjs/minimal.js"; export const protobufPackage = ""; diff --git a/runners/nodejs/src/runtime/arguments.ts b/runners/nodejs/src/runtime/arguments.ts index 509989d..b27be27 100644 --- a/runners/nodejs/src/runtime/arguments.ts +++ b/runners/nodejs/src/runtime/arguments.ts @@ -15,12 +15,15 @@ import { Writer } from "../interfaces/writer"; import { Reader } from "../interfaces/reader"; import { RunnerError } from "../error"; +import { Channel } from "../interfaces/channel"; +import { CallbackChannel } from "../interfaces/callback_channel"; +import { BufferedCallbackChannel } from "../interfaces/buffered_callback_channel"; /** * Argument types supported by RDF-Connect. These are enumerated as strings, in * order to support usage at runtime through literals. */ -type Type = +export type Type = | "boolean" | "byte" | "date" @@ -53,9 +56,9 @@ type GetType = T extends "boolean" : T extends "string" ? string : T extends "writer" - ? Writer + ? Writer : T extends "reader" - ? Reader + ? Reader : T extends "map" ? Arguments : never; @@ -84,9 +87,13 @@ function conforms(value: unknown, type: Type): boolean { case "string": return typeof value === "string"; case "writer": - return value instanceof Writer; + return ( + value instanceof Channel || + value instanceof CallbackChannel || + value instanceof BufferedCallbackChannel + ); case "reader": - return value instanceof Reader; + return value instanceof Channel; case "map": return value instanceof Arguments; default: @@ -97,12 +104,12 @@ function conforms(value: unknown, type: Type): boolean { /** * Literal type which indicates if the requested type is a singleton or a list. */ -type List = "true" | "false"; +export type List = "true" | "false"; /** * Literal type which indicate if the requested type is a nullable or not. */ -type Nullable = "true" | "false"; +export type Nullable = "true" | "false"; /** * Given a type `T`, return either `T` or `T[]` based on a `List` value. @@ -125,7 +132,7 @@ type GetNullable = N extends undefined /** * Describes the return type of a returned argument function. */ -type Options< +export type Options< T extends Type, L extends List | undefined, N extends Nullable | undefined, diff --git a/runners/nodejs/src/runtime/runner.ts b/runners/nodejs/src/runtime/runner.ts index cb50b92..aae266a 100644 --- a/runners/nodejs/src/runtime/runner.ts +++ b/runners/nodejs/src/runtime/runner.ts @@ -4,16 +4,17 @@ import { IRParameterType, IRStage, } from "../proto/intermediate"; -import { ChannelData, LogEntry } from "../proto"; -import { Subject, Subscription } from "rxjs"; +import { ChannelData } from "../proto"; import { Processor } from "../interfaces/processor"; -import * as path from "node:path"; import { Reader } from "../interfaces/reader"; import { Writer } from "../interfaces/writer"; import { RunnerError } from "../error"; -import { asMap, tryOrPanic } from "./util"; +import { asMap } from "./util"; import { Arguments } from "./arguments"; import { Log } from "../interfaces/log"; +import { Channel } from "../interfaces/channel"; +import { CallbackChannel } from "../interfaces/callback_channel"; +import { BufferedCallbackChannel } from "../interfaces/buffered_callback_channel"; /** * The actual implementation of the runner, and the core of the program. It is @@ -27,31 +28,22 @@ import { Log } from "../interfaces/log"; export class Runner { // The incoming channel is bound to by an external object. Whenever data is // written to it, it is handled by the runner as an incoming message. - public incoming = new Subject(); + public incoming = new CallbackChannel((data) => { + this.handleMessage(data); + }); // All writers are bound to the outgoing channel, and after it is written to, // the runner will delegate the messages to the server implementation. - public outgoing = new Subject(); - - // The handler for incoming message. Note that this value is not used, but - // kept as a reference to ensure the subscription is not dropped. - private incomingSubscription: Subscription; + public outgoing = new BufferedCallbackChannel(); // Maps the URIs of channels to their corresponding readers. We use this map // to route incoming messages to their correct receiver. - private readers: Map> = new Map(); + private readers: Map> = new Map(); // We keep track of the stages that are loaded into the runner by URI. These // are instantiated beforehand and can be executed or interrupted. private stages: Map = new Map(); - // The constructor binds the handler to the incoming message stream. - constructor() { - this.incomingSubscription = this.incoming.subscribe((x) => - this.handleMessage(x), - ); - } - /** * Handle an incoming message by routing it to the correct reader. This is * done by looking up the destination URI in the readers map and calling the @@ -63,7 +55,7 @@ export class Runner { if (!reader) { throw new Error(`Reader not found for payload ${payload.destinationUri}`); } - reader.next(payload.data); + reader.write(payload.data); } /** @@ -73,15 +65,13 @@ export class Runner { * @param channelURI The channel to write to as a URI. * @private */ - private createWriter(channelURI: string): Writer { - const subject = new Subject(); - subject.subscribe((data) => { - this.outgoing.next({ + private createWriter(channelURI: string): Writer { + return new CallbackChannel((data) => { + this.outgoing.write({ destinationUri: channelURI, data: data, }); }); - return new Writer(subject); } /** @@ -91,10 +81,10 @@ export class Runner { * @param channelURI The channel to read from as a URI. * @private */ - private createReader(channelURI: string): Reader { - const subject = new Subject(); - this.readers.set(channelURI, subject); - return new Reader(subject); + private createReader(channelURI: string): Reader { + const channel = new Channel(); + this.readers.set(channelURI, channel); + return channel; } /** @@ -106,6 +96,8 @@ export class Runner { * @private */ private parseSimpleArgument(type: IRParameterType, value: string): unknown { + Log.shared.debug(() => `Parsing '${value}' as ${IRParameterType[type]}`); + if (type == IRParameterType.BOOLEAN) { return value == "true"; } else if (type == IRParameterType.BYTE) { @@ -134,12 +126,17 @@ export class Runner { /** * Parse a single parameter, either simple or complex, into its native Node.js * representation. + * @param name The nam eof the argument. * @param arg The arguments to parse. * @param param The parameter to parse. */ - private parseArgument(arg: IRArgument, param: IRParameter): unknown[] { + private parseArgument( + name: string, + arg: IRArgument, + param: IRParameter, + ): unknown[] { // If the argument is complex, we need to recursively parse the arguments. - if (arg.complex && param.complex) { + if (arg.complex != undefined && param.complex != undefined) { const params = asMap(param.complex.parameters); // Recursively call for each value. @@ -150,14 +147,10 @@ export class Runner { } // If the argument is a single value, we can parse it directly. - if (arg.simple && param.simple) { - const params = - param.simple ?? - RunnerError.inconsistency("Expected simple parameter, found complex."); - + if (arg.simple != undefined && param.simple != undefined) { // Recursively call for each value. return arg.simple.value.map((value) => - this.parseSimpleArgument(params, value), + this.parseSimpleArgument(param.simple!, value), ); } @@ -184,7 +177,7 @@ export class Runner { // if required. for (const [name, arg] of args) { const param = params.get(name) ?? RunnerError.missingParameter(name); - const parsed = this.parseArgument(arg, param); + const parsed = this.parseArgument(name, arg, param); // Set the argument. result.set(name, parsed); @@ -220,18 +213,13 @@ export class Runner { asMap(params), ); - try { - // Instantiate the processor with the parsed arguments. - Log.shared.debug(() => `Instantiating stage: ${stage.uri}`); - const instance = tryOrPanic(() => { - return new constructor(new Arguments(parsedArguments)); - }); + // Instantiate the processor with the parsed arguments. + Log.shared.debug(() => `Instantiating stage: ${stage.uri}`); + const args = new Arguments(parsedArguments); + const instance = new constructor(args); - // Keep track of it in the stages map. - this.stages.set(stage.uri, instance); - } catch (e) { - Log.shared.fatal(`Could not instantiate stage: ${stage.uri}`); - } + // Keep track of it in the stages map. + this.stages.set(stage.uri, instance); } /** @@ -242,16 +230,15 @@ export class Runner { async exec(): Promise { Log.shared.info("Execution started."); - this.stages.forEach((stage) => { - new Promise(() => { - try { - return stage.exec(); - } catch (e) { - Log.shared.fatal("Error while executing stage."); - throw e; - } - }); + // Execute all stations. + const execs = [...this.stages.entries()].map(async ([uri, stage]) => { + Log.shared.debug(() => `Executing stage: ${uri}`); + await stage.exec(); + Log.shared.debug(() => `Finished stage: ${uri}`); }); + + // Discard results. + return Promise.all(execs).then(() => {}); } /** diff --git a/runners/nodejs/src/runtime/server.ts b/runners/nodejs/src/runtime/server.ts index ca5a55b..658c201 100644 --- a/runners/nodejs/src/runtime/server.ts +++ b/runners/nodejs/src/runtime/server.ts @@ -28,12 +28,15 @@ export class ServerImplementation implements RunnerServer { channel(call: ServerDuplexStream): void { // On incoming data, call the appropriate reader. call.on("data", function (payload: ChannelData) { - Runner.shared.incoming.next(payload); + Log.shared.debug( + () => `'${payload.destinationUri} -> [${payload.data.length} bytes]'`, + ); + Runner.shared.incoming.write(payload); }); - // On outgoing data, propagate to gRPC. - Runner.shared.outgoing.subscribe((payload) => { - call.write(payload); + // On outgoing data, write it to the stream. + Runner.shared.outgoing.setCallback(async (data) => { + call.write(data); }); } @@ -51,7 +54,8 @@ export class ServerImplementation implements RunnerServer { callback(null, {}); }) .catch((e) => { - callback(e, {}); + console.error(e); + callback(null, {}); }); } @@ -63,9 +67,15 @@ export class ServerImplementation implements RunnerServer { call: ServerUnaryCall, callback: sendUnaryData, ): void { - Runner.shared.exec().then(() => { - callback(null, {}); - }); + Runner.shared.exec().then( + () => { + callback(null, {}); + }, + (e) => { + console.error(e); + callback(null, {}); + }, + ); } /** diff --git a/runners/nodejs/tsconfig.json b/runners/nodejs/tsconfig.json index df35b80..cbc932d 100644 --- a/runners/nodejs/tsconfig.json +++ b/runners/nodejs/tsconfig.json @@ -3,20 +3,20 @@ /* Visit https://aka.ms/tsconfig to read more about this file */ /* Projects */ - "incremental": false /* Save .tsbuildinfo files to allow for incremental compilation of projects. */, - // "composite": true, /* Enable constraints that allow a TypeScript project to be used with project references. */ + "incremental": false, + /* Save .tsbuildinfo files to allow for incremental compilation of projects. */ // "composite": true, /* Enable constraints that allow a TypeScript project to be used with project references. */ // "tsBuildInfoFile": "./.tsbuildinfo", /* Specify the path to .tsbuildinfo incremental compilation file. */ // "disableSourceOfProjectReferenceRedirect": true, /* Disable preferring source files instead of declaration files when referencing composite projects. */ // "disableSolutionSearching": true, /* Opt a project out of multi-project reference checking when editing. */ // "disableReferencedProjectLoad": true, /* Reduce the number of projects loaded automatically by TypeScript. */ /* Language and Environment */ - "target": "es2016" /* Set the JavaScript language version for emitted JavaScript and include compatible library declarations. */, - // "lib": [], /* Specify a set of bundled library declaration files that describe the target runtime environment. */ + "target": "es2016", + /* Set the JavaScript language version for emitted JavaScript and include compatible library declarations. */ // "lib": [], /* Specify a set of bundled library declaration files that describe the target runtime environment. */ // "jsx": "preserve", /* Specify what JSX code is generated. */ - "experimentalDecorators": true /* Enable experimental support for legacy experimental decorators. */, - "emitDecoratorMetadata": true /* Emit design-type metadata for decorated declarations in source files. */, - // "jsxFactory": "", /* Specify the JSX factory function used when targeting React JSX emit, e.g. 'React.createElement' or 'h'. */ + "experimentalDecorators": true, + /* Enable experimental support for legacy experimental decorators. */ "emitDecoratorMetadata": true, + /* Emit design-type metadata for decorated declarations in source files. */ // "jsxFactory": "", /* Specify the JSX factory function used when targeting React JSX emit, e.g. 'React.createElement' or 'h'. */ // "jsxFragmentFactory": "", /* Specify the JSX Fragment reference used for fragments when targeting React JSX emit e.g. 'React.Fragment' or 'Fragment'. */ // "jsxImportSource": "", /* Specify module specifier used to import the JSX factory functions when using 'jsx: react-jsx*'. */ // "reactNamespace": "", /* Specify the object invoked for 'createElement'. This only applies when targeting 'react' JSX emit. */ @@ -25,9 +25,10 @@ // "moduleDetection": "auto", /* Control what method is used to detect module-format JS files. */ /* Modules */ - "module": "commonjs" /* Specify what module code is generated. */, - "rootDir": "./src" /* Specify the root folder within your source files. */, - // "moduleResolution": "node10", /* Specify how TypeScript looks up a file from a given module specifier. */ + "module": "es2022", + /* Specify what module code is generated. */ "rootDir": "./src", + /* Specify the root folder within your source files. */ "moduleResolution": "node", + /* Specify how TypeScript looks up a file from a given module specifier. */ // "baseUrl": "./", /* Specify the base directory to resolve non-relative module names. */ // "paths": {}, /* Specify a set of entries that re-map imports to additional lookup locations. */ // "rootDirs": [], /* Allow multiple folders to be treated as one when resolving modules. */ @@ -55,8 +56,8 @@ "sourceMap": true /* Create source map files for emitted JavaScript files. */, // "inlineSourceMap": true, /* Include sourcemap files inside the emitted JavaScript. */ // "outFile": "", /* Specify a file that bundles all outputs into one JavaScript file. If 'declaration' is true, also designates a file that bundles all .d.ts output. */ - "outDir": "./build/" /* Specify an output folder for all emitted files. */, - // "removeComments": true, /* Disable emitting comments. */ + "outDir": "./build/", + /* Specify an output folder for all emitted files. */ // "removeComments": true, /* Disable emitting comments. */ // "noEmit": true, /* Disable emitting files from a compilation. */ // "importHelpers": true, /* Allow importing helper functions from tslib once per project, instead of including them per-file. */ // "importsNotUsedAsValues": "remove", /* Specify emit/checking behavior for imports that are only used for types. */ @@ -65,8 +66,8 @@ // "mapRoot": "", /* Specify the location where debugger should locate map files instead of generated locations. */ // "inlineSources": true, /* Include source code in the sourcemaps inside the emitted JavaScript. */ // "emitBOM": true, /* Emit a UTF-8 Byte Order Mark (BOM) in the beginning of output files. */ - "newLine": "lf" /* Set the newline character for emitting files. */, - // "stripInternal": true, /* Disable emitting declarations that have '@internal' in their JSDoc comments. */ + "newLine": "lf", + /* Set the newline character for emitting files. */ // "stripInternal": true, /* Disable emitting declarations that have '@internal' in their JSDoc comments. */ // "noEmitHelpers": true, /* Disable generating custom helper functions like '__extends' in compiled output. */ // "noEmitOnError": true, /* Disable emitting files if any type checking errors are reported. */ // "preserveConstEnums": true, /* Disable erasing 'const enum' declarations in generated code. */ @@ -76,14 +77,14 @@ /* Interop Constraints */ // "isolatedModules": true, /* Ensure that each file can be safely transpiled without relying on other imports. */ // "verbatimModuleSyntax": true, /* Do not transform or elide any imports or exports not marked as type-only, ensuring they are written in the output file's format based on the 'module' setting. */ - // "allowSyntheticDefaultImports": true, /* Allow 'import x from y' when a module doesn't have a default export. */ - "esModuleInterop": true /* Emit additional JavaScript to ease support for importing CommonJS modules. This enables 'allowSyntheticDefaultImports' for type compatibility. */, - // "preserveSymlinks": true, /* Disable resolving symlinks to their realpath. This correlates to the same flag in node. */ - "forceConsistentCasingInFileNames": true /* Ensure that casing is correct in imports. */, - - /* Type Checking */ - "strict": true /* Enable all strict type-checking options. */, - // "noImplicitAny": true, /* Enable error reporting for expressions and declarations with an implied 'any' type. */ + "allowSyntheticDefaultImports": true, + /* Allow 'import x from y' when a module doesn't have a default export. */ + "esModuleInterop": true, + /* Emit additional JavaScript to ease support for importing CommonJS modules. This enables 'allowSyntheticDefaultImports' for type compatibility. */ // "preserveSymlinks": true, /* Disable resolving symlinks to their realpath. This correlates to the same flag in node. */ + "forceConsistentCasingInFileNames": true, + /* Ensure that casing is correct in imports. */ /* Type Checking */ + "strict": true, + /* Enable all strict type-checking options. */ // "noImplicitAny": true, /* Enable error reporting for expressions and declarations with an implied 'any' type. */ // "strictNullChecks": true, /* When type checking, take into account 'null' and 'undefined'. */ // "strictFunctionTypes": true, /* When assigning functions, check to ensure parameters and the return values are subtype-compatible. */ // "strictBindCallApply": true, /* Check that the arguments for 'bind', 'call', and 'apply' methods match the original function. */ @@ -104,8 +105,14 @@ /* Completeness */ // "skipDefaultLibCheck": true, /* Skip type checking .d.ts files that are included with TypeScript. */ - "skipLibCheck": true /* Skip type checking all .d.ts files. */ + "skipLibCheck": true + /* Skip type checking all .d.ts files. */ }, "include": ["src/**/*"], - "exclude": ["node_modules"] + "exclude": ["node_modules"], + "tsc-alias": { + "verbose": true, + "resolveFullPaths": true, + "resolveFullExtension": ".js" + } } diff --git a/src/main/kotlin/Orchestrator.kt b/src/main/kotlin/Orchestrator.kt index bca5073..ed0a4e2 100644 --- a/src/main/kotlin/Orchestrator.kt +++ b/src/main/kotlin/Orchestrator.kt @@ -58,11 +58,11 @@ class Orchestrator( try { withTimeout(1000) { val message = channel.receive() - val target = readers[message.channel]!! - Log.shared.info( - "Brokering message '${ - message.data.decodeToString().replace("\n", "\\n") - }' to ${message.channel}.") + val target = + readers[message.channel] ?: Log.shared.fatal("Unknown reader: ${message.channel}") + + Log.shared.debug { "'${message.channel}' <> [${message.data.size} bytes]" } + target.toProcessors.send(message) } } catch (_: TimeoutCancellationException) {} diff --git a/src/main/kotlin/parser/impl/JenaParser.kt b/src/main/kotlin/parser/impl/JenaParser.kt index e3d54f2..f8420ba 100644 --- a/src/main/kotlin/parser/impl/JenaParser.kt +++ b/src/main/kotlin/parser/impl/JenaParser.kt @@ -86,6 +86,7 @@ private fun Model.parseSHACLProperty(property: Resource): Pair { return result } +private fun Model.isSimpleSHACLShape(path: Resource): Boolean { + val property = + subjectWithProperty(SHACLM.path, path) + ?: Log.shared.fatal("No property found for path: $path") + + val datatype = objectOfProperty(property, SHACLM.datatype)?.asResource() + val clazz = objectOfProperty(property, SHACLM.class_)?.asResource() + val kind = objectOfProperty(property, SHACLM.nodeKind)?.asResource() + + if (listOfNotNull(datatype, clazz, kind).size > 1) { + Log.shared.fatal("Cannot combine sh:datatype, sh:class or sh:nodeKind.") + } + + // A datatype always points to a literal. + if (datatype != null) { + return true + } + + // Specific classes are always simple. + if (clazz != null && listOf(RDFC.channel, RDFC.reader, RDFC.writer).contains(clazz)) { + return true + } + + // If the kind can optionally be a literal, it should be handled as such. + if (kind != null && kind == SHACLM.IRIOrLiteral) { + return true + } + + // Default case: it is a complex object. + return false +} + private fun Model.nameOfSHACLPath(path: Resource): String { val property = subjectWithProperty(SHACLM.path, path) @@ -161,32 +196,32 @@ private fun Model.parseArguments(node: Resource): Map { // Go over each triple of the resource. If it is a literal, add it to the simple list. Otherwise, // call recursively and add it to the complex list. for (triple in listStatements(node, null, null as RDFNode?)) { + // The predicate must equal the SHACL path. + val path = triple.predicate + + // Skip the type predicate, if it is given. if (triple.predicate == RDF.type) { continue } - val key = nameOfSHACLPath(triple.predicate) + // Get the name of the argument. + val key = nameOfSHACLPath(path) val value = triple.`object` - // If the value is a literal, it is always simple. - if (value.isLiteral) { - val list = simple.getOrPut(key) { mutableListOf() } - list.add(value.asLiteral().string) - continue - } - - // If the value is a resource, pointing to a Reader or Writer, it is always simple as well. - val type = objectOfProperty(value.asResource(), RDF.type) - if (type == RDFC.channel || type == RDFC.writer || type == RDFC.reader) { + if (isSimpleSHACLShape(path)) { val list = simple.getOrPut(key) { mutableListOf() } - list.add(value.toString()) - continue + val v = + if (value.isLiteral) { + value.asLiteral().value + } else { + value.asResource() + } + list.add(v.toString()) + } else { + val list = complex.getOrPut(key) { mutableListOf() } + val nested = parseArguments(value.asResource()) + list.add(nested) } - - // Else, parse it as a complex argument. - val list = complex.getOrPut(key) { mutableListOf() } - val nested = parseArguments(value.asResource()) - list.add(nested) } // Combine both simple and complex mappings as a single map to IRArguments. diff --git a/src/main/kotlin/runner/impl/grpc/GRPCRunner.kt b/src/main/kotlin/runner/impl/grpc/GRPCRunner.kt index 7931060..df9b10c 100644 --- a/src/main/kotlin/runner/impl/grpc/GRPCRunner.kt +++ b/src/main/kotlin/runner/impl/grpc/GRPCRunner.kt @@ -4,7 +4,9 @@ import RunnerGrpcKt import com.google.protobuf.ByteString import io.grpc.ManagedChannel import io.grpc.ManagedChannelBuilder +import io.grpc.StatusException import kotlin.concurrent.thread +import kotlinx.coroutines.CancellationException import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.flow.map @@ -15,7 +17,6 @@ import runner.Runner import technology.idlab.intermediate.IRProcessor import technology.idlab.intermediate.IRStage import technology.idlab.util.Log -import technology.idlab.util.retries /** * This runner has GRPC built-in, so the only configuration that an extending class needs to provide @@ -72,17 +73,19 @@ abstract class GRPCRunner( override suspend fun load(processor: IRProcessor, stage: IRStage) { val payload = stage.toGRPC(processor.toGRPC()) - retries(5, 1000) { grpc.load(payload) } + + try { + grpc.load(payload) + } catch (e: StatusException) { + Log.shared.fatal("Failed to load stage: ${e.message}") + } } override suspend fun exec() = coroutineScope { // Create a flow for outgoing messages. val toGRPCProcessors = toProcessors.receiveAsFlow().map { - Log.shared.debug { - val value = it.data.decodeToString().replace("\n", "\\n") - "'$value' -> [${it.channel}]" - } + Log.shared.debug { "'${it.channel}' -> [${it.data.size} bytes]" } val builder = Index.ChannelData.newBuilder() builder.setDestinationUri(it.channel) @@ -99,18 +102,24 @@ abstract class GRPCRunner( .channel(toGRPCProcessors) .map { Payload(it.destinationUri, it.data.toByteArray()) } .collect { - Log.shared.debug { - val value = it.data.decodeToString().replace("\n", "\\n") - "'$value' -> [${it.channel}]" + Log.shared.debug { "'${it.channel}' <- [${it.data.size} bytes]" } + + try { + fromProcessors.send(it) + } catch (e: CancellationException) { + Log.shared.debug("Cancellation exception: ${e.message}") } - fromProcessors.send(it) } Log.shared.debug("Ending routing messages in GRPCRunner.") } // Attempt to execute the pipelines. - retries(5, 1000) { grpc.exec(empty) } + try { + grpc.exec(empty) + } catch (e: StatusException) { + Log.shared.fatal("Failed to execute pipeline: ${e.message}") + } // Wait for the router to finish. router.join() diff --git a/src/main/kotlin/util/Log.kt b/src/main/kotlin/util/Log.kt index 4d37886..610ca83 100644 --- a/src/main/kotlin/util/Log.kt +++ b/src/main/kotlin/util/Log.kt @@ -2,6 +2,7 @@ package technology.idlab.util import java.time.format.DateTimeFormatter import java.util.* +import kotlin.system.exitProcess import technology.idlab.exception.RunnerException private const val TIME_PADDING = 15 @@ -27,6 +28,18 @@ class Log private constructor(header: Boolean = true) { FATAL, } + /** + * The mode in which the logger will handle fatal messages. If the mode is set to EXCEPTION, the + * logger will throw an exception. If the mode is set to EXIT, the logger will exit the program. + */ + internal enum class FatalMode { + EXCEPTION, + EXIT, + } + + /** The mode in which the logger will handle fatal messages. The default mode is set to EXIT. */ + private var fatalMode = FatalMode.EXIT + init { if (header) { val builder = StringBuilder() @@ -97,6 +110,11 @@ class Log private constructor(header: Boolean = true) { builder.append("\u001B[31m") } + // Color the background red in fatal cases. + if (level == Level.FATAL) { + builder.append("\u001B[97m\u001B[48;5;52m") + } + // The actual message. builder.append(time.padEnd(TIME_PADDING, ' ')) builder.append(thread.padEnd(TASK_PADDING, ' ')) @@ -106,9 +124,7 @@ class Log private constructor(header: Boolean = true) { builder.append("\n") // Reset coloring. - if (level == Level.DEBUG || level == Level.SEVERE || level == Level.CMD) { - builder.append("\u001B[0m") - } + builder.append("\u001B[0m") // Print to the console, thread safe. synchronized(System.out) { print(builder) } @@ -130,7 +146,11 @@ class Log private constructor(header: Boolean = true) { */ fun fatal(message: String, location: String? = null): Nothing { output(message, Level.FATAL, location = location) - throw RunnerException() + + when (this.fatalMode) { + FatalMode.EXCEPTION -> throw RunnerException() + FatalMode.EXIT -> exitProcess(1) + } } /** @@ -166,6 +186,10 @@ class Log private constructor(header: Boolean = true) { output(message, Level.CMD, location = location, pid = pid) } + internal fun setFatalMode(fatalMode: Log.FatalMode) { + this.fatalMode = fatalMode + } + companion object { /** * A globally available instance of the logger. Note that at its creation, the logger will diff --git a/src/main/kotlin/util/ManagedProcess.kt b/src/main/kotlin/util/ManagedProcess.kt index d3256a0..2076c78 100644 --- a/src/main/kotlin/util/ManagedProcess.kt +++ b/src/main/kotlin/util/ManagedProcess.kt @@ -46,7 +46,7 @@ class ManagedProcess(private val process: Process, private val name: String) { private val outgoing = thread { val stream = process.errorStream.bufferedReader() for (line in stream.lines()) { - Log.shared.fatal(line, location = name) + Log.shared.severe(line, location = name) } } diff --git a/src/main/kotlin/util/Retries.kt b/src/main/kotlin/util/Retries.kt index 80d3594..2852bc5 100644 --- a/src/main/kotlin/util/Retries.kt +++ b/src/main/kotlin/util/Retries.kt @@ -9,8 +9,7 @@ suspend fun retries(times: Int, milliseconds: Long = 1000, block: suspend () try { return@coroutineScope block() } catch (e: Exception) { - Log.shared.severe( - "An exception occurred: ${e.message}. Retrying in $milliseconds milliseconds.") + Log.shared.severe("[$i/$times] ${e.message.toString()}") delay(milliseconds) } } diff --git a/src/test/kotlin/e2e/E2ETest.kt b/src/test/kotlin/e2e/E2ETest.kt index 96ee6f2..e2e0506 100644 --- a/src/test/kotlin/e2e/E2ETest.kt +++ b/src/test/kotlin/e2e/E2ETest.kt @@ -2,7 +2,6 @@ package e2e import java.io.File import kotlin.test.Test -import kotlin.test.assertEquals import kotlin.test.assertNotNull import kotlinx.coroutines.TimeoutCancellationException import kotlinx.coroutines.runBlocking @@ -13,26 +12,35 @@ import technology.idlab.exec class E2ETest { @Test fun node() { - val pipeline = this::class.java.getResource("/e2e/node.ttl") - assertNotNull(pipeline, "The file should exist.") - + // Create the output directory. val directory = File("/tmp/rdfc-testing") directory.createDirectory() - val input = File("/tmp/rdfc-testing/input.txt") - input.createNewFile() - input.writeText("Hello, World!") + // Reset output files. + val valid = File("/tmp/rdfc-testing/valid.ttl") + valid.delete() + val report = File("/tmp/rdfc-testing/report.ttl") + report.delete() - val output = File("/tmp/rdfc-testing/output.txt") - output.delete() - output.createNewFile() + // Read the pipeline file. + val pipeline = this::class.java.getResource("/e2e/node.ttl") + assertNotNull(pipeline, "The file should exist.") + // Execute the pipeline. runBlocking { try { - withTimeout(10000) { exec(pipeline.path) } + withTimeout(20_000) { exec(pipeline.path) } } catch (_: TimeoutCancellationException) {} } - assertEquals("Hello, World!", output.readText()) + // Check the output files. + assert(valid.exists()) { "The valid file should exist." } + assert(report.exists()) { "The invalid file should exist." } + + assert(valid.readText().isNotEmpty()) { "The valid file should not be empty." } + assert(report.readText().isNotEmpty()) { "The invalid file should not be empty." } + + assert(valid.readText().contains("")) + assert(report.readText().contains("sh:focusNode ")) } } diff --git a/src/test/kotlin/runner/impl/jvm/ArgumentsTest.kt b/src/test/kotlin/runner/impl/jvm/ArgumentsTest.kt index 94e4182..d95d0cf 100644 --- a/src/test/kotlin/runner/impl/jvm/ArgumentsTest.kt +++ b/src/test/kotlin/runner/impl/jvm/ArgumentsTest.kt @@ -5,6 +5,7 @@ import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertThrows import technology.idlab.exception.RunnerException import technology.idlab.runner.impl.jvm.Arguments +import technology.idlab.util.Log class ArgumentsTest { @Test @@ -97,6 +98,9 @@ class ArgumentsTest { @Test fun inheritance() { + // Do not exit on fatal messages. + Log.shared.setFatalMode(Log.FatalMode.EXCEPTION) + // The base class. open class A diff --git a/src/test/resources/e2e/data/invalid.ttl b/src/test/resources/e2e/data/invalid.ttl new file mode 100644 index 0000000..ec9eddc --- /dev/null +++ b/src/test/resources/e2e/data/invalid.ttl @@ -0,0 +1,14 @@ +@prefix test: . +@prefix xsd: . + +# The coordinates should be given as floats, but are given as integers instead. + + a test:Coordinate ; + test:latitude "51"^^xsd:int ; + test:longitude "3"^^xsd:int . + +# The coordinates are given by the incorrect URI. + + a test:Coordinate ; + test:lat "41.39"^^xsd:float ; + test:lon "2.14"^^xsd:float . diff --git a/src/test/resources/e2e/data/valid.ttl b/src/test/resources/e2e/data/valid.ttl new file mode 100644 index 0000000..fca41f8 --- /dev/null +++ b/src/test/resources/e2e/data/valid.ttl @@ -0,0 +1,14 @@ +@prefix test: . +@prefix xsd: . + +# Correct entry. + + a test:Coordinate ; + test:latitude "51.05"^^xsd:float ; + test:longitude "3.73"^^xsd:float . + +# Correct entry. + + a test:Coordinate ; + test:latitude "41.39"^^xsd:float ; + test:longitude "2.14"^^xsd:float . diff --git a/src/test/resources/e2e/node.ttl b/src/test/resources/e2e/node.ttl index 7d76bda..9f78e15 100644 --- a/src/test/resources/e2e/node.ttl +++ b/src/test/resources/e2e/node.ttl @@ -6,27 +6,54 @@ @prefix owl: . @prefix xsd: . -test:channel a rdfc:Channel . +test:report a rdfc:Channel . +test:validated a rdfc:Channel . +test:unvalidated a rdfc:Channel . test:NodePipeline a rdfc:Pipeline ; rdfc:name "Node.js Testing Pipeline" ; rdfc:description "Pipeline for end-to-end testing of the Node.js runner." ; - rdfc:stages test:FileReader, test:FileWriter ; + rdfc:stages + test:FileReader , + test:SHACLValidator , + test:FileWriter , + test:ReportWriter ; rdfc:dependency <../../../../runners/nodejs> , - <../../../../processors/file-utils-ts> . + <../../../../processors/file-utils-ts> , + <../../../../processors/shacl-validator-ts> . test:FileReader a rdfc:FileReaderTS ; rdfc:arguments [ - rdfc:path "/tmp/rdfc-testing/input.txt" ; - rdfc:outgoing test:channel ; + rdfc:paths + <./data/valid.ttl> , + <./data/invalid.ttl> ; + rdfc:outgoing test:unvalidated ; + ] . + +test:SHACLValidator + a rdfc:SHACLValidatorTS ; + rdfc:arguments [ + rdfc:incoming test:unvalidated ; + rdfc:outgoing test:validated ; + rdfc:report test:report ; + rdfc:shapes <./shacl/shapes.ttl> ; + rdfc:mime "text/turtle" ; + rdfc:fatal "false"^^xsd:boolean ; ] . test:FileWriter a rdfc:FileWriterTS ; rdfc:arguments [ - rdfc:path "/tmp/rdfc-testing/output.txt" ; - rdfc:incoming test:channel ; + rdfc:path ; + rdfc:incoming test:validated ; + ] . + +test:ReportWriter + a rdfc:FileWriterTS ; + rdfc:arguments [ + rdfc:path ; + rdfc:incoming test:report ; ] . diff --git a/src/test/resources/e2e/shacl/shapes.ttl b/src/test/resources/e2e/shacl/shapes.ttl new file mode 100644 index 0000000..7191ef1 --- /dev/null +++ b/src/test/resources/e2e/shacl/shapes.ttl @@ -0,0 +1,24 @@ +@prefix rdf: . +@prefix sh: . +@prefix test: . +@prefix xsd: . + +# Definition of a simple shape which requires a shape to have it's coordinates +# within the range of -90 to 90 for latitude and -180 to 180 for longitude as +# floats. +[] + a sh:NodeShape ; + sh:closed true ; + sh:targetClass test:Coordinate ; + sh:ignoredProperties ( rdf:type ) ; + sh:property [ + sh:path test:latitude ; + sh:datatype xsd:float ; + sh:minInclusive -90 ; + sh:maxInclusive 90 ; + ], [ + sh:path test:longitude ; + sh:datatype xsd:float ; + sh:minInclusive -180 ; + sh:maxInclusive 180 ; + ] .