diff --git a/access/handler.go b/access/handler.go index d4205d3d481..25316e7f3dd 100644 --- a/access/handler.go +++ b/access/handler.go @@ -1436,6 +1436,9 @@ func (h *Handler) SendAndSubscribeTransactionStatuses( return subscription.HandleSubscription(sub, func(txResults []*TransactionResult) error { for i := range txResults { index := messageIndex.Value() + if ok := messageIndex.Set(index + 1); !ok { + return status.Errorf(codes.Internal, "message index already incremented to %d", messageIndex.Value()) + } err = stream.Send(&access.SendAndSubscribeTransactionStatusesResponse{ TransactionResults: TransactionResultToMessage(txResults[i]), @@ -1445,9 +1448,6 @@ func (h *Handler) SendAndSubscribeTransactionStatuses( return rpc.ConvertError(err, "could not send response", codes.Internal) } - if ok := messageIndex.Set(index + 1); !ok { - return status.Errorf(codes.Internal, "message index already incremented to %d", messageIndex.Value()) - } } return nil diff --git a/engine/access/state_stream/backend/handler.go b/engine/access/state_stream/backend/handler.go index 8ed2ca8051d..b2066440bb8 100644 --- a/engine/access/state_stream/backend/handler.go +++ b/engine/access/state_stream/backend/handler.go @@ -382,6 +382,9 @@ func (h *Handler) handleEventsResponse(send sendSubscribeEventsResponseFunc, hea } index := messageIndex.Value() + if ok := messageIndex.Set(index + 1); !ok { + return status.Errorf(codes.Internal, "message index already incremented to %d", messageIndex.Value()) + } err = send(&executiondata.SubscribeEventsResponse{ BlockHeight: resp.Height, @@ -394,10 +397,6 @@ func (h *Handler) handleEventsResponse(send sendSubscribeEventsResponseFunc, hea return rpc.ConvertError(err, "could not send response", codes.Internal) } - if ok := messageIndex.Set(index + 1); !ok { - return status.Errorf(codes.Internal, "message index already incremented to %d", messageIndex.Value()) - } - return nil } } @@ -500,6 +499,9 @@ func (h *Handler) handleAccountStatusesResponse( } index := messageIndex.Value() + if ok := messageIndex.Set(index + 1); !ok { + return status.Errorf(codes.Internal, "message index already incremented to %d", messageIndex.Value()) + } err = send(&executiondata.SubscribeAccountStatusesResponse{ BlockId: convert.IdentifierToMessage(resp.BlockID), @@ -511,10 +513,6 @@ func (h *Handler) handleAccountStatusesResponse( return rpc.ConvertError(err, "could not send response", codes.Internal) } - if ok := messageIndex.Set(index + 1); !ok { - return status.Errorf(codes.Internal, "message index already incremented to %d", messageIndex.Value()) - } - return nil } }