From 968a8a17b7b03f48ffa1ee55008eea76d5df48f3 Mon Sep 17 00:00:00 2001 From: Gabriel Paradiso Date: Tue, 26 Nov 2024 12:41:22 -0300 Subject: [PATCH 1/2] feat: implement FetchFunc --- .../gateway/handlers/capabilities/handler.go | 7 +- core/services/workflows/syncer/fetcher.go | 59 ++++++++++ .../services/workflows/syncer/fetcher_test.go | 102 ++++++++++++++++++ 3 files changed, 165 insertions(+), 3 deletions(-) create mode 100644 core/services/workflows/syncer/fetcher.go create mode 100644 core/services/workflows/syncer/fetcher_test.go diff --git a/core/services/gateway/handlers/capabilities/handler.go b/core/services/gateway/handlers/capabilities/handler.go index 904a64c8896..90bc2065edd 100644 --- a/core/services/gateway/handlers/capabilities/handler.go +++ b/core/services/gateway/handlers/capabilities/handler.go @@ -20,9 +20,10 @@ import ( const ( // NOTE: more methods will go here. HTTP trigger/action/target; etc. - MethodWebAPITarget = "web_api_target" - MethodWebAPITrigger = "web_api_trigger" - MethodComputeAction = "compute_action" + MethodWebAPITarget = "web_api_target" + MethodWebAPITrigger = "web_api_trigger" + MethodComputeAction = "compute_action" + MethodWorkflowSyncer = "workflow_syncer" ) type handler struct { diff --git a/core/services/workflows/syncer/fetcher.go b/core/services/workflows/syncer/fetcher.go new file mode 100644 index 00000000000..c1ddd3530da --- /dev/null +++ b/core/services/workflows/syncer/fetcher.go @@ -0,0 +1,59 @@ +package syncer + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "strings" + + "github.com/smartcontractkit/chainlink/v2/core/capabilities/validation" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/webapi" + "github.com/smartcontractkit/chainlink/v2/core/logger" + ghcapabilities "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/capabilities" +) + +func NewFetcherFunc( + ctx context.Context, + lggr logger.Logger, + och *webapi.OutgoingConnectorHandler, + workflowID, workflowExecutionID string, + idGenerator func() string) FetcherFunc { + return func(ctx context.Context, url string) ([]byte, error) { + if err := validation.ValidateWorkflowOrExecutionID(workflowID); err != nil { + return nil, fmt.Errorf("workflow ID %q is invalid: %w", workflowID, err) + } + if err := validation.ValidateWorkflowOrExecutionID(workflowExecutionID); err != nil { + return nil, fmt.Errorf("workflow execution ID %q is invalid: %w", workflowExecutionID, err) + } + + messageID := strings.Join([]string{ + workflowID, + workflowExecutionID, + ghcapabilities.MethodWorkflowSyncer, + idGenerator(), + }, "/") + + payloadBytes, err := json.Marshal(ghcapabilities.Request{ + URL: url, + Method: http.MethodGet, + }) + if err != nil { + return nil, fmt.Errorf("failed to marshal fetch request: %w", err) + } + + resp, err := och.HandleSingleNodeRequest(ctx, messageID, payloadBytes) + if err != nil { + return nil, err + } + + lggr.Debugw("received gateway response", "resp", resp) + var payload ghcapabilities.Response + err = json.Unmarshal(resp.Body.Payload, &payload) + if err != nil { + return nil, err + } + + return payload.Body, nil + } +} diff --git a/core/services/workflows/syncer/fetcher_test.go b/core/services/workflows/syncer/fetcher_test.go new file mode 100644 index 00000000000..55997a96fb7 --- /dev/null +++ b/core/services/workflows/syncer/fetcher_test.go @@ -0,0 +1,102 @@ +package syncer + +import ( + "context" + "encoding/json" + "strings" + "testing" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink/v2/core/capabilities/webapi" + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/gateway/api" + gcmocks "github.com/smartcontractkit/chainlink/v2/core/services/gateway/connector/mocks" + ghcapabilities "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/capabilities" + "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/common" +) + +func TestNewFetcherFunc(t *testing.T) { + ctx := context.Background() + lggr := logger.TestLogger(t) + + config := webapi.ServiceConfig{ + RateLimiter: common.RateLimiterConfig{ + GlobalRPS: 100.0, + GlobalBurst: 100, + PerSenderRPS: 100.0, + PerSenderBurst: 100, + }, + } + + connector := gcmocks.NewGatewayConnector(t) + och, err := webapi.NewOutgoingConnectorHandler(connector, config, ghcapabilities.MethodComputeAction, lggr) + require.NoError(t, err) + + workflowID := "15c631d295ef5e32deb99a10ee6804bc4af13855687559d7ff6552ac6dbb2ce0" + workflowExecutionID := "25c631d295ef5e32deb99a10ee6804bc4af13855687559d7ff6552ac6dbb2ce1" + idGenerator := func() string { return "uniqueID" } + url := "http://example.com" + + msgID := strings.Join([]string{ + workflowID, + workflowExecutionID, + ghcapabilities.MethodWorkflowSyncer, + idGenerator(), + }, "/") + + t.Run("OK-valid_request", func(t *testing.T) { + gatewayResp := gatewayResponse(t, msgID) + connector.EXPECT().SignAndSendToGateway(mock.Anything, "gateway1", mock.Anything).Run(func(ctx context.Context, gatewayID string, msg *api.MessageBody) { + och.HandleGatewayMessage(ctx, "gateway1", gatewayResp) + }).Return(nil).Times(1) + connector.EXPECT().DonID().Return("don-id") + connector.EXPECT().GatewayIDs().Return([]string{"gateway1", "gateway2"}) + + fetcher := NewFetcherFunc(ctx, lggr, och, workflowID, workflowExecutionID, idGenerator) + + payload, err := fetcher(ctx, url) + require.NoError(t, err) + + expectedPayload := []byte("response body") + require.Equal(t, expectedPayload, payload) + }) + + t.Run("NOK-invalid_workflow_id", func(t *testing.T) { + invalidWorkflowID := "invalidWorkflowID" + fetcher := NewFetcherFunc(ctx, lggr, och, invalidWorkflowID, workflowExecutionID, idGenerator) + + _, err := fetcher(ctx, url) + require.Error(t, err) + require.Contains(t, err.Error(), "workflow ID") + }) + + t.Run("NOK-invalid_workflow_execution_id", func(t *testing.T) { + invalidWorkflowExecutionID := "invalidWorkflowExecutionID" + fetcher := NewFetcherFunc(ctx, lggr, och, workflowID, invalidWorkflowExecutionID, idGenerator) + + _, err := fetcher(ctx, url) + require.Error(t, err) + require.Contains(t, err.Error(), "workflow execution ID") + }) +} + +func gatewayResponse(t *testing.T, msgID string) *api.Message { + headers := map[string]string{"Content-Type": "application/json"} + body := []byte("response body") + responsePayload, err := json.Marshal(ghcapabilities.Response{ + StatusCode: 200, + Headers: headers, + Body: body, + ExecutionError: false, + }) + require.NoError(t, err) + return &api.Message{ + Body: api.MessageBody{ + MessageId: msgID, + Method: ghcapabilities.MethodWebAPITarget, + Payload: responsePayload, + }, + } +} From 2a26934025b25159789a57b687eebc46f93f5daa Mon Sep 17 00:00:00 2001 From: Gabriel Paradiso Date: Tue, 26 Nov 2024 15:24:39 -0300 Subject: [PATCH 2/2] fix: generate ID based on the secretsURL --- core/services/workflows/syncer/fetcher.go | 20 ++----------- .../services/workflows/syncer/fetcher_test.go | 30 ++----------------- 2 files changed, 4 insertions(+), 46 deletions(-) diff --git a/core/services/workflows/syncer/fetcher.go b/core/services/workflows/syncer/fetcher.go index c1ddd3530da..ed815a240ba 100644 --- a/core/services/workflows/syncer/fetcher.go +++ b/core/services/workflows/syncer/fetcher.go @@ -7,7 +7,6 @@ import ( "net/http" "strings" - "github.com/smartcontractkit/chainlink/v2/core/capabilities/validation" "github.com/smartcontractkit/chainlink/v2/core/capabilities/webapi" "github.com/smartcontractkit/chainlink/v2/core/logger" ghcapabilities "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/capabilities" @@ -16,24 +15,8 @@ import ( func NewFetcherFunc( ctx context.Context, lggr logger.Logger, - och *webapi.OutgoingConnectorHandler, - workflowID, workflowExecutionID string, - idGenerator func() string) FetcherFunc { + och *webapi.OutgoingConnectorHandler) FetcherFunc { return func(ctx context.Context, url string) ([]byte, error) { - if err := validation.ValidateWorkflowOrExecutionID(workflowID); err != nil { - return nil, fmt.Errorf("workflow ID %q is invalid: %w", workflowID, err) - } - if err := validation.ValidateWorkflowOrExecutionID(workflowExecutionID); err != nil { - return nil, fmt.Errorf("workflow execution ID %q is invalid: %w", workflowExecutionID, err) - } - - messageID := strings.Join([]string{ - workflowID, - workflowExecutionID, - ghcapabilities.MethodWorkflowSyncer, - idGenerator(), - }, "/") - payloadBytes, err := json.Marshal(ghcapabilities.Request{ URL: url, Method: http.MethodGet, @@ -42,6 +25,7 @@ func NewFetcherFunc( return nil, fmt.Errorf("failed to marshal fetch request: %w", err) } + messageID := strings.Join([]string{ghcapabilities.MethodWorkflowSyncer, url}, "/") resp, err := och.HandleSingleNodeRequest(ctx, messageID, payloadBytes) if err != nil { return nil, err diff --git a/core/services/workflows/syncer/fetcher_test.go b/core/services/workflows/syncer/fetcher_test.go index 55997a96fb7..846a9186b5a 100644 --- a/core/services/workflows/syncer/fetcher_test.go +++ b/core/services/workflows/syncer/fetcher_test.go @@ -34,17 +34,9 @@ func TestNewFetcherFunc(t *testing.T) { och, err := webapi.NewOutgoingConnectorHandler(connector, config, ghcapabilities.MethodComputeAction, lggr) require.NoError(t, err) - workflowID := "15c631d295ef5e32deb99a10ee6804bc4af13855687559d7ff6552ac6dbb2ce0" - workflowExecutionID := "25c631d295ef5e32deb99a10ee6804bc4af13855687559d7ff6552ac6dbb2ce1" - idGenerator := func() string { return "uniqueID" } url := "http://example.com" - msgID := strings.Join([]string{ - workflowID, - workflowExecutionID, - ghcapabilities.MethodWorkflowSyncer, - idGenerator(), - }, "/") + msgID := strings.Join([]string{ghcapabilities.MethodWorkflowSyncer, url}, "/") t.Run("OK-valid_request", func(t *testing.T) { gatewayResp := gatewayResponse(t, msgID) @@ -54,7 +46,7 @@ func TestNewFetcherFunc(t *testing.T) { connector.EXPECT().DonID().Return("don-id") connector.EXPECT().GatewayIDs().Return([]string{"gateway1", "gateway2"}) - fetcher := NewFetcherFunc(ctx, lggr, och, workflowID, workflowExecutionID, idGenerator) + fetcher := NewFetcherFunc(ctx, lggr, och) payload, err := fetcher(ctx, url) require.NoError(t, err) @@ -62,24 +54,6 @@ func TestNewFetcherFunc(t *testing.T) { expectedPayload := []byte("response body") require.Equal(t, expectedPayload, payload) }) - - t.Run("NOK-invalid_workflow_id", func(t *testing.T) { - invalidWorkflowID := "invalidWorkflowID" - fetcher := NewFetcherFunc(ctx, lggr, och, invalidWorkflowID, workflowExecutionID, idGenerator) - - _, err := fetcher(ctx, url) - require.Error(t, err) - require.Contains(t, err.Error(), "workflow ID") - }) - - t.Run("NOK-invalid_workflow_execution_id", func(t *testing.T) { - invalidWorkflowExecutionID := "invalidWorkflowExecutionID" - fetcher := NewFetcherFunc(ctx, lggr, och, workflowID, invalidWorkflowExecutionID, idGenerator) - - _, err := fetcher(ctx, url) - require.Error(t, err) - require.Contains(t, err.Error(), "workflow execution ID") - }) } func gatewayResponse(t *testing.T, msgID string) *api.Message {