diff --git a/client/batch.go b/client/batch.go new file mode 100644 index 00000000..1cc55283 --- /dev/null +++ b/client/batch.go @@ -0,0 +1,40 @@ +package client + +import ( + "fmt" + + rpcClient "github.com/gnolang/gno/tm2/pkg/bft/rpc/client" +) + +// Batch is the wrapper for HTTP batch requests +type Batch struct { + batch *rpcClient.BatchHTTP +} + +// AddBlockRequest adds a new block request (block fetch) to the batch +func (b *Batch) AddBlockRequest(blockNum int64) error { + if _, err := b.batch.Block(&blockNum); err != nil { + return fmt.Errorf("unable to add block request, %w", err) + } + + return nil +} + +// AddBlockResultsRequest adds a new block results request (block results fetch) to the batch +func (b *Batch) AddBlockResultsRequest(blockNum int64) error { + if _, err := b.batch.BlockResults(&blockNum); err != nil { + return fmt.Errorf("unable to add block results request, %w", err) + } + + return nil +} + +// Execute sends the batch off for processing by the node +func (b *Batch) Execute() ([]any, error) { + return b.batch.Send() +} + +// Count returns the number of requests in the batch +func (b *Batch) Count() int { + return b.batch.Count() +} diff --git a/client/http.go b/client/http.go index 28e953c1..ee4c71a2 100644 --- a/client/http.go +++ b/client/http.go @@ -5,11 +5,12 @@ import ( rpcClient "github.com/gnolang/gno/tm2/pkg/bft/rpc/client" core_types "github.com/gnolang/gno/tm2/pkg/bft/rpc/core/types" + clientTypes "github.com/gnolang/tx-indexer/client/types" ) // Client is the TM2 HTTP client type Client struct { - client rpcClient.Client + client *rpcClient.HTTP } // NewClient creates a new TM2 HTTP client @@ -19,6 +20,13 @@ func NewClient(remote string) *Client { } } +// CreateBatch creates a new request batch +func (c *Client) CreateBatch() clientTypes.Batch { + return &Batch{ + batch: c.client.NewBatch(), + } +} + func (c *Client) GetLatestBlockNumber() (int64, error) { status, err := c.client.Status() if err != nil { diff --git a/client/types/types.go b/client/types/types.go new file mode 100644 index 00000000..7eded635 --- /dev/null +++ b/client/types/types.go @@ -0,0 +1,16 @@ +package types + +// Batch defines the interface for the client batch +type Batch interface { + // AddBlockRequest adds a new block request (block fetch) to the batch + AddBlockRequest(int64) error + + // AddBlockResultsRequest adds a new block results request (block results fetch) to the batch + AddBlockResultsRequest(blockNum int64) error + + // Execute sends the batch off for processing by the node + Execute() ([]any, error) + + // Count returns the number of requests in the batch + Count() int +} diff --git a/cmd/start.go b/cmd/start.go index ba64f82a..5d052f00 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -8,6 +8,8 @@ import ( "github.com/gnolang/tx-indexer/client" "github.com/gnolang/tx-indexer/fetch" "github.com/gnolang/tx-indexer/serve" + "github.com/gnolang/tx-indexer/serve/handlers/block" + "github.com/gnolang/tx-indexer/serve/handlers/tx" "github.com/gnolang/tx-indexer/storage" "github.com/peterbourgon/ff/v3/ffcli" "go.uber.org/zap" @@ -22,6 +24,7 @@ type startCfg struct { listenAddress string remote string dbPath string + logLevel string } // newStartCmd creates the indexer start command @@ -65,15 +68,32 @@ func (c *startCfg) registerFlags(fs *flag.FlagSet) { defaultDBPath, "the absolute path for the indexer DB (embedded)", ) + + fs.StringVar( + &c.logLevel, + "log-level", + zap.InfoLevel.String(), + "the log level for the CLI output", + ) } // exec executes the indexer start command func (c *startCfg) exec(ctx context.Context) error { + // Parse the log level + logLevel, err := zap.ParseAtomicLevel(c.logLevel) + if err != nil { + return fmt.Errorf("unable to parse log level, %w", err) + } + + cfg := zap.NewDevelopmentConfig() + cfg.Level = logLevel + // Create a new logger - logger, err := zap.NewDevelopment() + logger, err := cfg.Build() if err != nil { return fmt.Errorf("unable to create logger, %w", err) } + defer logger.Sync() // Create a DB instance db, err := storage.New(c.dbPath) @@ -106,6 +126,20 @@ func (c *startCfg) exec(ctx context.Context) error { ), ) + txHandler := tx.NewHandler(db) + blockHandler := block.NewHandler(db) + + // Register handlers + j.RegisterHandler( + "getTx", + txHandler.GetTxHandler, + ) + + j.RegisterHandler( + "getBlock", + blockHandler.GetBlockHandler, + ) + // Create a new waiter w := newWaiter(ctx) diff --git a/fetch/fetch.go b/fetch/fetch.go index 0914b3e7..f4b4c47a 100644 --- a/fetch/fetch.go +++ b/fetch/fetch.go @@ -15,7 +15,7 @@ import ( const ( maxSlots = 100 - maxChunkSize = 50 + maxChunkSize = 100 ) type Fetcher struct { @@ -54,9 +54,9 @@ func (f *Fetcher) FetchTransactions(ctx context.Context) error { f.logger.Info("Fetcher service shut down") }() - collectorCh := make(chan *workerResponse, 10) + collectorCh := make(chan *workerResponse, maxSlots) - callback := func() error { + startRangeFetch := func() error { // Check if there are any free slots if f.chunkBuffer.Len() == maxSlots { // Currently no free slot exists @@ -112,7 +112,8 @@ func (f *Fetcher) FetchTransactions(ctx context.Context) error { ticker := time.NewTicker(f.queryInterval) defer ticker.Stop() - if err := callback(); err != nil { + // Execute the initial "catch up" with the chain + if err := startRangeFetch(); err != nil { return err } @@ -121,21 +122,10 @@ func (f *Fetcher) FetchTransactions(ctx context.Context) error { case <-ctx.Done(): return nil case <-ticker.C: - if err := callback(); err != nil { + if err := startRangeFetch(); err != nil { return err } case response := <-collectorCh: - pruneSlot := func(index int) { - // Prune the element - f.chunkBuffer.Queue = append( - f.chunkBuffer.Queue[:index], - f.chunkBuffer.Queue[index+1:]..., - ) - - // Fix the queue - f.chunkBuffer.Fix() - } - // Find the slot index index := sort.Search(f.chunkBuffer.Len(), func(i int) bool { return f.chunkBuffer.getSlot(i).chunkRange.from >= response.chunkRange.from @@ -144,31 +134,19 @@ func (f *Fetcher) FetchTransactions(ctx context.Context) error { if response.error != nil { f.logger.Error( "error encountered during chunk fetch", - zap.Error(response.error), + zap.String("error", response.error.Error()), ) - - // Prune the chunk as it is invalid - pruneSlot(index) - - continue } // Save the chunk f.chunkBuffer.setChunk(index, response.chunk) - // Fetch the latest store sequence - latestLocal, err := f.storage.GetLatestHeight() - if err != nil && !errors.Is(err, storageErrors.ErrNotFound) { - return fmt.Errorf("unable to fetch latest block height, %w", err) - } - for f.chunkBuffer.Len() > 0 { item := f.chunkBuffer.getSlot(0) - isConsecutive := item.chunkRange.from == latestLocal+1 isFetched := item.chunk != nil - if !isConsecutive || !isFetched { + if !isFetched { break } @@ -183,33 +161,35 @@ func (f *Fetcher) FetchTransactions(ctx context.Context) error { // have blocks / transactions that are no longer compatible // with latest "master" changes for Amino, so these blocks / txs are ignored, // as opposed to this error being a show-stopper for the fetcher - f.logger.Error("unable to save block", zap.String("err", err.Error())) + f.logger.Error("unable to save block", zap.String("err", saveErr.Error())) continue } - f.logger.Info("Saved block data", zap.Int64("number", block.Height)) + f.logger.Debug("Saved block data", zap.Int64("number", block.Height)) } - for _, results := range item.chunk.results { - for _, tx := range results { - if err := f.storage.SaveTx(tx); err != nil { - f.logger.Error("unable to save tx", zap.String("err", err.Error())) - - continue - } + for _, txResult := range item.chunk.results { + if err := f.storage.SaveTx(txResult); err != nil { + f.logger.Error("unable to save tx", zap.String("err", err.Error())) - f.logger.Info( - "Saved tx", - zap.String("hash", base64.StdEncoding.EncodeToString(tx.Tx.Hash())), - ) + continue } + + f.logger.Debug( + "Saved tx", + zap.String("hash", base64.StdEncoding.EncodeToString(txResult.Tx.Hash())), + ) } - latestLocal = item.chunkRange.to + f.logger.Info( + "Saved block and tx data for range", + zap.Int64("from", item.chunkRange.from), + zap.Int64("to", item.chunkRange.to), + ) // Save the latest height data - if err := f.storage.SaveLatestHeight(latestLocal); err != nil { + if err := f.storage.SaveLatestHeight(item.chunkRange.to); err != nil { return fmt.Errorf("unable to save latest height info, %w", err) } } diff --git a/fetch/mocks_test.go b/fetch/mocks_test.go index 86af3842..56b48294 100644 --- a/fetch/mocks_test.go +++ b/fetch/mocks_test.go @@ -3,6 +3,7 @@ package fetch import ( core_types "github.com/gnolang/gno/tm2/pkg/bft/rpc/core/types" "github.com/gnolang/gno/tm2/pkg/bft/types" + clientTypes "github.com/gnolang/tx-indexer/client/types" ) type ( @@ -55,12 +56,16 @@ type ( getLatestBlockNumberDelegate func() (int64, error) getBlockDelegate func(int64) (*core_types.ResultBlock, error) getBlockResultsDelegate func(int64) (*core_types.ResultBlockResults, error) + + createBatchDelegate func() clientTypes.Batch ) type mockClient struct { getLatestBlockNumberFn getLatestBlockNumberDelegate getBlockFn getBlockDelegate getBlockResultsFn getBlockResultsDelegate + + createBatchFn createBatchDelegate } func (m *mockClient) GetLatestBlockNumber() (int64, error) { @@ -86,3 +91,11 @@ func (m *mockClient) GetBlockResults(blockNum int64) (*core_types.ResultBlockRes return nil, nil } + +func (m *mockClient) CreateBatch() clientTypes.Batch { + if m.createBatchFn != nil { + return m.createBatchFn() + } + + return nil +} diff --git a/fetch/slots.go b/fetch/slots.go index 8043b30e..81d13a6a 100644 --- a/fetch/slots.go +++ b/fetch/slots.go @@ -7,7 +7,7 @@ import ( type chunk struct { blocks []*types.Block - results [][]*types.TxResult + results []*types.TxResult } // slot is a single chunk slot diff --git a/fetch/types.go b/fetch/types.go index 858fd4a9..209922b7 100644 --- a/fetch/types.go +++ b/fetch/types.go @@ -3,6 +3,7 @@ package fetch import ( core_types "github.com/gnolang/gno/tm2/pkg/bft/rpc/core/types" "github.com/gnolang/gno/tm2/pkg/bft/types" + clientTypes "github.com/gnolang/tx-indexer/client/types" ) // Storage defines the transaction storage interface @@ -31,4 +32,7 @@ type Client interface { // GetBlockResults returns the results of executing the transactions // for the specified block GetBlockResults(int64) (*core_types.ResultBlockResults, error) + + // CreateBatch creates a new client batch + CreateBatch() clientTypes.Batch } diff --git a/fetch/worker.go b/fetch/worker.go index f790b99a..2b82acc4 100644 --- a/fetch/worker.go +++ b/fetch/worker.go @@ -2,7 +2,10 @@ package fetch import ( "context" + "errors" + "fmt" + core_types "github.com/gnolang/gno/tm2/pkg/bft/rpc/core/types" "github.com/gnolang/gno/tm2/pkg/bft/types" ) @@ -25,54 +28,195 @@ func handleChunk( client Client, info *workerInfo, ) { + extractChunk := func() (*chunk, error) { + errs := make([]error, 0) + + // Get block data from the node + blocks, err := getBlocksFromBatch(info.chunkRange, client) + errs = append(errs, err) + + results, err := getTxResultFromBatch(blocks, client) + errs = append(errs, err) + + return &chunk{ + blocks: blocks, + results: results, + }, errors.Join(errs...) + } + + c, err := extractChunk() + + response := &workerResponse{ + error: err, + chunk: c, + chunkRange: info.chunkRange, + } + + select { + case <-ctx.Done(): + case info.resCh <- response: + } +} + +func getBlocksSequentially(chunkRange chunkRange, client Client) ([]*types.Block, error) { var ( - err error + errs = make([]error, 0) + blocks = make([]*types.Block, 0) + ) + + for blockNum := chunkRange.from; blockNum <= chunkRange.to; blockNum++ { + // Get block info from the chain + block, err := client.GetBlock(blockNum) + if err != nil { + errs = append(errs, fmt.Errorf("unable to get block %d, %w", blockNum, err)) - c = &chunk{ - blocks: make([]*types.Block, 0), - results: make([][]*types.TxResult, 0), + continue } + + blocks = append(blocks, block.Block) + } + + return blocks, errors.Join(errs...) +} + +func getBlocksFromBatch(chunkRange chunkRange, client Client) ([]*types.Block, error) { + var ( + batch = client.CreateBatch() + fetchedBlocks = make([]*types.Block, 0) ) - for blockNum := info.chunkRange.from; blockNum <= info.chunkRange.to; blockNum++ { - // Get block info from the chain - block, getErr := client.GetBlock(blockNum) - if getErr != nil { - break + // Add block requests to the batch + for blockNum := chunkRange.from; blockNum <= chunkRange.to; blockNum++ { + if err := batch.AddBlockRequest(blockNum); err != nil { + return nil, fmt.Errorf( + "unable to add block request for block %d, %w", + blockNum, + err, + ) } + } - results := make([]*types.TxResult, block.Block.NumTxs) + // Get the block results + blocksRaw, err := batch.Execute() + if err != nil { + // Try to fetch sequentially + return getBlocksSequentially(chunkRange, client) + } - if block.Block.NumTxs != 0 { - // Get the transaction execution results - txResults, resErr := client.GetBlockResults(blockNum) - if resErr != nil { - break - } + // Extract the blocks + for _, blockRaw := range blocksRaw { + block, ok := blockRaw.(*core_types.ResultBlock) + if !ok { + return nil, errors.New("unable to cast batch result into ResultBlock") + } + + // Save block + fetchedBlocks = append(fetchedBlocks, block.Block) + } + + return fetchedBlocks, nil +} + +func getTxResultsSequentially(blocks []*types.Block, client Client) ([]*types.TxResult, error) { + var ( + errs = make([]error, 0) + results = make([]*types.TxResult, 0) + ) + + for _, block := range blocks { + if block.NumTxs == 0 { + continue + } + + // Get the transaction execution results + txResults, err := client.GetBlockResults(block.Height) + if err != nil { + errs = append( + errs, + fmt.Errorf( + "unable to get block results for block %d, %w", + block.Height, + err, + ), + ) + + continue + } - // Save the transaction result to the storage - for index, tx := range block.Block.Txs { - results[index] = &types.TxResult{ - Height: block.Block.Height, - Index: uint32(index), - Tx: tx, - Response: txResults.Results.DeliverTxs[index], - } + // Save the transaction result to the storage + for index, tx := range block.Txs { + result := &types.TxResult{ + Height: block.Height, + Index: uint32(index), + Tx: tx, + Response: txResults.Results.DeliverTxs[index], } + + results = append(results, result) + } + } + + return results, errors.Join(errs...) +} + +func getTxResultFromBatch(blocks []*types.Block, client Client) ([]*types.TxResult, error) { + var ( + batch = client.CreateBatch() + fetchedResults = make([]*types.TxResult, 0) + ) + + // Create the results request batch + for _, block := range blocks { + if block.NumTxs == 0 { + // No need to request results + // for an empty block + continue } - c.blocks = append(c.blocks, block.Block) - c.results = append(c.results, results) + // Add the request to the batch + if err := batch.AddBlockResultsRequest(block.Height); err != nil { + return nil, fmt.Errorf( + "unable to add block results request for block %d, %w", + block.Height, + err, + ) + } } - response := &workerResponse{ - error: err, - chunk: c, - chunkRange: info.chunkRange, + // Check if there is anything to execute + if batch.Count() == 0 { + // Batch is empty, nothing to fetch + return fetchedResults, nil } - select { - case <-ctx.Done(): - case info.resCh <- response: + // Get the block results + blockResultsRaw, err := batch.Execute() + if err != nil { + // Try to fetch sequentially + return getTxResultsSequentially(blocks, client) } + + // Extract the results + for resultsIndex, resultsRaw := range blockResultsRaw { + results, ok := resultsRaw.(*core_types.ResultBlockResults) + if !ok { + return nil, errors.New("unable to cast batch result into ResultBlockResults") + } + + height := results.Height + deliverTxs := results.Results.DeliverTxs + + for txIndex, tx := range blocks[resultsIndex].Txs { + result := &types.TxResult{ + Height: height, + Index: uint32(txIndex), + Tx: tx, + Response: deliverTxs[txIndex], + } + + fetchedResults = append(fetchedResults, result) + } + } + + return fetchedResults, nil }