Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove register/unregister remote implementation #15788

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 0 additions & 58 deletions core/capabilities/remote/executable/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb"
"github.com/smartcontractkit/chainlink-common/pkg/services/servicetest"
"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"
"github.com/smartcontractkit/chainlink-common/pkg/values"

"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/executable"
Expand All @@ -31,7 +30,6 @@ const (
)

func Test_Client_DonTopologies(t *testing.T) {
tests.SkipFlakey(t, "https://smartcontract-it.atlassian.net/browse/CAPPL-363")
ctx := testutils.Context(t)

transmissionSchedule, err := values.NewMap(map[string]any{
Expand Down Expand Up @@ -59,18 +57,6 @@ func Test_Client_DonTopologies(t *testing.T) {
executeMethod(ctx, caller, transmissionSchedule, executeInputs, responseTest, t)
})

methods = append(methods, func(caller commoncap.ExecutableCapability) {
registerToWorkflowMethod(ctx, caller, transmissionSchedule, func(t *testing.T, responseError error) {
require.NoError(t, responseError)
}, t)
})

methods = append(methods, func(caller commoncap.ExecutableCapability) {
unregisterFromWorkflowMethod(ctx, caller, transmissionSchedule, func(t *testing.T, responseError error) {
require.NoError(t, responseError)
}, t)
})

for _, method := range methods {
testClient(t, 1, responseTimeOut, 1, 0,
capability, method)
Expand All @@ -90,7 +76,6 @@ func Test_Client_DonTopologies(t *testing.T) {
}

func Test_Client_TransmissionSchedules(t *testing.T) {
tests.SkipFlakey(t, "https://smartcontract-it.atlassian.net/browse/CAPPL-363")
ctx := testutils.Context(t)

responseTest := func(t *testing.T, response commoncap.CapabilityResponse, responseError error) {
Expand Down Expand Up @@ -264,34 +249,6 @@ func testClient(t *testing.T, numWorkflowPeers int, workflowNodeResponseTimeout
wg.Wait()
}

func registerToWorkflowMethod(ctx context.Context, caller commoncap.ExecutableCapability, transmissionSchedule *values.Map,
responseTest func(t *testing.T, responseError error), t *testing.T) {
err := caller.RegisterToWorkflow(ctx, commoncap.RegisterToWorkflowRequest{
Metadata: commoncap.RegistrationMetadata{
WorkflowID: workflowID1,
ReferenceID: stepReferenceID1,
WorkflowOwner: workflowOwnerID,
},
Config: transmissionSchedule,
})

responseTest(t, err)
}

func unregisterFromWorkflowMethod(ctx context.Context, caller commoncap.ExecutableCapability, transmissionSchedule *values.Map,
responseTest func(t *testing.T, responseError error), t *testing.T) {
err := caller.UnregisterFromWorkflow(ctx, commoncap.UnregisterFromWorkflowRequest{
Metadata: commoncap.RegistrationMetadata{
WorkflowID: workflowID1,
ReferenceID: stepReferenceID1,
WorkflowOwner: workflowOwnerID,
},
Config: transmissionSchedule,
})

responseTest(t, err)
}

func executeMethod(ctx context.Context, caller commoncap.ExecutableCapability, transmissionSchedule *values.Map,
executeInputs *values.Map, responseTest func(t *testing.T, responseCh commoncap.CapabilityResponse, responseError error), t *testing.T) {
responseCh, err := caller.Execute(ctx,
Expand Down Expand Up @@ -362,21 +319,6 @@ func (t *clientTestServer) Receive(_ context.Context, msg *remotetypes.MessageBo
resp, responseErr := t.executableCapability.Execute(context.Background(), capabilityRequest)
payload, marshalErr := pb.MarshalCapabilityResponse(resp)
t.sendResponse(messageID, responseErr, payload, marshalErr)

case remotetypes.MethodRegisterToWorkflow:
registerRequest, err := pb.UnmarshalRegisterToWorkflowRequest(msg.Payload)
if err != nil {
panic(err)
}
responseErr := t.executableCapability.RegisterToWorkflow(context.Background(), registerRequest)
t.sendResponse(messageID, responseErr, nil, nil)
case remotetypes.MethodUnregisterFromWorkflow:
unregisterRequest, err := pb.UnmarshalUnregisterFromWorkflowRequest(msg.Payload)
if err != nil {
panic(err)
}
responseErr := t.executableCapability.UnregisterFromWorkflow(context.Background(), unregisterRequest)
t.sendResponse(messageID, responseErr, nil, nil)
default:
panic("unknown method")
}
Expand Down
48 changes: 0 additions & 48 deletions core/capabilities/remote/executable/request/client_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,54 +48,6 @@ type ClientRequest struct {
wg *sync.WaitGroup
}

func NewClientRegisterToWorkflowRequest(ctx context.Context, lggr logger.Logger, req commoncap.RegisterToWorkflowRequest,
remoteCapabilityInfo commoncap.CapabilityInfo, localDonInfo capabilities.DON, dispatcher types.Dispatcher,
requestTimeout time.Duration) (*ClientRequest, error) {
rawRequest, err := proto.MarshalOptions{Deterministic: true}.Marshal(pb.RegisterToWorkflowRequestToProto(req))
if err != nil {
return nil, fmt.Errorf("failed to marshal register to workflow request: %w", err)
}

workflowID := req.Metadata.WorkflowID
if err := validation.ValidateWorkflowOrExecutionID(workflowID); err != nil {
return nil, fmt.Errorf("workflow ID is invalid: %w", err)
}

requestID := types.MethodRegisterToWorkflow + ":" + workflowID

tc := transmission.TransmissionConfig{
Schedule: transmission.Schedule_AllAtOnce,
DeltaStage: 0,
}

return newClientRequest(ctx, lggr, requestID, remoteCapabilityInfo, localDonInfo, dispatcher, requestTimeout,
tc, types.MethodRegisterToWorkflow, rawRequest)
}

func NewClientUnregisterFromWorkflowRequest(ctx context.Context, lggr logger.Logger, req commoncap.UnregisterFromWorkflowRequest,
remoteCapabilityInfo commoncap.CapabilityInfo, localDonInfo capabilities.DON, dispatcher types.Dispatcher,
requestTimeout time.Duration) (*ClientRequest, error) {
rawRequest, err := proto.MarshalOptions{Deterministic: true}.Marshal(pb.UnregisterFromWorkflowRequestToProto(req))
if err != nil {
return nil, fmt.Errorf("failed to marshal unregister from workflow request: %w", err)
}

workflowID := req.Metadata.WorkflowID
if err := validation.ValidateWorkflowOrExecutionID(workflowID); err != nil {
return nil, fmt.Errorf("workflow ID is invalid: %w", err)
}

requestID := types.MethodUnregisterFromWorkflow + ":" + workflowID

tc := transmission.TransmissionConfig{
Schedule: transmission.Schedule_AllAtOnce,
DeltaStage: 0,
}

return newClientRequest(ctx, lggr, requestID, remoteCapabilityInfo, localDonInfo, dispatcher, requestTimeout,
tc, types.MethodUnregisterFromWorkflow, rawRequest)
}

func NewClientExecuteRequest(ctx context.Context, lggr logger.Logger, req commoncap.CapabilityRequest,
remoteCapabilityInfo commoncap.CapabilityInfo, localDonInfo capabilities.DON, dispatcher types.Dispatcher,
requestTimeout time.Duration) (*ClientRequest, error) {
Expand Down
163 changes: 0 additions & 163 deletions core/capabilities/remote/executable/request/client_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,6 @@ func Test_ClientRequest_MessageValidation(t *testing.T) {
Config: transmissionSchedule,
}

registerToWorkflowRequest := commoncap.RegisterToWorkflowRequest{
Metadata: commoncap.RegistrationMetadata{
WorkflowID: workflowID1,
WorkflowOwner: "0xaa",
ReferenceID: "refID",
},
Config: transmissionSchedule,
}

m, err := values.NewMap(map[string]any{"response": "response1"})
require.NoError(t, err)
capabilityResponse := commoncap.CapabilityResponse{
Expand Down Expand Up @@ -317,160 +308,6 @@ func Test_ClientRequest_MessageValidation(t *testing.T) {

assert.Equal(t, resp, values.NewString("response1"))
})

t.Run("Register To Workflow Request", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

dispatcher := &clientRequestTestDispatcher{msgs: make(chan *types.MessageBody, 100)}
request, err := request.NewClientRegisterToWorkflowRequest(ctx, lggr, registerToWorkflowRequest, capInfo,
workflowDonInfo, dispatcher, 10*time.Minute)
require.NoError(t, err)
defer request.Cancel(errors.New("test end"))

<-dispatcher.msgs
<-dispatcher.msgs
assert.Empty(t, dispatcher.msgs)

msg := &types.MessageBody{
CapabilityId: capInfo.ID,
CapabilityDonId: capDonInfo.ID,
CallerDonId: workflowDonInfo.ID,
Method: types.MethodRegisterToWorkflow,
Payload: nil,
MessageId: []byte("messageID"),
}

msg.Sender = capabilityPeers[0][:]
err = request.OnMessage(ctx, msg)
require.NoError(t, err)

msg.Sender = capabilityPeers[1][:]
err = request.OnMessage(ctx, msg)
require.NoError(t, err)

response := <-request.ResponseChan()
require.Nil(t, response.Result)
require.NoError(t, response.Err)
})

t.Run("Register To Workflow Request with error", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

dispatcher := &clientRequestTestDispatcher{msgs: make(chan *types.MessageBody, 100)}
request, err := request.NewClientRegisterToWorkflowRequest(ctx, lggr, registerToWorkflowRequest, capInfo,
workflowDonInfo, dispatcher, 10*time.Minute)
require.NoError(t, err)
defer request.Cancel(errors.New("test end"))

<-dispatcher.msgs
<-dispatcher.msgs
assert.Empty(t, dispatcher.msgs)

msg := &types.MessageBody{
CapabilityId: capInfo.ID,
CapabilityDonId: capDonInfo.ID,
CallerDonId: workflowDonInfo.ID,
Method: types.MethodRegisterToWorkflow,
Payload: nil,
MessageId: []byte("messageID"),
Error: types.Error_INTERNAL_ERROR,
ErrorMsg: "an error",
}

msg.Sender = capabilityPeers[0][:]
err = request.OnMessage(ctx, msg)
require.NoError(t, err)

msg.Sender = capabilityPeers[1][:]
err = request.OnMessage(ctx, msg)
require.NoError(t, err)

response := <-request.ResponseChan()
require.Nil(t, response.Result)
assert.Equal(t, "an error", response.Err.Error())
})

t.Run("Unregister From Workflow Request", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

dispatcher := &clientRequestTestDispatcher{msgs: make(chan *types.MessageBody, 100)}
request, err := request.NewClientUnregisterFromWorkflowRequest(ctx, lggr, commoncap.UnregisterFromWorkflowRequest{
Metadata: commoncap.RegistrationMetadata{
WorkflowID: workflowID1,
},
}, capInfo, workflowDonInfo, dispatcher, 10*time.Minute)
require.NoError(t, err)
defer request.Cancel(errors.New("test end"))

<-dispatcher.msgs
<-dispatcher.msgs
assert.Empty(t, dispatcher.msgs)

msg := &types.MessageBody{
CapabilityId: capInfo.ID,
CapabilityDonId: capDonInfo.ID,
CallerDonId: workflowDonInfo.ID,
Method: types.MethodUnregisterFromWorkflow,
Payload: nil,
MessageId: []byte("messageID"),
}

msg.Sender = capabilityPeers[0][:]
err = request.OnMessage(ctx, msg)
require.NoError(t, err)

msg.Sender = capabilityPeers[1][:]
err = request.OnMessage(ctx, msg)
require.NoError(t, err)

response := <-request.ResponseChan()
require.Nil(t, response.Result)
require.NoError(t, response.Err)
})

t.Run("Unregister From Workflow Request with error", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

dispatcher := &clientRequestTestDispatcher{msgs: make(chan *types.MessageBody, 100)}
request, err := request.NewClientUnregisterFromWorkflowRequest(ctx, lggr, commoncap.UnregisterFromWorkflowRequest{
Metadata: commoncap.RegistrationMetadata{
WorkflowID: workflowID1,
},
}, capInfo, workflowDonInfo, dispatcher, 10*time.Minute)
require.NoError(t, err)
defer request.Cancel(errors.New("test end"))

<-dispatcher.msgs
<-dispatcher.msgs
assert.Empty(t, dispatcher.msgs)

msg := &types.MessageBody{
CapabilityId: capInfo.ID,
CapabilityDonId: capDonInfo.ID,
CallerDonId: workflowDonInfo.ID,
Method: types.MethodUnregisterFromWorkflow,
Payload: nil,
MessageId: []byte("messageID"),
Error: types.Error_INTERNAL_ERROR,
ErrorMsg: "an error",
}

msg.Sender = capabilityPeers[0][:]
err = request.OnMessage(ctx, msg)
require.NoError(t, err)

msg.Sender = capabilityPeers[1][:]
err = request.OnMessage(ctx, msg)
require.NoError(t, err)

response := <-request.ResponseChan()
require.Nil(t, response.Result)
assert.Equal(t, "an error", response.Err.Error())
})
}

type clientRequestTestDispatcher struct {
Expand Down
34 changes: 0 additions & 34 deletions core/capabilities/remote/executable/request/server_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,6 @@ func (e *ServerRequest) OnMessage(ctx context.Context, msg *types.MessageBody) e
switch e.method {
case types.MethodExecute:
e.executeRequest(ctx, msg.Payload, executeCapabilityRequest)
case types.MethodRegisterToWorkflow:
e.executeRequest(ctx, msg.Payload, registerToWorkflow)
case types.MethodUnregisterFromWorkflow:
e.executeRequest(ctx, msg.Payload, unregisterFromWorkflow)
default:
e.setError(types.Error_INTERNAL_ERROR, "unknown method %s"+e.method)
}
Expand Down Expand Up @@ -252,33 +248,3 @@ func executeCapabilityRequest(ctx context.Context, lggr logger.Logger, capabilit
lggr.Debugw("received execution results", "workflowExecutionID", capabilityRequest.Metadata.WorkflowExecutionID)
return responsePayload, nil
}

func registerToWorkflow(ctx context.Context, _ logger.Logger, capability capabilities.ExecutableCapability,
payload []byte) ([]byte, error) {
registerRequest, err := pb.UnmarshalRegisterToWorkflowRequest(payload)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal register to workflow request: %w", err)
}

err = capability.RegisterToWorkflow(ctx, registerRequest)
if err != nil {
return nil, fmt.Errorf("failed to register to workflow: %w", err)
}

return nil, nil
}

func unregisterFromWorkflow(ctx context.Context, _ logger.Logger, capability capabilities.ExecutableCapability,
payload []byte) ([]byte, error) {
unregisterRequest, err := pb.UnmarshalUnregisterFromWorkflowRequest(payload)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal unregister from workflow request: %w", err)
}

err = capability.UnregisterFromWorkflow(ctx, unregisterRequest)
if err != nil {
return nil, fmt.Errorf("failed to unregister from workflow: %w", err)
}

return nil, nil
}
Loading
Loading