From 235c2beae36620b469cecac18ad1898fa203038a Mon Sep 17 00:00:00 2001 From: Francisco de Borja Aranda Castillejo Date: Fri, 10 Jan 2025 17:29:12 +0100 Subject: [PATCH] feat: make blocktracker aware of block hashes (#400) Changelog -- - Make `blockTracker` aware of block hashes. This is the first step to enhance the indexer. - Minor format fixes. ## Summary by CodeRabbit Based on the comprehensive summary, here are the updated release notes: - **New Features** - Enhanced block tracking with block hash support across multiple components. - Added new utility functions for generating and converting block hashes. - **Improvements** - Updated database schema to include block hash tracking. - Standardized error message capitalization across various packages. - Refined block indexing and tracking mechanisms. - **Chores** - Updated configuration files for local network contracts. - Modified mock interfaces to support new block tracking functionality. --- .gitignore | 3 + pkg/blockchain/registryAdmin.go | 4 +- pkg/blockchain/registryAdmin_test.go | 2 +- pkg/blockchain/signer.go | 4 +- pkg/db/queries.sql | 12 ++-- pkg/db/queries/models.go | 1 + pkg/db/queries/queries.sql.go | 26 ++++--- pkg/indexer/blockTracker.go | 71 +++++++++++++++----- pkg/indexer/blockTracker_test.go | 69 ++++++++++++++----- pkg/indexer/indexer.go | 12 ++-- pkg/indexer/indexer_test.go | 7 +- pkg/indexer/interface.go | 8 ++- pkg/migrations/00003_add-latest-block.up.sql | 3 +- pkg/mocks/indexer/mock_IBlockTracker.go | 43 +++++++----- pkg/sync/syncWorker.go | 8 +-- pkg/testutils/blockchain.go | 11 +++ pkg/testutils/random.go | 5 ++ 17 files changed, 207 insertions(+), 82 deletions(-) create mode 100644 pkg/testutils/blockchain.go diff --git a/.gitignore b/.gitignore index 99fbdda4..56b642d6 100644 --- a/.gitignore +++ b/.gitignore @@ -12,6 +12,9 @@ __debug_* build/ bin/ +# contracts development config +contracts/config/anvil_localnet/ + # Test binary, built with `go test -c` *.test diff --git a/pkg/blockchain/registryAdmin.go b/pkg/blockchain/registryAdmin.go index d99b1f51..922ecb05 100644 --- a/pkg/blockchain/registryAdmin.go +++ b/pkg/blockchain/registryAdmin.go @@ -57,14 +57,14 @@ func (n *NodeRegistryAdmin) AddNode( httpAddress string, ) error { if !common.IsHexAddress(owner) { - return fmt.Errorf("Invalid owner address provided %s", owner) + return fmt.Errorf("invalid owner address provided %s", owner) } ownerAddress := common.HexToAddress(owner) signingKey := crypto.FromECDSAPub(signingKeyPub) if n.signer == nil { - return fmt.Errorf("No signer provided") + return fmt.Errorf("no signer provided") } tx, err := n.contract.AddNode(&bind.TransactOpts{ Context: ctx, diff --git a/pkg/blockchain/registryAdmin_test.go b/pkg/blockchain/registryAdmin_test.go index c14848c6..128222aa 100644 --- a/pkg/blockchain/registryAdmin_test.go +++ b/pkg/blockchain/registryAdmin_test.go @@ -55,7 +55,7 @@ func TestAddNodeBadOwner(t *testing.T) { owner := testutils.RandomString(10) err := registry.AddNode(ctx, owner, &privateKey.PublicKey, httpAddress) - require.ErrorContains(t, err, "Invalid owner address") + require.ErrorContains(t, err, "invalid owner address provided") } func TestAddNodeUnauthorized(t *testing.T) { diff --git a/pkg/blockchain/signer.go b/pkg/blockchain/signer.go index bb309e8e..aa4fc568 100644 --- a/pkg/blockchain/signer.go +++ b/pkg/blockchain/signer.go @@ -32,7 +32,7 @@ func NewPrivateKeySigner(privateKeyString string, chainID int) (*PrivateKeySigne publicKey := privateKey.Public() publicKeyECDSA, ok := publicKey.(*ecdsa.PublicKey) if !ok { - return nil, fmt.Errorf("Failed to cast to ECDSA public key %v", err) + return nil, fmt.Errorf("failed to cast to ECDSA public key %v", err) } fromAddress := crypto.PubkeyToAddress(*publicKeyECDSA) @@ -42,7 +42,7 @@ func NewPrivateKeySigner(privateKeyString string, chainID int) (*PrivateKeySigne big.NewInt(int64(chainID)), ) if err != nil { - return nil, fmt.Errorf("Failed to create transactor: %v", err) + return nil, fmt.Errorf("failed to create transactor: %v", err) } return &PrivateKeySigner{ diff --git a/pkg/db/queries.sql b/pkg/db/queries.sql index b6431cd6..7d8c4d9b 100644 --- a/pkg/db/queries.sql +++ b/pkg/db/queries.sql @@ -104,17 +104,19 @@ WHERE originator_node_id = @originator_node_id; -- name: SetLatestBlock :exec -INSERT INTO latest_block(contract_address, block_number) - VALUES (@contract_address, @block_number) +INSERT INTO latest_block(contract_address, block_number, block_hash) + VALUES (@contract_address, @block_number, @block_hash) ON CONFLICT (contract_address) DO UPDATE SET - block_number = @block_number + block_number = @block_number, block_hash = @block_hash WHERE - @block_number > latest_block.block_number; + @block_number > latest_block.block_number + AND @block_hash != latest_block.block_hash; -- name: GetLatestBlock :one SELECT - block_number + block_number, + block_hash FROM latest_block WHERE diff --git a/pkg/db/queries/models.go b/pkg/db/queries/models.go index eee1890c..6579fe5a 100644 --- a/pkg/db/queries/models.go +++ b/pkg/db/queries/models.go @@ -27,6 +27,7 @@ type GatewayEnvelope struct { type LatestBlock struct { ContractAddress string BlockNumber int64 + BlockHash []byte } type NodeInfo struct { diff --git a/pkg/db/queries/queries.sql.go b/pkg/db/queries/queries.sql.go index e9c2c443..a1f2e4b2 100644 --- a/pkg/db/queries/queries.sql.go +++ b/pkg/db/queries/queries.sql.go @@ -77,18 +77,24 @@ func (q *Queries) GetAddressLogs(ctx context.Context, addresses []string) ([]Get const getLatestBlock = `-- name: GetLatestBlock :one SELECT - block_number + block_number, + block_hash FROM latest_block WHERE contract_address = $1 ` -func (q *Queries) GetLatestBlock(ctx context.Context, contractAddress string) (int64, error) { +type GetLatestBlockRow struct { + BlockNumber int64 + BlockHash []byte +} + +func (q *Queries) GetLatestBlock(ctx context.Context, contractAddress string) (GetLatestBlockRow, error) { row := q.db.QueryRowContext(ctx, getLatestBlock, contractAddress) - var block_number int64 - err := row.Scan(&block_number) - return block_number, err + var i GetLatestBlockRow + err := row.Scan(&i.BlockNumber, &i.BlockHash) + return i, err } const getLatestSequenceId = `-- name: GetLatestSequenceId :one @@ -380,21 +386,23 @@ func (q *Queries) SelectVectorClock(ctx context.Context) ([]SelectVectorClockRow } const setLatestBlock = `-- name: SetLatestBlock :exec -INSERT INTO latest_block(contract_address, block_number) - VALUES ($1, $2) +INSERT INTO latest_block(contract_address, block_number, block_hash) + VALUES ($1, $2, $3) ON CONFLICT (contract_address) DO UPDATE SET - block_number = $2 + block_number = $2, block_hash = $3 WHERE $2 > latest_block.block_number + AND $3 != latest_block.block_hash ` type SetLatestBlockParams struct { ContractAddress string BlockNumber int64 + BlockHash []byte } func (q *Queries) SetLatestBlock(ctx context.Context, arg SetLatestBlockParams) error { - _, err := q.db.ExecContext(ctx, setLatestBlock, arg.ContractAddress, arg.BlockNumber) + _, err := q.db.ExecContext(ctx, setLatestBlock, arg.ContractAddress, arg.BlockNumber, arg.BlockHash) return err } diff --git a/pkg/indexer/blockTracker.go b/pkg/indexer/blockTracker.go index 859a05de..6e78946e 100644 --- a/pkg/indexer/blockTracker.go +++ b/pkg/indexer/blockTracker.go @@ -8,6 +8,7 @@ import ( "sync" "sync/atomic" + "github.com/ethereum/go-ethereum/common" "github.com/xmtp/xmtpd/pkg/db/queries" ) @@ -18,12 +19,21 @@ and allows the user to increase the value. * */ type BlockTracker struct { - latestBlock atomic.Uint64 + latestBlock *Block contractAddress string queries *queries.Queries mu sync.Mutex } +type Block struct { + number atomic.Uint64 + hash common.Hash +} + +var ( + ErrEmptyBlockHash = errors.New("block hash is empty") +) + // Return a new BlockTracker initialized to the latest block from the DB func NewBlockTracker( ctx context.Context, @@ -39,18 +49,24 @@ func NewBlockTracker( if err != nil { return nil, err } - bt.latestBlock.Store(latestBlock) + bt.latestBlock = latestBlock return bt, nil } -func (bt *BlockTracker) GetLatestBlock() uint64 { - return bt.latestBlock.Load() +func (bt *BlockTracker) GetLatestBlock() (uint64, []byte) { + bt.mu.Lock() + defer bt.mu.Unlock() + return bt.latestBlock.number.Load(), bt.latestBlock.hash.Bytes() } -func (bt *BlockTracker) UpdateLatestBlock(ctx context.Context, block uint64) error { +func (bt *BlockTracker) UpdateLatestBlock( + ctx context.Context, + block uint64, + hashBytes []byte, +) error { // Quick check without lock - if block <= bt.latestBlock.Load() { + if block <= bt.latestBlock.number.Load() { return nil } @@ -58,22 +74,35 @@ func (bt *BlockTracker) UpdateLatestBlock(ctx context.Context, block uint64) err defer bt.mu.Unlock() // Re-check after acquiring lock - if block <= bt.latestBlock.Load() { + if block <= bt.latestBlock.number.Load() { return nil } - if err := bt.updateDB(ctx, block); err != nil { + newHash := common.Hash(hashBytes) + + if newHash == (common.Hash{}) { + return ErrEmptyBlockHash + } + + if newHash == bt.latestBlock.hash { + return nil + } + + if err := bt.updateDB(ctx, block, newHash.Bytes()); err != nil { return err } - bt.latestBlock.Store(block) + bt.latestBlock.number.Store(block) + bt.latestBlock.hash = newHash + return nil } -func (bt *BlockTracker) updateDB(ctx context.Context, block uint64) error { +func (bt *BlockTracker) updateDB(ctx context.Context, block uint64, hash []byte) error { return bt.queries.SetLatestBlock(ctx, queries.SetLatestBlockParams{ ContractAddress: bt.contractAddress, BlockNumber: int64(block), + BlockHash: hash, }) } @@ -81,22 +110,30 @@ func loadLatestBlock( ctx context.Context, contractAddress string, querier *queries.Queries, -) (uint64, error) { +) (*Block, error) { + block := &Block{ + number: atomic.Uint64{}, + hash: common.Hash{}, + } + latestBlock, err := querier.GetLatestBlock(ctx, contractAddress) if err != nil { if errors.Is(err, sql.ErrNoRows) { - return 0, nil + return block, nil } - return 0, err + return block, err } - if latestBlock < 0 { - return 0, fmt.Errorf( + if latestBlock.BlockNumber < 0 { + return block, fmt.Errorf( "invalid block number %d for contract %s", - latestBlock, + latestBlock.BlockNumber, contractAddress, ) } - return uint64(latestBlock), nil + block.number.Store(uint64(latestBlock.BlockNumber)) + block.hash = common.BytesToHash(latestBlock.BlockHash) + + return block, nil } diff --git a/pkg/indexer/blockTracker_test.go b/pkg/indexer/blockTracker_test.go index 7ee9e9be..261dd3e5 100644 --- a/pkg/indexer/blockTracker_test.go +++ b/pkg/indexer/blockTracker_test.go @@ -5,6 +5,7 @@ import ( "sync" "testing" + "github.com/ethereum/go-ethereum/common" "github.com/stretchr/testify/require" "github.com/xmtp/xmtpd/pkg/db/queries" "github.com/xmtp/xmtpd/pkg/testutils" @@ -20,9 +21,11 @@ func TestInitialize(t *testing.T) { querier := queries.New(db) tracker, err := NewBlockTracker(ctx, CONTRACT_ADDRESS, querier) + blockNumber, blockHash := tracker.GetLatestBlock() require.NoError(t, err) require.NotNil(t, tracker) - require.Equal(t, uint64(0), tracker.GetLatestBlock()) + require.Equal(t, uint64(0), blockNumber) + require.Equal(t, common.Hash{}.Bytes(), blockHash) } func TestUpdateLatestBlock(t *testing.T) { @@ -35,25 +38,36 @@ func TestUpdateLatestBlock(t *testing.T) { tracker, err := NewBlockTracker(ctx, CONTRACT_ADDRESS, querier) require.NoError(t, err) + blockHigh := testutils.Int64ToHash(100).Bytes() + blockLow := testutils.Int64ToHash(50).Bytes() + // Test updating to a higher block - err = tracker.UpdateLatestBlock(ctx, 100) + err = tracker.UpdateLatestBlock(ctx, 100, blockHigh) + blockNumber, blockHash := tracker.GetLatestBlock() require.NoError(t, err) - require.Equal(t, uint64(100), tracker.GetLatestBlock()) + require.Equal(t, uint64(100), blockNumber) + require.Equal(t, blockHigh, blockHash) // Test updating to a lower block (should not update) - err = tracker.UpdateLatestBlock(ctx, 50) + err = tracker.UpdateLatestBlock(ctx, 50, blockLow) require.NoError(t, err) - require.Equal(t, uint64(100), tracker.GetLatestBlock()) + blockNumber, blockHash = tracker.GetLatestBlock() + require.Equal(t, uint64(100), blockNumber) + require.Equal(t, blockHigh, blockHash) // Test updating to the same block (should not update) - err = tracker.UpdateLatestBlock(ctx, 100) + err = tracker.UpdateLatestBlock(ctx, 100, blockHigh) require.NoError(t, err) - require.Equal(t, uint64(100), tracker.GetLatestBlock()) + blockNumber, blockHash = tracker.GetLatestBlock() + require.Equal(t, uint64(100), blockNumber) + require.Equal(t, blockHigh, blockHash) // Verify persistence newTracker, err := NewBlockTracker(ctx, CONTRACT_ADDRESS, querier) require.NoError(t, err) - require.Equal(t, uint64(100), newTracker.GetLatestBlock()) + blockNumber, blockHash = newTracker.GetLatestBlock() + require.Equal(t, uint64(100), blockNumber) + require.Equal(t, blockHigh, blockHash) } func TestConcurrentUpdates(t *testing.T) { @@ -77,7 +91,11 @@ func TestConcurrentUpdates(t *testing.T) { defer wg.Done() for j := 0; j < updatesPerGoroutine; j++ { blockNum := uint64(startBlock + j) - err := tracker.UpdateLatestBlock(ctx, blockNum) + err := tracker.UpdateLatestBlock( + ctx, + blockNum, + testutils.Int64ToHash(int64(blockNum)).Bytes(), + ) require.NoError(t, err) } }(i * updatesPerGoroutine) @@ -87,12 +105,18 @@ func TestConcurrentUpdates(t *testing.T) { // The final block number should be the highest one attempted expectedFinalBlock := uint64((numGoroutines-1)*updatesPerGoroutine + (updatesPerGoroutine - 1)) - require.Equal(t, expectedFinalBlock, tracker.GetLatestBlock()) + blockNumber, blockHash := tracker.GetLatestBlock() + require.Equal(t, expectedFinalBlock, blockNumber) + + expectedFinalHash := testutils.Int64ToHash(int64(expectedFinalBlock)).Bytes() + require.Equal(t, expectedFinalHash, blockHash) // Verify persistence newTracker, err := NewBlockTracker(ctx, CONTRACT_ADDRESS, querier) require.NoError(t, err) - require.Equal(t, expectedFinalBlock, newTracker.GetLatestBlock()) + blockNumber, blockHash = newTracker.GetLatestBlock() + require.Equal(t, expectedFinalBlock, blockNumber) + require.Equal(t, expectedFinalHash, blockHash) } func TestMultipleContractAddresses(t *testing.T) { @@ -110,15 +134,22 @@ func TestMultipleContractAddresses(t *testing.T) { tracker2, err := NewBlockTracker(ctx, address2, querier) require.NoError(t, err) + blockHash1 := testutils.Int64ToHash(100).Bytes() + blockHash2 := testutils.Int64ToHash(200).Bytes() + // Update trackers independently - err = tracker1.UpdateLatestBlock(ctx, 100) + err = tracker1.UpdateLatestBlock(ctx, 100, blockHash1) require.NoError(t, err) - err = tracker2.UpdateLatestBlock(ctx, 200) + err = tracker2.UpdateLatestBlock(ctx, 200, blockHash2) require.NoError(t, err) // Verify different addresses maintain separate block numbers - require.Equal(t, uint64(100), tracker1.GetLatestBlock()) - require.Equal(t, uint64(200), tracker2.GetLatestBlock()) + blockNumber, blockHash := tracker1.GetLatestBlock() + require.Equal(t, uint64(100), blockNumber) + require.Equal(t, blockHash1, blockHash) + blockNumber, blockHash = tracker2.GetLatestBlock() + require.Equal(t, uint64(200), blockNumber) + require.Equal(t, blockHash2, blockHash) // Verify persistence for both addresses newTracker1, err := NewBlockTracker(ctx, address1, querier) @@ -126,6 +157,10 @@ func TestMultipleContractAddresses(t *testing.T) { newTracker2, err := NewBlockTracker(ctx, address2, querier) require.NoError(t, err) - require.Equal(t, uint64(100), newTracker1.GetLatestBlock()) - require.Equal(t, uint64(200), newTracker2.GetLatestBlock()) + blockNumber, blockHash = newTracker1.GetLatestBlock() + require.Equal(t, uint64(100), blockNumber) + require.Equal(t, blockHash1, blockHash) + blockNumber, blockHash = newTracker2.GetLatestBlock() + require.Equal(t, uint64(200), blockNumber) + require.Equal(t, blockHash2, blockHash) } diff --git a/pkg/indexer/indexer.go b/pkg/indexer/indexer.go index c98e2859..4f12a9cd 100644 --- a/pkg/indexer/indexer.go +++ b/pkg/indexer/indexer.go @@ -33,7 +33,6 @@ type Indexer struct { func NewIndexer( ctx context.Context, log *zap.Logger, - ) *Indexer { ctx, cancel := context.WithCancel(ctx) return &Indexer{ @@ -57,7 +56,8 @@ func (s *Indexer) Close() { func (i *Indexer) StartIndexer( db *sql.DB, cfg config.ContractsOptions, - validationService mlsvalidate.MLSValidationService) error { + validationService mlsvalidate.MLSValidationService, +) error { client, err := blockchain.NewClient(i.ctx, cfg.RpcUrl) if err != nil { return err @@ -146,8 +146,9 @@ func configureLogStream( return nil, err } + latestBlockNumber, _ := messagesTracker.GetLatestBlock() messagesChannel := builder.ListenForContractEvent( - messagesTracker.GetLatestBlock(), + latestBlockNumber, common.HexToAddress(cfg.MessagesContractAddress), []common.Hash{messagesTopic}, cfg.MaxChainDisconnectTime, @@ -163,8 +164,9 @@ func configureLogStream( return nil, err } + latestBlockNumber, _ = identityUpdatesTracker.GetLatestBlock() identityUpdatesChannel := builder.ListenForContractEvent( - identityUpdatesTracker.GetLatestBlock(), + latestBlockNumber, common.HexToAddress(cfg.IdentityUpdatesContractAddress), []common.Hash{identityUpdatesTopic}, cfg.MaxChainDisconnectTime, @@ -212,7 +214,7 @@ func indexLogs( } } else { logger.Info("Stored log", zap.Uint64("blockNumber", event.BlockNumber)) - if trackerErr := blockTracker.UpdateLatestBlock(ctx, event.BlockNumber); trackerErr != nil { + if trackerErr := blockTracker.UpdateLatestBlock(ctx, event.BlockNumber, event.BlockHash.Bytes()); trackerErr != nil { logger.Error("error updating block tracker", zap.Error(trackerErr)) } } diff --git a/pkg/indexer/indexer_test.go b/pkg/indexer/indexer_test.go index 609da982..753862c8 100644 --- a/pkg/indexer/indexer_test.go +++ b/pkg/indexer/indexer_test.go @@ -19,10 +19,15 @@ func TestIndexLogsSuccess(t *testing.T) { channel := make(chan types.Log, 10) defer close(channel) newBlockNumber := uint64(10) + newBlockHash := common.HexToHash( + "0x0000000000000000000000000000000000000000000000000000000000000000", + ) logStorer := storerMocks.NewMockLogStorer(t) blockTracker := indexerMocks.NewMockIBlockTracker(t) - blockTracker.EXPECT().UpdateLatestBlock(mock.Anything, newBlockNumber).Return(nil) + blockTracker.EXPECT(). + UpdateLatestBlock(mock.Anything, newBlockNumber, newBlockHash.Bytes()). + Return(nil) event := types.Log{ Address: common.HexToAddress("0x123"), diff --git a/pkg/indexer/interface.go b/pkg/indexer/interface.go index e6bc088c..80cd7004 100644 --- a/pkg/indexer/interface.go +++ b/pkg/indexer/interface.go @@ -1,8 +1,10 @@ package indexer -import "context" +import ( + "context" +) type IBlockTracker interface { - GetLatestBlock() uint64 - UpdateLatestBlock(ctx context.Context, block uint64) error + GetLatestBlock() (uint64, []byte) + UpdateLatestBlock(ctx context.Context, block uint64, hash []byte) error } diff --git a/pkg/migrations/00003_add-latest-block.up.sql b/pkg/migrations/00003_add-latest-block.up.sql index 855d9a3b..07a8589f 100644 --- a/pkg/migrations/00003_add-latest-block.up.sql +++ b/pkg/migrations/00003_add-latest-block.up.sql @@ -1,5 +1,6 @@ CREATE TABLE latest_block( contract_address TEXT NOT NULL PRIMARY KEY, - block_number BIGINT NOT NULL + block_number BIGINT NOT NULL, + block_hash BYTEA NOT NULL ); diff --git a/pkg/mocks/indexer/mock_IBlockTracker.go b/pkg/mocks/indexer/mock_IBlockTracker.go index 82fd0bde..2168ef91 100644 --- a/pkg/mocks/indexer/mock_IBlockTracker.go +++ b/pkg/mocks/indexer/mock_IBlockTracker.go @@ -22,7 +22,7 @@ func (_m *MockIBlockTracker) EXPECT() *MockIBlockTracker_Expecter { } // GetLatestBlock provides a mock function with given fields: -func (_m *MockIBlockTracker) GetLatestBlock() uint64 { +func (_m *MockIBlockTracker) GetLatestBlock() (uint64, []byte) { ret := _m.Called() if len(ret) == 0 { @@ -30,13 +30,25 @@ func (_m *MockIBlockTracker) GetLatestBlock() uint64 { } var r0 uint64 + var r1 []byte + if rf, ok := ret.Get(0).(func() (uint64, []byte)); ok { + return rf() + } if rf, ok := ret.Get(0).(func() uint64); ok { r0 = rf() } else { r0 = ret.Get(0).(uint64) } - return r0 + if rf, ok := ret.Get(1).(func() []byte); ok { + r1 = rf() + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).([]byte) + } + } + + return r0, r1 } // MockIBlockTracker_GetLatestBlock_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetLatestBlock' @@ -56,27 +68,27 @@ func (_c *MockIBlockTracker_GetLatestBlock_Call) Run(run func()) *MockIBlockTrac return _c } -func (_c *MockIBlockTracker_GetLatestBlock_Call) Return(_a0 uint64) *MockIBlockTracker_GetLatestBlock_Call { - _c.Call.Return(_a0) +func (_c *MockIBlockTracker_GetLatestBlock_Call) Return(_a0 uint64, _a1 []byte) *MockIBlockTracker_GetLatestBlock_Call { + _c.Call.Return(_a0, _a1) return _c } -func (_c *MockIBlockTracker_GetLatestBlock_Call) RunAndReturn(run func() uint64) *MockIBlockTracker_GetLatestBlock_Call { +func (_c *MockIBlockTracker_GetLatestBlock_Call) RunAndReturn(run func() (uint64, []byte)) *MockIBlockTracker_GetLatestBlock_Call { _c.Call.Return(run) return _c } -// UpdateLatestBlock provides a mock function with given fields: ctx, block -func (_m *MockIBlockTracker) UpdateLatestBlock(ctx context.Context, block uint64) error { - ret := _m.Called(ctx, block) +// UpdateLatestBlock provides a mock function with given fields: ctx, block, hash +func (_m *MockIBlockTracker) UpdateLatestBlock(ctx context.Context, block uint64, hash []byte) error { + ret := _m.Called(ctx, block, hash) if len(ret) == 0 { panic("no return value specified for UpdateLatestBlock") } var r0 error - if rf, ok := ret.Get(0).(func(context.Context, uint64) error); ok { - r0 = rf(ctx, block) + if rf, ok := ret.Get(0).(func(context.Context, uint64, []byte) error); ok { + r0 = rf(ctx, block, hash) } else { r0 = ret.Error(0) } @@ -92,13 +104,14 @@ type MockIBlockTracker_UpdateLatestBlock_Call struct { // UpdateLatestBlock is a helper method to define mock.On call // - ctx context.Context // - block uint64 -func (_e *MockIBlockTracker_Expecter) UpdateLatestBlock(ctx interface{}, block interface{}) *MockIBlockTracker_UpdateLatestBlock_Call { - return &MockIBlockTracker_UpdateLatestBlock_Call{Call: _e.mock.On("UpdateLatestBlock", ctx, block)} +// - hash []byte +func (_e *MockIBlockTracker_Expecter) UpdateLatestBlock(ctx interface{}, block interface{}, hash interface{}) *MockIBlockTracker_UpdateLatestBlock_Call { + return &MockIBlockTracker_UpdateLatestBlock_Call{Call: _e.mock.On("UpdateLatestBlock", ctx, block, hash)} } -func (_c *MockIBlockTracker_UpdateLatestBlock_Call) Run(run func(ctx context.Context, block uint64)) *MockIBlockTracker_UpdateLatestBlock_Call { +func (_c *MockIBlockTracker_UpdateLatestBlock_Call) Run(run func(ctx context.Context, block uint64, hash []byte)) *MockIBlockTracker_UpdateLatestBlock_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(uint64)) + run(args[0].(context.Context), args[1].(uint64), args[2].([]byte)) }) return _c } @@ -108,7 +121,7 @@ func (_c *MockIBlockTracker_UpdateLatestBlock_Call) Return(_a0 error) *MockIBloc return _c } -func (_c *MockIBlockTracker_UpdateLatestBlock_Call) RunAndReturn(run func(context.Context, uint64) error) *MockIBlockTracker_UpdateLatestBlock_Call { +func (_c *MockIBlockTracker_UpdateLatestBlock_Call) RunAndReturn(run func(context.Context, uint64, []byte) error) *MockIBlockTracker_UpdateLatestBlock_Call { _c.Call.Return(run) return _c } diff --git a/pkg/sync/syncWorker.go b/pkg/sync/syncWorker.go index c1d623bc..68e14360 100644 --- a/pkg/sync/syncWorker.go +++ b/pkg/sync/syncWorker.go @@ -265,7 +265,7 @@ func (s *syncWorker) connectToNode(node registry.Node) (*grpc.ClientConn, error) zap.String("peer", node.HttpAddress), zap.Error(err), ) - return nil, fmt.Errorf("Failed to connect to peer at %s: %v", node.HttpAddress, err) + return nil, fmt.Errorf("failed to connect to peer at %s: %v", node.HttpAddress, err) } s.log.Debug(fmt.Sprintf("Successfully connected to peer at %s", node.HttpAddress)) @@ -302,7 +302,7 @@ func (s *syncWorker) setupStream( ) if err != nil { return nil, fmt.Errorf( - "Failed to batch subscribe to peer: %v", + "failed to batch subscribe to peer: %v", err, ) } @@ -326,11 +326,11 @@ func (s *syncWorker) listenToStream( // Recv() is a blocking operation that can only be interrupted by cancelling ctx envs, err := originatorStream.stream.Recv() if err == io.EOF { - return fmt.Errorf("Stream closed with EOF") + return fmt.Errorf("stream closed with EOF") } if err != nil { return fmt.Errorf( - "Stream closed with error: %v", + "stream closed with error: %v", err) } s.log.Debug("Received envelopes", zap.Any("numEnvelopes", len(envs.Envelopes))) diff --git a/pkg/testutils/blockchain.go b/pkg/testutils/blockchain.go new file mode 100644 index 00000000..81df94fa --- /dev/null +++ b/pkg/testutils/blockchain.go @@ -0,0 +1,11 @@ +package testutils + +import ( + "math/big" + + "github.com/ethereum/go-ethereum/common" +) + +func Int64ToHash(x int64) common.Hash { + return common.BigToHash(big.NewInt(x)) +} diff --git a/pkg/testutils/random.go b/pkg/testutils/random.go index 8654f92b..e12dbbb8 100644 --- a/pkg/testutils/random.go +++ b/pkg/testutils/random.go @@ -62,3 +62,8 @@ func RandomPrivateKey(t *testing.T) *ecdsa.PrivateKey { return key } + +func RandomBlockHash() common.Hash { + bytes := RandomBytes(32) + return common.BytesToHash(bytes) +}