Skip to content

Commit

Permalink
CLI to compare eden-watcher entities and fix mapping code (#149)
Browse files Browse the repository at this point in the history
* Make sumStaked variable local in eden network mapping

* Implement compare CLI to fetch and query by ids

* Set filterLogs to true for eden-watcher

* Use varchar for bigint array type in eden-watcher

* Store subgraph entities by id in IPLD state

* Store bigint vales as string in IPLD state

* Update eden watcher hook to store single Block entity in IPLD checkpoint

* Fix entity enum type property

* Fix parsing big numbers in event params

* Fix event bigint params parsing in all watchers

* Set default limit to query result and process block after events
  • Loading branch information
nikugogoi authored Aug 8, 2022
1 parent 6990eb8 commit 1a903fc
Show file tree
Hide file tree
Showing 47 changed files with 363 additions and 132 deletions.
21 changes: 12 additions & 9 deletions packages/codegen/src/templates/indexer-template.handlebars
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import { {{subgraphEntity.className}} } from './entity/{{subgraphEntity.classNam
{{/each}}

const log = debug('vulcanize:indexer');
const JSONbigNative = JSONbig({ useNativeBigInt: true });

{{#each contracts as | contract |}}
const KIND_{{capitalize contract.contractName}} = '{{contract.contractKind}}';
Expand Down Expand Up @@ -177,8 +178,8 @@ export class Indexer implements IPLDIndexerInterface {

getResultEvent (event: Event): ResultEvent {
const block = event.block;
const eventFields = JSONbig.parse(event.eventInfo);
const { tx, eventSignature } = JSON.parse(event.extraInfo);
const eventFields = JSONbigNative.parse(event.eventInfo);
const { tx, eventSignature } = JSONbigNative.parse(event.extraInfo);

return {
block: {
Expand Down Expand Up @@ -288,7 +289,7 @@ export class Indexer implements IPLDIndexerInterface {
{{/if}}

await this._db.{{query.saveQueryName}}({ blockHash, blockNumber, contractAddress
{{~#each query.params}}, {{this.name~}} {{/each}}, value: result.value, proof: JSONbig.stringify(result.proof) });
{{~#each query.params}}, {{this.name~}} {{/each}}, value: result.value, proof: JSONbigNative.stringify(result.proof) });

{{#if query.stateVariableType}}
{{#if (compare query.stateVariableType 'Mapping')}}
Expand Down Expand Up @@ -441,13 +442,15 @@ export class Indexer implements IPLDIndexerInterface {
async processBlock (blockHash: string, blockNumber: number): Promise<void> {
// Call a function to create initial state for contracts.
await this._baseIndexer.createInit(this, blockHash, blockNumber);
{{#if (subgraphPath)}}
}

{{#if (subgraphPath)}}
async processBlockAfterEvents (blockHash: string): Promise<void> {
// Call subgraph handler for block.
await this._graphWatcher.handleBlock(blockHash);
{{/if}}
}

{{/if}}
parseEventNameAndArgs (kind: string, logObj: any): any {
const { topics, data } = logObj;

Expand Down Expand Up @@ -770,10 +773,10 @@ export class Indexer implements IPLDIndexerInterface {
txHash,
contract,
eventName,
eventInfo: JSONbig.stringify(eventInfo),
extraInfo: JSONbig.stringify(extraInfo),
proof: JSONbig.stringify({
data: JSONbig.stringify({
eventInfo: JSONbigNative.stringify(eventInfo),
extraInfo: JSONbigNative.stringify(extraInfo),
proof: JSONbigNative.stringify({
data: JSONbigNative.stringify({
blockHash,
receiptCID,
log: {
Expand Down
8 changes: 4 additions & 4 deletions packages/eden-watcher/environments/local.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@
checkpointInterval = 2000

# IPFS API address (can be taken from the output on running the IPFS daemon).
ipfsApiAddr = "/ip4/127.0.0.1/tcp/5001"
# ipfsApiAddr = "/ip4/127.0.0.1/tcp/5001"

subgraphPath = "../graph-node/test/subgraph/eden"
wasmRestartBlocksInterval = 20

# Boolean to filter logs by contract.
filterLogs = false
filterLogs = true

# Max block range for which to return events in eventsInRange GQL query.
# Use -1 for skipping check on block range.
Expand All @@ -38,8 +38,8 @@

[upstream]
[upstream.ethServer]
gqlApiEndpoint = "http://127.0.0.1:8082/graphql"
rpcProviderEndpoint = "http://127.0.0.1:8081"
gqlApiEndpoint = "http://127.0.0.1:8083/graphql"
rpcProviderEndpoint = "http://127.0.0.1:8082"
blockDelayInMilliSecs = 2000

[upstream.cache]
Expand Down
3 changes: 2 additions & 1 deletion packages/eden-watcher/src/entity/Network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ export class Network {
@Column('numeric', { transformer: bigintTransformer })
totalStaked!: bigint;

@Column('numeric', { transformer: bigintArrayTransformer, array: true })
// https://github.com/brianc/node-postgres/issues/1943#issuecomment-520500053
@Column('varchar', { transformer: bigintArrayTransformer, array: true })
stakedPercentiles!: bigint[];
}
4 changes: 2 additions & 2 deletions packages/eden-watcher/src/entity/ProducerSetChange.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import { Entity, PrimaryColumn, Column } from 'typeorm';
import { bigintTransformer } from '@vulcanize/util';

enum ProducerSetChangeType {
Added,
Removed
Added = 'Added',
Removed = 'Removed'
}

@Entity()
Expand Down
4 changes: 2 additions & 2 deletions packages/eden-watcher/src/gql/queries/account.gql
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
query account($id: String!, $blockHash: Bytes!){
account(id: $id, block: { hash: $blockHash }){
query account($id: String!, $block: Block_height){
account(id: $id, block: $block){
id
totalClaimed
totalSlashed
Expand Down
4 changes: 2 additions & 2 deletions packages/eden-watcher/src/gql/queries/block.gql
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
query block($id: String!, $blockHash: Bytes!){
block(id: $id, block: { hash: $blockHash }){
query block($id: String!, $block: Block_height){
block(id: $id, block: $block){
id
fromActiveProducer
hash
Expand Down
4 changes: 2 additions & 2 deletions packages/eden-watcher/src/gql/queries/claim.gql
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
query claim($id: String!, $blockHash: Bytes!){
claim(id: $id, block: { hash: $blockHash }){
query claim($id: String!, $block: Block_height){
claim(id: $id, block: $block){
id
timestamp
index
Expand Down
4 changes: 2 additions & 2 deletions packages/eden-watcher/src/gql/queries/distribution.gql
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
query distribution($id: String!, $blockHash: Bytes!){
distribution(id: $id, block: { hash: $blockHash }){
query distribution($id: String!, $block: Block_height){
distribution(id: $id, block: $block){
id
distributor{
id
Expand Down
4 changes: 2 additions & 2 deletions packages/eden-watcher/src/gql/queries/distributor.gql
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
query distributor($id: String!, $blockHash: Bytes!){
distributor(id: $id, block: { hash: $blockHash }){
query distributor($id: String!, $block: Block_height){
distributor(id: $id, block: $block){
id
currentDistribution{
id
Expand Down
4 changes: 2 additions & 2 deletions packages/eden-watcher/src/gql/queries/epoch.gql
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
query epoch($id: String!, $blockHash: Bytes!){
epoch(id: $id, block: { hash: $blockHash }){
query epoch($id: String!, $block: Block_height){
epoch(id: $id, block: $block){
id
finalized
epochNumber
Expand Down
4 changes: 2 additions & 2 deletions packages/eden-watcher/src/gql/queries/network.gql
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
query network($id: String!, $blockHash: Bytes!){
network(id: $id, block: { hash: $blockHash }){
query network($id: String!, $block: Block_height){
network(id: $id, block: $block){
id
slot0{
id
Expand Down
4 changes: 2 additions & 2 deletions packages/eden-watcher/src/gql/queries/producer.gql
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
query producer($id: String!, $blockHash: Bytes!){
producer(id: $id, block: { hash: $blockHash }){
query producer($id: String!, $block: Block_height){
producer(id: $id, block: $block){
id
active
rewardCollector
Expand Down
4 changes: 2 additions & 2 deletions packages/eden-watcher/src/gql/queries/producerEpoch.gql
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
query producerEpoch($id: String!, $blockHash: Bytes!){
producerEpoch(id: $id, block: { hash: $blockHash }){
query producerEpoch($id: String!, $block: Block_height){
producerEpoch(id: $id, block: $block){
id
address
epoch{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
query producerRewardCollectorChange($id: String!, $blockHash: Bytes!){
producerRewardCollectorChange(id: $id, block: { hash: $blockHash }){
query producerRewardCollectorChange($id: String!, $block: Block_height){
producerRewardCollectorChange(id: $id, block: $block){
id
blockNumber
producer
Expand Down
4 changes: 2 additions & 2 deletions packages/eden-watcher/src/gql/queries/producerSet.gql
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
query producerSet($id: String!, $blockHash: Bytes!){
producerSet(id: $id, block: { hash: $blockHash }){
query producerSet($id: String!, $block: Block_height){
producerSet(id: $id, block: $block){
id
producers{
id
Expand Down
4 changes: 2 additions & 2 deletions packages/eden-watcher/src/gql/queries/producerSetChange.gql
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
query producerSetChange($id: String!, $blockHash: Bytes!){
producerSetChange(id: $id, block: { hash: $blockHash }){
query producerSetChange($id: String!, $block: Block_height){
producerSetChange(id: $id, block: $block){
id
blockNumber
producer
Expand Down
4 changes: 2 additions & 2 deletions packages/eden-watcher/src/gql/queries/rewardSchedule.gql
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
query rewardSchedule($id: String!, $blockHash: Bytes!){
rewardSchedule(id: $id, block: { hash: $blockHash }){
query rewardSchedule($id: String!, $block: Block_height){
rewardSchedule(id: $id, block: $block){
id
rewardScheduleEntries{
id
Expand Down
4 changes: 2 additions & 2 deletions packages/eden-watcher/src/gql/queries/rewardScheduleEntry.gql
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
query rewardScheduleEntry($id: String!, $blockHash: Bytes!){
rewardScheduleEntry(id: $id, block: { hash: $blockHash }){
query rewardScheduleEntry($id: String!, $block: Block_height){
rewardScheduleEntry(id: $id, block: $block){
id
startTime
epochDuration
Expand Down
4 changes: 2 additions & 2 deletions packages/eden-watcher/src/gql/queries/slash.gql
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
query slash($id: String!, $blockHash: Bytes!){
slash(id: $id, block: { hash: $blockHash }){
query slash($id: String!, $block: Block_height){
slash(id: $id, block: $block){
id
timestamp
account{
Expand Down
4 changes: 2 additions & 2 deletions packages/eden-watcher/src/gql/queries/slot.gql
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
query slot($id: String!, $blockHash: Bytes!){
slot(id: $id, block: { hash: $blockHash }){
query slot($id: String!, $block: Block_height){
slot(id: $id, block: $block){
id
owner
delegate
Expand Down
4 changes: 2 additions & 2 deletions packages/eden-watcher/src/gql/queries/slotClaim.gql
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
query slotClaim($id: String!, $blockHash: Bytes!){
slotClaim(id: $id, block: { hash: $blockHash }){
query slotClaim($id: String!, $block: Block_height){
slotClaim(id: $id, block: $block){
id
slot{
id
Expand Down
4 changes: 2 additions & 2 deletions packages/eden-watcher/src/gql/queries/staker.gql
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
query staker($id: String!, $blockHash: Bytes!){
staker(id: $id, block: { hash: $blockHash }){
query staker($id: String!, $block: Block_height){
staker(id: $id, block: $block){
id
staked
rank
Expand Down
58 changes: 56 additions & 2 deletions packages/eden-watcher/src/hooks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
// Copyright 2021 Vulcanize, Inc.
//

import { IPLDBlockInterface, StateKind } from '@vulcanize/util';
import assert from 'assert';
import * as codec from '@ipld/dag-cbor';
import _ from 'lodash';

import { Indexer, ResultEvent } from './indexer';

Expand Down Expand Up @@ -51,9 +54,60 @@ export async function createStateCheckpoint (indexer: Indexer, contractAddress:
assert(blockHash);
assert(contractAddress);

// Use indexer.createStateCheckpoint() method to create a custom checkpoint.
// TODO: Pass blockProgress instead of blockHash to hook method.
const block = await indexer.getBlockProgress(blockHash);
assert(block);

return false;
// Fetch the latest 'checkpoint' | 'init' for the contract to fetch diffs after it.
let prevNonDiffBlock: IPLDBlockInterface;
let getDiffBlockNumber: number;
const checkpointBlock = await indexer.getLatestIPLDBlock(contractAddress, StateKind.Checkpoint, block.blockNumber);

if (checkpointBlock) {
const checkpointBlockNumber = checkpointBlock.block.blockNumber;

prevNonDiffBlock = checkpointBlock;
getDiffBlockNumber = checkpointBlockNumber;

// Update IPLD status map with the latest checkpoint info.
// Essential while importing state as checkpoint at the snapshot block is added by import-state CLI.
// (job-runner won't have the updated ipld status)
indexer.updateIPLDStatusMap(contractAddress, { checkpoint: checkpointBlockNumber });
} else {
// There should be an initial state at least.
const initBlock = await indexer.getLatestIPLDBlock(contractAddress, StateKind.Init);
assert(initBlock, 'No initial state found');

prevNonDiffBlock = initBlock;
// Take block number previous to initial state block to include any diff state at that block.
getDiffBlockNumber = initBlock.block.blockNumber - 1;
}

// Fetching all diff blocks after the latest 'checkpoint' | 'init'.
const diffBlocks = await indexer.getDiffIPLDBlocksByBlocknumber(contractAddress, getDiffBlockNumber);

const prevNonDiffBlockData = codec.decode(Buffer.from(prevNonDiffBlock.data)) as any;
const data = {
state: prevNonDiffBlockData.state
};

// Merge all diff blocks after previous checkpoint.
for (const diffBlock of diffBlocks) {
const diff = codec.decode(Buffer.from(diffBlock.data)) as any;
data.state = _.merge(data.state, diff.state);
}

// Check if Block entity exists.
if (data.state.Block) {
// Store only block entity at checkpoint height instead of all entities.
data.state.Block = {
[blockHash]: data.state.Block[blockHash]
};
}

await indexer.createStateCheckpoint(contractAddress, blockHash, data);

return true;
}

/**
Expand Down
23 changes: 15 additions & 8 deletions packages/eden-watcher/src/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ import { Account } from './entity/Account';
import { Slash } from './entity/Slash';

const log = debug('vulcanize:indexer');
const JSONbigNative = JSONbig({ useNativeBigInt: true });

const KIND_EDENNETWORK = 'EdenNetwork';
const KIND_MERKLEDISTRIBUTOR = 'EdenNetworkDistribution';
Expand Down Expand Up @@ -176,8 +177,8 @@ export class Indexer implements IPLDIndexerInterface {

getResultEvent (event: Event): ResultEvent {
const block = event.block;
const eventFields = JSONbig.parse(event.eventInfo);
const { tx, eventSignature } = JSON.parse(event.extraInfo);
const eventFields = JSONbigNative.parse(event.eventInfo);
const { tx, eventSignature } = JSONbigNative.parse(event.extraInfo);

return {
block: {
Expand Down Expand Up @@ -287,6 +288,10 @@ export class Indexer implements IPLDIndexerInterface {
return this._baseIndexer.getIPLDBlockByCid(cid);
}

async getDiffIPLDBlocksByBlocknumber (contractAddress: string, blockNumber: number): Promise<IPLDBlock[]> {
return this._db.getDiffIPLDBlocksByBlocknumber(contractAddress, blockNumber);
}

getIPLDData (ipldBlock: IPLDBlock): any {
return this._baseIndexer.getIPLDData(ipldBlock);
}
Expand Down Expand Up @@ -370,13 +375,15 @@ export class Indexer implements IPLDIndexerInterface {
await this._baseIndexer.createInit(this, blockHash, blockNumber);

console.timeEnd('time:indexer#processBlock-init_state');
}

console.time('time:indexer#processBlock-mapping_code');
async processBlockAfterEvents (blockHash: string): Promise<void> {
console.time('time:indexer#processBlockAfterEvents-mapping_code');

// Call subgraph handler for block.
await this._graphWatcher.handleBlock(blockHash);

console.timeEnd('time:indexer#processBlock-mapping_code');
console.timeEnd('time:indexer#processBlockAfterEvents-mapping_code');
}

parseEventNameAndArgs (kind: string, logObj: any): any {
Expand Down Expand Up @@ -1013,10 +1020,10 @@ export class Indexer implements IPLDIndexerInterface {
txHash,
contract,
eventName,
eventInfo: JSONbig.stringify(eventInfo),
extraInfo: JSONbig.stringify(extraInfo),
proof: JSONbig.stringify({
data: JSONbig.stringify({
eventInfo: JSONbigNative.stringify(eventInfo),
extraInfo: JSONbigNative.stringify(extraInfo),
proof: JSONbigNative.stringify({
data: JSONbigNative.stringify({
blockHash,
receiptCID,
log: {
Expand Down
Loading

0 comments on commit 1a903fc

Please sign in to comment.