From 5b0425aa514224a5c9b637161f3e18f3d02f88a9 Mon Sep 17 00:00:00 2001 From: Matthew Pendrey Date: Fri, 20 Dec 2024 11:10:32 +0000 Subject: [PATCH] remove register/unregister --- .../remote/executable/client_test.go | 58 ------- .../executable/request/client_request.go | 48 ------ .../executable/request/client_request_test.go | 163 ------------------ .../executable/request/server_request.go | 34 ---- .../executable/request/server_request_test.go | 101 ----------- core/capabilities/remote/executable/server.go | 3 +- .../remote/executable/server_test.go | 139 --------------- core/capabilities/remote/types/types.go | 10 +- 8 files changed, 6 insertions(+), 550 deletions(-) diff --git a/core/capabilities/remote/executable/client_test.go b/core/capabilities/remote/executable/client_test.go index db351b75eb3..4e7d940a3b9 100644 --- a/core/capabilities/remote/executable/client_test.go +++ b/core/capabilities/remote/executable/client_test.go @@ -12,7 +12,6 @@ import ( commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb" "github.com/smartcontractkit/chainlink-common/pkg/services/servicetest" - "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" "github.com/smartcontractkit/chainlink-common/pkg/values" "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/executable" @@ -31,7 +30,6 @@ const ( ) func Test_Client_DonTopologies(t *testing.T) { - tests.SkipFlakey(t, "https://smartcontract-it.atlassian.net/browse/CAPPL-363") ctx := testutils.Context(t) transmissionSchedule, err := values.NewMap(map[string]any{ @@ -59,18 +57,6 @@ func Test_Client_DonTopologies(t *testing.T) { executeMethod(ctx, caller, transmissionSchedule, executeInputs, responseTest, t) }) - methods = append(methods, func(caller commoncap.ExecutableCapability) { - registerToWorkflowMethod(ctx, caller, transmissionSchedule, func(t *testing.T, responseError error) { - require.NoError(t, responseError) - }, t) - }) - - methods = append(methods, func(caller commoncap.ExecutableCapability) { - unregisterFromWorkflowMethod(ctx, caller, transmissionSchedule, func(t *testing.T, responseError error) { - require.NoError(t, responseError) - }, t) - }) - for _, method := range methods { testClient(t, 1, responseTimeOut, 1, 0, capability, method) @@ -90,7 +76,6 @@ func Test_Client_DonTopologies(t *testing.T) { } func Test_Client_TransmissionSchedules(t *testing.T) { - tests.SkipFlakey(t, "https://smartcontract-it.atlassian.net/browse/CAPPL-363") ctx := testutils.Context(t) responseTest := func(t *testing.T, response commoncap.CapabilityResponse, responseError error) { @@ -264,34 +249,6 @@ func testClient(t *testing.T, numWorkflowPeers int, workflowNodeResponseTimeout wg.Wait() } -func registerToWorkflowMethod(ctx context.Context, caller commoncap.ExecutableCapability, transmissionSchedule *values.Map, - responseTest func(t *testing.T, responseError error), t *testing.T) { - err := caller.RegisterToWorkflow(ctx, commoncap.RegisterToWorkflowRequest{ - Metadata: commoncap.RegistrationMetadata{ - WorkflowID: workflowID1, - ReferenceID: stepReferenceID1, - WorkflowOwner: workflowOwnerID, - }, - Config: transmissionSchedule, - }) - - responseTest(t, err) -} - -func unregisterFromWorkflowMethod(ctx context.Context, caller commoncap.ExecutableCapability, transmissionSchedule *values.Map, - responseTest func(t *testing.T, responseError error), t *testing.T) { - err := caller.UnregisterFromWorkflow(ctx, commoncap.UnregisterFromWorkflowRequest{ - Metadata: commoncap.RegistrationMetadata{ - WorkflowID: workflowID1, - ReferenceID: stepReferenceID1, - WorkflowOwner: workflowOwnerID, - }, - Config: transmissionSchedule, - }) - - responseTest(t, err) -} - func executeMethod(ctx context.Context, caller commoncap.ExecutableCapability, transmissionSchedule *values.Map, executeInputs *values.Map, responseTest func(t *testing.T, responseCh commoncap.CapabilityResponse, responseError error), t *testing.T) { responseCh, err := caller.Execute(ctx, @@ -362,21 +319,6 @@ func (t *clientTestServer) Receive(_ context.Context, msg *remotetypes.MessageBo resp, responseErr := t.executableCapability.Execute(context.Background(), capabilityRequest) payload, marshalErr := pb.MarshalCapabilityResponse(resp) t.sendResponse(messageID, responseErr, payload, marshalErr) - - case remotetypes.MethodRegisterToWorkflow: - registerRequest, err := pb.UnmarshalRegisterToWorkflowRequest(msg.Payload) - if err != nil { - panic(err) - } - responseErr := t.executableCapability.RegisterToWorkflow(context.Background(), registerRequest) - t.sendResponse(messageID, responseErr, nil, nil) - case remotetypes.MethodUnregisterFromWorkflow: - unregisterRequest, err := pb.UnmarshalUnregisterFromWorkflowRequest(msg.Payload) - if err != nil { - panic(err) - } - responseErr := t.executableCapability.UnregisterFromWorkflow(context.Background(), unregisterRequest) - t.sendResponse(messageID, responseErr, nil, nil) default: panic("unknown method") } diff --git a/core/capabilities/remote/executable/request/client_request.go b/core/capabilities/remote/executable/request/client_request.go index ef4d0023773..1d98e3366f1 100644 --- a/core/capabilities/remote/executable/request/client_request.go +++ b/core/capabilities/remote/executable/request/client_request.go @@ -48,54 +48,6 @@ type ClientRequest struct { wg *sync.WaitGroup } -func NewClientRegisterToWorkflowRequest(ctx context.Context, lggr logger.Logger, req commoncap.RegisterToWorkflowRequest, - remoteCapabilityInfo commoncap.CapabilityInfo, localDonInfo capabilities.DON, dispatcher types.Dispatcher, - requestTimeout time.Duration) (*ClientRequest, error) { - rawRequest, err := proto.MarshalOptions{Deterministic: true}.Marshal(pb.RegisterToWorkflowRequestToProto(req)) - if err != nil { - return nil, fmt.Errorf("failed to marshal register to workflow request: %w", err) - } - - workflowID := req.Metadata.WorkflowID - if err := validation.ValidateWorkflowOrExecutionID(workflowID); err != nil { - return nil, fmt.Errorf("workflow ID is invalid: %w", err) - } - - requestID := types.MethodRegisterToWorkflow + ":" + workflowID - - tc := transmission.TransmissionConfig{ - Schedule: transmission.Schedule_AllAtOnce, - DeltaStage: 0, - } - - return newClientRequest(ctx, lggr, requestID, remoteCapabilityInfo, localDonInfo, dispatcher, requestTimeout, - tc, types.MethodRegisterToWorkflow, rawRequest) -} - -func NewClientUnregisterFromWorkflowRequest(ctx context.Context, lggr logger.Logger, req commoncap.UnregisterFromWorkflowRequest, - remoteCapabilityInfo commoncap.CapabilityInfo, localDonInfo capabilities.DON, dispatcher types.Dispatcher, - requestTimeout time.Duration) (*ClientRequest, error) { - rawRequest, err := proto.MarshalOptions{Deterministic: true}.Marshal(pb.UnregisterFromWorkflowRequestToProto(req)) - if err != nil { - return nil, fmt.Errorf("failed to marshal unregister from workflow request: %w", err) - } - - workflowID := req.Metadata.WorkflowID - if err := validation.ValidateWorkflowOrExecutionID(workflowID); err != nil { - return nil, fmt.Errorf("workflow ID is invalid: %w", err) - } - - requestID := types.MethodUnregisterFromWorkflow + ":" + workflowID - - tc := transmission.TransmissionConfig{ - Schedule: transmission.Schedule_AllAtOnce, - DeltaStage: 0, - } - - return newClientRequest(ctx, lggr, requestID, remoteCapabilityInfo, localDonInfo, dispatcher, requestTimeout, - tc, types.MethodUnregisterFromWorkflow, rawRequest) -} - func NewClientExecuteRequest(ctx context.Context, lggr logger.Logger, req commoncap.CapabilityRequest, remoteCapabilityInfo commoncap.CapabilityInfo, localDonInfo capabilities.DON, dispatcher types.Dispatcher, requestTimeout time.Duration) (*ClientRequest, error) { diff --git a/core/capabilities/remote/executable/request/client_request_test.go b/core/capabilities/remote/executable/request/client_request_test.go index 45e81fc70d8..212b7265e4f 100644 --- a/core/capabilities/remote/executable/request/client_request_test.go +++ b/core/capabilities/remote/executable/request/client_request_test.go @@ -80,15 +80,6 @@ func Test_ClientRequest_MessageValidation(t *testing.T) { Config: transmissionSchedule, } - registerToWorkflowRequest := commoncap.RegisterToWorkflowRequest{ - Metadata: commoncap.RegistrationMetadata{ - WorkflowID: workflowID1, - WorkflowOwner: "0xaa", - ReferenceID: "refID", - }, - Config: transmissionSchedule, - } - m, err := values.NewMap(map[string]any{"response": "response1"}) require.NoError(t, err) capabilityResponse := commoncap.CapabilityResponse{ @@ -317,160 +308,6 @@ func Test_ClientRequest_MessageValidation(t *testing.T) { assert.Equal(t, resp, values.NewString("response1")) }) - - t.Run("Register To Workflow Request", func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - dispatcher := &clientRequestTestDispatcher{msgs: make(chan *types.MessageBody, 100)} - request, err := request.NewClientRegisterToWorkflowRequest(ctx, lggr, registerToWorkflowRequest, capInfo, - workflowDonInfo, dispatcher, 10*time.Minute) - require.NoError(t, err) - defer request.Cancel(errors.New("test end")) - - <-dispatcher.msgs - <-dispatcher.msgs - assert.Empty(t, dispatcher.msgs) - - msg := &types.MessageBody{ - CapabilityId: capInfo.ID, - CapabilityDonId: capDonInfo.ID, - CallerDonId: workflowDonInfo.ID, - Method: types.MethodRegisterToWorkflow, - Payload: nil, - MessageId: []byte("messageID"), - } - - msg.Sender = capabilityPeers[0][:] - err = request.OnMessage(ctx, msg) - require.NoError(t, err) - - msg.Sender = capabilityPeers[1][:] - err = request.OnMessage(ctx, msg) - require.NoError(t, err) - - response := <-request.ResponseChan() - require.Nil(t, response.Result) - require.NoError(t, response.Err) - }) - - t.Run("Register To Workflow Request with error", func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - dispatcher := &clientRequestTestDispatcher{msgs: make(chan *types.MessageBody, 100)} - request, err := request.NewClientRegisterToWorkflowRequest(ctx, lggr, registerToWorkflowRequest, capInfo, - workflowDonInfo, dispatcher, 10*time.Minute) - require.NoError(t, err) - defer request.Cancel(errors.New("test end")) - - <-dispatcher.msgs - <-dispatcher.msgs - assert.Empty(t, dispatcher.msgs) - - msg := &types.MessageBody{ - CapabilityId: capInfo.ID, - CapabilityDonId: capDonInfo.ID, - CallerDonId: workflowDonInfo.ID, - Method: types.MethodRegisterToWorkflow, - Payload: nil, - MessageId: []byte("messageID"), - Error: types.Error_INTERNAL_ERROR, - ErrorMsg: "an error", - } - - msg.Sender = capabilityPeers[0][:] - err = request.OnMessage(ctx, msg) - require.NoError(t, err) - - msg.Sender = capabilityPeers[1][:] - err = request.OnMessage(ctx, msg) - require.NoError(t, err) - - response := <-request.ResponseChan() - require.Nil(t, response.Result) - assert.Equal(t, "an error", response.Err.Error()) - }) - - t.Run("Unregister From Workflow Request", func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - dispatcher := &clientRequestTestDispatcher{msgs: make(chan *types.MessageBody, 100)} - request, err := request.NewClientUnregisterFromWorkflowRequest(ctx, lggr, commoncap.UnregisterFromWorkflowRequest{ - Metadata: commoncap.RegistrationMetadata{ - WorkflowID: workflowID1, - }, - }, capInfo, workflowDonInfo, dispatcher, 10*time.Minute) - require.NoError(t, err) - defer request.Cancel(errors.New("test end")) - - <-dispatcher.msgs - <-dispatcher.msgs - assert.Empty(t, dispatcher.msgs) - - msg := &types.MessageBody{ - CapabilityId: capInfo.ID, - CapabilityDonId: capDonInfo.ID, - CallerDonId: workflowDonInfo.ID, - Method: types.MethodUnregisterFromWorkflow, - Payload: nil, - MessageId: []byte("messageID"), - } - - msg.Sender = capabilityPeers[0][:] - err = request.OnMessage(ctx, msg) - require.NoError(t, err) - - msg.Sender = capabilityPeers[1][:] - err = request.OnMessage(ctx, msg) - require.NoError(t, err) - - response := <-request.ResponseChan() - require.Nil(t, response.Result) - require.NoError(t, response.Err) - }) - - t.Run("Unregister From Workflow Request with error", func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - dispatcher := &clientRequestTestDispatcher{msgs: make(chan *types.MessageBody, 100)} - request, err := request.NewClientUnregisterFromWorkflowRequest(ctx, lggr, commoncap.UnregisterFromWorkflowRequest{ - Metadata: commoncap.RegistrationMetadata{ - WorkflowID: workflowID1, - }, - }, capInfo, workflowDonInfo, dispatcher, 10*time.Minute) - require.NoError(t, err) - defer request.Cancel(errors.New("test end")) - - <-dispatcher.msgs - <-dispatcher.msgs - assert.Empty(t, dispatcher.msgs) - - msg := &types.MessageBody{ - CapabilityId: capInfo.ID, - CapabilityDonId: capDonInfo.ID, - CallerDonId: workflowDonInfo.ID, - Method: types.MethodUnregisterFromWorkflow, - Payload: nil, - MessageId: []byte("messageID"), - Error: types.Error_INTERNAL_ERROR, - ErrorMsg: "an error", - } - - msg.Sender = capabilityPeers[0][:] - err = request.OnMessage(ctx, msg) - require.NoError(t, err) - - msg.Sender = capabilityPeers[1][:] - err = request.OnMessage(ctx, msg) - require.NoError(t, err) - - response := <-request.ResponseChan() - require.Nil(t, response.Result) - assert.Equal(t, "an error", response.Err.Error()) - }) } type clientRequestTestDispatcher struct { diff --git a/core/capabilities/remote/executable/request/server_request.go b/core/capabilities/remote/executable/request/server_request.go index ee4e48fe1b9..a4f58194328 100644 --- a/core/capabilities/remote/executable/request/server_request.go +++ b/core/capabilities/remote/executable/request/server_request.go @@ -94,10 +94,6 @@ func (e *ServerRequest) OnMessage(ctx context.Context, msg *types.MessageBody) e switch e.method { case types.MethodExecute: e.executeRequest(ctx, msg.Payload, executeCapabilityRequest) - case types.MethodRegisterToWorkflow: - e.executeRequest(ctx, msg.Payload, registerToWorkflow) - case types.MethodUnregisterFromWorkflow: - e.executeRequest(ctx, msg.Payload, unregisterFromWorkflow) default: e.setError(types.Error_INTERNAL_ERROR, "unknown method %s"+e.method) } @@ -252,33 +248,3 @@ func executeCapabilityRequest(ctx context.Context, lggr logger.Logger, capabilit lggr.Debugw("received execution results", "workflowExecutionID", capabilityRequest.Metadata.WorkflowExecutionID) return responsePayload, nil } - -func registerToWorkflow(ctx context.Context, _ logger.Logger, capability capabilities.ExecutableCapability, - payload []byte) ([]byte, error) { - registerRequest, err := pb.UnmarshalRegisterToWorkflowRequest(payload) - if err != nil { - return nil, fmt.Errorf("failed to unmarshal register to workflow request: %w", err) - } - - err = capability.RegisterToWorkflow(ctx, registerRequest) - if err != nil { - return nil, fmt.Errorf("failed to register to workflow: %w", err) - } - - return nil, nil -} - -func unregisterFromWorkflow(ctx context.Context, _ logger.Logger, capability capabilities.ExecutableCapability, - payload []byte) ([]byte, error) { - unregisterRequest, err := pb.UnmarshalUnregisterFromWorkflowRequest(payload) - if err != nil { - return nil, fmt.Errorf("failed to unmarshal unregister from workflow request: %w", err) - } - - err = capability.UnregisterFromWorkflow(ctx, unregisterRequest) - if err != nil { - return nil, fmt.Errorf("failed to unregister from workflow: %w", err) - } - - return nil, nil -} diff --git a/core/capabilities/remote/executable/request/server_request_test.go b/core/capabilities/remote/executable/request/server_request_test.go index faf91be0690..d58b6edf3f2 100644 --- a/core/capabilities/remote/executable/request/server_request_test.go +++ b/core/capabilities/remote/executable/request/server_request_test.go @@ -165,107 +165,6 @@ func Test_ServerRequest_MessageValidation(t *testing.T) { assert.Equal(t, types.Error_OK, dispatcher.msgs[0].Error) assert.Equal(t, types.Error_OK, dispatcher.msgs[1].Error) }) - t.Run("Register to workflow request", func(t *testing.T) { - dispatcher := &testDispatcher{} - request := request.NewServerRequest(capability, types.MethodRegisterToWorkflow, "capabilityID", 2, - capabilityPeerID, callingDon, "requestMessageID", dispatcher, 10*time.Minute, lggr) - - err := sendValidRequest(request, workflowPeers, capabilityPeerID, rawRequest) - require.NoError(t, err) - - err = request.OnMessage(context.Background(), &types.MessageBody{ - Version: 0, - Sender: workflowPeers[1][:], - Receiver: capabilityPeerID[:], - MessageId: []byte("workflowID" + "workflowExecutionID"), - CapabilityId: "capabilityID", - CapabilityDonId: 2, - CallerDonId: 1, - Method: types.MethodRegisterToWorkflow, - Payload: rawRequest, - }) - require.NoError(t, err) - assert.Len(t, dispatcher.msgs, 2) - assert.Equal(t, types.Error_OK, dispatcher.msgs[0].Error) - assert.Equal(t, types.Error_OK, dispatcher.msgs[1].Error) - }) - t.Run("Register to workflow request errors", func(t *testing.T) { - dispatcher := &testDispatcher{} - req := request.NewServerRequest(TestErrorCapability{}, types.MethodRegisterToWorkflow, "capabilityID", 2, - capabilityPeerID, callingDon, "requestMessageID", dispatcher, 10*time.Minute, lggr) - - err := sendValidRequest(req, workflowPeers, capabilityPeerID, rawRequest) - require.NoError(t, err) - - err = req.OnMessage(context.Background(), &types.MessageBody{ - Version: 0, - Sender: workflowPeers[1][:], - Receiver: capabilityPeerID[:], - MessageId: []byte("workflowID" + "workflowExecutionID"), - CapabilityId: "capabilityID", - CapabilityDonId: 2, - CallerDonId: 1, - Method: types.MethodRegisterToWorkflow, - Payload: rawRequest, - }) - require.NoError(t, err) - assert.Len(t, dispatcher.msgs, 2) - assert.Equal(t, types.Error_INTERNAL_ERROR, dispatcher.msgs[0].Error) - assert.Equal(t, "failed to register to workflow: an error", dispatcher.msgs[0].ErrorMsg) - assert.Equal(t, types.Error_INTERNAL_ERROR, dispatcher.msgs[1].Error) - assert.Equal(t, "failed to register to workflow: an error", dispatcher.msgs[1].ErrorMsg) - }) - t.Run("Unregister from workflow request", func(t *testing.T) { - dispatcher := &testDispatcher{} - request := request.NewServerRequest(capability, types.MethodUnregisterFromWorkflow, "capabilityID", 2, - capabilityPeerID, callingDon, "requestMessageID", dispatcher, 10*time.Minute, lggr) - - err := sendValidRequest(request, workflowPeers, capabilityPeerID, rawRequest) - require.NoError(t, err) - - err = request.OnMessage(context.Background(), &types.MessageBody{ - Version: 0, - Sender: workflowPeers[1][:], - Receiver: capabilityPeerID[:], - MessageId: []byte("workflowID" + "workflowExecutionID"), - CapabilityId: "capabilityID", - CapabilityDonId: 2, - CallerDonId: 1, - Method: types.MethodUnregisterFromWorkflow, - Payload: rawRequest, - }) - require.NoError(t, err) - assert.Len(t, dispatcher.msgs, 2) - assert.Equal(t, types.Error_OK, dispatcher.msgs[0].Error) - assert.Equal(t, types.Error_OK, dispatcher.msgs[1].Error) - }) - - t.Run("Unregister from workflow request errors", func(t *testing.T) { - dispatcher := &testDispatcher{} - req := request.NewServerRequest(TestErrorCapability{}, types.MethodUnregisterFromWorkflow, "capabilityID", 2, - capabilityPeerID, callingDon, "requestMessageID", dispatcher, 10*time.Minute, lggr) - - err := sendValidRequest(req, workflowPeers, capabilityPeerID, rawRequest) - require.NoError(t, err) - - err = req.OnMessage(context.Background(), &types.MessageBody{ - Version: 0, - Sender: workflowPeers[1][:], - Receiver: capabilityPeerID[:], - MessageId: []byte("workflowID" + "workflowExecutionID"), - CapabilityId: "capabilityID", - CapabilityDonId: 2, - CallerDonId: 1, - Method: types.MethodUnregisterFromWorkflow, - Payload: rawRequest, - }) - require.NoError(t, err) - assert.Len(t, dispatcher.msgs, 2) - assert.Equal(t, types.Error_INTERNAL_ERROR, dispatcher.msgs[0].Error) - assert.Equal(t, "failed to unregister from workflow: an error", dispatcher.msgs[0].ErrorMsg) - assert.Equal(t, types.Error_INTERNAL_ERROR, dispatcher.msgs[1].Error) - assert.Equal(t, "failed to unregister from workflow: an error", dispatcher.msgs[1].ErrorMsg) - }) } type serverRequest interface { diff --git a/core/capabilities/remote/executable/server.go b/core/capabilities/remote/executable/server.go index 0208572b0bd..5d6a0aff490 100644 --- a/core/capabilities/remote/executable/server.go +++ b/core/capabilities/remote/executable/server.go @@ -138,9 +138,10 @@ func (r *server) Receive(ctx context.Context, msg *types.MessageBody) { defer r.receiveLock.Unlock() switch msg.Method { - case types.MethodExecute, types.MethodRegisterToWorkflow, types.MethodUnregisterFromWorkflow: + case types.MethodExecute: default: r.lggr.Errorw("received request for unsupported method type", "method", remote.SanitizeLogString(msg.Method)) + return } messageID, err := GetMessageID(msg) diff --git a/core/capabilities/remote/executable/server_test.go b/core/capabilities/remote/executable/server_test.go index 1fb5c2dd413..c92a996f846 100644 --- a/core/capabilities/remote/executable/server_test.go +++ b/core/capabilities/remote/executable/server_test.go @@ -83,89 +83,6 @@ func Test_Server_Execute_RespondsAfterSufficientRequests(t *testing.T) { closeServices(t, srvcs) } -func Test_Server_RegisterToWorkflow_RespondsAfterSufficientRequests(t *testing.T) { - ctx := testutils.Context(t) - - numCapabilityPeers := 4 - - callers, srvcs := testRemoteExecutableCapabilityServer(ctx, t, &commoncap.RemoteExecutableConfig{}, &TestCapability{}, 10, 9, numCapabilityPeers, 3, 10*time.Minute) - - for _, caller := range callers { - err := caller.RegisterToWorkflow(context.Background(), commoncap.RegisterToWorkflowRequest{ - Metadata: commoncap.RegistrationMetadata{ - WorkflowID: workflowID1, - ReferenceID: stepReferenceID1, - WorkflowOwner: workflowOwnerID, - }, - }) - - require.NoError(t, err) - } - - for _, caller := range callers { - for i := 0; i < numCapabilityPeers; i++ { - msg := <-caller.receivedMessages - assert.Equal(t, remotetypes.Error_OK, msg.Error) - } - } - closeServices(t, srvcs) -} - -func Test_Server_RegisterToWorkflow_Error(t *testing.T) { - ctx := testutils.Context(t) - - numCapabilityPeers := 4 - - callers, srvcs := testRemoteExecutableCapabilityServer(ctx, t, &commoncap.RemoteExecutableConfig{}, &TestErrorCapability{}, 10, 9, numCapabilityPeers, 3, 10*time.Minute) - - for _, caller := range callers { - err := caller.RegisterToWorkflow(context.Background(), commoncap.RegisterToWorkflowRequest{ - Metadata: commoncap.RegistrationMetadata{ - WorkflowID: workflowID1, - ReferenceID: stepReferenceID1, - WorkflowOwner: workflowOwnerID, - }, - }) - - require.NoError(t, err) - } - - for _, caller := range callers { - for i := 0; i < numCapabilityPeers; i++ { - msg := <-caller.receivedMessages - assert.Equal(t, remotetypes.Error_INTERNAL_ERROR, msg.Error) - } - } - closeServices(t, srvcs) -} - -func Test_Server_UnregisterFromWorkflow_RespondsAfterSufficientRequests(t *testing.T) { - ctx := testutils.Context(t) - - numCapabilityPeers := 4 - - callers, srvcs := testRemoteExecutableCapabilityServer(ctx, t, &commoncap.RemoteExecutableConfig{}, &TestCapability{}, 10, 9, numCapabilityPeers, 3, 10*time.Minute) - - for _, caller := range callers { - err := caller.UnregisterFromWorkflow(context.Background(), commoncap.UnregisterFromWorkflowRequest{ - Metadata: commoncap.RegistrationMetadata{ - WorkflowID: workflowID1, - ReferenceID: stepReferenceID1, - WorkflowOwner: workflowOwnerID, - }, - }) - require.NoError(t, err) - } - - for _, caller := range callers { - for i := 0; i < numCapabilityPeers; i++ { - msg := <-caller.receivedMessages - assert.Equal(t, remotetypes.Error_OK, msg.Error) - } - } - closeServices(t, srvcs) -} - func Test_Server_InsufficientCallers(t *testing.T) { ctx := testutils.Context(t) @@ -319,62 +236,6 @@ func (r *serverTestClient) Info(ctx context.Context) (commoncap.CapabilityInfo, panic("not implemented") } -func (r *serverTestClient) RegisterToWorkflow(ctx context.Context, req commoncap.RegisterToWorkflowRequest) error { - rawRequest, err := pb.MarshalRegisterToWorkflowRequest(req) - if err != nil { - return err - } - - messageID := remotetypes.MethodRegisterToWorkflow + ":" + req.Metadata.WorkflowID - - for _, node := range r.capabilityDonInfo.Members { - message := &remotetypes.MessageBody{ - CapabilityId: "capability-id", - CapabilityDonId: 1, - CallerDonId: 2, - Method: remotetypes.MethodRegisterToWorkflow, - Payload: rawRequest, - MessageId: []byte(messageID), - Sender: r.peerID[:], - Receiver: node[:], - } - - if err = r.dispatcher.Send(node, message); err != nil { - return err - } - } - - return nil -} - -func (r *serverTestClient) UnregisterFromWorkflow(ctx context.Context, req commoncap.UnregisterFromWorkflowRequest) error { - rawRequest, err := pb.MarshalUnregisterFromWorkflowRequest(req) - if err != nil { - return err - } - - messageID := remotetypes.MethodUnregisterFromWorkflow + ":" + req.Metadata.WorkflowID - - for _, node := range r.capabilityDonInfo.Members { - message := &remotetypes.MessageBody{ - CapabilityId: "capability-id", - CapabilityDonId: 1, - CallerDonId: 2, - Method: remotetypes.MethodUnregisterFromWorkflow, - Payload: rawRequest, - MessageId: []byte(messageID), - Sender: r.peerID[:], - Receiver: node[:], - } - - if err = r.dispatcher.Send(node, message); err != nil { - return err - } - } - - return nil -} - func (r *serverTestClient) Execute(ctx context.Context, req commoncap.CapabilityRequest) (<-chan commoncap.CapabilityResponse, error) { rawRequest, err := pb.MarshalCapabilityRequest(req) if err != nil { diff --git a/core/capabilities/remote/types/types.go b/core/capabilities/remote/types/types.go index 188587bc7ac..983ac6a8ed2 100644 --- a/core/capabilities/remote/types/types.go +++ b/core/capabilities/remote/types/types.go @@ -13,12 +13,10 @@ import ( ) const ( - MethodRegisterTrigger = "RegisterTrigger" - MethodUnRegisterTrigger = "UnregisterTrigger" - MethodTriggerEvent = "TriggerEvent" - MethodExecute = "Execute" - MethodRegisterToWorkflow = "RegisterToWorkflow" - MethodUnregisterFromWorkflow = "UnregisterFromWorkflow" + MethodRegisterTrigger = "RegisterTrigger" + MethodUnRegisterTrigger = "UnregisterTrigger" + MethodTriggerEvent = "TriggerEvent" + MethodExecute = "Execute" ) type Dispatcher interface {