Skip to content

Commit

Permalink
Merge pull request onflow#6011 from onflow/leo/increase-guarantee-buffer
Browse files Browse the repository at this point in the history
[Execution] Increase guarantee buffer
  • Loading branch information
zhangchiqing authored May 31, 2024
2 parents a6e3115 + 02fc53d commit 7029607
Showing 1 changed file with 15 additions and 1 deletion.
16 changes: 15 additions & 1 deletion engine/execution/ingestion/fetcher/access_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,23 @@ func NewAccessCollectionFetcher(
return nil, fmt.Errorf("failed to connect to collection rpc: %w", err)
}

lg.Info().Msgf("connected to access rpc at %s", accessAddress)

// make a large enough buffer so that it is able to hold all the guarantees
// on startup and not block the main thread.
// this case would only happen if there are lots of un-executed finalized blocks.
// making sure the --enable-new-ingestion-engine=true flag is on to make use
// of the new ingestion engine for catching up, which loads less un-executed blocks
// during startup.
bufferSize := 100_000
noopHandler := func(flow.Identifier, flow.Entity) {}
e := &AccessCollectionFetcher{
log: lg,
handler: noopHandler,
client: access.NewAccessAPIClient(collectionRPCConn),
chain: chain,
originID: nodeID,
guaranteeInfos: make(chan guaranteeInfo, 100),
guaranteeInfos: make(chan guaranteeInfo, bufferSize),
}

builder := component.NewComponentManagerBuilder().AddWorker(e.launchWorker)
Expand All @@ -98,6 +107,9 @@ func convertAccessAddrFromState(address string) string {
}

func (f *AccessCollectionFetcher) FetchCollection(blockID flow.Identifier, height uint64, guarantee *flow.CollectionGuarantee) error {
f.log.Debug().Hex("blockID", blockID[:]).Uint64("height", height).Hex("col_id", guarantee.CollectionID[:]).
Msgf("fetching collection guarantee")

f.guaranteeInfos <- guaranteeInfo{
blockID: blockID,
height: height,
Expand All @@ -117,6 +129,8 @@ func (f *AccessCollectionFetcher) WithHandle(handler requester.HandleFunc) {
func (f *AccessCollectionFetcher) launchWorker(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) {
ready()

f.log.Info().Msg("launching collection fetcher worker")

for {
select {
case <-ctx.Done():
Expand Down

0 comments on commit 7029607

Please sign in to comment.