From 12dd46811ceab21760c8a27d545a3145cb7f840e Mon Sep 17 00:00:00 2001 From: RAJI ABDUL <119139404+rajiabdul@users.noreply.github.com> Date: Mon, 23 Dec 2024 21:01:08 +0000 Subject: [PATCH 1/3] feat: startknet indexer --- land-registry-indexer/.env.example | 2 +- land-registry-indexer/Dockerfile | 11 - land-registry-indexer/docker-compose.yml | 27 -- land-registry-indexer/package.json | 25 +- land-registry-indexer/src/config.ts | 63 +++- land-registry-indexer/src/eventHandlers.ts | 339 --------------------- land-registry-indexer/src/index.ts | 123 -------- land-registry-indexer/src/indexer.ts | 152 +++++++++ land-registry-indexer/src/tsconfig.json | 14 + 9 files changed, 229 insertions(+), 527 deletions(-) delete mode 100644 land-registry-indexer/Dockerfile delete mode 100644 land-registry-indexer/docker-compose.yml delete mode 100644 land-registry-indexer/src/eventHandlers.ts delete mode 100644 land-registry-indexer/src/index.ts create mode 100644 land-registry-indexer/src/indexer.ts create mode 100644 land-registry-indexer/src/tsconfig.json diff --git a/land-registry-indexer/.env.example b/land-registry-indexer/.env.example index 61688101..7031bd41 100644 --- a/land-registry-indexer/.env.example +++ b/land-registry-indexer/.env.example @@ -1,4 +1,4 @@ STARTING_BLOCK=0 LAND_REGISTRY_ADDRESS=0x5a4054a1b1389dcd48b650637977280d32f1ad8b3027bc6c7eb606bf7e28bf5 DATABASE_URL=postgresql://username:password@localhost:5432/land_registry -APIBARA_URL= \ No newline at end of file +APIBARA_URL=https://goerli.starknet.a5a.ch \ No newline at end of file diff --git a/land-registry-indexer/Dockerfile b/land-registry-indexer/Dockerfile deleted file mode 100644 index 6801bf78..00000000 --- a/land-registry-indexer/Dockerfile +++ /dev/null @@ -1,11 +0,0 @@ -FROM node:18-alpine - -WORKDIR /app - -COPY package*.json ./ -RUN npm install - -COPY . . -RUN npm run build - -CMD ["npm", "start"] \ No newline at end of file diff --git a/land-registry-indexer/docker-compose.yml b/land-registry-indexer/docker-compose.yml deleted file mode 100644 index 574edbe8..00000000 --- a/land-registry-indexer/docker-compose.yml +++ /dev/null @@ -1,27 +0,0 @@ -version: '3.8' - -services: - postgres: - image: postgres:14 - environment: - POSTGRES_USER: ${POSTGRES_USER:-postgres} - POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-postgres} - POSTGRES_DB: ${POSTGRES_DB:-land_registry} - volumes: - - postgres_data:/var/lib/postgresql/data - - ./schema.sql:/docker-entrypoint-initdb.d/schema.sql - ports: - - "5432:5432" - - indexer: - build: . - environment: - - DATABASE_URL=postgresql://${POSTGRES_USER:-postgres}:${POSTGRES_PASSWORD:-postgres}@postgres:5432/${POSTGRES_DB:-land_registry} - - LAND_REGISTRY_ADDRESS=${LAND_REGISTRY_ADDRESS} - - STARTING_BLOCK=${STARTING_BLOCK:-0} - - APIBARA_URL=${APIBARA_URL} - depends_on: - - postgres - -volumes: - postgres_data: \ No newline at end of file diff --git a/land-registry-indexer/package.json b/land-registry-indexer/package.json index 0d9eaf63..c1f9403d 100644 --- a/land-registry-indexer/package.json +++ b/land-registry-indexer/package.json @@ -1,24 +1,23 @@ { "name": "land-registry-indexer", "version": "1.0.0", - "description": "Apibara indexer for Land Registry events", - "main": "src/index.ts", + "main": "dist/indexer.js", "scripts": { "build": "tsc", - "start": "node dist/index.js", - "dev": "ts-node src/index.ts" + "start": "node dist/indexer.js", + "dev": "ts-node src/indexer.ts" }, "dependencies": { - "@apibara/indexer": "^0.3.0", - "@apibara/protocol": "^0.4.0", - "@apibara/starknet": "^0.3.0", - "dotenv": "^16.0.3", - "pg": "^8.11.0", - "typescript": "^5.0.4" + "@apibara/protocol": "^2.0.0-beta.28", + "@apibara/starknet": "^2.0.0-beta.28", + "pg": "^8.11.3", + "dotenv": "^16.3.1" }, "devDependencies": { - "@types/node": "^20.2.5", - "@types/pg": "^8.10.1", - "ts-node": "^10.9.1" + "@types/node": "^20.8.0", + "@types/pg": "^8.10.3", + "@types/dotenv": "^8.2.0", + "ts-node": "^10.9.1", + "typescript": "^5.2.2" } } \ No newline at end of file diff --git a/land-registry-indexer/src/config.ts b/land-registry-indexer/src/config.ts index 59b4d7d5..bbf25fc5 100644 --- a/land-registry-indexer/src/config.ts +++ b/land-registry-indexer/src/config.ts @@ -1,16 +1,53 @@ -/** - * Configuration settings for the Land Registry Indexer - * - * This module loads environment variables from a .env file and provides - * configuration constants used throughout the application. - */ +import { createClient } from "@apibara/protocol"; +import { Filter, StarknetStream } from "@apibara/starknet"; +import * as dotenv from "dotenv"; -import dotenv from 'dotenv'; dotenv.config(); -export const config = { - startingBlock: Number(process.env.STARTING_BLOCK || 0), - landRegistryAddress: process.env.LAND_REGISTRY_ADDRESS || '', - pgConnection: process.env.DATABASE_URL || '', - apibaraUrl: process.env.APIBARA_URL || '', -}; +// Helper function to convert hex string to decimal string +const hexToDec = (hex: string): string => { + return BigInt(hex).toString(10); +}; + +// Event keys for the Land Registry contract +export const EVENT_KEYS = { + LAND_REGISTERED: hexToDec("0x0000000000000000000000000000000000000000000000000000000000000001"), + LAND_TRANSFERRED: hexToDec("0x0000000000000000000000000000000000000000000000000000000000000002"), + LAND_VERIFIED: hexToDec("0x0000000000000000000000000000000000000000000000000000000000000003"), + LAND_UPDATED: hexToDec("0x0000000000000000000000000000000000000000000000000000000000000004"), + INSPECTOR_ADDED: hexToDec("0x0000000000000000000000000000000000000000000000000000000000000005"), + INSPECTOR_REMOVED: hexToDec("0x0000000000000000000000000000000000000000000000000000000000000006"), + LAND_LISTED: hexToDec("0x0000000000000000000000000000000000000000000000000000000000000007"), + LISTING_UPDATED: hexToDec("0x0000000000000000000000000000000000000000000000000000000000000008"), + LAND_SOLD: hexToDec("0x0000000000000000000000000000000000000000000000000000000000000009"), +}; + +export const createIndexerConfig = () => { + const filter = Filter.make({ + header: "unknown", + events: [{ + address: process.env.LAND_REGISTRY_ADDRESS as `0x${string}`, + keys: Object.values(EVENT_KEYS).map((key) => `0x${key.toString()}` as `0x${string}`), + transactionStatus: "succeeded", + }], + }); + + const request = StarknetStream.Request.make({ + filter: [filter], + finality: "pending" || "accepted" || "succeeded", + startingCursor: { + orderKey: BigInt(process.env.STARTING_BLOCK || "0"), + }, + }); + + return { + client: createClient(StarknetStream, process.env.APIBARA_URL || ""), + request, + dbUrl: process.env.DATABASE_URL || "", + }; +}; + +export type IndexerMessage = { + _tag: "data" | "invalidate"; + data?: any; +}; \ No newline at end of file diff --git a/land-registry-indexer/src/eventHandlers.ts b/land-registry-indexer/src/eventHandlers.ts deleted file mode 100644 index e1440bdb..00000000 --- a/land-registry-indexer/src/eventHandlers.ts +++ /dev/null @@ -1,339 +0,0 @@ -/** - * Event handler functions for the Land Registry smart contract - * - * These functions process events emitted by the contract and store them in PostgreSQL. - * Each handler receives: - * - client: PostgreSQL client for database operations - * - data: Event data from the contract - * - cursor: Block/transaction metadata - * - * The handlers maintain the state of: - * - Land parcels (registration, transfers, verification) - * - Inspectors (adding/removing) - * - Marketplace listings (creation, updates, sales) - * - * All monetary values are stored as strings to preserve precision. - * Addresses are stored as strings in their full StarkNet format. - * Timestamps are converted from Unix seconds to JavaScript Date objects. - */ - -import { PoolClient } from 'pg'; -import { StarkNetCursor } from '@apibara/protocol'; - -export async function handleLandRegistered( - client: PoolClient, - data: any, - cursor: StarkNetCursor -) { - const { land_id, owner, location, area, land_use, fee } = data; - - await client.query( - `INSERT INTO lands ( - land_id, owner_address, location_latitude, - location_longitude, area, land_use, status, fee - ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`, - [ - land_id.toString(), - owner.toString(), - location.latitude, - location.longitude, - area.toString(), - land_use, - 'PENDING', - fee.toString() - ] - ); -} - -export async function handleLandTransferred( - client: PoolClient, - data: any, - cursor: StarkNetCursor -) { - const { land_id, from_owner, to_owner } = data; - - await client.query( - `INSERT INTO land_transfers ( - land_id, from_address, to_address, - transaction_hash, block_number, timestamp - ) VALUES ($1, $2, $3, $4, $5, $6)`, - [ - land_id.toString(), - from_owner.toString(), - to_owner.toString(), - cursor.transactionHash, - cursor.blockNumber, - new Date(cursor.timestamp * 1000) - ] - ); - - await client.query( - `UPDATE lands SET owner_address = $1, updated_at = NOW() - WHERE land_id = $2`, - [to_owner.toString(), land_id.toString()] - ); -} - -export async function handleLandVerified( - client: PoolClient, - data: any, - cursor: StarkNetCursor -) { - const { land_id, inspector } = data; - - await client.query( - `INSERT INTO land_verifications ( - land_id, inspector_address, transaction_hash, - block_number, timestamp - ) VALUES ($1, $2, $3, $4, $5)`, - [ - land_id.toString(), - inspector.toString(), - cursor.transactionHash, - cursor.blockNumber, - new Date(cursor.timestamp * 1000) - ] - ); - - await client.query( - `UPDATE lands SET - status = 'VERIFIED', - inspector_address = $1, - updated_at = NOW() - WHERE land_id = $2`, - [inspector.toString(), land_id.toString()] - ); -} - -export async function handleLandUpdated( - client: PoolClient, - data: any, - cursor: StarkNetCursor -) { - const { land_id, area, land_use } = data; - - await client.query( - `INSERT INTO land_updates ( - land_id, area, land_use, transaction_hash, - block_number, timestamp - ) VALUES ($1, $2, $3, $4, $5, $6)`, - [ - land_id.toString(), - area.toString(), - land_use, - cursor.transactionHash, - cursor.blockNumber, - new Date(cursor.timestamp * 1000) - ] - ); - - await client.query( - `UPDATE lands SET - area = $1, - land_use = $2, - updated_at = NOW() - WHERE land_id = $3`, - [area.toString(), land_use, land_id.toString()] - ); -} - -export async function handleInspectorAdded( - client: PoolClient, - data: any, - cursor: StarkNetCursor -) { - const { inspector } = data; - - await client.query( - `INSERT INTO inspectors ( - address, is_active, added_at - ) VALUES ($1, $2, $3)`, - [ - inspector.toString(), - true, - new Date(cursor.timestamp * 1000) - ] - ); -} - -export async function handleInspectorRemoved( - client: PoolClient, - data: any, - cursor: StarkNetCursor -) { - const { inspector } = data; - - await client.query( - `UPDATE inspectors SET - is_active = false, - removed_at = $1 - WHERE address = $2`, - [new Date(cursor.timestamp * 1000), inspector.toString()] - ); -} - -export async function handleLandInspectorSet( - client: PoolClient, - data: any, - cursor: StarkNetCursor -) { - const { land_id, inspector } = data; - - await client.query( - `INSERT INTO inspector_assignments ( - land_id, inspector_address, transaction_hash, - block_number, timestamp - ) VALUES ($1, $2, $3, $4, $5)`, - [ - land_id.toString(), - inspector.toString(), - cursor.transactionHash, - cursor.blockNumber, - new Date(cursor.timestamp * 1000) - ] - ); - - await client.query( - `UPDATE lands SET - inspector_address = $1, - updated_at = NOW() - WHERE land_id = $2`, - [inspector.toString(), land_id.toString()] - ); -} - -export async function handleFeeUpdated( - client: PoolClient, - data: any, - cursor: StarkNetCursor -) { - const { old_fee, new_fee } = data; - - await client.query( - `INSERT INTO fee_updates ( - old_fee, new_fee, transaction_hash, - block_number, timestamp - ) VALUES ($1, $2, $3, $4, $5)`, - [ - old_fee.toString(), - new_fee.toString(), - cursor.transactionHash, - cursor.blockNumber, - new Date(cursor.timestamp * 1000) - ] - ); -} - -export async function handleListingCreated( - client: PoolClient, - data: any, - cursor: StarkNetCursor -) { - const { listing_id, land_id, seller, price } = data; - - await client.query( - `INSERT INTO listings ( - id, land_id, seller_address, price, status, - created_at, updated_at, transaction_hash, block_number - ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)`, - [ - listing_id.toString(), - land_id.toString(), - seller.toString(), - price.toString(), - 'ACTIVE', - new Date(cursor.timestamp * 1000), - new Date(cursor.timestamp * 1000), - cursor.transactionHash, - cursor.blockNumber - ] - ); -} - -export async function handleListingPriceUpdated( - client: PoolClient, - data: any, - cursor: StarkNetCursor -) { - const { listing_id, old_price, new_price } = data; - - await client.query( - `INSERT INTO listing_price_updates ( - listing_id, old_price, new_price, transaction_hash, - block_number, timestamp - ) VALUES ($1, $2, $3, $4, $5, $6)`, - [ - listing_id.toString(), - old_price.toString(), - new_price.toString(), - cursor.transactionHash, - cursor.blockNumber, - new Date(cursor.timestamp * 1000) - ] - ); - - await client.query( - `UPDATE listings SET - price = $1, - updated_at = NOW() - WHERE id = $2`, - [new_price.toString(), listing_id.toString()] - ); -} - -export async function handleListingCancelled( - client: PoolClient, - data: any, - cursor: StarkNetCursor -) { - const { listing_id } = data; - - await client.query( - `UPDATE listings SET - status = 'CANCELLED', - updated_at = NOW() - WHERE id = $1`, - [listing_id.toString()] - ); -} - -export async function handleLandSold( - client: PoolClient, - data: any, - cursor: StarkNetCursor -) { - const { listing_id, land_id, seller, buyer, price } = data; - - await client.query( - `INSERT INTO land_sales ( - listing_id, land_id, seller_address, buyer_address, - price, transaction_hash, block_number, timestamp - ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`, - [ - listing_id.toString(), - land_id.toString(), - seller.toString(), - buyer.toString(), - price.toString(), - cursor.transactionHash, - cursor.blockNumber, - new Date(cursor.timestamp * 1000) - ] - ); - - await client.query( - `UPDATE listings SET - status = 'SOLD', - updated_at = NOW() - WHERE id = $1`, - [listing_id.toString()] - ); - - await client.query( - `UPDATE lands SET - owner_address = $1, - updated_at = NOW() - WHERE land_id = $2`, - [buyer.toString(), land_id.toString()] - ); -} \ No newline at end of file diff --git a/land-registry-indexer/src/index.ts b/land-registry-indexer/src/index.ts deleted file mode 100644 index c0a11173..00000000 --- a/land-registry-indexer/src/index.ts +++ /dev/null @@ -1,123 +0,0 @@ -/** - * Main entry point for the Land Registry Indexer - * - * This script initializes the indexer, connects to the PostgreSQL database, - * and starts processing events from the StarkNet node using Apibara. - - */ - -import { Indexer, IndexerRunner } from '@apibara/indexer'; -import { StarkNetCursor, Filter } from '@apibara/protocol'; -import { Pool } from 'pg'; -import { config } from './config'; -import { - handleLandRegistered, - handleLandTransferred, - handleLandVerified, - handleLandUpdated, - handleInspectorAdded, - handleInspectorRemoved, - handleLandInspectorSet, - handleListingCreated, - handleListingPriceUpdated, - handleListingCancelled, - handleLandSold -} from './eventHandlers'; - -const pool = new Pool({ - connectionString: config.pgConnection, -}); - -class LandRegistryIndexer implements Indexer { - async handleData(cursor: StarkNetCursor, data: any[]): Promise { - const client = await pool.connect(); - - try { - await client.query('BEGIN'); - - for (const event of data) { - const { name, data: eventData } = event; - - switch (name) { - case 'LandRegistered': - await handleLandRegistered(client, eventData, cursor); - break; - case 'LandTransferred': - await handleLandTransferred(client, eventData, cursor); - break; - case 'LandVerified': - await handleLandVerified(client, eventData, cursor); - break; - case 'LandUpdated': - await handleLandUpdated(client, eventData, cursor); - break; - case 'InspectorAdded': - await handleInspectorAdded(client, eventData, cursor); - break; - case 'InspectorRemoved': - await handleInspectorRemoved(client, eventData, cursor); - break; - case 'LandInspectorSet': - await handleLandInspectorSet(client, eventData, cursor); - break; - - case 'ListingCreated': - await handleListingCreated(client, eventData, cursor); - break; - case 'ListingPriceUpdated': - await handleListingPriceUpdated(client, eventData, cursor); - break; - case 'ListingCancelled': - await handleListingCancelled(client, eventData, cursor); - break; - case 'LandSold': - await handleLandSold(client, eventData, cursor); - break; - } - } - - await client.query('COMMIT'); - } catch (error) { - await client.query('ROLLBACK'); - throw error; - } finally { - client.release(); - } - } - - getFilter(): Filter { - return { - header: { weak: true }, - events: [{ - fromAddress: config.landRegistryAddress, - keys: [ - ['LandRegistered'], - ['LandTransferred'], - ['LandVerified'], - ['LandUpdated'], - ['InspectorAdded'], - ['InspectorRemoved'], - ['LandInspectorSet'], - ['FeeUpdated'], - ['ListingCreated'], - ['ListingPriceUpdated'], - ['ListingCancelled'], - ['LandSold'] - ], - }], - }; - } -} - -const runner = new IndexerRunner({ - indexer: new LandRegistryIndexer(), - startingBlock: config.startingBlock, - network: { - url: config.apibaraUrl, - }, -}); - -runner.start().catch((error) => { - console.error('Indexer failed:', error); - process.exit(1); -}); \ No newline at end of file diff --git a/land-registry-indexer/src/indexer.ts b/land-registry-indexer/src/indexer.ts new file mode 100644 index 00000000..0aac783e --- /dev/null +++ b/land-registry-indexer/src/indexer.ts @@ -0,0 +1,152 @@ +import { ClientError, Status } from "@apibara/protocol"; +import { Client } from "pg"; +import { createIndexerConfig, IndexerMessage, EVENT_KEYS } from "./config"; + +export class LandRegistryIndexer { + private pgClient: Client; + private running: boolean = false; + + constructor() { + this.pgClient = new Client({ + connectionString: createIndexerConfig().dbUrl, + }); + } + + async start() { + this.running = true; + await this.pgClient.connect(); + + while (this.running) { + try { + const { client, request } = createIndexerConfig(); + + for await (const message of client.streamData(request)) { + await this.handleMessage(message as IndexerMessage); + } + } catch (err) { + if (err instanceof ClientError) { + if (err.code !== Status.INTERNAL) { + console.error("Non-recoverable error:", err); + throw err; + } + + // Internal errors are caused by disconnection + console.log("Connection lost, reconnecting in 2 seconds..."); + await new Promise(r => setTimeout(r, 2000)); + continue; + } + throw err; + } + + } + } + + private async handleMessage(message: IndexerMessage) { + switch (message._tag) { + case "data": { + if (!message.data) return; + + const block = message.data; + // Process events + for (const event of block.events) { + const transaction = block.transactions.find( + (tx: { transactionIndex: number }) => tx.transactionIndex === event.transactionIndex + ); + + // Get event key to determine event type + const eventKey = event.keys[0]; + + try { + switch (eventKey) { + case EVENT_KEYS.LAND_REGISTERED: + await this.handleLandRegistered(event, transaction); + break; + case EVENT_KEYS.LAND_TRANSFERRED: + await this.handleLandTransferred(event, transaction); + break; + case EVENT_KEYS.LAND_VERIFIED: + await this.handleLandVerified(event, transaction); + break; + default: + console.log("Unknown event:", eventKey); + } + } catch (error) { + console.error(`Error processing event ${eventKey}:`, error); + } + } + break; + } + case "invalidate": { + // Handle chain reorg + console.log("Chain reorganization detected"); + break; + } + } + } + + // Event handler methods + private async handleLandRegistered(event: any, transaction: any) { + const [landId, ownerAddress, latitude, longitude, area, landUse] = event.data; + await this.pgClient.query( + `INSERT INTO lands ( + land_id, owner_address, location_latitude, location_longitude, + area, land_use, status, created_at, updated_at + ) VALUES ($1, $2, $3, $4, $5, $6, 'registered', NOW(), NOW())`, + [landId, ownerAddress, latitude, longitude, area, landUse] + ); + } + + private async handleLandTransferred(event: any, transaction: any) { + const [landId, fromAddress, toAddress] = event.data; + await this.pgClient.query( + `INSERT INTO land_transfers ( + land_id, from_address, to_address, transaction_hash, + block_number, timestamp + ) VALUES ($1, $2, $3, $4, $5, NOW())`, + [landId, fromAddress, toAddress, transaction.hash, transaction.blockNumber] + ); + + // Update land ownership + await this.pgClient.query( + `UPDATE lands SET owner_address = $1, updated_at = NOW() WHERE land_id = $2`, + [toAddress, landId] + ); + } + + private async handleLandVerified(event: any, transaction: any) { + const [landId, inspectorAddress] = event.data; + await this.pgClient.query( + `INSERT INTO land_verifications ( + land_id, inspector_address, transaction_hash, + block_number, timestamp + ) VALUES ($1, $2, $3, $4, NOW())`, + [landId, inspectorAddress, transaction.hash, transaction.blockNumber] + ); + + // Update land status + await this.pgClient.query( + `UPDATE lands SET status = 'verified', updated_at = NOW() WHERE land_id = $1`, + [landId] + ); + } + + async stop() { + this.running = false; + await this.pgClient.end(); + } +} + +// Start the indexer +if (require.main === module) { + const indexer = new LandRegistryIndexer(); + indexer.start().catch((error) => { + console.error("Failed to start indexer:", error); + process.exit(1); + }); + + // Handle shutdown gracefully + process.on("SIGINT", async () => { + await indexer.stop(); + process.exit(0); + }); +} \ No newline at end of file diff --git a/land-registry-indexer/src/tsconfig.json b/land-registry-indexer/src/tsconfig.json new file mode 100644 index 00000000..060dce46 --- /dev/null +++ b/land-registry-indexer/src/tsconfig.json @@ -0,0 +1,14 @@ +{ + "compilerOptions": { + "target": "ES2020", + "module": "commonjs", + "outDir": "./dist", + "rootDir": "./src", + "strict": true, + "esModuleInterop": true, + "skipLibCheck": true, + "forceConsistentCasingInFileNames": true + }, + "include": ["src/**/*"], + "exclude": ["node_modules"] + } \ No newline at end of file From 97f35514c744f7b5c8cde75e9e761ae5f2170c03 Mon Sep 17 00:00:00 2001 From: RAJI ABDUL <119139404+rajiabdul@users.noreply.github.com> Date: Wed, 25 Dec 2024 16:12:40 +0000 Subject: [PATCH 2/3] updates --- land-registry-indexer/src/indexer.ts | 124 ++++++------------ land-registry-indexer/src/services/db.ts | 77 +++++++++++ .../src/services/eventHandlers.ts | 32 +++++ land-registry-indexer/src/tsconfig.json | 26 ++-- land-registry-indexer/src/types/index.ts | 23 ++++ 5 files changed, 187 insertions(+), 95 deletions(-) create mode 100644 land-registry-indexer/src/services/db.ts create mode 100644 land-registry-indexer/src/services/eventHandlers.ts create mode 100644 land-registry-indexer/src/types/index.ts diff --git a/land-registry-indexer/src/indexer.ts b/land-registry-indexer/src/indexer.ts index 0aac783e..11616e47 100644 --- a/land-registry-indexer/src/indexer.ts +++ b/land-registry-indexer/src/indexer.ts @@ -1,20 +1,22 @@ import { ClientError, Status } from "@apibara/protocol"; -import { Client } from "pg"; -import { createIndexerConfig, IndexerMessage, EVENT_KEYS } from "./config"; +import { createIndexerConfig, EVENT_KEYS } from "./config"; +import { DatabaseService } from "./services/db"; +import { EventHandlerService } from "./services/eventHandlers"; +import { IndexerMessage, Event, Transaction, Block } from "./types"; export class LandRegistryIndexer { - private pgClient: Client; + private db: DatabaseService; + private eventHandler: EventHandlerService; private running: boolean = false; constructor() { - this.pgClient = new Client({ - connectionString: createIndexerConfig().dbUrl, - }); + this.db = new DatabaseService(createIndexerConfig().dbUrl); + this.eventHandler = new EventHandlerService(this.db); } async start() { this.running = true; - await this.pgClient.connect(); + await this.db.connect(); while (this.running) { try { @@ -29,15 +31,12 @@ export class LandRegistryIndexer { console.error("Non-recoverable error:", err); throw err; } - - // Internal errors are caused by disconnection console.log("Connection lost, reconnecting in 2 seconds..."); await new Promise(r => setTimeout(r, 2000)); continue; } throw err; } - } } @@ -45,94 +44,56 @@ export class LandRegistryIndexer { switch (message._tag) { case "data": { if (!message.data) return; - - const block = message.data; - // Process events - for (const event of block.events) { - const transaction = block.transactions.find( - (tx: { transactionIndex: number }) => tx.transactionIndex === event.transactionIndex - ); - - // Get event key to determine event type - const eventKey = event.keys[0]; - - try { - switch (eventKey) { - case EVENT_KEYS.LAND_REGISTERED: - await this.handleLandRegistered(event, transaction); - break; - case EVENT_KEYS.LAND_TRANSFERRED: - await this.handleLandTransferred(event, transaction); - break; - case EVENT_KEYS.LAND_VERIFIED: - await this.handleLandVerified(event, transaction); - break; - default: - console.log("Unknown event:", eventKey); - } - } catch (error) { - console.error(`Error processing event ${eventKey}:`, error); - } - } + await this.processBlock(message.data); break; } case "invalidate": { - // Handle chain reorg console.log("Chain reorganization detected"); break; } } } - // Event handler methods - private async handleLandRegistered(event: any, transaction: any) { - const [landId, ownerAddress, latitude, longitude, area, landUse] = event.data; - await this.pgClient.query( - `INSERT INTO lands ( - land_id, owner_address, location_latitude, location_longitude, - area, land_use, status, created_at, updated_at - ) VALUES ($1, $2, $3, $4, $5, $6, 'registered', NOW(), NOW())`, - [landId, ownerAddress, latitude, longitude, area, landUse] - ); - } + private async processBlock(block: Block) { + for (const event of block.events) { + const transaction = block.transactions.find( + tx => tx.transactionIndex === event.transactionIndex + ) as Transaction; + + if (!transaction) { + console.error(`No transaction found for event at index ${event.transactionIndex}`); + continue; + } - private async handleLandTransferred(event: any, transaction: any) { - const [landId, fromAddress, toAddress] = event.data; - await this.pgClient.query( - `INSERT INTO land_transfers ( - land_id, from_address, to_address, transaction_hash, - block_number, timestamp - ) VALUES ($1, $2, $3, $4, $5, NOW())`, - [landId, fromAddress, toAddress, transaction.hash, transaction.blockNumber] - ); - - // Update land ownership - await this.pgClient.query( - `UPDATE lands SET owner_address = $1, updated_at = NOW() WHERE land_id = $2`, - [toAddress, landId] - ); + try { + await this.processEvent(event, transaction); + } catch (error) { + console.error(`Error processing event:`, error); + } + } } - private async handleLandVerified(event: any, transaction: any) { - const [landId, inspectorAddress] = event.data; - await this.pgClient.query( - `INSERT INTO land_verifications ( - land_id, inspector_address, transaction_hash, - block_number, timestamp - ) VALUES ($1, $2, $3, $4, NOW())`, - [landId, inspectorAddress, transaction.hash, transaction.blockNumber] - ); + private async processEvent(event: Event, transaction: Transaction) { + const eventKey = event.keys[0]; - // Update land status - await this.pgClient.query( - `UPDATE lands SET status = 'verified', updated_at = NOW() WHERE land_id = $1`, - [landId] - ); + switch (eventKey) { + case EVENT_KEYS.LAND_REGISTERED: + await this.eventHandler.handleLandRegistered(event, transaction); + break; + case EVENT_KEYS.LAND_TRANSFERRED: + await this.eventHandler.handleLandTransferred(event, transaction); + break; + case EVENT_KEYS.LAND_VERIFIED: + await this.eventHandler.handleLandVerified(event, transaction); + break; + default: + console.log("Unknown event:", eventKey); + } } async stop() { this.running = false; - await this.pgClient.end(); + await this.db.disconnect(); } } @@ -144,7 +105,6 @@ if (require.main === module) { process.exit(1); }); - // Handle shutdown gracefully process.on("SIGINT", async () => { await indexer.stop(); process.exit(0); diff --git a/land-registry-indexer/src/services/db.ts b/land-registry-indexer/src/services/db.ts new file mode 100644 index 00000000..1d26f3ea --- /dev/null +++ b/land-registry-indexer/src/services/db.ts @@ -0,0 +1,77 @@ +import { Client } from "pg"; + +export class DatabaseService { + private client: Client; + + constructor(connectionString: string) { + this.client = new Client({ connectionString }); + } + + async connect() { + await this.client.connect(); + } + + async disconnect() { + await this.client.end(); + } + + // Land operations + async registerLand(landId: string, ownerAddress: string, latitude: number, longitude: number, area: number, landUse: string) { + return this.client.query( + `INSERT INTO lands ( + land_id, owner_address, location_latitude, location_longitude, + area, land_use, status, created_at, updated_at + ) VALUES ($1, $2, $3, $4, $5, $6, 'registered', NOW(), NOW())`, + [landId, ownerAddress, latitude, longitude, area, landUse] + ); + } + + async transferLand(landId: string, fromAddress: string, toAddress: string, txHash: string, blockNumber: number) { + // Use transaction to ensure data consistency + try { + await this.client.query('BEGIN'); + + await this.client.query( + `INSERT INTO land_transfers ( + land_id, from_address, to_address, transaction_hash, + block_number, timestamp + ) VALUES ($1, $2, $3, $4, $5, NOW())`, + [landId, fromAddress, toAddress, txHash, blockNumber] + ); + + await this.client.query( + `UPDATE lands SET owner_address = $1, updated_at = NOW() WHERE land_id = $2`, + [toAddress, landId] + ); + + await this.client.query('COMMIT'); + } catch (e) { + await this.client.query('ROLLBACK'); + throw e; + } + } + + async verifyLand(landId: string, inspectorAddress: string, txHash: string, blockNumber: number) { + try { + await this.client.query('BEGIN'); + + await this.client.query( + `INSERT INTO land_verifications ( + land_id, inspector_address, transaction_hash, + block_number, timestamp + ) VALUES ($1, $2, $3, $4, NOW())`, + [landId, inspectorAddress, txHash, blockNumber] + ); + + await this.client.query( + `UPDATE lands SET status = 'verified', updated_at = NOW() WHERE land_id = $1`, + [landId] + ); + + await this.client.query('COMMIT'); + } catch (e) { + await this.client.query('ROLLBACK'); + throw e; + } + } +} \ No newline at end of file diff --git a/land-registry-indexer/src/services/eventHandlers.ts b/land-registry-indexer/src/services/eventHandlers.ts new file mode 100644 index 00000000..f9263533 --- /dev/null +++ b/land-registry-indexer/src/services/eventHandlers.ts @@ -0,0 +1,32 @@ +import { DatabaseService } from './db'; +import { Event, Transaction } from '../types'; + +export class EventHandlerService { + constructor(private db: DatabaseService) {} + + async handleLandRegistered(event: Event, transaction: Transaction) { + const [landId, ownerAddress, latitude, longitude, area, landUse] = event.data; + await this.db.registerLand(landId, ownerAddress, latitude, longitude, area, landUse); + } + + async handleLandTransferred(event: Event, transaction: Transaction) { + const [landId, fromAddress, toAddress] = event.data; + await this.db.transferLand( + landId, + fromAddress, + toAddress, + transaction.hash, + transaction.blockNumber + ); + } + + async handleLandVerified(event: Event, transaction: Transaction) { + const [landId, inspectorAddress] = event.data; + await this.db.verifyLand( + landId, + inspectorAddress, + transaction.hash, + transaction.blockNumber + ); + } +} \ No newline at end of file diff --git a/land-registry-indexer/src/tsconfig.json b/land-registry-indexer/src/tsconfig.json index 060dce46..08cb7bed 100644 --- a/land-registry-indexer/src/tsconfig.json +++ b/land-registry-indexer/src/tsconfig.json @@ -1,14 +1,14 @@ { - "compilerOptions": { - "target": "ES2020", - "module": "commonjs", - "outDir": "./dist", - "rootDir": "./src", - "strict": true, - "esModuleInterop": true, - "skipLibCheck": true, - "forceConsistentCasingInFileNames": true - }, - "include": ["src/**/*"], - "exclude": ["node_modules"] - } \ No newline at end of file + "compilerOptions": { + "target": "ES2020", + "module": "commonjs", + "outDir": "./dist", + "rootDir": "./src", + "strict": true, + "esModuleInterop": true, + "skipLibCheck": true, + "forceConsistentCasingInFileNames": true + }, + "include": ["src/**/*"], + "exclude": ["node_modules"] +} \ No newline at end of file diff --git a/land-registry-indexer/src/types/index.ts b/land-registry-indexer/src/types/index.ts new file mode 100644 index 00000000..af98d03d --- /dev/null +++ b/land-registry-indexer/src/types/index.ts @@ -0,0 +1,23 @@ +export interface Event { + keys: string[]; + data: any[]; + transactionIndex: number; + transactionHash: string; + blockNumber: number; + } + + export interface Transaction { + hash: string; + blockNumber: number; + transactionIndex: number; + } + + export interface Block { + events: Event[]; + transactions: Transaction[]; + } + + export interface IndexerMessage { + _tag: "data" | "invalidate"; + data?: Block; + } \ No newline at end of file From 1837c8f7acf7be60b660740df7eef21f44d1c94c Mon Sep 17 00:00:00 2001 From: RAJI ABDUL <119139404+rajiabdul@users.noreply.github.com> Date: Wed, 25 Dec 2024 16:15:51 +0000 Subject: [PATCH 3/3] fixes --- land-registry-indexer/src/config.ts | 2 +- land-registry-indexer/{src => }/tsconfig.json | 0 2 files changed, 1 insertion(+), 1 deletion(-) rename land-registry-indexer/{src => }/tsconfig.json (100%) diff --git a/land-registry-indexer/src/config.ts b/land-registry-indexer/src/config.ts index bbf25fc5..6a81a116 100644 --- a/land-registry-indexer/src/config.ts +++ b/land-registry-indexer/src/config.ts @@ -34,7 +34,7 @@ export const createIndexerConfig = () => { const request = StarknetStream.Request.make({ filter: [filter], - finality: "pending" || "accepted" || "succeeded", + finality: "accepted", startingCursor: { orderKey: BigInt(process.env.STARTING_BLOCK || "0"), }, diff --git a/land-registry-indexer/src/tsconfig.json b/land-registry-indexer/tsconfig.json similarity index 100% rename from land-registry-indexer/src/tsconfig.json rename to land-registry-indexer/tsconfig.json