Skip to content

Commit

Permalink
improve logging & cursor handling
Browse files Browse the repository at this point in the history
  • Loading branch information
DenisCarriere committed Feb 20, 2024
1 parent ca360a7 commit 998e489
Show file tree
Hide file tree
Showing 7 changed files with 1,918 additions and 38 deletions.
3 changes: 3 additions & 0 deletions bin/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,20 @@ import { ping } from "../src/ping.js";
export interface WebhookRunOptions extends commander.RunOptions {
webhookUrl: string;
privateKey?: string;
cursorPath: string;
maximumAttempts: number;
}

const webhookUrlOption = new Option("--webhook-url <string>", "Webhook URL to send POST").makeOptionMandatory().env("WEBHOOK_URL");
const privateKeyOption = new Option("--private-key <string>", "Ed25519 private key to sign POST data payload").env("PRIVATE_KEY");
const cursorOption = new Option("--cursor-path <string>", "Path to cursor file").env("CURSOR_PATH").default("cursor.lock");

// Run Webhook Sink
const program = commander.program(pkg);
const command = commander.run(program, pkg);
command.addOption(webhookUrlOption);
command.addOption(privateKeyOption);
command.addOption(cursorOption);
command.addOption(new Option("--maximum-attempts <number>", "Maximum attempts to retry POST").env("MAXIMUM_ATTEMPTS").default(100));
command.action(action);

Expand Down
8 changes: 6 additions & 2 deletions index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import PQueue from "p-queue";
import { http, setup, logger } from "substreams-sink";
import fs from "fs";
import { http, setup, logger, fileCursor } from "substreams-sink";
import { postWebhook } from "./src/postWebhook.js";

import type { SessionInit } from "@substreams/core/proto";
Expand All @@ -10,8 +11,10 @@ import { ping } from "./src/ping.js";
import { keyPair, parsePrivateKey } from "./src/auth.js";

export async function action(options: WebhookRunOptions) {
const cursor = fileCursor.readCursor(options.cursorPath);

// Block Emitter
const { emitter, moduleHash } = await setup(options);
const { emitter, moduleHash } = await setup({...options, cursor});

// Queue
const queue = new PQueue({ concurrency: 1 }); // all messages are sent in block order, no need to parallelize
Expand Down Expand Up @@ -58,6 +61,7 @@ export async function action(options: WebhookRunOptions) {
// Queue POST
queue.add(async () => {
await postWebhook(options.webhookUrl, body, privateKey, options);
fs.writeFileSync(options.cursorPath, cursor);
});
});
emitter.start();
Expand Down
Loading

0 comments on commit 998e489

Please sign in to comment.