Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: detect block gaps when streaming from ordhook #349

Merged
merged 2 commits into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/pg/block-cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ export class BlockCache {
locations: DbLocationInsert[] = [];
currentLocations = new Map<string, DbCurrentLocationInsert>();
recursiveRefs = new Map<string, string[]>();
revealedNumbers: number[] = [];

mimeTypeCounts = new Map<string, number>();
satRarityCounts = new Map<string, number>();
Expand Down Expand Up @@ -72,6 +73,7 @@ export class BlockCache {
parent: reveal.parent,
timestamp: this.timestamp,
});
this.revealedNumbers.push(reveal.inscription_number.jubilee);
this.increaseMimeTypeCount(mime_type);
this.increaseSatRarityCount(satoshi.rarity);
this.increaseInscriptionTypeCount(reveal.inscription_number.classic < 0 ? 'cursed' : 'blessed');
Expand Down
64 changes: 52 additions & 12 deletions src/pg/pg-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@ import {
runMigrations,
stopwatch,
} from '@hirosystems/api-toolkit';
import { BitcoinEvent, BitcoinPayload } from '@hirosystems/chainhook-client';
import {
BadPayloadRequestError,
BitcoinEvent,
BitcoinPayload,
} from '@hirosystems/chainhook-client';
import * as path from 'path';
import * as postgres from 'postgres';
import { Order, OrderBy } from '../api/schemas';
Expand All @@ -35,6 +39,8 @@ export const INSERT_BATCH_SIZE = 4000;

type InscriptionIdentifier = { genesis_id: string } | { number: number };

class BlockAlreadyIngestedError extends Error {}

export class PgStore extends BasePgStore {
readonly brc20: Brc20PgStore;
readonly counts: CountsPgStore;
Expand Down Expand Up @@ -90,14 +96,17 @@ export class PgStore extends BasePgStore {
);
}
for (const event of payload.apply) {
if (await this.isBlockIngested(event)) {
logger.warn(`PgStore skipping previously seen block ${event.block_identifier.index}`);
continue;
}
logger.info(`PgStore apply block ${event.block_identifier.index}`);
const time = stopwatch();
await this.updateInscriptionsEvent(sql, event, 'apply', streamed);
await this.brc20.updateBrc20Operations(sql, event, 'apply');
try {
await this.updateInscriptionsEvent(sql, event, 'apply', streamed);
await this.brc20.updateBrc20Operations(sql, event, 'apply');
} catch (error) {
if (error instanceof BlockAlreadyIngestedError) {
logger.warn(error);
continue;
} else throw error;
}
await this.updateChainTipBlockHeight(sql, event.block_identifier.index);
logger.info(
`PgStore apply block ${
Expand All @@ -119,6 +128,7 @@ export class PgStore extends BasePgStore {
normalizedHexString(event.block_identifier.hash),
event.timestamp
);
if (direction === 'apply') await this.assertNextBlockIsNotIngested(sql, event);
for (const tx of event.transactions) {
const tx_id = normalizedHexString(tx.transaction_identifier.hash);
for (const operation of tx.metadata.ordinal_operations) {
Expand All @@ -138,6 +148,7 @@ export class PgStore extends BasePgStore {
}
switch (direction) {
case 'apply':
if (streamed) await this.assertNextBlockIsContiguous(sql, event, cache);
await this.applyInscriptions(sql, cache, streamed);
break;
case 'rollback':
Expand Down Expand Up @@ -348,15 +359,44 @@ export class PgStore extends BasePgStore {
}
}

private async isBlockIngested(event: BitcoinEvent): Promise<boolean> {
const currentBlockHeight = await this.getChainTipBlockHeight();
private async assertNextBlockIsNotIngested(sql: PgSqlClient, event: BitcoinEvent) {
const result = await sql<{ block_height: number }[]>`
SELECT block_height::int FROM chain_tip
`;
if (!result.count) return false;
const currentHeight = result[0].block_height;
if (
event.block_identifier.index <= currentBlockHeight &&
event.block_identifier.index <= currentHeight &&
event.block_identifier.index !== ORDINALS_GENESIS_BLOCK
) {
return true;
throw new BlockAlreadyIngestedError(
`Block ${event.block_identifier.index} is already ingested, chain tip is at ${currentHeight}`
);
}
}

private async assertNextBlockIsContiguous(
sql: PgSqlClient,
event: BitcoinEvent,
cache: BlockCache
) {
if (!cache.revealedNumbers.length) {
// TODO: How do we check blocks with only transfers?
return;
}
return false;
const result = await sql<{ max: number | null; block_height: number }[]>`
WITH tip AS (SELECT block_height::int FROM chain_tip)
SELECT MAX(number)::int AS max, (SELECT block_height FROM tip)
FROM inscriptions WHERE number >= 0
`;
if (!result.count) return;
const data = result[0];
const firstReveal = cache.revealedNumbers.sort()[0];
if (data.max === null && firstReveal === 0) return;
if ((data.max ?? 0) + 1 != firstReveal)
throw new BadPayloadRequestError(
`Streamed block ${event.block_identifier.index} is non-contiguous, attempting to reveal #${firstReveal} when current max is #${data.max} at block height ${data.block_height}`
);
}

private async updateChainTipBlockHeight(sql: PgSqlClient, block_height: number): Promise<void> {
Expand Down
10 changes: 10 additions & 0 deletions tests/api/cache.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ describe('ETag cache', () => {

test('inscription cache control', async () => {
const block = new TestChainhookPayloadBuilder()
.streamingBlocks(true)
.apply()
.block({ height: 775617 })
.transaction({ hash: '0x38c46a8bf7ec90bc7f6b797e7dc84baa97f4e5fd4286b92fe1b50176d03b18dc' })
Expand Down Expand Up @@ -88,6 +89,7 @@ describe('ETag cache', () => {
// Perform transfer and check cache
await db.updateInscriptions(
new TestChainhookPayloadBuilder()
.streamingBlocks(true)
.apply()
.block({ height: 775618, timestamp: 1678122360 })
.transaction({
Expand Down Expand Up @@ -125,6 +127,7 @@ describe('ETag cache', () => {
// Perform transfer GAP FILL and check cache
await db.updateInscriptions(
new TestChainhookPayloadBuilder()
.streamingBlocks(true)
.apply()
.block({ height: 775619, timestamp: 1678122360 })
.transaction({
Expand Down Expand Up @@ -161,6 +164,7 @@ describe('ETag cache', () => {

test('inscriptions index cache control', async () => {
const block1 = new TestChainhookPayloadBuilder()
.streamingBlocks(true)
.apply()
.block({ height: 778575 })
.transaction({ hash: '0x9f4a9b73b0713c5da01c0a47f97c6c001af9028d6bdd9e264dfacbc4e6790201' })
Expand Down Expand Up @@ -194,6 +198,7 @@ describe('ETag cache', () => {
.build();
await db.updateInscriptions(block1);
const block2 = new TestChainhookPayloadBuilder()
.streamingBlocks(true)
.apply()
.block({ height: 778576 })
.transaction({ hash: '0x00000000000000000002a90330a99f67e3f01eb2ce070b45930581e82fb7a91d' })
Expand Down Expand Up @@ -246,6 +251,7 @@ describe('ETag cache', () => {

// New location
const block3 = new TestChainhookPayloadBuilder()
.streamingBlocks(true)
.apply()
.block({ height: 778577 })
.transaction({ hash: 'ae9d273a10e899f0d2cad47ee2b0e77ab8a9addd9dd5bb5e4b03d6971c060d52' })
Expand Down Expand Up @@ -274,6 +280,7 @@ describe('ETag cache', () => {

test('inscriptions stats per block cache control', async () => {
const block1 = new TestChainhookPayloadBuilder()
.streamingBlocks(true)
.apply()
.block({ height: 778575, hash: randomHash() })
.transaction({ hash: '0x9f4a9b73b0713c5da01c0a47f97c6c001af9028d6bdd9e264dfacbc4e6790201' })
Expand Down Expand Up @@ -326,6 +333,7 @@ describe('ETag cache', () => {

// New block
const block2 = new TestChainhookPayloadBuilder()
.streamingBlocks(true)
.apply()
.block({ height: 778576, hash: randomHash() })
.transaction({ hash: '0x00000000000000000002a90330a99f67e3f01eb2ce070b45930581e82fb7a91d' })
Expand Down Expand Up @@ -370,6 +378,7 @@ describe('ETag cache', () => {

test('status etag changes with new block', async () => {
const block1 = new TestChainhookPayloadBuilder()
.streamingBlocks(true)
.apply()
.block({ height: 778575, hash: randomHash() })
.transaction({ hash: '0x9f4a9b73b0713c5da01c0a47f97c6c001af9028d6bdd9e264dfacbc4e6790201' })
Expand Down Expand Up @@ -422,6 +431,7 @@ describe('ETag cache', () => {

// New block
const block2 = new TestChainhookPayloadBuilder()
.streamingBlocks(true)
.apply()
.block({ height: 778576, hash: randomHash() })
.transaction({ hash: '0x00000000000000000002a90330a99f67e3f01eb2ce070b45930581e82fb7a91d' })
Expand Down
2 changes: 1 addition & 1 deletion tests/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ export class TestChainhookPayloadBuilder {
operation: 'inscription_feed',
meta_protocols: ['brc-20'],
},
is_streaming_blocks: true,
is_streaming_blocks: false,
},
};
private action: 'apply' | 'rollback' = 'apply';
Expand Down
1 change: 1 addition & 0 deletions tests/ordhook/replay.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ describe('Replay', () => {

test('shuts down when streaming on replay mode', async () => {
const payload1 = new TestChainhookPayloadBuilder()
.streamingBlocks(true)
.apply()
.block({
height: 767430,
Expand Down
126 changes: 126 additions & 0 deletions tests/ordhook/server.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,132 @@ describe('EventServer', () => {
});

describe('gap detection', () => {
test('server rejects payload with first inscription gap when streaming', async () => {
await db.updateInscriptions(
new TestChainhookPayloadBuilder()
.streamingBlocks(false)
.apply()
.block({
height: 778575,
hash: '0x00000000000000000002a90330a99f67e3f01eb2ce070b45930581e82fb7a91d',
timestamp: 1676913207,
})
.transaction({
hash: '9f4a9b73b0713c5da01c0a47f97c6c001af9028d6bdd9e264dfacbc4e6790201',
})
.inscriptionRevealed({
content_bytes: '0x48656C6C6F',
content_type: 'text/plain;charset=utf-8',
content_length: 5,
inscription_number: { classic: 0, jubilee: 0 },
inscription_fee: 705,
inscription_id: '9f4a9b73b0713c5da01c0a47f97c6c001af9028d6bdd9e264dfacbc4e6790201i0',
inscription_output_value: 10000,
inscriber_address: 'bc1pscktlmn99gyzlvymvrezh6vwd0l4kg06tg5rvssw0czg8873gz5sdkteqj',
ordinal_number: 257418248345364,
ordinal_block_height: 650000,
ordinal_offset: 0,
satpoint_post_inscription:
'9f4a9b73b0713c5da01c0a47f97c6c001af9028d6bdd9e264dfacbc4e6790201:0:0',
inscription_input_index: 0,
transfers_pre_inscription: 0,
tx_index: 0,
curse_type: null,
inscription_pointer: null,
delegate: null,
metaprotocol: null,
metadata: null,
parent: null,
})
.build()
);
const errorPayload1 = new TestChainhookPayloadBuilder()
.streamingBlocks(false)
.apply()
.block({
height: 778576,
hash: '00000000000000000002a90330a99f67e3f01eb2ce070b45930581e82fb7a91d',
timestamp: 1676913207,
})
.transaction({
hash: '38c46a8bf7ec90bc7f6b797e7dc84baa97f4e5fd4286b92fe1b50176d03b18dc',
})
.inscriptionRevealed({
content_bytes: '0x48656C6C6F',
content_type: 'text/plain;charset=utf-8',
content_length: 5,
inscription_number: { classic: 5, jubilee: 5 }, // Gap at 5 but block is not streamed
inscription_fee: 705,
inscription_id: '38c46a8bf7ec90bc7f6b797e7dc84baa97f4e5fd4286b92fe1b50176d03b18dci0',
inscription_output_value: 10000,
inscriber_address: 'bc1p3cyx5e2hgh53w7kpxcvm8s4kkega9gv5wfw7c4qxsvxl0u8x834qf0u2td',
ordinal_number: 1050000000000000,
ordinal_block_height: 650000,
ordinal_offset: 0,
satpoint_post_inscription:
'38c46a8bf7ec90bc7f6b797e7dc84baa97f4e5fd4286b92fe1b50176d03b18dc:0:0',
inscription_input_index: 0,
transfers_pre_inscription: 0,
tx_index: 0,
curse_type: null,
inscription_pointer: null,
delegate: null,
metaprotocol: null,
metadata: null,
parent: null,
})
.build();
// Not streamed, accepts block.
await expect(db.updateInscriptions(errorPayload1)).resolves.not.toThrow(
BadPayloadRequestError
);

const errorPayload2 = new TestChainhookPayloadBuilder()
.streamingBlocks(true)
.apply()
.block({
height: 778579,
hash: '00000000000000000002a90330a99f67e3f01eb2ce070b45930581e82fb7a91d',
timestamp: 1676913207,
})
.transaction({
hash: '38c46a8bf7ec90bc7f6b797e7dc84baa97f4e5fd4286b92fe1b50176d03b18dc',
})
.inscriptionRevealed({
content_bytes: '0x48656C6C6F',
content_type: 'text/plain;charset=utf-8',
content_length: 5,
inscription_number: { classic: 10, jubilee: 10 }, // Gap at 10
inscription_fee: 705,
inscription_id: '38c46a8bf7ec90bc7f6b797e7dc84baa97f4e5fd4286b92fe1b50176d03b18dci0',
inscription_output_value: 10000,
inscriber_address: 'bc1p3cyx5e2hgh53w7kpxcvm8s4kkega9gv5wfw7c4qxsvxl0u8x834qf0u2td',
ordinal_number: 1050000000000000,
ordinal_block_height: 650000,
ordinal_offset: 0,
satpoint_post_inscription:
'38c46a8bf7ec90bc7f6b797e7dc84baa97f4e5fd4286b92fe1b50176d03b18dc:0:0',
inscription_input_index: 0,
transfers_pre_inscription: 0,
tx_index: 0,
curse_type: null,
inscription_pointer: null,
delegate: null,
metaprotocol: null,
metadata: null,
parent: null,
})
.build();
await expect(db.updateInscriptions(errorPayload2)).rejects.toThrow(BadPayloadRequestError);
const response = await server['fastify'].inject({
method: 'POST',
url: `/payload`,
headers: { authorization: `Bearer ${ENV.ORDHOOK_NODE_AUTH_TOKEN}` },
payload: errorPayload2,
});
expect(response.statusCode).toBe(400);
});

test('server ignores past blocks', async () => {
const payload = new TestChainhookPayloadBuilder()
.apply()
Expand Down
Loading