Skip to content

Commit

Permalink
Add a heartbeatInterval parameter to SubscribeEvents and check that a…
Browse files Browse the repository at this point in the history
… response is received for each block on the indexer
  • Loading branch information
m-Peter committed Dec 19, 2023
1 parent 4d47834 commit 2b36693
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 2 deletions.
15 changes: 13 additions & 2 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func runIndexer(store *storage.Store) {
indexer.EventFilter{
Contracts: []string{"A.7e60df042a9c0868.FlowToken"},
},
1,
)
if err != nil {
log.Fatalf("could not subscribe to execution data: %v", err)
Expand All @@ -91,19 +92,27 @@ func runIndexer(store *storage.Store) {
indexer.EventFilter{
Contracts: []string{"A.7e60df042a9c0868.FlowToken"},
},
1,
)
if err != nil {
log.Fatalf("could not subscribe to execution data: %v", err)
}
}

// track the most recently seen block height. we will use this when reconnecting
lastHeight := latestBlockHeader.Height
// the first response should be for latestBlockHeader.Height
lastHeight := latestBlockHeader.Height - 1
for {
select {
case <-ctx.Done():
return
case response, ok := <-sub.Channel():
if response.Height != lastHeight+1 {
log.Fatalf("missed events response for block %d", lastHeight+1)
reconnect(lastHeight)
continue
}

if !ok {
if sub.Err() != nil {
log.Fatalf("error in subscription: %v", sub.Err())
Expand All @@ -115,7 +124,9 @@ func runIndexer(store *storage.Store) {
}

log.Printf("block %d %s:", response.Height, response.BlockID)
store.StoreBlockHeight(ctx, response.Height)
if len(response.Events) > 0 {
store.StoreBlockHeight(ctx, response.Height)
}
for _, event := range response.Events {
log.Printf(" %s", event.Type)
}
Expand Down
2 changes: 2 additions & 0 deletions indexer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ func (c *ExecutionDataClient) SubscribeEvents(
startBlockID flow.Identifier,
startHeight uint64,
filter EventFilter,
heartbeatInterval uint64,
opts ...grpc.CallOption,
) (*Subscription[EventsResponse], error) {
if startBlockID != flow.ZeroID && startHeight > 0 {
Expand All @@ -149,6 +150,7 @@ func (c *ExecutionDataClient) SubscribeEvents(
Address: filter.Addresses,
Contract: filter.Contracts,
},
HeartbeatInterval: heartbeatInterval,
}
if startBlockID != flow.ZeroID {
req.StartBlockId = startBlockID[:]
Expand Down

0 comments on commit 2b36693

Please sign in to comment.