Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chaining data loaders #1522

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions docs/data-loaders.md
Original file line number Diff line number Diff line change
Expand Up @@ -322,3 +322,27 @@ RuntimeError: Unable to load file: quakes.csv
```

When any data loader fails, the entire build fails.

## File Server

Data loaders can request a file from Framework, by querying the `FILE_SERVER` HTTP endpoint indicated in their environment variables. For example, a bash data loader `mags.txt.sh` can read `quakes.json` and use [`jq`](https://jqlang.github.io/jq/) to extract the magnitude of recent earthquakes by calling:

```sh
curl ${FILE_SERVER}quakes.json | jq .features[].properties.mag
```

similarly, the following JavaScript data loader `quakecount.txt.js` will return the number of recent earthquakes:

```js run=false
const {FILE_SERVER} = process.env;
const quakes = await fetch(`${FILE_SERVER}quakes.json`).then((reponse) => response.json());
process.stdout.write(quakes.features.length);
```

In the preview server, when `quakes.json` is updated, `mags.txt` and `quakecount.txt` get automatically refreshed. If `quakes.json` is in fact generated by a data loader `quakes.json.sh`, modifying that script live-updates both files. (Even though `quakes.json` is called by two loaders, the file server ensures that it runs only once, ensuring consistency and optimal performance.) The interpreters used are inconsequent: this mechanism allows a python data loader to talk to typescript, and vice-versa.

<div class="note">

The `FILE_SERVER` endpoint is typically used to chain data loaders, with a loader that downloads a large dataset from the Web or from a database. That payload is then cached, and the data loaders that queried can run various kinds of analysis. Make sure you don’t have circular dependencies, as they will lead the preview server and the build process to hang! Like [archives](#archives), files queried through this endpoint are added to the build only if [statically referenced](./files#static-analysis) by `FileAttachment`.

</div>
60 changes: 32 additions & 28 deletions src/fileWatchers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type {FSWatcher} from "node:fs";
import {watch} from "node:fs";
import {isEnoent} from "./error.js";
import {maybeStat} from "./files.js";
import {chainDependencies} from "./loader.js";
import type {LoaderResolver} from "./loader.js";
import {resolvePath} from "./path.js";

Expand All @@ -11,38 +12,41 @@ export class FileWatchers {
static async of(loaders: LoaderResolver, path: string, names: Iterable<string>, callback: (name: string) => void) {
const that = new FileWatchers();
const {watchers} = that;
const {root} = loaders;
for (const name of names) {
const watchPath = loaders.getWatchPath(resolvePath(path, name));
if (!watchPath) continue;
let currentStat = await maybeStat(watchPath);
let watcher: FSWatcher;
const index = watchers.length;
try {
watcher = watch(watchPath, async function watched(type) {
// Re-initialize the watcher on the original path on rename.
if (type === "rename") {
watcher.close();
try {
watcher = watchers[index] = watch(watchPath, watched);
} catch (error) {
if (!isEnoent(error)) throw error;
console.error(`file no longer exists: ${watchPath}`);
for (const p of chainDependencies(root, resolvePath(path, name))) {
const watchPath = loaders.getWatchPath(p);
if (!watchPath) continue;
let currentStat = await maybeStat(watchPath);
let watcher: FSWatcher;
const index = watchers.length;
try {
watcher = watch(watchPath, async function watched(type) {
// Re-initialize the watcher on the original path on rename.
if (type === "rename") {
watcher.close();
try {
watcher = watchers[index] = watch(watchPath, watched);
} catch (error) {
if (!isEnoent(error)) throw error;
console.error(`file no longer exists: ${watchPath}`);
return;
}
setTimeout(() => watched("change"), 100); // delay to avoid a possibly-empty file
return;
}
setTimeout(() => watched("change"), 100); // delay to avoid a possibly-empty file
return;
}
const newStat = await maybeStat(watchPath);
// Ignore if the file was truncated or not modified.
if (currentStat?.mtimeMs === newStat?.mtimeMs || newStat?.size === 0) return;
currentStat = newStat;
callback(name);
});
} catch (error) {
if (!isEnoent(error)) throw error;
continue;
const newStat = await maybeStat(watchPath);
// Ignore if the file was truncated or not modified.
if (currentStat?.mtimeMs === newStat?.mtimeMs || newStat?.size === 0) return;
currentStat = newStat;
callback(name);
});
} catch (error) {
if (!isEnoent(error)) throw error;
continue;
}
watchers[index] = watcher;
}
watchers[index] = watcher;
}
return that;
}
Expand Down
99 changes: 84 additions & 15 deletions src/loader.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import {createHash} from "node:crypto";
import type {FSWatcher, WatchListener, WriteStream} from "node:fs";
import {createReadStream, existsSync, statSync, watch} from "node:fs";
import {open, readFile, rename, unlink} from "node:fs/promises";
import {createReadStream, existsSync, readFileSync, statSync, watch} from "node:fs";
import {open, readFile, rename, rm, unlink, writeFile} from "node:fs/promises";
import {dirname, extname, join} from "node:path/posix";
import {createGunzip} from "node:zlib";
import {spawn} from "cross-spawn";
import JSZip from "jszip";
import {extract} from "tar-stream";
import {enoent} from "./error.js";
import {enoent, isEnoent} from "./error.js";
import {maybeStat, prepareOutput, visitFiles} from "./files.js";
import {FileWatchers} from "./fileWatchers.js";
import {formatByteSize} from "./format.js";
Expand All @@ -16,6 +16,7 @@ import {findModule, getFileInfo, getLocalModuleHash, getModuleHash} from "./java
import type {Logger, Writer} from "./logger.js";
import type {MarkdownPage, ParseOptions} from "./markdown.js";
import {parseMarkdown} from "./markdown.js";
import {preview} from "./preview.js";
import {getModuleResolver, resolveImportPath} from "./resolvers.js";
import type {Params} from "./route.js";
import {isParameterized, requote, route} from "./route.js";
Expand Down Expand Up @@ -51,6 +52,9 @@ const defaultEffects: LoadEffects = {
export interface LoadOptions {
/** Whether to use a stale cache; true when building. */
useStale?: boolean;

/** An asset server for chained data loaders. */
FILE_SERVER?: string;
}

export interface LoaderOptions {
Expand All @@ -61,7 +65,7 @@ export interface LoaderOptions {
}

export class LoaderResolver {
private readonly root: string;
readonly root: string;
private readonly interpreters: Map<string, string[]>;

constructor({root, interpreters}: {root: string; interpreters?: Record<string, string[] | null>}) {
Expand Down Expand Up @@ -304,7 +308,13 @@ export class LoaderResolver {
const info = getFileInfo(this.root, path);
if (!info) return createHash("sha256").digest("hex");
const {hash} = info;
return path === name ? hash : createHash("sha256").update(hash).update(String(info.mtimeMs)).digest("hex");
if (path === name) return hash;
const hash2 = createHash("sha256");
for (const p of chainDependencies(this.root, name)) {
const info = getFileInfo(this.root, this.getSourceFilePath(p));
if (info) hash2.update(info.hash).update(String(info.mtimeMs));
}
return hash2.digest("hex");
}

getOutputFileHash(name: string): string {
Expand Down Expand Up @@ -414,15 +424,33 @@ abstract class AbstractLoader implements Loader {
let command = runningCommands.get(key);
if (!command) {
command = (async () => {
const loaderStat = await maybeStat(loaderPath);
const paths = chainDependencies(this.root, this.targetPath);
const FRESH = 0;
const STALE = 1;
const MISSING = 2;
let status = FRESH;
for (const path of paths) {
const cachePath = join(this.root, ".observablehq", "cache", path);
const cacheStat = await maybeStat(cachePath);
if (!cacheStat) {
status = MISSING;
break;
} else if (cacheStat.mtimeMs < loaderStat!.mtimeMs) status = Math.max(status, STALE);
}
const outputPath = join(".observablehq", "cache", this.targetPath);
const cachePath = join(this.root, outputPath);
const loaderStat = await maybeStat(loaderPath);
const cacheStat = await maybeStat(cachePath);
if (!cacheStat) effects.output.write(faint("[missing] "));
else if (cacheStat.mtimeMs < loaderStat!.mtimeMs) {
if (useStale) return effects.output.write(faint("[using stale] ")), outputPath;
else effects.output.write(faint("[stale] "));
} else return effects.output.write(faint("[fresh] ")), outputPath;
switch (status) {
case FRESH:
return effects.output.write(faint("[fresh] ")), outputPath;
case STALE:
if (useStale) return effects.output.write(faint("[using stale] ")), outputPath;
effects.output.write(faint("[stale] "));
break;
case MISSING:
effects.output.write(faint("[missing] "));
break;
}
const tempPath = join(this.root, ".observablehq", "cache", `${this.targetPath}.${process.pid}`);
const errorPath = tempPath + ".err";
const errorStat = await maybeStat(errorPath);
Expand All @@ -434,15 +462,37 @@ abstract class AbstractLoader implements Loader {
await prepareOutput(tempPath);
await prepareOutput(cachePath);
const tempFd = await open(tempPath, "w");

// Launch a server for chained data loaders. TODO configure host?
const dependencies = new Set<string>();
const {server} = await preview({root: this.root, verbose: false, hostname: "127.0.0.1", dependencies});
const address = server.address();
if (!address || typeof address !== "object")
throw new Error("Couldn't launch server for chained data loaders!");
const FILE_SERVER = `http://${address.address}:${address.port}/_file/`;

try {
await this.exec(tempFd.createWriteStream({highWaterMark: 1024 * 1024}), {useStale}, effects);
await this.exec(tempFd.createWriteStream({highWaterMark: 1024 * 1024}), {useStale, FILE_SERVER}, effects);
await rename(tempPath, cachePath);
} catch (error) {
await rename(tempPath, errorPath);
throw error;
} finally {
await tempFd.close();
}

const cachedeps = `${cachePath}__dependencies`;
if (dependencies.size) await writeFile(cachedeps, JSON.stringify([...dependencies]), "utf-8");
else
try {
await rm(cachedeps);
} catch (error) {
if (!isEnoent(error)) throw error;
}

// TODO: server.close() might be enough?
await new Promise((closed) => server.close(closed));

return outputPath;
})();
command.finally(() => runningCommands.delete(key)).catch(() => {});
Expand Down Expand Up @@ -495,8 +545,12 @@ class CommandLoader extends AbstractLoader {
this.args = args;
}

async exec(output: WriteStream): Promise<void> {
const subprocess = spawn(this.command, this.args, {windowsHide: true, stdio: ["ignore", output, "inherit"]});
async exec(output: WriteStream, {FILE_SERVER}): Promise<void> {
const subprocess = spawn(this.command, this.args, {
windowsHide: true,
stdio: ["ignore", output, "inherit"],
env: {...process.env, FILE_SERVER}
});
const code = await new Promise((resolve, reject) => {
subprocess.on("error", reject);
subprocess.on("close", resolve);
Expand Down Expand Up @@ -582,3 +636,18 @@ function formatElapsed(start: number): string {
const elapsed = performance.now() - start;
return `${Math.floor(elapsed)}ms`;
}

export function chainDependencies(root: string, path: string): Set<string> {
const paths = new Set([path]);
for (const path of paths) {
try {
for (const f of JSON.parse(readFileSync(join(root, ".observablehq", "cache", `${path}__dependencies`), "utf-8")))
paths.add(f);
} catch (error) {
if (!isEnoent(error)) {
throw error;
}
}
}
return paths;
}
8 changes: 7 additions & 1 deletion src/preview.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ export interface PreviewOptions {
port?: number;
origins?: string[];
verbose?: boolean;
dependencies?: Set<string>;
}

export async function preview(options: PreviewOptions): Promise<PreviewServer> {
Expand All @@ -58,19 +59,22 @@ export class PreviewServer {
private readonly _server: ReturnType<typeof createServer>;
private readonly _socketServer: WebSocketServer;
private readonly _verbose: boolean;
private readonly dependencies: Set<string> | undefined;

private constructor({
config,
root,
origins = [],
server,
verbose
verbose,
dependencies
}: {
config?: string;
root?: string;
origins?: string[];
server: Server;
verbose: boolean;
dependencies?: Set<string>;
}) {
this._config = config;
this._root = root;
Expand All @@ -80,6 +84,7 @@ export class PreviewServer {
this._server.on("request", this._handleRequest);
this._socketServer = new WebSocketServer({server: this._server});
this._socketServer.on("connection", this._handleConnection);
this.dependencies = dependencies;
}

static async start({verbose = true, hostname, port, open, ...options}: PreviewOptions) {
Expand Down Expand Up @@ -172,6 +177,7 @@ export class PreviewServer {
}
throw enoent(path);
} else if (pathname.startsWith("/_file/")) {
if (this.dependencies) this.dependencies.add(pathname.slice("/_file".length));
send(req, await loaders.loadFile(pathname.slice("/_file".length)), {root}).pipe(res);
} else {
if ((pathname = normalize(pathname)).startsWith("..")) throw new Error("Invalid path: " + pathname);
Expand Down
1 change: 1 addition & 0 deletions test/build-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ function* findFiles(root: string): Iterable<string> {
visited.add(status.ino);
for (const entry of readdirSync(path)) {
if (entry === ".DS_Store") continue; // macOS
if (entry === ".ignoreme") continue; // see inputs/build/chain/
queue.push(join(path, entry));
}
} else {
Expand Down
10 changes: 10 additions & 0 deletions test/input/build/chain/chain-source.json.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import {existsSync, writeFileSync} from "node:fs";
const testFile = "./test/output/build/chain-changed/.ignoreme";

const x = existsSync(testFile) ? 0 : 3;

try {
writeFileSync(testFile, "—");
} catch (error) { }

process.stdout.write(JSON.stringify({ x }));
1 change: 1 addition & 0 deletions test/input/build/chain/chain.json.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
console.log(JSON.stringify(process.env.address, null, 2));
9 changes: 9 additions & 0 deletions test/input/build/chain/chain.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Chained data loaders

```js
FileAttachment("chain1.json").json()
```

```js
FileAttachment("chain2.csv").csv({typed: true})
```
3 changes: 3 additions & 0 deletions test/input/build/chain/chain1.json.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
const {FILE_SERVER} = process.env;
const {x} = await fetch(`${FILE_SERVER}chain-source.json`).then((response) => response.json());
process.stdout.write(JSON.stringify({x, "x^2": x * x}, null, 2));
3 changes: 3 additions & 0 deletions test/input/build/chain/chain2.csv.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
const {FILE_SERVER} = process.env;
const {x} = await fetch(`${FILE_SERVER}chain-source.json`).then((response) => response.json());
process.stdout.write(`name,value\nx,${x}\nx^2,${x * x}`);
4 changes: 4 additions & 0 deletions test/output/build/chain/_file/chain1.e1f60496.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"x": 3,
"x^2": 9
}
3 changes: 3 additions & 0 deletions test/output/build/chain/_file/chain2.18991dde.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
name,value
x,3
x^2,9
Empty file.
Empty file.
Empty file.
Empty file.
Loading