From 84208295f0945c5a1188387c20ea4a1771459216 Mon Sep 17 00:00:00 2001 From: Street <5597260+MStreet3@users.noreply.github.com> Date: Mon, 9 Dec 2024 21:31:34 +0200 Subject: [PATCH] fix(workflows/syncer): skips fetching data for missing urls (#15519) --- core/services/workflows/syncer/handler.go | 18 +- .../services/workflows/syncer/handler_test.go | 235 ++++++++++++------ 2 files changed, 168 insertions(+), 85 deletions(-) diff --git a/core/services/workflows/syncer/handler.go b/core/services/workflows/syncer/handler.go index 46dcd21ed90..077dbc0cedb 100644 --- a/core/services/workflows/syncer/handler.go +++ b/core/services/workflows/syncer/handler.go @@ -383,14 +383,20 @@ func (h *eventHandler) workflowRegisteredEvent( return fmt.Errorf("failed to fetch binary from %s : %w", payload.BinaryURL, err) } - config, err := h.fetcher(ctx, payload.ConfigURL) - if err != nil { - return fmt.Errorf("failed to fetch config from %s : %w", payload.ConfigURL, err) + var config []byte + if payload.ConfigURL != "" { + config, err = h.fetcher(ctx, payload.ConfigURL) + if err != nil { + return fmt.Errorf("failed to fetch config from %s : %w", payload.ConfigURL, err) + } } - secrets, err := h.fetcher(ctx, payload.SecretsURL) - if err != nil { - return fmt.Errorf("failed to fetch secrets from %s : %w", payload.SecretsURL, err) + var secrets []byte + if payload.SecretsURL != "" { + secrets, err = h.fetcher(ctx, payload.SecretsURL) + if err != nil { + return fmt.Errorf("failed to fetch secrets from %s : %w", payload.SecretsURL, err) + } } // Calculate the hash of the binary and config files diff --git a/core/services/workflows/syncer/handler_test.go b/core/services/workflows/syncer/handler_test.go index bb0a61aea4d..7d11d347a02 100644 --- a/core/services/workflows/syncer/handler_test.go +++ b/core/services/workflows/syncer/handler_test.go @@ -173,59 +173,161 @@ const ( ) func Test_workflowRegisteredHandler(t *testing.T) { - t.Run("success with paused workflow registered", func(t *testing.T) { - var ( - ctx = testutils.Context(t) - lggr = logger.TestLogger(t) - db = pgtest.NewSqlxDB(t) - orm = NewWorkflowRegistryDS(db, lggr) - emitter = custmsg.NewLabeler() + var binaryURL = "http://example.com/binary" + var secretsURL = "http://example.com/secrets" + var configURL = "http://example.com/config" + var config = []byte("") + var wfOwner = []byte("0xOwner") + var binary = wasmtest.CreateTestBinary(binaryCmd, binaryLocation, true, t) + defaultValidationFn := func(t *testing.T, ctx context.Context, h *eventHandler, wfOwner []byte, wfName string, wfID string) { + // Verify the record is updated in the database + dbSpec, err := h.orm.GetWorkflowSpec(ctx, hex.EncodeToString(wfOwner), "workflow-name") + require.NoError(t, err) + require.Equal(t, hex.EncodeToString(wfOwner), dbSpec.WorkflowOwner) + require.Equal(t, "workflow-name", dbSpec.WorkflowName) + require.Equal(t, job.WorkflowSpecStatusActive, dbSpec.Status) - binary = wasmtest.CreateTestBinary(binaryCmd, binaryLocation, true, t) - config = []byte("") - secretsURL = "http://example.com" - binaryURL = "http://example.com/binary" - configURL = "http://example.com/config" - wfOwner = []byte("0xOwner") + // Verify the engine is started + engine, err := h.engineRegistry.Get(wfID) + require.NoError(t, err) + err = engine.Ready() + require.NoError(t, err) + } - fetcher = newMockFetcher(map[string]mockFetchResp{ + var tt = []testCase{ + { + Name: "success with active workflow registered", + fetcher: newMockFetcher(map[string]mockFetchResp{ binaryURL: {Body: binary, Err: nil}, configURL: {Body: config, Err: nil}, secretsURL: {Body: []byte("secrets"), Err: nil}, - }) - ) - - giveWFID, err := pkgworkflows.GenerateWorkflowID(wfOwner, binary, config, secretsURL) - require.NoError(t, err) - - paused := WorkflowRegistryWorkflowRegisteredV1{ - Status: uint8(1), - WorkflowID: giveWFID, - Owner: wfOwner, - WorkflowName: "workflow-name", - BinaryURL: binaryURL, - ConfigURL: configURL, - SecretsURL: secretsURL, - } + }), + GiveConfig: config, + ConfigURL: configURL, + SecretsURL: secretsURL, + BinaryURL: binaryURL, + GiveBinary: binary, + WFOwner: wfOwner, + Event: func(wfID []byte) WorkflowRegistryWorkflowRegisteredV1 { + return WorkflowRegistryWorkflowRegisteredV1{ + Status: uint8(0), + WorkflowID: [32]byte(wfID), + Owner: wfOwner, + WorkflowName: "workflow-name", + BinaryURL: binaryURL, + ConfigURL: configURL, + SecretsURL: secretsURL, + } + }, + validationFn: defaultValidationFn, + }, + { + Name: "success with paused workflow registered", + fetcher: newMockFetcher(map[string]mockFetchResp{ + binaryURL: {Body: binary, Err: nil}, + configURL: {Body: config, Err: nil}, + secretsURL: {Body: []byte("secrets"), Err: nil}, + }), + GiveConfig: config, + ConfigURL: configURL, + SecretsURL: secretsURL, + BinaryURL: binaryURL, + GiveBinary: binary, + WFOwner: wfOwner, + Event: func(wfID []byte) WorkflowRegistryWorkflowRegisteredV1 { + return WorkflowRegistryWorkflowRegisteredV1{ + Status: uint8(1), + WorkflowID: [32]byte(wfID), + Owner: wfOwner, + WorkflowName: "workflow-name", + BinaryURL: binaryURL, + ConfigURL: configURL, + SecretsURL: secretsURL, + } + }, + validationFn: func(t *testing.T, ctx context.Context, h *eventHandler, wfOwner []byte, wfName string, wfID string) { + // Verify the record is updated in the database + dbSpec, err := h.orm.GetWorkflowSpec(ctx, hex.EncodeToString(wfOwner), "workflow-name") + require.NoError(t, err) + require.Equal(t, hex.EncodeToString(wfOwner), dbSpec.WorkflowOwner) + require.Equal(t, "workflow-name", dbSpec.WorkflowName) + require.Equal(t, job.WorkflowSpecStatusPaused, dbSpec.Status) + + // Verify there is no running engine + _, err = h.engineRegistry.Get(wfID) + require.Error(t, err) + }, + }, + { + Name: "skips fetch if config url is missing", + GiveConfig: make([]byte, 0), + ConfigURL: "", + SecretsURL: secretsURL, + BinaryURL: binaryURL, + GiveBinary: binary, + WFOwner: wfOwner, + fetcher: newMockFetcher(map[string]mockFetchResp{ + binaryURL: {Body: binary, Err: nil}, + secretsURL: {Body: []byte("secrets"), Err: nil}, + }), + validationFn: defaultValidationFn, + Event: func(wfID []byte) WorkflowRegistryWorkflowRegisteredV1 { + return WorkflowRegistryWorkflowRegisteredV1{ + Status: uint8(0), + WorkflowID: [32]byte(wfID), + Owner: wfOwner, + WorkflowName: "workflow-name", + BinaryURL: binaryURL, + SecretsURL: secretsURL, + } + }, + }, + { + Name: "skips fetch if secrets url is missing", + GiveConfig: config, + ConfigURL: configURL, + BinaryURL: binaryURL, + GiveBinary: binary, + WFOwner: wfOwner, + fetcher: newMockFetcher(map[string]mockFetchResp{ + binaryURL: {Body: binary, Err: nil}, + configURL: {Body: config, Err: nil}, + }), + validationFn: defaultValidationFn, + Event: func(wfID []byte) WorkflowRegistryWorkflowRegisteredV1 { + return WorkflowRegistryWorkflowRegisteredV1{ + Status: uint8(0), + WorkflowID: [32]byte(wfID), + Owner: wfOwner, + WorkflowName: "workflow-name", + BinaryURL: binaryURL, + ConfigURL: configURL, + } + }, + }, + } - h := &eventHandler{ - lggr: lggr, - orm: orm, - fetcher: fetcher, - emitter: emitter, - } - err = h.workflowRegisteredEvent(ctx, paused) - require.NoError(t, err) + for _, tc := range tt { + testRunningWorkflow(t, tc) + } +} - // Verify the record is updated in the database - dbSpec, err := orm.GetWorkflowSpec(ctx, hex.EncodeToString(wfOwner), "workflow-name") - require.NoError(t, err) - require.Equal(t, hex.EncodeToString(wfOwner), dbSpec.WorkflowOwner) - require.Equal(t, "workflow-name", dbSpec.WorkflowName) - require.Equal(t, job.WorkflowSpecStatusPaused, dbSpec.Status) - }) +type testCase struct { + Name string + SecretsURL string + BinaryURL string + GiveBinary []byte + GiveConfig []byte + ConfigURL string + WFOwner []byte + fetcher FetcherFunc + Event func([]byte) WorkflowRegistryWorkflowRegisteredV1 + validationFn func(t *testing.T, ctx context.Context, h *eventHandler, wfOwner []byte, wfName string, wfID string) +} - t.Run("success with active workflow registered", func(t *testing.T) { +func testRunningWorkflow(t *testing.T, cmd testCase) { + t.Helper() + t.Run(cmd.Name, func(t *testing.T) { var ( ctx = testutils.Context(t) lggr = logger.TestLogger(t) @@ -233,34 +335,20 @@ func Test_workflowRegisteredHandler(t *testing.T) { orm = NewWorkflowRegistryDS(db, lggr) emitter = custmsg.NewLabeler() - binary = wasmtest.CreateTestBinary(binaryCmd, binaryLocation, true, t) - config = []byte("") - secretsURL = "http://example.com" - binaryURL = "http://example.com/binary" - configURL = "http://example.com/config" - wfOwner = []byte("0xOwner") + binary = cmd.GiveBinary + config = cmd.GiveConfig + secretsURL = cmd.SecretsURL + wfOwner = cmd.WFOwner - fetcher = newMockFetcher(map[string]mockFetchResp{ - binaryURL: {Body: binary, Err: nil}, - configURL: {Body: config, Err: nil}, - secretsURL: {Body: []byte("secrets"), Err: nil}, - }) + fetcher = cmd.fetcher ) giveWFID, err := pkgworkflows.GenerateWorkflowID(wfOwner, binary, config, secretsURL) require.NoError(t, err) - require.NoError(t, err) + wfID := hex.EncodeToString(giveWFID[:]) - active := WorkflowRegistryWorkflowRegisteredV1{ - Status: uint8(0), - WorkflowID: giveWFID, - Owner: wfOwner, - WorkflowName: "workflow-name", - BinaryURL: binaryURL, - ConfigURL: configURL, - SecretsURL: secretsURL, - } + event := cmd.Event(giveWFID[:]) er := newEngineRegistry() store := wfstore.NewDBStore(db, lggr, clockwork.NewFakeClock()) @@ -275,21 +363,10 @@ func Test_workflowRegisteredHandler(t *testing.T) { capRegistry: registry, workflowStore: store, } - err = h.workflowRegisteredEvent(ctx, active) - require.NoError(t, err) - - // Verify the record is updated in the database - dbSpec, err := orm.GetWorkflowSpec(ctx, hex.EncodeToString(wfOwner), "workflow-name") + err = h.workflowRegisteredEvent(ctx, event) require.NoError(t, err) - require.Equal(t, hex.EncodeToString(wfOwner), dbSpec.WorkflowOwner) - require.Equal(t, "workflow-name", dbSpec.WorkflowName) - require.Equal(t, job.WorkflowSpecStatusActive, dbSpec.Status) - // Verify the engine is started - engine, err := h.engineRegistry.Get(hex.EncodeToString(giveWFID[:])) - require.NoError(t, err) - err = engine.Ready() - require.NoError(t, err) + cmd.validationFn(t, ctx, h, wfOwner, "workflow-name", wfID) }) }