Skip to content

Commit

Permalink
Merge pull request #27 from pinax-network/rename-finalBlocksOnly
Browse files Browse the repository at this point in the history
rename finalBlocksOnly
  • Loading branch information
DenisCarriere authored Feb 21, 2024
2 parents ad0cb09 + ef5ed26 commit 26a4a4e
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 15 deletions.
8 changes: 5 additions & 3 deletions index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ import { banner } from "./src/banner.js";
import { toJSON, toText } from "./src/http.js";
import { ping } from "./src/ping.js";
import { keyPair, parsePrivateKey } from "./src/auth.js";
import { Metadata, boolean } from "./src/schemas.js";

export async function action(options: WebhookRunOptions) {
const cursor = fileCursor.readCursor(options.cursorPath);
const finalBlocksOnly = boolean.parse(options.finalBlocksOnly);

// Block Emitter
const { emitter, moduleHash } = await setup({...options, cursor});
Expand All @@ -34,7 +36,7 @@ export async function action(options: WebhookRunOptions) {
emitter.on("output", async (data, cursor, clock) => {
if (!clock.timestamp) return;
const chain = new URL(options.substreamsEndpoint).hostname.split(".")[0];
const metadata = {
const metadata: Metadata = {
status: 200,
cursor,
session: {
Expand All @@ -49,7 +51,7 @@ export async function action(options: WebhookRunOptions) {
manifest: {
substreamsEndpoint: options.substreamsEndpoint,
chain,
finalBlockOnly: options.finalBlocksOnly,
finalBlocksOnly,
moduleName: options.moduleName,
type: data.getType().typeName,
moduleHash,
Expand All @@ -60,7 +62,7 @@ export async function action(options: WebhookRunOptions) {

// Queue POST
queue.add(async () => {
await postWebhook(options.webhookUrl, body, privateKey, options);
await postWebhook(options.webhookUrl, body, metadata, privateKey, options);
fs.writeFileSync(options.cursorPath, cursor);
});
});
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"version": "0.9.1",
"version": "0.9.2",
"name": "substreams-sink-webhook",
"description": "Substreams Sink Webhook",
"type": "module",
Expand Down
4 changes: 2 additions & 2 deletions src/ping.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ export async function ping(url: string, privateKey?: Hex) {

// send valid signature (must respond with 200)
try {
await postWebhook(url, body, privateKey, { maximumAttempts: 0 });
await postWebhook(url, body, undefined, privateKey, { maximumAttempts: 0 });
} catch (_e) {
return false;
}
Expand All @@ -20,7 +20,7 @@ export async function ping(url: string, privateKey?: Hex) {

// send invalid signature (must NOT respond with 200)
try {
await postWebhook(url, body, invalidprivateKey, { maximumAttempts: 0 });
await postWebhook(url, body, undefined, invalidprivateKey, { maximumAttempts: 0 });
return false;
} catch (_e) {
return true;
Expand Down
10 changes: 6 additions & 4 deletions src/postWebhook.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Hex } from "@noble/curves/abstract/utils";
import { logger } from "substreams-sink";
import { createTimestamp, sign } from "./auth.js";
import logUpdate from "log-update";
import { Metadata } from "./schemas.js";

function awaitSetTimeout(ms: number) {
return new Promise((resolve) => setTimeout(resolve, ms));
Expand All @@ -21,16 +22,17 @@ let start = now();
// let lastUpdate = now();

// TO-DO replace with Prometheus metrics
function logProgress() {
function logProgress(metadata?: Metadata) {
if ( !metadata ) return;
const delta = now() - start;
const rate = Math.round(blocks / delta);
const minutes = Math.round(delta / 60);
const seconds = delta % 60;
if ( blocks ) logUpdate(`[app] blocks=${blocks} [${rate} b/s] (${minutes}m ${seconds}s)`);
if ( blocks ) logUpdate(`[app] timestamp=${metadata.clock.timestamp} block_number=${metadata.clock.number} blocks=${blocks} [${rate} b/s] (${minutes}m ${seconds}s)`);
blocks++;
}

export async function postWebhook(url: string, body: string, secretKey?: Hex, options: PostWebhookOptions = {}) {
export async function postWebhook(url: string, body: string, metadata?: Metadata, secretKey?: Hex, options: PostWebhookOptions = {}) {
// Retry Policy
const initialInterval = 1000; // 1s
const maximumAttempts = options.maximumAttempts ?? 100 * initialInterval;
Expand Down Expand Up @@ -81,7 +83,7 @@ export async function postWebhook(url: string, body: string, secretKey?: Hex, op
continue;
}
// success
logProgress();
logProgress(metadata);
return { url, status };
} catch (e: any) {
const error = e.cause;
Expand Down
22 changes: 17 additions & 5 deletions src/schemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,29 @@ export const ManifestSchema = z.object({
type: z.string(),
moduleHash: z.string(),
chain: z.string(),
finalBlockOnly: boolean,
finalBlocksOnly: boolean,
});
export type Manifest = z.infer<typeof ManifestSchema>;

export const SessionSchema = z.object({
traceId: z.string(),
resolvedStartBlock: z.number(),
});
export type Session = z.infer<typeof SessionSchema>;

export const MetadataSchema = z.object({
status: z.number(),
cursor: z.string(),
session: SessionSchema,
clock: ClockSchema,
manifest: ManifestSchema,
});
export type Metadata = z.infer<typeof MetadataSchema>;

export const makePayloadBody = <S extends z.Schema>(dataSchema: S) =>
z.object({
cursor: z.string(),
session: z.object({
traceId: z.string(),
resolvedStartBlock: z.number(),
}),
session: SessionSchema,
clock: ClockSchema,
manifest: ManifestSchema,
data: dataSchema,
Expand Down

0 comments on commit 26a4a4e

Please sign in to comment.