Skip to content

Commit

Permalink
add method GetLatestBlock and update interface
Browse files Browse the repository at this point in the history
  • Loading branch information
fbac committed Jan 10, 2025
1 parent 792bfc7 commit b14e1de
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 92 deletions.
2 changes: 1 addition & 1 deletion pkg/db/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ ON CONFLICT (contract_address)
block_number = @block_number, block_hash = @block_hash
WHERE
@block_number > latest_block.block_number
AND @block_hash != latest_block.block_hash);
AND @block_hash != latest_block.block_hash;

-- name: GetLatestBlock :one
SELECT
Expand Down
1 change: 1 addition & 0 deletions pkg/db/queries/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 2 additions & 6 deletions pkg/indexer/blockTracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,10 @@ func NewBlockTracker(
return bt, nil
}

func (bt *BlockTracker) GetLatestBlockNumber() uint64 {
return bt.latestBlock.number.Load()
}

func (bt *BlockTracker) GetLatestBlockHash() []byte {
func (bt *BlockTracker) GetLatestBlock() (uint64, []byte) {
bt.mu.Lock()
defer bt.mu.Unlock()
return bt.latestBlock.hash.Bytes()
return bt.latestBlock.number.Load(), bt.latestBlock.hash.Bytes()
}

func (bt *BlockTracker) UpdateLatestBlock(
Expand Down
55 changes: 34 additions & 21 deletions pkg/indexer/blockTracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sync"
"testing"

"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/require"
"github.com/xmtp/xmtpd/pkg/db/queries"
"github.com/xmtp/xmtpd/pkg/testutils"
Expand All @@ -20,9 +21,11 @@ func TestInitialize(t *testing.T) {
querier := queries.New(db)

tracker, err := NewBlockTracker(ctx, CONTRACT_ADDRESS, querier)
blockNumber, blockHash := tracker.GetLatestBlock()
require.NoError(t, err)
require.NotNil(t, tracker)
require.Equal(t, uint64(0), tracker.GetLatestBlockNumber())
require.Equal(t, uint64(0), blockNumber)
require.Equal(t, common.Hash{}.Bytes(), blockHash)
}

func TestUpdateLatestBlock(t *testing.T) {
Expand All @@ -40,27 +43,31 @@ func TestUpdateLatestBlock(t *testing.T) {

// Test updating to a higher block
err = tracker.UpdateLatestBlock(ctx, 100, blockHigh)
blockNumber, blockHash := tracker.GetLatestBlock()
require.NoError(t, err)
require.Equal(t, uint64(100), tracker.GetLatestBlockNumber())
require.Equal(t, blockHigh, tracker.GetLatestBlockHash())
require.Equal(t, uint64(100), blockNumber)
require.Equal(t, blockHigh, blockHash)

// Test updating to a lower block (should not update)
err = tracker.UpdateLatestBlock(ctx, 50, blockLow)
require.NoError(t, err)
require.Equal(t, uint64(100), tracker.GetLatestBlockNumber())
require.Equal(t, blockHigh, tracker.GetLatestBlockHash())
blockNumber, blockHash = tracker.GetLatestBlock()
require.Equal(t, uint64(100), blockNumber)
require.Equal(t, blockHigh, blockHash)

// Test updating to the same block (should not update)
err = tracker.UpdateLatestBlock(ctx, 100, blockHigh)
require.NoError(t, err)
require.Equal(t, uint64(100), tracker.GetLatestBlockNumber())
require.Equal(t, blockHigh, tracker.GetLatestBlockHash())
blockNumber, blockHash = tracker.GetLatestBlock()
require.Equal(t, uint64(100), blockNumber)
require.Equal(t, blockHigh, blockHash)

// Verify persistence
newTracker, err := NewBlockTracker(ctx, CONTRACT_ADDRESS, querier)
require.NoError(t, err)
require.Equal(t, uint64(100), newTracker.GetLatestBlockNumber())
require.Equal(t, blockHigh, newTracker.GetLatestBlockHash())
blockNumber, blockHash = newTracker.GetLatestBlock()
require.Equal(t, uint64(100), blockNumber)
require.Equal(t, blockHigh, blockHash)
}

func TestConcurrentUpdates(t *testing.T) {
Expand Down Expand Up @@ -98,16 +105,18 @@ func TestConcurrentUpdates(t *testing.T) {

// The final block number should be the highest one attempted
expectedFinalBlock := uint64((numGoroutines-1)*updatesPerGoroutine + (updatesPerGoroutine - 1))
require.Equal(t, expectedFinalBlock, tracker.GetLatestBlockNumber())
blockNumber, blockHash := tracker.GetLatestBlock()
require.Equal(t, expectedFinalBlock, blockNumber)

expectedFinalHash := testutils.IntToHash(int64(expectedFinalBlock)).Bytes()
require.Equal(t, expectedFinalHash, tracker.GetLatestBlockHash())
require.Equal(t, expectedFinalHash, blockHash)

// Verify persistence
newTracker, err := NewBlockTracker(ctx, CONTRACT_ADDRESS, querier)
require.NoError(t, err)
require.Equal(t, expectedFinalBlock, newTracker.GetLatestBlockNumber())
require.Equal(t, expectedFinalHash, newTracker.GetLatestBlockHash())
blockNumber, blockHash = newTracker.GetLatestBlock()
require.Equal(t, expectedFinalBlock, blockNumber)
require.Equal(t, expectedFinalHash, blockHash)
}

func TestMultipleContractAddresses(t *testing.T) {
Expand Down Expand Up @@ -135,19 +144,23 @@ func TestMultipleContractAddresses(t *testing.T) {
require.NoError(t, err)

// Verify different addresses maintain separate block numbers
require.Equal(t, uint64(100), tracker1.GetLatestBlockNumber())
require.Equal(t, blockHash1, tracker1.GetLatestBlockHash())
require.Equal(t, uint64(200), tracker2.GetLatestBlockNumber())
require.Equal(t, blockHash2, tracker2.GetLatestBlockHash())
blockNumber, blockHash := tracker1.GetLatestBlock()
require.Equal(t, uint64(100), blockNumber)
require.Equal(t, blockHash1, blockHash)
blockNumber, blockHash = tracker2.GetLatestBlock()
require.Equal(t, uint64(200), blockNumber)
require.Equal(t, blockHash2, blockHash)

// Verify persistence for both addresses
newTracker1, err := NewBlockTracker(ctx, address1, querier)
require.NoError(t, err)
newTracker2, err := NewBlockTracker(ctx, address2, querier)
require.NoError(t, err)

require.Equal(t, uint64(100), newTracker1.GetLatestBlockNumber())
require.Equal(t, blockHash1, newTracker1.GetLatestBlockHash())
require.Equal(t, uint64(200), newTracker2.GetLatestBlockNumber())
require.Equal(t, blockHash2, newTracker2.GetLatestBlockHash())
blockNumber, blockHash = newTracker1.GetLatestBlock()
require.Equal(t, uint64(100), blockNumber)
require.Equal(t, blockHash1, blockHash)
blockNumber, blockHash = newTracker2.GetLatestBlock()
require.Equal(t, uint64(200), blockNumber)
require.Equal(t, blockHash2, blockHash)
}
6 changes: 4 additions & 2 deletions pkg/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,9 @@ func configureLogStream(
return nil, err
}

latestBlockNumber, _ := messagesTracker.GetLatestBlock()
messagesChannel := builder.ListenForContractEvent(
messagesTracker.GetLatestBlockNumber(),
latestBlockNumber,
common.HexToAddress(cfg.MessagesContractAddress),
[]common.Hash{messagesTopic},
cfg.MaxChainDisconnectTime,
Expand All @@ -163,8 +164,9 @@ func configureLogStream(
return nil, err
}

latestBlockNumber, _ = identityUpdatesTracker.GetLatestBlock()
identityUpdatesChannel := builder.ListenForContractEvent(
identityUpdatesTracker.GetLatestBlockNumber(),
latestBlockNumber,
common.HexToAddress(cfg.IdentityUpdatesContractAddress),
[]common.Hash{identityUpdatesTopic},
cfg.MaxChainDisconnectTime,
Expand Down
3 changes: 1 addition & 2 deletions pkg/indexer/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
)

type IBlockTracker interface {
GetLatestBlockNumber() uint64
GetLatestBlockHash() []byte
GetLatestBlock() (uint64, []byte)
UpdateLatestBlock(ctx context.Context, block uint64, hash []byte) error
}
85 changes: 25 additions & 60 deletions pkg/mocks/indexer/mock_IBlockTracker.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit b14e1de

Please sign in to comment.