diff --git a/internal/blockchain/common/common.go b/internal/blockchain/common/common.go index d15e0b8c4..bb5d952cd 100644 --- a/internal/blockchain/common/common.go +++ b/internal/blockchain/common/common.go @@ -392,9 +392,17 @@ func (s *subscriptions) GetSubscription(subID string) *SubscriptionInfo { } // Common function for handling receipts from blockchain connectors. -func HandleReceipt(ctx context.Context, plugin core.Named, reply *BlockchainReceiptNotification, callbacks BlockchainCallbacks) error { +func HandleReceipt(ctx context.Context, namespace string, plugin core.Named, reply *BlockchainReceiptNotification, callbacks BlockchainCallbacks) error { l := log.L(ctx) + if namespace != "" { + opNamespace, _, _ := core.ParseNamespacedOpID(ctx, reply.Headers.ReceiptID) + if opNamespace != namespace { + l.Debugf("Ignoring operation update from other namespace: request=%s tx=%s message=%s", reply.Headers.ReceiptID, reply.TxHash, reply.Message) + return nil + } + } + if reply.Headers.ReceiptID == "" || reply.Headers.ReplyType == "" { return fmt.Errorf("reply cannot be processed - missing fields: %+v", reply) } @@ -409,7 +417,7 @@ func HandleReceipt(ctx context.Context, plugin core.Named, reply *BlockchainRece updateType = core.OpStatusFailed } - // Slightly upgly conversion from ReceiptFromBlockchain -> JSONObject which the generic OperationUpdate() function requires + // Slightly ugly conversion from ReceiptFromBlockchain -> JSONObject which the generic OperationUpdate() function requires var output fftypes.JSONObject obj, err := json.Marshal(reply) if err != nil { diff --git a/internal/blockchain/common/common_test.go b/internal/blockchain/common/common_test.go index 402a40339..8461bcc5e 100644 --- a/internal/blockchain/common/common_test.go +++ b/internal/blockchain/common/common_test.go @@ -342,15 +342,15 @@ func TestGoodSuccessReceipt(t *testing.T) { cb.SetHandler("ns1", mcb) mcb.On("OperationUpdate", "ns1", mock.Anything).Return() - err := HandleReceipt(context.Background(), nil, &reply, cb) + err := HandleReceipt(context.Background(), "", nil, &reply, cb) assert.NoError(t, err) reply.Headers.ReplyType = "TransactionUpdate" - err = HandleReceipt(context.Background(), nil, &reply, cb) + err = HandleReceipt(context.Background(), "", nil, &reply, cb) assert.NoError(t, err) reply.Headers.ReplyType = "TransactionFailed" - err = HandleReceipt(context.Background(), nil, &reply, cb) + err = HandleReceipt(context.Background(), "", nil, &reply, cb) assert.NoError(t, err) } @@ -365,7 +365,7 @@ func TestReceiptMarshallingError(t *testing.T) { cb.SetHandler("ns1", mcb) mcb.On("OperationUpdate", "ns1", mock.Anything).Return() - err := HandleReceipt(context.Background(), nil, &reply, cb) + err := HandleReceipt(context.Background(), "", nil, &reply, cb) assert.Error(t, err) assert.Regexp(t, ".*[^n]marshalling error.*", err) } @@ -384,10 +384,19 @@ func TestBadReceipt(t *testing.T) { data := fftypes.JSONAnyPtr(`{}`) err := json.Unmarshal(data.Bytes(), &reply) assert.NoError(t, err) - err = HandleReceipt(context.Background(), nil, &reply, nil) + err = HandleReceipt(context.Background(), "", nil, &reply, nil) assert.Error(t, err) } +func TestWrongNamespaceReceipt(t *testing.T) { + var reply BlockchainReceiptNotification + data := fftypes.JSONAnyPtr(`{}`) + err := json.Unmarshal(data.Bytes(), &reply) + assert.NoError(t, err) + err = HandleReceipt(context.Background(), "wrong", nil, &reply, nil) + assert.NoError(t, err) +} + func TestErrorWrappingConflict(t *testing.T) { ctx := context.Background() res := &resty.Response{ diff --git a/internal/blockchain/ethereum/ethereum.go b/internal/blockchain/ethereum/ethereum.go index afa3c2c06..6ecfa283c 100644 --- a/internal/blockchain/ethereum/ethereum.go +++ b/internal/blockchain/ethereum/ethereum.go @@ -513,7 +513,7 @@ func (e *Ethereum) eventLoop(namespace string, wsconn wsclient.WSClient, closed if !isBatch { var receipt common.BlockchainReceiptNotification _ = json.Unmarshal(msgBytes, &receipt) - err := common.HandleReceipt(ctx, e, &receipt, e.callbacks) + err := common.HandleReceipt(ctx, namespace, e, &receipt, e.callbacks) if err != nil { l.Errorf("Failed to process receipt: %+v", msgTyped) } @@ -1223,7 +1223,7 @@ func (e *Ethereum) GetTransactionStatus(ctx context.Context, operation *core.Ope TxHash: statusResponse.GetString("transactionHash"), Message: statusResponse.GetString("errorMessage"), ProtocolID: receiptInfo.GetString("protocolId")} - err := common.HandleReceipt(ctx, e, receipt, e.callbacks) + err := common.HandleReceipt(ctx, operation.Namespace, e, receipt, e.callbacks) if err != nil { log.L(ctx).Warnf("Failed to handle receipt") } diff --git a/internal/blockchain/ethereum/ethereum_test.go b/internal/blockchain/ethereum/ethereum_test.go index e9c036f3c..739a3e298 100644 --- a/internal/blockchain/ethereum/ethereum_test.go +++ b/internal/blockchain/ethereum/ethereum_test.go @@ -1835,7 +1835,7 @@ func TestHandleReceiptTXSuccess(t *testing.T) { err := json.Unmarshal(data.Bytes(), &reply) assert.NoError(t, err) - common.HandleReceipt(context.Background(), e, &reply, e.callbacks) + common.HandleReceipt(context.Background(), "ns1", e, &reply, e.callbacks) em.AssertExpectations(t) } @@ -1922,7 +1922,7 @@ func TestHandleReceiptTXUpdateEVMConnect(t *testing.T) { assert.NoError(t, err) expectedReceiptId := "ns1:" + operationID.String() assert.Equal(t, reply.Headers.ReceiptID, expectedReceiptId) - common.HandleReceipt(context.Background(), e, &reply, e.callbacks) + common.HandleReceipt(context.Background(), "", e, &reply, e.callbacks) em.AssertExpectations(t) } @@ -1987,7 +1987,7 @@ func TestHandleMsgBatchBadData(t *testing.T) { data := fftypes.JSONAnyPtr(`{}`) err := json.Unmarshal(data.Bytes(), &reply) assert.NoError(t, err) - common.HandleReceipt(context.Background(), e, &reply, e.callbacks) + common.HandleReceipt(context.Background(), "", e, &reply, e.callbacks) } func TestFormatNil(t *testing.T) { diff --git a/internal/blockchain/fabric/fabric.go b/internal/blockchain/fabric/fabric.go index 574e5d8e9..a854aceee 100644 --- a/internal/blockchain/fabric/fabric.go +++ b/internal/blockchain/fabric/fabric.go @@ -540,7 +540,7 @@ func (f *Fabric) eventLoop(namespace string, wsconn wsclient.WSClient, closed ch var receipt common.BlockchainReceiptNotification _ = json.Unmarshal(msgBytes, &receipt) - err := common.HandleReceipt(ctx, f, &receipt, f.callbacks) + err := common.HandleReceipt(ctx, namespace, f, &receipt, f.callbacks) if err != nil { l.Errorf("Failed to process receipt: %+v", msgTyped) } diff --git a/internal/blockchain/fabric/fabric_test.go b/internal/blockchain/fabric/fabric_test.go index a839fa9ee..2453ba5d4 100644 --- a/internal/blockchain/fabric/fabric_test.go +++ b/internal/blockchain/fabric/fabric_test.go @@ -1815,7 +1815,7 @@ func TestHandleReceiptTXSuccess(t *testing.T) { err := json.Unmarshal(data, &reply) assert.NoError(t, err) - common.HandleReceipt(context.Background(), e, &reply, e.callbacks) + common.HandleReceipt(context.Background(), "ns1", e, &reply, e.callbacks) em.AssertExpectations(t) } @@ -1836,7 +1836,7 @@ func TestHandleReceiptNoRequestID(t *testing.T) { data := []byte(`{}`) err := json.Unmarshal(data, &reply) assert.NoError(t, err) - common.HandleReceipt(context.Background(), e, &reply, e.callbacks) + common.HandleReceipt(context.Background(), "", e, &reply, e.callbacks) } func TestHandleReceiptFailedTx(t *testing.T) { @@ -1876,7 +1876,7 @@ func TestHandleReceiptFailedTx(t *testing.T) { err := json.Unmarshal(data, &reply) assert.NoError(t, err) - common.HandleReceipt(context.Background(), e, &reply, e.callbacks) + common.HandleReceipt(context.Background(), "", e, &reply, e.callbacks) em.AssertExpectations(t) } diff --git a/internal/blockchain/tezos/tezos.go b/internal/blockchain/tezos/tezos.go index 8dd1fb8d4..a2103ca4c 100644 --- a/internal/blockchain/tezos/tezos.go +++ b/internal/blockchain/tezos/tezos.go @@ -591,7 +591,7 @@ func (t *Tezos) GetTransactionStatus(ctx context.Context, operation *core.Operat TxHash: statusResponse.GetString("transactionHash"), Message: statusResponse.GetString("errorMessage"), ProtocolID: receiptInfo.GetString("protocolId")} - err := common.HandleReceipt(ctx, t, receipt, t.callbacks) + err := common.HandleReceipt(ctx, operation.Namespace, t, receipt, t.callbacks) if err != nil { log.L(ctx).Warnf("Failed to handle receipt") } @@ -822,7 +822,7 @@ func (t *Tezos) eventLoop() { var receipt common.BlockchainReceiptNotification _ = json.Unmarshal(msgBytes, &receipt) - err := common.HandleReceipt(ctx, t, &receipt, t.callbacks) + err := common.HandleReceipt(ctx, "", t, &receipt, t.callbacks) // TODO: should be specific to a namespace if err != nil { l.Errorf("Failed to process receipt: %+v", msgTyped) } diff --git a/internal/blockchain/tezos/tezos_test.go b/internal/blockchain/tezos/tezos_test.go index 9b545a4ca..6ed39ae65 100644 --- a/internal/blockchain/tezos/tezos_test.go +++ b/internal/blockchain/tezos/tezos_test.go @@ -695,7 +695,7 @@ func TestHandleReceiptTXSuccess(t *testing.T) { err := json.Unmarshal(data.Bytes(), &reply) assert.NoError(t, err) - common.HandleReceipt(context.Background(), tz, &reply, tz.callbacks) + common.HandleReceipt(context.Background(), "", tz, &reply, tz.callbacks) tm.AssertExpectations(t) } @@ -780,7 +780,7 @@ func TestHandleReceiptTXUpdateTezosConnect(t *testing.T) { assert.NoError(t, err) expectedReceiptId := "ns1:" + operationID.String() assert.Equal(t, reply.Headers.ReceiptID, expectedReceiptId) - common.HandleReceipt(context.Background(), tz, &reply, tz.callbacks) + common.HandleReceipt(context.Background(), "", tz, &reply, tz.callbacks) tm.AssertExpectations(t) } @@ -797,7 +797,7 @@ func TestHandleMsgBatchBadData(t *testing.T) { data := fftypes.JSONAnyPtr(`{}`) err := json.Unmarshal(data.Bytes(), &reply) assert.NoError(t, err) - common.HandleReceipt(context.Background(), tz, &reply, tz.callbacks) + common.HandleReceipt(context.Background(), "", tz, &reply, tz.callbacks) } func TestAddSubscription(t *testing.T) {