Skip to content

Commit

Permalink
Use batches instead of single requests for the fetcher
Browse files Browse the repository at this point in the history
  • Loading branch information
zivkovicmilos committed Jan 3, 2024
1 parent 977917d commit 34b9f9d
Show file tree
Hide file tree
Showing 9 changed files with 320 additions and 81 deletions.
40 changes: 40 additions & 0 deletions client/batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package client

import (
"fmt"

rpcClient "github.com/gnolang/gno/tm2/pkg/bft/rpc/client"
)

// Batch is the wrapper for HTTP batch requests
type Batch struct {
batch *rpcClient.BatchHTTP
}

// AddBlockRequest adds a new block request (block fetch) to the batch
func (b *Batch) AddBlockRequest(blockNum int64) error {
if _, err := b.batch.Block(&blockNum); err != nil {
return fmt.Errorf("unable to add block request, %w", err)
}

return nil
}

// AddBlockResultsRequest adds a new block results request (block results fetch) to the batch
func (b *Batch) AddBlockResultsRequest(blockNum int64) error {
if _, err := b.batch.BlockResults(&blockNum); err != nil {
return fmt.Errorf("unable to add block results request, %w", err)
}

return nil
}

// Execute sends the batch off for processing by the node
func (b *Batch) Execute() ([]any, error) {
return b.batch.Send()
}

// Count returns the number of requests in the batch
func (b *Batch) Count() int {
return b.batch.Count()
}
10 changes: 9 additions & 1 deletion client/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ import (

rpcClient "github.com/gnolang/gno/tm2/pkg/bft/rpc/client"
core_types "github.com/gnolang/gno/tm2/pkg/bft/rpc/core/types"
clientTypes "github.com/gnolang/tx-indexer/client/types"
)

// Client is the TM2 HTTP client
type Client struct {
client rpcClient.Client
client *rpcClient.HTTP
}

// NewClient creates a new TM2 HTTP client
Expand All @@ -19,6 +20,13 @@ func NewClient(remote string) *Client {
}
}

// CreateBatch creates a new request batch
func (c *Client) CreateBatch() clientTypes.Batch {
return &Batch{
batch: c.client.NewBatch(),
}
}

func (c *Client) GetLatestBlockNumber() (int64, error) {
status, err := c.client.Status()
if err != nil {
Expand Down
16 changes: 16 additions & 0 deletions client/types/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package types

// Batch defines the interface for the client batch
type Batch interface {
// AddBlockRequest adds a new block request (block fetch) to the batch
AddBlockRequest(int64) error

// AddBlockResultsRequest adds a new block results request (block results fetch) to the batch
AddBlockResultsRequest(blockNum int64) error

// Execute sends the batch off for processing by the node
Execute() ([]any, error)

// Count returns the number of requests in the batch
Count() int
}
36 changes: 35 additions & 1 deletion cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"github.com/gnolang/tx-indexer/client"
"github.com/gnolang/tx-indexer/fetch"

Check failure on line 9 in cmd/start.go

View workflow job for this annotation

GitHub Actions / Go Linter / lint

could not import github.com/gnolang/tx-indexer/fetch (-: # github.com/gnolang/tx-indexer/fetch

Check failure on line 9 in cmd/start.go

View workflow job for this annotation

GitHub Actions / Go Linter / lint

could not import github.com/gnolang/tx-indexer/fetch (-: # github.com/gnolang/tx-indexer/fetch

Check failure on line 9 in cmd/start.go

View workflow job for this annotation

GitHub Actions / Go Linter / lint

could not import github.com/gnolang/tx-indexer/fetch (-: # github.com/gnolang/tx-indexer/fetch

Check failure on line 9 in cmd/start.go

View workflow job for this annotation

GitHub Actions / Go Linter / lint

could not import github.com/gnolang/tx-indexer/fetch (-: # github.com/gnolang/tx-indexer/fetch

Check failure on line 9 in cmd/start.go

View workflow job for this annotation

GitHub Actions / Go Linter / lint

could not import github.com/gnolang/tx-indexer/fetch (-: # github.com/gnolang/tx-indexer/fetch
"github.com/gnolang/tx-indexer/serve"
"github.com/gnolang/tx-indexer/serve/handlers/block"
"github.com/gnolang/tx-indexer/serve/handlers/tx"
"github.com/gnolang/tx-indexer/storage"
"github.com/peterbourgon/ff/v3/ffcli"
"go.uber.org/zap"
Expand All @@ -22,6 +24,7 @@ type startCfg struct {
listenAddress string
remote string
dbPath string
logLevel string
}

// newStartCmd creates the indexer start command
Expand Down Expand Up @@ -65,15 +68,32 @@ func (c *startCfg) registerFlags(fs *flag.FlagSet) {
defaultDBPath,
"the absolute path for the indexer DB (embedded)",
)

fs.StringVar(
&c.logLevel,
"log-level",
zap.InfoLevel.String(),
"the log level for the CLI output",
)
}

// exec executes the indexer start command
func (c *startCfg) exec(ctx context.Context) error {
// Parse the log level
logLevel, err := zap.ParseAtomicLevel(c.logLevel)
if err != nil {
return fmt.Errorf("unable to parse log level, %w", err)
}

cfg := zap.NewDevelopmentConfig()
cfg.Level = logLevel

// Create a new logger
logger, err := zap.NewDevelopment()
logger, err := cfg.Build()
if err != nil {
return fmt.Errorf("unable to create logger, %w", err)
}
defer logger.Sync()

// Create a DB instance
db, err := storage.New(c.dbPath)
Expand Down Expand Up @@ -106,6 +126,20 @@ func (c *startCfg) exec(ctx context.Context) error {
),
)

txHandler := tx.NewHandler(db)
blockHandler := block.NewHandler(db)

// Register handlers
j.RegisterHandler(
"getTx",
txHandler.GetTxHandler,
)

j.RegisterHandler(
"getBlock",
blockHandler.GetBlockHandler,
)

// Create a new waiter
w := newWaiter(ctx)

Expand Down
70 changes: 25 additions & 45 deletions fetch/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (

const (
maxSlots = 100
maxChunkSize = 50
maxChunkSize = 100
)

type Fetcher struct {
Expand Down Expand Up @@ -54,9 +54,9 @@ func (f *Fetcher) FetchTransactions(ctx context.Context) error {
f.logger.Info("Fetcher service shut down")
}()

collectorCh := make(chan *workerResponse, 10)
collectorCh := make(chan *workerResponse, maxSlots)

callback := func() error {
startRangeFetch := func() error {
// Check if there are any free slots
if f.chunkBuffer.Len() == maxSlots {
// Currently no free slot exists
Expand Down Expand Up @@ -112,7 +112,8 @@ func (f *Fetcher) FetchTransactions(ctx context.Context) error {
ticker := time.NewTicker(f.queryInterval)
defer ticker.Stop()

if err := callback(); err != nil {
// Execute the initial "catch up" with the chain
if err := startRangeFetch(); err != nil {
return err
}

Expand All @@ -121,21 +122,10 @@ func (f *Fetcher) FetchTransactions(ctx context.Context) error {
case <-ctx.Done():
return nil
case <-ticker.C:
if err := callback(); err != nil {
if err := startRangeFetch(); err != nil {
return err
}
case response := <-collectorCh:
pruneSlot := func(index int) {
// Prune the element
f.chunkBuffer.Queue = append(
f.chunkBuffer.Queue[:index],
f.chunkBuffer.Queue[index+1:]...,
)

// Fix the queue
f.chunkBuffer.Fix()
}

// Find the slot index
index := sort.Search(f.chunkBuffer.Len(), func(i int) bool {
return f.chunkBuffer.getSlot(i).chunkRange.from >= response.chunkRange.from
Expand All @@ -144,31 +134,19 @@ func (f *Fetcher) FetchTransactions(ctx context.Context) error {
if response.error != nil {
f.logger.Error(
"error encountered during chunk fetch",
zap.Error(response.error),
zap.String("error", response.error.Error()),
)

// Prune the chunk as it is invalid
pruneSlot(index)

continue
}

// Save the chunk
f.chunkBuffer.setChunk(index, response.chunk)

// Fetch the latest store sequence
latestLocal, err := f.storage.GetLatestHeight()
if err != nil && !errors.Is(err, storageErrors.ErrNotFound) {
return fmt.Errorf("unable to fetch latest block height, %w", err)
}

for f.chunkBuffer.Len() > 0 {
item := f.chunkBuffer.getSlot(0)

isConsecutive := item.chunkRange.from == latestLocal+1
isFetched := item.chunk != nil

if !isConsecutive || !isFetched {
if !isFetched {
break
}

Expand All @@ -183,33 +161,35 @@ func (f *Fetcher) FetchTransactions(ctx context.Context) error {
// have blocks / transactions that are no longer compatible
// with latest "master" changes for Amino, so these blocks / txs are ignored,
// as opposed to this error being a show-stopper for the fetcher
f.logger.Error("unable to save block", zap.String("err", err.Error()))
f.logger.Error("unable to save block", zap.String("err", saveErr.Error()))

continue
}

f.logger.Info("Saved block data", zap.Int64("number", block.Height))
f.logger.Debug("Saved block data", zap.Int64("number", block.Height))
}

for _, results := range item.chunk.results {
for _, tx := range results {
if err := f.storage.SaveTx(tx); err != nil {
f.logger.Error("unable to save tx", zap.String("err", err.Error()))

continue
}
for _, txResult := range item.chunk.results {
if err := f.storage.SaveTx(txResult); err != nil {
f.logger.Error("unable to save tx", zap.String("err", err.Error()))

f.logger.Info(
"Saved tx",
zap.String("hash", base64.StdEncoding.EncodeToString(tx.Tx.Hash())),
)
continue
}

f.logger.Debug(
"Saved tx",
zap.String("hash", base64.StdEncoding.EncodeToString(txResult.Tx.Hash())),
)
}

latestLocal = item.chunkRange.to
f.logger.Info(
"Saved block and tx data for range",
zap.Int64("from", item.chunkRange.from),
zap.Int64("to", item.chunkRange.to),
)

// Save the latest height data
if err := f.storage.SaveLatestHeight(latestLocal); err != nil {
if err := f.storage.SaveLatestHeight(item.chunkRange.to); err != nil {
return fmt.Errorf("unable to save latest height info, %w", err)
}
}
Expand Down
13 changes: 13 additions & 0 deletions fetch/mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package fetch
import (
core_types "github.com/gnolang/gno/tm2/pkg/bft/rpc/core/types"
"github.com/gnolang/gno/tm2/pkg/bft/types"
clientTypes "github.com/gnolang/tx-indexer/client/types"
)

type (
Expand Down Expand Up @@ -55,12 +56,16 @@ type (
getLatestBlockNumberDelegate func() (int64, error)
getBlockDelegate func(int64) (*core_types.ResultBlock, error)
getBlockResultsDelegate func(int64) (*core_types.ResultBlockResults, error)

createBatchDelegate func() clientTypes.Batch
)

type mockClient struct {
getLatestBlockNumberFn getLatestBlockNumberDelegate
getBlockFn getBlockDelegate
getBlockResultsFn getBlockResultsDelegate

createBatchFn createBatchDelegate
}

func (m *mockClient) GetLatestBlockNumber() (int64, error) {
Expand All @@ -86,3 +91,11 @@ func (m *mockClient) GetBlockResults(blockNum int64) (*core_types.ResultBlockRes

return nil, nil
}

func (m *mockClient) CreateBatch() clientTypes.Batch {
if m.createBatchFn != nil {
return m.createBatchFn()
}

return nil
}
2 changes: 1 addition & 1 deletion fetch/slots.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

type chunk struct {
blocks []*types.Block
results [][]*types.TxResult
results []*types.TxResult
}

// slot is a single chunk slot
Expand Down
4 changes: 4 additions & 0 deletions fetch/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package fetch
import (
core_types "github.com/gnolang/gno/tm2/pkg/bft/rpc/core/types"
"github.com/gnolang/gno/tm2/pkg/bft/types"
clientTypes "github.com/gnolang/tx-indexer/client/types"
)

// Storage defines the transaction storage interface
Expand Down Expand Up @@ -31,4 +32,7 @@ type Client interface {
// GetBlockResults returns the results of executing the transactions
// for the specified block
GetBlockResults(int64) (*core_types.ResultBlockResults, error)

// CreateBatch creates a new client batch
CreateBatch() clientTypes.Batch
}
Loading

0 comments on commit 34b9f9d

Please sign in to comment.