From bc7724346d4b4fc58b6eba48c45835798e306dcf Mon Sep 17 00:00:00 2001 From: Gabriel Paradiso Date: Fri, 13 Dec 2024 16:15:56 +0100 Subject: [PATCH] [CAPPL-324] rehydrate artifacts from db if they haven't changed (#15668) * fix: rehydrate artifacts from db if they haven't changed * feat: implement GetWorkflowSpecByID --- core/services/workflows/syncer/handler.go | 56 +++++++++++++------ core/services/workflows/syncer/mocks/orm.go | 59 +++++++++++++++++++++ core/services/workflows/syncer/orm.go | 19 +++++++ core/services/workflows/syncer/orm_test.go | 39 ++++++++++++++ 4 files changed, 157 insertions(+), 16 deletions(-) diff --git a/core/services/workflows/syncer/handler.go b/core/services/workflows/syncer/handler.go index 4ef7f952249..534dfd57e7b 100644 --- a/core/services/workflows/syncer/handler.go +++ b/core/services/workflows/syncer/handler.go @@ -400,25 +400,13 @@ func (h *eventHandler) workflowRegisteredEvent( ctx context.Context, payload WorkflowRegistryWorkflowRegisteredV1, ) error { - // Download the contents of binaryURL, configURL and secretsURL and cache them locally. - binary, err := h.fetcher(ctx, payload.BinaryURL) + // Fetch the workflow artifacts from the database or download them from the specified URLs + decodedBinary, config, err := h.getWorkflowArtifacts(ctx, payload) if err != nil { - return fmt.Errorf("failed to fetch binary from %s : %w", payload.BinaryURL, err) - } - - decodedBinary, err := base64.StdEncoding.DecodeString(string(binary)) - if err != nil { - return fmt.Errorf("failed to decode binary: %w", err) - } - - var config []byte - if payload.ConfigURL != "" { - config, err = h.fetcher(ctx, payload.ConfigURL) - if err != nil { - return fmt.Errorf("failed to fetch config from %s : %w", payload.ConfigURL, err) - } + return err } + // Always fetch secrets from the SecretsURL var secrets []byte if payload.SecretsURL != "" { secrets, err = h.fetcher(ctx, payload.SecretsURL) @@ -499,6 +487,42 @@ func (h *eventHandler) workflowRegisteredEvent( return nil } +// getWorkflowArtifacts retrieves the workflow artifacts from the database if they exist, +// or downloads them from the specified URLs if they are not found in the database. +func (h *eventHandler) getWorkflowArtifacts( + ctx context.Context, + payload WorkflowRegistryWorkflowRegisteredV1, +) ([]byte, []byte, error) { + spec, err := h.orm.GetWorkflowSpecByID(ctx, hex.EncodeToString(payload.WorkflowID[:])) + if err != nil { + binary, err2 := h.fetcher(ctx, payload.BinaryURL) + if err2 != nil { + return nil, nil, fmt.Errorf("failed to fetch binary from %s : %w", payload.BinaryURL, err) + } + + decodedBinary, err2 := base64.StdEncoding.DecodeString(string(binary)) + if err2 != nil { + return nil, nil, fmt.Errorf("failed to decode binary: %w", err) + } + + var config []byte + if payload.ConfigURL != "" { + config, err2 = h.fetcher(ctx, payload.ConfigURL) + if err2 != nil { + return nil, nil, fmt.Errorf("failed to fetch config from %s : %w", payload.ConfigURL, err) + } + } + return decodedBinary, config, nil + } + + // there is no update in the BinaryURL or ConfigURL, lets decode the stored artifacts + decodedBinary, err := hex.DecodeString(spec.Workflow) + if err != nil { + return nil, nil, fmt.Errorf("failed to decode stored workflow spec: %w", err) + } + return decodedBinary, []byte(spec.Config), nil +} + func (h *eventHandler) engineFactoryFn(ctx context.Context, id string, owner string, name string, config []byte, binary []byte) (services.Service, error) { moduleConfig := &host.ModuleConfig{Logger: h.lggr, Labeler: h.emitter} sdkSpec, err := host.GetWorkflowSpec(ctx, moduleConfig, binary, config) diff --git a/core/services/workflows/syncer/mocks/orm.go b/core/services/workflows/syncer/mocks/orm.go index da96f422361..09a543d65e3 100644 --- a/core/services/workflows/syncer/mocks/orm.go +++ b/core/services/workflows/syncer/mocks/orm.go @@ -540,6 +540,65 @@ func (_c *ORM_GetWorkflowSpec_Call) RunAndReturn(run func(context.Context, strin return _c } +// GetWorkflowSpecByID provides a mock function with given fields: ctx, id +func (_m *ORM) GetWorkflowSpecByID(ctx context.Context, id string) (*job.WorkflowSpec, error) { + ret := _m.Called(ctx, id) + + if len(ret) == 0 { + panic("no return value specified for GetWorkflowSpecByID") + } + + var r0 *job.WorkflowSpec + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string) (*job.WorkflowSpec, error)); ok { + return rf(ctx, id) + } + if rf, ok := ret.Get(0).(func(context.Context, string) *job.WorkflowSpec); ok { + r0 = rf(ctx, id) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*job.WorkflowSpec) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { + r1 = rf(ctx, id) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// ORM_GetWorkflowSpecByID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetWorkflowSpecByID' +type ORM_GetWorkflowSpecByID_Call struct { + *mock.Call +} + +// GetWorkflowSpecByID is a helper method to define mock.On call +// - ctx context.Context +// - id string +func (_e *ORM_Expecter) GetWorkflowSpecByID(ctx interface{}, id interface{}) *ORM_GetWorkflowSpecByID_Call { + return &ORM_GetWorkflowSpecByID_Call{Call: _e.mock.On("GetWorkflowSpecByID", ctx, id)} +} + +func (_c *ORM_GetWorkflowSpecByID_Call) Run(run func(ctx context.Context, id string)) *ORM_GetWorkflowSpecByID_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string)) + }) + return _c +} + +func (_c *ORM_GetWorkflowSpecByID_Call) Return(_a0 *job.WorkflowSpec, _a1 error) *ORM_GetWorkflowSpecByID_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *ORM_GetWorkflowSpecByID_Call) RunAndReturn(run func(context.Context, string) (*job.WorkflowSpec, error)) *ORM_GetWorkflowSpecByID_Call { + _c.Call.Return(run) + return _c +} + // Update provides a mock function with given fields: ctx, secretsURL, contents func (_m *ORM) Update(ctx context.Context, secretsURL string, contents string) (int64, error) { ret := _m.Called(ctx, secretsURL, contents) diff --git a/core/services/workflows/syncer/orm.go b/core/services/workflows/syncer/orm.go index bd0501795e6..6289a7ff0b8 100644 --- a/core/services/workflows/syncer/orm.go +++ b/core/services/workflows/syncer/orm.go @@ -52,6 +52,9 @@ type WorkflowSpecsDS interface { // DeleteWorkflowSpec deletes the workflow spec for the given owner and name. DeleteWorkflowSpec(ctx context.Context, owner, name string) error + + // GetWorkflowSpecByID returns the workflow spec for the given workflowID. + GetWorkflowSpecByID(ctx context.Context, id string) (*job.WorkflowSpec, error) } type ORM interface { @@ -370,6 +373,22 @@ func (orm *orm) GetWorkflowSpec(ctx context.Context, owner, name string) (*job.W return &spec, nil } +func (orm *orm) GetWorkflowSpecByID(ctx context.Context, id string) (*job.WorkflowSpec, error) { + query := ` + SELECT * + FROM workflow_specs + WHERE workflow_id = $1 + ` + + var spec job.WorkflowSpec + err := orm.ds.GetContext(ctx, &spec, query, id) + if err != nil { + return nil, err + } + + return &spec, nil +} + func (orm *orm) DeleteWorkflowSpec(ctx context.Context, owner, name string) error { query := ` DELETE FROM workflow_specs diff --git a/core/services/workflows/syncer/orm_test.go b/core/services/workflows/syncer/orm_test.go index a94233e78a1..2e8ffac00df 100644 --- a/core/services/workflows/syncer/orm_test.go +++ b/core/services/workflows/syncer/orm_test.go @@ -197,6 +197,45 @@ func Test_GetWorkflowSpec(t *testing.T) { }) } +func Test_GetWorkflowSpecByID(t *testing.T) { + db := pgtest.NewSqlxDB(t) + ctx := testutils.Context(t) + lggr := logger.TestLogger(t) + orm := &orm{ds: db, lggr: lggr} + + t.Run("gets a workflow spec by ID", func(t *testing.T) { + spec := &job.WorkflowSpec{ + Workflow: "test_workflow", + Config: "test_config", + WorkflowID: "cid-123", + WorkflowOwner: "owner-123", + WorkflowName: "Test Workflow", + Status: job.WorkflowSpecStatusActive, + BinaryURL: "http://example.com/binary", + ConfigURL: "http://example.com/config", + CreatedAt: time.Now(), + SpecType: job.WASMFile, + } + + id, err := orm.UpsertWorkflowSpec(ctx, spec) + require.NoError(t, err) + require.NotZero(t, id) + + dbSpec, err := orm.GetWorkflowSpecByID(ctx, spec.WorkflowID) + require.NoError(t, err) + require.Equal(t, spec.Workflow, dbSpec.Workflow) + + err = orm.DeleteWorkflowSpec(ctx, spec.WorkflowOwner, spec.WorkflowName) + require.NoError(t, err) + }) + + t.Run("fails if no workflow spec exists", func(t *testing.T) { + dbSpec, err := orm.GetWorkflowSpecByID(ctx, "inexistent-workflow-id") + require.Error(t, err) + require.Nil(t, dbSpec) + }) +} + func Test_GetContentsByWorkflowID(t *testing.T) { db := pgtest.NewSqlxDB(t) ctx := testutils.Context(t)