Skip to content

Commit

Permalink
Merge pull request onflow#6598 from AndriiDiachuk/change-send-and-sub…
Browse files Browse the repository at this point in the history
…scribe-endoints-msg-index-start-with-zero

[Access] Change SendAndSubscribe endpoint's MessageIndex to start at 0
  • Loading branch information
peterargue authored Nov 6, 2024
2 parents df95497 + 299829d commit 43ee3c5
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 5 deletions.
8 changes: 6 additions & 2 deletions access/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1435,15 +1435,19 @@ func (h *Handler) SendAndSubscribeTransactionStatuses(
messageIndex := counters.NewMonotonousCounter(0)
return subscription.HandleSubscription(sub, func(txResults []*TransactionResult) error {
for i := range txResults {
value := messageIndex.Increment()
index := messageIndex.Value()
if ok := messageIndex.Set(index + 1); !ok {
return status.Errorf(codes.Internal, "message index already incremented to %d", messageIndex.Value())
}

err = stream.Send(&access.SendAndSubscribeTransactionStatusesResponse{
TransactionResults: TransactionResultToMessage(txResults[i]),
MessageIndex: value,
MessageIndex: index,
})
if err != nil {
return rpc.ConvertError(err, "could not send response", codes.Internal)
}

}

return nil
Expand Down
10 changes: 8 additions & 2 deletions engine/access/state_stream/backend/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,10 @@ func (h *Handler) handleEventsResponse(send sendSubscribeEventsResponseFunc, hea
return status.Errorf(codes.Internal, "could not convert events to entity: %v", err)
}

index := messageIndex.Increment()
index := messageIndex.Value()
if ok := messageIndex.Set(index + 1); !ok {
return status.Errorf(codes.Internal, "message index already incremented to %d", messageIndex.Value())
}

err = send(&executiondata.SubscribeEventsResponse{
BlockHeight: resp.Height,
Expand Down Expand Up @@ -495,7 +498,10 @@ func (h *Handler) handleAccountStatusesResponse(
return err
}

index := messageIndex.Increment()
index := messageIndex.Value()
if ok := messageIndex.Set(index + 1); !ok {
return status.Errorf(codes.Internal, "message index already incremented to %d", messageIndex.Value())
}

err = send(&executiondata.SubscribeAccountStatusesResponse{
BlockId: convert.IdentifierToMessage(resp.BlockID),
Expand Down
2 changes: 1 addition & 1 deletion integration/tests/access/cohort1/access_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ func (s *AccessAPISuite) TestSendAndSubscribeTransactionStatuses() {
})
s.Require().NoError(err)

expectedCounter := uint64(1)
expectedCounter := uint64(0)
lastReportedTxStatus := entities.TransactionStatus_UNKNOWN
var txID sdk.Identifier

Expand Down

0 comments on commit 43ee3c5

Please sign in to comment.