Skip to content

Commit

Permalink
feat(workflows/handler): adds all event handlers (#15400)
Browse files Browse the repository at this point in the history
* feat(workflows/handler): adds all event handlers

* refactor(workflows/handlers): try to pop without failing

* chore(registry): removes unused method
  • Loading branch information
MStreet3 authored Nov 26, 2024
1 parent a677d28 commit 65ee497
Show file tree
Hide file tree
Showing 4 changed files with 446 additions and 13 deletions.
12 changes: 12 additions & 0 deletions core/services/workflows/syncer/engine_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,18 @@ func (r *engineRegistry) Get(id string) (*workflows.Engine, error) {
return engine, nil
}

// IsRunning is true if the engine exists and is ready.
func (r *engineRegistry) IsRunning(id string) bool {
r.mu.RLock()
defer r.mu.RUnlock()
engine, found := r.engines[id]
if !found {
return false
}

return engine.Ready() == nil
}

// Pop removes an engine from the registry and returns the engine if found.
func (r *engineRegistry) Pop(id string) (*workflows.Engine, error) {
r.mu.Lock()
Expand Down
194 changes: 181 additions & 13 deletions core/services/workflows/syncer/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,11 +180,82 @@ func (h *eventHandler) Handle(ctx context.Context, event WorkflowRegistryEvent)
h.lggr.Debugf("workflow 0x%x registered and started", wfID)
return nil
case WorkflowUpdatedEvent:
return h.workflowUpdatedEvent(ctx, event)
payload, ok := event.Data.(WorkflowRegistryWorkflowUpdatedV1)
if !ok {
return fmt.Errorf("invalid data type %T for event", event.Data)
}

newWorkflowID := hex.EncodeToString(payload.NewWorkflowID[:])
cma := h.emitter.With(
platform.KeyWorkflowID, newWorkflowID,
platform.KeyWorkflowName, payload.WorkflowName,
platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner),
)

if err := h.workflowUpdatedEvent(ctx, payload); err != nil {
logCustMsg(ctx, cma, fmt.Sprintf("failed to handle workflow updated event: %v", err), h.lggr)
return err
}

return nil
case WorkflowPausedEvent:
return h.workflowPausedEvent(ctx, event)
payload, ok := event.Data.(WorkflowRegistryWorkflowPausedV1)
if !ok {
return fmt.Errorf("invalid data type %T for event", event.Data)
}

wfID := hex.EncodeToString(payload.WorkflowID[:])

cma := h.emitter.With(
platform.KeyWorkflowID, wfID,
platform.KeyWorkflowName, payload.WorkflowName,
platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner),
)

if err := h.workflowPausedEvent(ctx, payload); err != nil {
logCustMsg(ctx, cma, fmt.Sprintf("failed to handle workflow paused event: %v", err), h.lggr)
return err
}
return nil
case WorkflowActivatedEvent:
return h.workflowActivatedEvent(ctx, event)
payload, ok := event.Data.(WorkflowRegistryWorkflowActivatedV1)
if !ok {
return fmt.Errorf("invalid data type %T for event", event.Data)
}

wfID := hex.EncodeToString(payload.WorkflowID[:])

cma := h.emitter.With(
platform.KeyWorkflowID, wfID,
platform.KeyWorkflowName, payload.WorkflowName,
platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner),
)
if err := h.workflowActivatedEvent(ctx, payload); err != nil {
logCustMsg(ctx, cma, fmt.Sprintf("failed to handle workflow activated event: %v", err), h.lggr)
return err
}

return nil
case WorkflowDeletedEvent:
payload, ok := event.Data.(WorkflowRegistryWorkflowDeletedV1)
if !ok {
return fmt.Errorf("invalid data type %T for event", event.Data)
}

wfID := hex.EncodeToString(payload.WorkflowID[:])

cma := h.emitter.With(
platform.KeyWorkflowID, wfID,
platform.KeyWorkflowName, payload.WorkflowName,
platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner),
)

if err := h.workflowDeletedEvent(ctx, payload); err != nil {
logCustMsg(ctx, cma, fmt.Sprintf("failed to handle workflow deleted event: %v", err), h.lggr)
return err
}

return nil
default:
return fmt.Errorf("event type unsupported: %v", event.EventType)
}
Expand Down Expand Up @@ -284,28 +355,107 @@ func (h *eventHandler) workflowRegisteredEvent(
return nil
}

// workflowUpdatedEvent handles the WorkflowUpdatedEvent event type.
// workflowUpdatedEvent handles the WorkflowUpdatedEvent event type by first finding the
// current workflow engine, stopping it, and then starting a new workflow engine with the
// updated workflow spec.
func (h *eventHandler) workflowUpdatedEvent(
_ context.Context,
_ WorkflowRegistryEvent,
ctx context.Context,
payload WorkflowRegistryWorkflowUpdatedV1,
) error {
return ErrNotImplemented
// Remove the old workflow engine from the local registry if it exists
if err := h.tryEngineCleanup(hex.EncodeToString(payload.OldWorkflowID[:])); err != nil {
return err
}

registeredEvent := WorkflowRegistryWorkflowRegisteredV1{
WorkflowID: payload.NewWorkflowID,
WorkflowOwner: payload.WorkflowOwner,
DonID: payload.DonID,
Status: 0,
WorkflowName: payload.WorkflowName,
BinaryURL: payload.BinaryURL,
ConfigURL: payload.ConfigURL,
SecretsURL: payload.SecretsURL,
}

return h.workflowRegisteredEvent(ctx, registeredEvent)
}

// workflowPausedEvent handles the WorkflowPausedEvent event type.
func (h *eventHandler) workflowPausedEvent(
_ context.Context,
_ WorkflowRegistryEvent,
ctx context.Context,
payload WorkflowRegistryWorkflowPausedV1,
) error {
return ErrNotImplemented
// Remove the workflow engine from the local registry if it exists
if err := h.tryEngineCleanup(hex.EncodeToString(payload.WorkflowID[:])); err != nil {
return err
}

// get existing workflow spec from DB
spec, err := h.orm.GetWorkflowSpec(ctx, hex.EncodeToString(payload.WorkflowOwner), payload.WorkflowName)
if err != nil {
return fmt.Errorf("failed to get workflow spec: %w", err)
}

// update the status of the workflow spec
spec.Status = job.WorkflowSpecStatusPaused
if _, err := h.orm.UpsertWorkflowSpec(ctx, spec); err != nil {
return fmt.Errorf("failed to update workflow spec: %w", err)
}

return nil
}

// workflowActivatedEvent handles the WorkflowActivatedEvent event type.
func (h *eventHandler) workflowActivatedEvent(
_ context.Context,
_ WorkflowRegistryEvent,
ctx context.Context,
payload WorkflowRegistryWorkflowActivatedV1,
) error {
// fetch the workflow spec from the DB
spec, err := h.orm.GetWorkflowSpec(ctx, hex.EncodeToString(payload.WorkflowOwner), payload.WorkflowName)
if err != nil {
return fmt.Errorf("failed to get workflow spec: %w", err)
}

// Do nothing if the workflow is already active
if spec.Status == job.WorkflowSpecStatusActive && h.engineRegistry.IsRunning(hex.EncodeToString(payload.WorkflowID[:])) {
return nil
}

// get the secrets url by the secrets id
secretsURL, err := h.orm.GetSecretsURLByID(ctx, spec.SecretsID.Int64)
if err != nil {
return fmt.Errorf("failed to get secrets URL by ID: %w", err)
}

// start a new workflow engine
registeredEvent := WorkflowRegistryWorkflowRegisteredV1{
WorkflowID: payload.WorkflowID,
WorkflowOwner: payload.WorkflowOwner,
DonID: payload.DonID,
Status: 0,
WorkflowName: payload.WorkflowName,
BinaryURL: spec.BinaryURL,
ConfigURL: spec.ConfigURL,
SecretsURL: secretsURL,
}

return h.workflowRegisteredEvent(ctx, registeredEvent)
}

// workflowDeletedEvent handles the WorkflowDeletedEvent event type.
func (h *eventHandler) workflowDeletedEvent(
ctx context.Context,
payload WorkflowRegistryWorkflowDeletedV1,
) error {
return ErrNotImplemented
if err := h.tryEngineCleanup(hex.EncodeToString(payload.WorkflowID[:])); err != nil {
return err
}

if err := h.orm.DeleteWorkflowSpec(ctx, hex.EncodeToString(payload.WorkflowOwner), payload.WorkflowName); err != nil {
return fmt.Errorf("failed to delete workflow spec: %w", err)
}
return nil
}

// forceUpdateSecretsEvent handles the ForceUpdateSecretsEvent event type.
Expand Down Expand Up @@ -335,6 +485,24 @@ func (h *eventHandler) forceUpdateSecretsEvent(
return nil
}

// tryEngineCleanup attempts to stop the workflow engine for the given workflow ID. Does nothing if the
// workflow engine is not running.
func (h *eventHandler) tryEngineCleanup(wfID string) error {
if h.engineRegistry.IsRunning(wfID) {
// Remove the engine from the registry
e, err := h.engineRegistry.Pop(wfID)
if err != nil {
return fmt.Errorf("failed to get workflow engine: %w", err)
}

// Stop the engine
if err := e.Close(); err != nil {
return fmt.Errorf("failed to close workflow engine: %w", err)
}
}
return nil
}

// workflowID returns a hex encoded sha256 hash of the wasm, config and secretsURL.
func workflowID(wasm, config, secretsURL []byte) string {
sum := sha256.New()
Expand Down
Loading

0 comments on commit 65ee497

Please sign in to comment.