Skip to content

Commit

Permalink
polygon/heimdall: service test to cover both sequential and batch fet…
Browse files Browse the repository at this point in the history
…ch paths (#11865)
  • Loading branch information
taratorio authored Sep 4, 2024
1 parent 4085099 commit c6a47ad
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 36 deletions.
2 changes: 2 additions & 0 deletions polygon/heimdall/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ var (

const (
StateEventsFetchLimit = 50
SpansFetchLimit = 150
CheckpointsFetchLimit = 10_000

apiHeimdallTimeout = 10 * time.Second
retryBackOff = time.Second
Expand Down
4 changes: 2 additions & 2 deletions polygon/heimdall/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func newCheckpointFetcher(client HeimdallClient, logger log.Logger) entityFetche
client.FetchCheckpointCount,
client.FetchCheckpoint,
client.FetchCheckpoints,
10_000, // fetchEntitiesPageLimit
CheckpointsFetchLimit,
1,
logger,
)
Expand Down Expand Up @@ -157,7 +157,7 @@ func newSpanFetcher(client HeimdallClient, logger log.Logger) entityFetcher[*Spa
fetchLastEntityId,
fetchEntity,
client.FetchSpans,
150,
SpansFetchLimit,
0,
logger,
)
Expand Down
97 changes: 63 additions & 34 deletions polygon/heimdall/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,13 @@ func (suite *ServiceTestSuite) SetupSuite() {
suite.eg.Go(func() error {
return suite.service.Run(suite.ctx)
})

err = suite.service.SynchronizeMilestones(suite.ctx)
require.NoError(suite.T(), err)
err = suite.service.SynchronizeCheckpoints(suite.ctx)
require.NoError(suite.T(), err)
err = suite.service.SynchronizeSpans(suite.ctx, math.MaxInt)
require.NoError(suite.T(), err)
}

func (suite *ServiceTestSuite) TearDownSuite() {
Expand All @@ -106,9 +113,6 @@ func (suite *ServiceTestSuite) TestMilestones() {
t := suite.T()
svc := suite.service

err := svc.SynchronizeMilestones(ctx)
require.NoError(t, err)

id, ok, err := svc.store.Milestones().LastEntityId(ctx)
require.NoError(t, err)
require.True(t, ok)
Expand All @@ -123,8 +127,6 @@ func (suite *ServiceTestSuite) TestMilestones() {
}

func (suite *ServiceTestSuite) TestRegisterMilestoneObserver() {
err := suite.service.SynchronizeMilestones(suite.ctx)
require.NoError(suite.T(), err)
require.Len(suite.T(), suite.observedMilestones, 100)
}

Expand All @@ -133,9 +135,6 @@ func (suite *ServiceTestSuite) TestCheckpoints() {
t := suite.T()
svc := suite.service

err := svc.SynchronizeCheckpoints(ctx)
require.NoError(t, err)

id, ok, err := svc.store.Checkpoints().LastEntityId(ctx)
require.NoError(t, err)
require.True(t, ok)
Expand All @@ -154,9 +153,6 @@ func (suite *ServiceTestSuite) TestSpans() {
t := suite.T()
svc := suite.service

err := svc.SynchronizeSpans(ctx, math.MaxInt)
require.NoError(t, err)

id, ok, err := svc.store.Spans().LastEntityId(ctx)
require.NoError(t, err)
require.True(t, ok)
Expand All @@ -171,18 +167,10 @@ func (suite *ServiceTestSuite) TestSpans() {
}

func (suite *ServiceTestSuite) TestRegisterSpanObserver() {
err := suite.service.SynchronizeSpans(suite.ctx, math.MaxInt)
require.NoError(suite.T(), err)
require.Len(suite.T(), suite.observedSpans, 1281)
}

func (suite *ServiceTestSuite) TestProducers() {
ctx := suite.ctx
t := suite.T()
svc := suite.service

err := svc.SynchronizeSpans(ctx, math.MaxInt)
require.NoError(t, err)
// span 0
suite.producersSubTest(1) // start
suite.producersSubTest(255) // end
Expand Down Expand Up @@ -245,27 +233,53 @@ func (suite *ServiceTestSuite) setupSpans() {
return cmp.Compare(idA, idB)
})

latestSpanFileName := files[len(files)-1].Name()
// leave few files for sequential fetch
sequentialFetchFileCount := 3
lastSpanFileNameForSequentialFetch := files[len(files)-1].Name()
lastSpanFileNameForBatchFetch := files[len(files)-1-sequentialFetchFileCount].Name()
batchFetchSpanFiles := files[:len(files)-sequentialFetchFileCount]

suite.client.EXPECT().
FetchLatestSpan(gomock.Any()).
DoAndReturn(func(ctx context.Context) (*Span, error) {
return readEntityFromFile[Span](suite.T(), fmt.Sprintf("%s/%s", spanTestDataDir, latestSpanFileName)), nil
}).
AnyTimes()
gomock.InOrder(
suite.client.EXPECT().
FetchLatestSpan(gomock.Any()).
DoAndReturn(func(ctx context.Context) (*Span, error) {
return readEntityFromFile[Span](
suite.T(),
fmt.Sprintf("%s/%s", spanTestDataDir, lastSpanFileNameForBatchFetch),
), nil
}).
Times(1),
suite.client.EXPECT().
FetchLatestSpan(gomock.Any()).
DoAndReturn(func(ctx context.Context) (*Span, error) {
return readEntityFromFile[Span](
suite.T(),
fmt.Sprintf("%s/%s", spanTestDataDir, lastSpanFileNameForSequentialFetch),
), nil
}).
AnyTimes(),
)

suite.client.EXPECT().
FetchSpans(gomock.Any(), gomock.Any(), gomock.Any()).
DoAndReturn(func(ctx context.Context, page, limit uint64) ([]*Span, error) {
var spans []*Span
for i := (page - 1) * limit; i <= ((page-1)*limit)+limit-1 && i <= 1280; i++ {
spans := make([]*Span, 0, limit)
startIdx := (page - 1) * limit
endIdx := min(startIdx+limit, uint64(len(batchFetchSpanFiles)))
for i := startIdx; i < endIdx; i++ {
span := readEntityFromFile[Span](suite.T(), fmt.Sprintf("%s/span_%d.json", spanTestDataDir, i))
spans = append(spans, span)
}

return spans, nil
}).
AnyTimes()
Times(1 + (len(files)+len(files)%SpansFetchLimit)/SpansFetchLimit)

suite.client.EXPECT().
FetchSpan(gomock.Any(), gomock.Any()).
DoAndReturn(func(ctx context.Context, id uint64) (*Span, error) {
return readEntityFromFile[Span](suite.T(), fmt.Sprintf("%s/span_%d.json", spanTestDataDir, id)), nil
}).
Times(sequentialFetchFileCount)
}

func (suite *ServiceTestSuite) setupCheckpoints() {
Expand All @@ -282,10 +296,20 @@ func (suite *ServiceTestSuite) setupCheckpoints() {

sort.Sort(checkpoints)

suite.client.EXPECT().
FetchCheckpointCount(gomock.Any()).
Return(int64(len(files)), nil).
AnyTimes()
// leave few files for sequential fetch
sequentialFetchCheckpointsCount := 3
batchFetchCheckpointsCount := len(checkpoints) - sequentialFetchCheckpointsCount

gomock.InOrder(
suite.client.EXPECT().
FetchCheckpointCount(gomock.Any()).
Return(int64(batchFetchCheckpointsCount), nil).
Times(1),
suite.client.EXPECT().
FetchCheckpointCount(gomock.Any()).
Return(int64(len(files)), nil).
AnyTimes(),
)

gomock.InOrder(
suite.client.EXPECT().
Expand All @@ -297,6 +321,11 @@ func (suite *ServiceTestSuite) setupCheckpoints() {
Return(nil, nil).
Times(1),
)

suite.client.EXPECT().
FetchCheckpoint(gomock.Any(), gomock.Any()).
DoAndReturn(func(ctx context.Context, id int64) (*Checkpoint, error) { return checkpoints[id-1], nil }).
Times(sequentialFetchCheckpointsCount)
}

func (suite *ServiceTestSuite) setupMilestones() {
Expand Down

0 comments on commit c6a47ad

Please sign in to comment.