diff --git a/index.ts b/index.ts index 18ee1d8..0870fff 100644 --- a/index.ts +++ b/index.ts @@ -9,9 +9,11 @@ import { banner } from "./src/banner.js"; import { toJSON, toText } from "./src/http.js"; import { ping } from "./src/ping.js"; import { keyPair, parsePrivateKey } from "./src/auth.js"; +import { Metadata, boolean } from "./src/schemas.js"; export async function action(options: WebhookRunOptions) { const cursor = fileCursor.readCursor(options.cursorPath); + const finalBlocksOnly = boolean.parse(options.finalBlocksOnly); // Block Emitter const { emitter, moduleHash } = await setup({...options, cursor}); @@ -34,7 +36,7 @@ export async function action(options: WebhookRunOptions) { emitter.on("output", async (data, cursor, clock) => { if (!clock.timestamp) return; const chain = new URL(options.substreamsEndpoint).hostname.split(".")[0]; - const metadata = { + const metadata: Metadata = { status: 200, cursor, session: { @@ -49,7 +51,7 @@ export async function action(options: WebhookRunOptions) { manifest: { substreamsEndpoint: options.substreamsEndpoint, chain, - finalBlockOnly: options.finalBlocksOnly, + finalBlocksOnly, moduleName: options.moduleName, type: data.getType().typeName, moduleHash, @@ -60,7 +62,7 @@ export async function action(options: WebhookRunOptions) { // Queue POST queue.add(async () => { - await postWebhook(options.webhookUrl, body, privateKey, options); + await postWebhook(options.webhookUrl, body, metadata, privateKey, options); fs.writeFileSync(options.cursorPath, cursor); }); }); diff --git a/package.json b/package.json index 902a70b..1be0229 100644 --- a/package.json +++ b/package.json @@ -1,5 +1,5 @@ { - "version": "0.9.1", + "version": "0.9.2", "name": "substreams-sink-webhook", "description": "Substreams Sink Webhook", "type": "module", diff --git a/src/ping.ts b/src/ping.ts index 4f6d4f7..112cbc5 100644 --- a/src/ping.ts +++ b/src/ping.ts @@ -8,7 +8,7 @@ export async function ping(url: string, privateKey?: Hex) { // send valid signature (must respond with 200) try { - await postWebhook(url, body, privateKey, { maximumAttempts: 0 }); + await postWebhook(url, body, undefined, privateKey, { maximumAttempts: 0 }); } catch (_e) { return false; } @@ -20,7 +20,7 @@ export async function ping(url: string, privateKey?: Hex) { // send invalid signature (must NOT respond with 200) try { - await postWebhook(url, body, invalidprivateKey, { maximumAttempts: 0 }); + await postWebhook(url, body, undefined, invalidprivateKey, { maximumAttempts: 0 }); return false; } catch (_e) { return true; diff --git a/src/postWebhook.ts b/src/postWebhook.ts index c6b51c1..6ec2293 100644 --- a/src/postWebhook.ts +++ b/src/postWebhook.ts @@ -2,6 +2,7 @@ import { Hex } from "@noble/curves/abstract/utils"; import { logger } from "substreams-sink"; import { createTimestamp, sign } from "./auth.js"; import logUpdate from "log-update"; +import { Metadata } from "./schemas.js"; function awaitSetTimeout(ms: number) { return new Promise((resolve) => setTimeout(resolve, ms)); @@ -21,16 +22,17 @@ let start = now(); // let lastUpdate = now(); // TO-DO replace with Prometheus metrics -function logProgress() { +function logProgress(metadata?: Metadata) { + if ( !metadata ) return; const delta = now() - start; const rate = Math.round(blocks / delta); const minutes = Math.round(delta / 60); const seconds = delta % 60; - if ( blocks ) logUpdate(`[app] blocks=${blocks} [${rate} b/s] (${minutes}m ${seconds}s)`); + if ( blocks ) logUpdate(`[app] timestamp=${metadata.clock.timestamp} block_number=${metadata.clock.number} blocks=${blocks} [${rate} b/s] (${minutes}m ${seconds}s)`); blocks++; } -export async function postWebhook(url: string, body: string, secretKey?: Hex, options: PostWebhookOptions = {}) { +export async function postWebhook(url: string, body: string, metadata?: Metadata, secretKey?: Hex, options: PostWebhookOptions = {}) { // Retry Policy const initialInterval = 1000; // 1s const maximumAttempts = options.maximumAttempts ?? 100 * initialInterval; @@ -81,7 +83,7 @@ export async function postWebhook(url: string, body: string, secretKey?: Hex, op continue; } // success - logProgress(); + logProgress(metadata); return { url, status }; } catch (e: any) { const error = e.cause; diff --git a/src/schemas.ts b/src/schemas.ts index 1739de6..fd4a0f5 100644 --- a/src/schemas.ts +++ b/src/schemas.ts @@ -18,17 +18,29 @@ export const ManifestSchema = z.object({ type: z.string(), moduleHash: z.string(), chain: z.string(), - finalBlockOnly: boolean, + finalBlocksOnly: boolean, }); export type Manifest = z.infer; +export const SessionSchema = z.object({ + traceId: z.string(), + resolvedStartBlock: z.number(), +}); +export type Session = z.infer; + +export const MetadataSchema = z.object({ + status: z.number(), + cursor: z.string(), + session: SessionSchema, + clock: ClockSchema, + manifest: ManifestSchema, +}); +export type Metadata = z.infer; + export const makePayloadBody = (dataSchema: S) => z.object({ cursor: z.string(), - session: z.object({ - traceId: z.string(), - resolvedStartBlock: z.number(), - }), + session: SessionSchema, clock: ClockSchema, manifest: ManifestSchema, data: dataSchema,