diff --git a/core/services/workflows/syncer/engine_registry.go b/core/services/workflows/syncer/engine_registry.go index 6dd54794e8b..809381c191c 100644 --- a/core/services/workflows/syncer/engine_registry.go +++ b/core/services/workflows/syncer/engine_registry.go @@ -36,6 +36,18 @@ func (r *engineRegistry) Get(id string) (*workflows.Engine, error) { return engine, nil } +// IsRunning is true if the engine exists and is ready. +func (r *engineRegistry) IsRunning(id string) bool { + r.mu.RLock() + defer r.mu.RUnlock() + engine, found := r.engines[id] + if !found { + return false + } + + return engine.Ready() == nil +} + // Pop removes an engine from the registry and returns the engine if found. func (r *engineRegistry) Pop(id string) (*workflows.Engine, error) { r.mu.Lock() diff --git a/core/services/workflows/syncer/handler.go b/core/services/workflows/syncer/handler.go index 5ccb3f5e180..7004c740c97 100644 --- a/core/services/workflows/syncer/handler.go +++ b/core/services/workflows/syncer/handler.go @@ -180,11 +180,82 @@ func (h *eventHandler) Handle(ctx context.Context, event WorkflowRegistryEvent) h.lggr.Debugf("workflow 0x%x registered and started", wfID) return nil case WorkflowUpdatedEvent: - return h.workflowUpdatedEvent(ctx, event) + payload, ok := event.Data.(WorkflowRegistryWorkflowUpdatedV1) + if !ok { + return fmt.Errorf("invalid data type %T for event", event.Data) + } + + newWorkflowID := hex.EncodeToString(payload.NewWorkflowID[:]) + cma := h.emitter.With( + platform.KeyWorkflowID, newWorkflowID, + platform.KeyWorkflowName, payload.WorkflowName, + platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner), + ) + + if err := h.workflowUpdatedEvent(ctx, payload); err != nil { + logCustMsg(ctx, cma, fmt.Sprintf("failed to handle workflow updated event: %v", err), h.lggr) + return err + } + + return nil case WorkflowPausedEvent: - return h.workflowPausedEvent(ctx, event) + payload, ok := event.Data.(WorkflowRegistryWorkflowPausedV1) + if !ok { + return fmt.Errorf("invalid data type %T for event", event.Data) + } + + wfID := hex.EncodeToString(payload.WorkflowID[:]) + + cma := h.emitter.With( + platform.KeyWorkflowID, wfID, + platform.KeyWorkflowName, payload.WorkflowName, + platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner), + ) + + if err := h.workflowPausedEvent(ctx, payload); err != nil { + logCustMsg(ctx, cma, fmt.Sprintf("failed to handle workflow paused event: %v", err), h.lggr) + return err + } + return nil case WorkflowActivatedEvent: - return h.workflowActivatedEvent(ctx, event) + payload, ok := event.Data.(WorkflowRegistryWorkflowActivatedV1) + if !ok { + return fmt.Errorf("invalid data type %T for event", event.Data) + } + + wfID := hex.EncodeToString(payload.WorkflowID[:]) + + cma := h.emitter.With( + platform.KeyWorkflowID, wfID, + platform.KeyWorkflowName, payload.WorkflowName, + platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner), + ) + if err := h.workflowActivatedEvent(ctx, payload); err != nil { + logCustMsg(ctx, cma, fmt.Sprintf("failed to handle workflow activated event: %v", err), h.lggr) + return err + } + + return nil + case WorkflowDeletedEvent: + payload, ok := event.Data.(WorkflowRegistryWorkflowDeletedV1) + if !ok { + return fmt.Errorf("invalid data type %T for event", event.Data) + } + + wfID := hex.EncodeToString(payload.WorkflowID[:]) + + cma := h.emitter.With( + platform.KeyWorkflowID, wfID, + platform.KeyWorkflowName, payload.WorkflowName, + platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner), + ) + + if err := h.workflowDeletedEvent(ctx, payload); err != nil { + logCustMsg(ctx, cma, fmt.Sprintf("failed to handle workflow deleted event: %v", err), h.lggr) + return err + } + + return nil default: return fmt.Errorf("event type unsupported: %v", event.EventType) } @@ -284,28 +355,107 @@ func (h *eventHandler) workflowRegisteredEvent( return nil } -// workflowUpdatedEvent handles the WorkflowUpdatedEvent event type. +// workflowUpdatedEvent handles the WorkflowUpdatedEvent event type by first finding the +// current workflow engine, stopping it, and then starting a new workflow engine with the +// updated workflow spec. func (h *eventHandler) workflowUpdatedEvent( - _ context.Context, - _ WorkflowRegistryEvent, + ctx context.Context, + payload WorkflowRegistryWorkflowUpdatedV1, ) error { - return ErrNotImplemented + // Remove the old workflow engine from the local registry if it exists + if err := h.tryEngineCleanup(hex.EncodeToString(payload.OldWorkflowID[:])); err != nil { + return err + } + + registeredEvent := WorkflowRegistryWorkflowRegisteredV1{ + WorkflowID: payload.NewWorkflowID, + WorkflowOwner: payload.WorkflowOwner, + DonID: payload.DonID, + Status: 0, + WorkflowName: payload.WorkflowName, + BinaryURL: payload.BinaryURL, + ConfigURL: payload.ConfigURL, + SecretsURL: payload.SecretsURL, + } + + return h.workflowRegisteredEvent(ctx, registeredEvent) } // workflowPausedEvent handles the WorkflowPausedEvent event type. func (h *eventHandler) workflowPausedEvent( - _ context.Context, - _ WorkflowRegistryEvent, + ctx context.Context, + payload WorkflowRegistryWorkflowPausedV1, ) error { - return ErrNotImplemented + // Remove the workflow engine from the local registry if it exists + if err := h.tryEngineCleanup(hex.EncodeToString(payload.WorkflowID[:])); err != nil { + return err + } + + // get existing workflow spec from DB + spec, err := h.orm.GetWorkflowSpec(ctx, hex.EncodeToString(payload.WorkflowOwner), payload.WorkflowName) + if err != nil { + return fmt.Errorf("failed to get workflow spec: %w", err) + } + + // update the status of the workflow spec + spec.Status = job.WorkflowSpecStatusPaused + if _, err := h.orm.UpsertWorkflowSpec(ctx, spec); err != nil { + return fmt.Errorf("failed to update workflow spec: %w", err) + } + + return nil } // workflowActivatedEvent handles the WorkflowActivatedEvent event type. func (h *eventHandler) workflowActivatedEvent( - _ context.Context, - _ WorkflowRegistryEvent, + ctx context.Context, + payload WorkflowRegistryWorkflowActivatedV1, +) error { + // fetch the workflow spec from the DB + spec, err := h.orm.GetWorkflowSpec(ctx, hex.EncodeToString(payload.WorkflowOwner), payload.WorkflowName) + if err != nil { + return fmt.Errorf("failed to get workflow spec: %w", err) + } + + // Do nothing if the workflow is already active + if spec.Status == job.WorkflowSpecStatusActive && h.engineRegistry.IsRunning(hex.EncodeToString(payload.WorkflowID[:])) { + return nil + } + + // get the secrets url by the secrets id + secretsURL, err := h.orm.GetSecretsURLByID(ctx, spec.SecretsID.Int64) + if err != nil { + return fmt.Errorf("failed to get secrets URL by ID: %w", err) + } + + // start a new workflow engine + registeredEvent := WorkflowRegistryWorkflowRegisteredV1{ + WorkflowID: payload.WorkflowID, + WorkflowOwner: payload.WorkflowOwner, + DonID: payload.DonID, + Status: 0, + WorkflowName: payload.WorkflowName, + BinaryURL: spec.BinaryURL, + ConfigURL: spec.ConfigURL, + SecretsURL: secretsURL, + } + + return h.workflowRegisteredEvent(ctx, registeredEvent) +} + +// workflowDeletedEvent handles the WorkflowDeletedEvent event type. +func (h *eventHandler) workflowDeletedEvent( + ctx context.Context, + payload WorkflowRegistryWorkflowDeletedV1, ) error { - return ErrNotImplemented + if err := h.tryEngineCleanup(hex.EncodeToString(payload.WorkflowID[:])); err != nil { + return err + } + + if err := h.orm.DeleteWorkflowSpec(ctx, hex.EncodeToString(payload.WorkflowOwner), payload.WorkflowName); err != nil { + return fmt.Errorf("failed to delete workflow spec: %w", err) + } + return nil } // forceUpdateSecretsEvent handles the ForceUpdateSecretsEvent event type. @@ -335,6 +485,24 @@ func (h *eventHandler) forceUpdateSecretsEvent( return nil } +// tryEngineCleanup attempts to stop the workflow engine for the given workflow ID. Does nothing if the +// workflow engine is not running. +func (h *eventHandler) tryEngineCleanup(wfID string) error { + if h.engineRegistry.IsRunning(wfID) { + // Remove the engine from the registry + e, err := h.engineRegistry.Pop(wfID) + if err != nil { + return fmt.Errorf("failed to get workflow engine: %w", err) + } + + // Stop the engine + if err := e.Close(); err != nil { + return fmt.Errorf("failed to close workflow engine: %w", err) + } + } + return nil +} + // workflowID returns a hex encoded sha256 hash of the wasm, config and secretsURL. func workflowID(wasm, config, secretsURL []byte) string { sum := sha256.New() diff --git a/core/services/workflows/syncer/handler_test.go b/core/services/workflows/syncer/handler_test.go index 42da3e8de9d..eb8b89ad7e1 100644 --- a/core/services/workflows/syncer/handler_test.go +++ b/core/services/workflows/syncer/handler_test.go @@ -264,6 +264,7 @@ func Test_workflowRegisteredHandler(t *testing.T) { er := newEngineRegistry() store := wfstore.NewDBStore(db, lggr, clockwork.NewFakeClock()) registry := capabilities.NewRegistry(lggr) + registry.SetLocalRegistry(&capabilities.TestMetadataRegistry{}) h := &eventHandler{ lggr: lggr, orm: orm, @@ -290,3 +291,250 @@ func Test_workflowRegisteredHandler(t *testing.T) { require.NoError(t, err) }) } + +func Test_workflowDeletedHandler(t *testing.T) { + t.Run("success deleting existing engine and spec", func(t *testing.T) { + var ( + ctx = testutils.Context(t) + lggr = logger.TestLogger(t) + db = pgtest.NewSqlxDB(t) + orm = NewWorkflowRegistryDS(db, lggr) + emitter = custmsg.NewLabeler() + + binary = wasmtest.CreateTestBinary(binaryCmd, binaryLocation, true, t) + config = []byte("") + secretsURL = "http://example.com" + binaryURL = "http://example.com/binary" + configURL = "http://example.com/config" + wfOwner = []byte("0xOwner") + + fetcher = newMockFetcher(map[string]mockFetchResp{ + binaryURL: {Body: binary, Err: nil}, + configURL: {Body: config, Err: nil}, + secretsURL: {Body: []byte("secrets"), Err: nil}, + }) + ) + + giveWFID := workflowID(binary, config, []byte(secretsURL)) + + b, err := hex.DecodeString(giveWFID) + require.NoError(t, err) + wfID := make([]byte, 32) + copy(wfID, b) + + active := WorkflowRegistryWorkflowRegisteredV1{ + Status: uint8(0), + WorkflowID: [32]byte(wfID), + WorkflowOwner: wfOwner, + WorkflowName: "workflow-name", + BinaryURL: binaryURL, + ConfigURL: configURL, + SecretsURL: secretsURL, + } + + er := newEngineRegistry() + store := wfstore.NewDBStore(db, lggr, clockwork.NewFakeClock()) + registry := capabilities.NewRegistry(lggr) + registry.SetLocalRegistry(&capabilities.TestMetadataRegistry{}) + h := &eventHandler{ + lggr: lggr, + orm: orm, + fetcher: fetcher, + emitter: emitter, + engineRegistry: er, + capRegistry: registry, + workflowStore: store, + } + err = h.workflowRegisteredEvent(ctx, active) + require.NoError(t, err) + + // Verify the record is updated in the database + dbSpec, err := orm.GetWorkflowSpec(ctx, hex.EncodeToString(wfOwner), "workflow-name") + require.NoError(t, err) + require.Equal(t, hex.EncodeToString(wfOwner), dbSpec.WorkflowOwner) + require.Equal(t, "workflow-name", dbSpec.WorkflowName) + require.Equal(t, job.WorkflowSpecStatusActive, dbSpec.Status) + + // Verify the engine is started + engine, err := h.engineRegistry.Get(giveWFID) + require.NoError(t, err) + err = engine.Ready() + require.NoError(t, err) + + deleteEvent := WorkflowRegistryWorkflowDeletedV1{ + WorkflowID: [32]byte(wfID), + WorkflowOwner: wfOwner, + WorkflowName: "workflow-name", + DonID: 1, + } + err = h.workflowDeletedEvent(ctx, deleteEvent) + require.NoError(t, err) + + // Verify the record is deleted in the database + _, err = orm.GetWorkflowSpec(ctx, hex.EncodeToString(wfOwner), "workflow-name") + require.Error(t, err) + + // Verify the engine is deleted + _, err = h.engineRegistry.Get(giveWFID) + require.Error(t, err) + }) +} + +func Test_workflowPausedActivatedUpdatedHandler(t *testing.T) { + t.Run("success pausing activating and updating existing engine and spec", func(t *testing.T) { + var ( + ctx = testutils.Context(t) + lggr = logger.TestLogger(t) + db = pgtest.NewSqlxDB(t) + orm = NewWorkflowRegistryDS(db, lggr) + emitter = custmsg.NewLabeler() + + binary = wasmtest.CreateTestBinary(binaryCmd, binaryLocation, true, t) + config = []byte("") + updateConfig = []byte("updated") + secretsURL = "http://example.com" + binaryURL = "http://example.com/binary" + configURL = "http://example.com/config" + newConfigURL = "http://example.com/new-config" + wfOwner = []byte("0xOwner") + + fetcher = newMockFetcher(map[string]mockFetchResp{ + binaryURL: {Body: binary, Err: nil}, + configURL: {Body: config, Err: nil}, + newConfigURL: {Body: updateConfig, Err: nil}, + secretsURL: {Body: []byte("secrets"), Err: nil}, + }) + ) + + giveWFID := workflowID(binary, config, []byte(secretsURL)) + updatedWFID := workflowID(binary, updateConfig, []byte(secretsURL)) + + b, err := hex.DecodeString(giveWFID) + require.NoError(t, err) + wfID := make([]byte, 32) + copy(wfID, b) + + b, err = hex.DecodeString(updatedWFID) + require.NoError(t, err) + newWFID := make([]byte, 32) + copy(newWFID, b) + + active := WorkflowRegistryWorkflowRegisteredV1{ + Status: uint8(0), + WorkflowID: [32]byte(wfID), + WorkflowOwner: wfOwner, + WorkflowName: "workflow-name", + BinaryURL: binaryURL, + ConfigURL: configURL, + SecretsURL: secretsURL, + } + + er := newEngineRegistry() + store := wfstore.NewDBStore(db, lggr, clockwork.NewFakeClock()) + registry := capabilities.NewRegistry(lggr) + registry.SetLocalRegistry(&capabilities.TestMetadataRegistry{}) + h := &eventHandler{ + lggr: lggr, + orm: orm, + fetcher: fetcher, + emitter: emitter, + engineRegistry: er, + capRegistry: registry, + workflowStore: store, + } + err = h.workflowRegisteredEvent(ctx, active) + require.NoError(t, err) + + // Verify the record is updated in the database + dbSpec, err := orm.GetWorkflowSpec(ctx, hex.EncodeToString(wfOwner), "workflow-name") + require.NoError(t, err) + require.Equal(t, hex.EncodeToString(wfOwner), dbSpec.WorkflowOwner) + require.Equal(t, "workflow-name", dbSpec.WorkflowName) + require.Equal(t, job.WorkflowSpecStatusActive, dbSpec.Status) + + // Verify the engine is started + engine, err := h.engineRegistry.Get(giveWFID) + require.NoError(t, err) + err = engine.Ready() + require.NoError(t, err) + + // create a paused event + pauseEvent := WorkflowRegistryWorkflowPausedV1{ + WorkflowID: [32]byte(wfID), + WorkflowOwner: wfOwner, + WorkflowName: "workflow-name", + DonID: 1, + } + err = h.workflowPausedEvent(ctx, pauseEvent) + require.NoError(t, err) + + // Verify the record is updated in the database + dbSpec, err = orm.GetWorkflowSpec(ctx, hex.EncodeToString(wfOwner), "workflow-name") + require.NoError(t, err) + require.Equal(t, hex.EncodeToString(wfOwner), dbSpec.WorkflowOwner) + require.Equal(t, "workflow-name", dbSpec.WorkflowName) + require.Equal(t, job.WorkflowSpecStatusPaused, dbSpec.Status) + + // Verify the engine is removed + _, err = h.engineRegistry.Get(giveWFID) + require.Error(t, err) + + // create an activated workflow event + activatedEvent := WorkflowRegistryWorkflowActivatedV1{ + WorkflowID: [32]byte(wfID), + WorkflowOwner: wfOwner, + WorkflowName: "workflow-name", + DonID: 1, + } + + err = h.workflowActivatedEvent(ctx, activatedEvent) + require.NoError(t, err) + + // Verify the record is updated in the database + dbSpec, err = orm.GetWorkflowSpec(ctx, hex.EncodeToString(wfOwner), "workflow-name") + require.NoError(t, err) + require.Equal(t, hex.EncodeToString(wfOwner), dbSpec.WorkflowOwner) + require.Equal(t, "workflow-name", dbSpec.WorkflowName) + require.Equal(t, job.WorkflowSpecStatusActive, dbSpec.Status) + + // Verify the engine is started + engine, err = h.engineRegistry.Get(giveWFID) + require.NoError(t, err) + err = engine.Ready() + require.NoError(t, err) + + // create an updated event + updatedEvent := WorkflowRegistryWorkflowUpdatedV1{ + OldWorkflowID: [32]byte(wfID), + NewWorkflowID: [32]byte(newWFID), + WorkflowOwner: wfOwner, + WorkflowName: "workflow-name", + BinaryURL: binaryURL, + ConfigURL: newConfigURL, + SecretsURL: secretsURL, + DonID: 1, + } + err = h.workflowUpdatedEvent(ctx, updatedEvent) + require.NoError(t, err) + + // Verify the record is updated in the database + dbSpec, err = orm.GetWorkflowSpec(ctx, hex.EncodeToString(wfOwner), "workflow-name") + require.NoError(t, err) + require.Equal(t, hex.EncodeToString(wfOwner), dbSpec.WorkflowOwner) + require.Equal(t, "workflow-name", dbSpec.WorkflowName) + require.Equal(t, job.WorkflowSpecStatusActive, dbSpec.Status) + require.Equal(t, hex.EncodeToString(newWFID), dbSpec.WorkflowID) + require.Equal(t, newConfigURL, dbSpec.ConfigURL) + require.Equal(t, string(updateConfig), dbSpec.Config) + + // old engine is no longer running + _, err = h.engineRegistry.Get(giveWFID) + require.Error(t, err) + + // new engine is started + engine, err = h.engineRegistry.Get(updatedWFID) + require.NoError(t, err) + err = engine.Ready() + require.NoError(t, err) + }) +} diff --git a/core/services/workflows/syncer/orm.go b/core/services/workflows/syncer/orm.go index 16612b9a9c6..d1f2d55a3a1 100644 --- a/core/services/workflows/syncer/orm.go +++ b/core/services/workflows/syncer/orm.go @@ -235,6 +235,11 @@ func (orm *orm) UpsertWorkflowSpecWithSecrets( txErr := tx.QueryRowxContext(ctx, `INSERT INTO workflow_secrets (secrets_url, secrets_url_hash, contents) VALUES ($1, $2, $3) + ON CONFLICT (secrets_url_hash) DO UPDATE + SET + secrets_url_hash = EXCLUDED.secrets_url_hash, + contents = EXCLUDED.contents, + secrets_url = EXCLUDED.secrets_url RETURNING id`, url, hash, contents, ).Scan(&sid)