Skip to content

Commit

Permalink
fix(go-sdk): prefix action names with workflow name
Browse files Browse the repository at this point in the history
  • Loading branch information
steebchen committed Jun 27, 2024
1 parent 31cf5be commit e602855
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 29 deletions.
29 changes: 13 additions & 16 deletions pkg/client/types/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,15 @@ type Action struct {
// Required. The verb to perform.
Verb string

// Optional. A way to unique identify the step.
Subresource string
// The workflow name. Optional for compatibility reasons.
Workflow string
}

func (o Action) String() string {
if o.Subresource != "" {
return fmt.Sprintf("%s:%s:%s", o.Service, o.Verb, o.Subresource)
if o.Workflow != "" {
return fmt.Sprintf("%s:%s:%s", o.Workflow, o.Service, o.Verb)
}

return o.IntegrationVerbString()
}

func (o Action) IntegrationVerbString() string {
return fmt.Sprintf("%s:%s", o.Service, o.Verb)
}

Expand All @@ -39,18 +35,19 @@ func ParseActionID(actionID string) (Action, error) {
return Action{}, fmt.Errorf("invalid action id %s, must have at least 2 strings separated : (colon)", actionID)
}

Service := firstToLower(parts[0])
verb := strings.ToLower(parts[1])

var subresource string
var workflow string
if numParts == 3 {
subresource = firstToLower(parts[2])
workflow = firstToLower(parts[0])
parts = parts[1:]
}

service := firstToLower(parts[0])
verb := strings.ToLower(parts[1])

return Action{
Service: Service,
Verb: verb,
Subresource: subresource,
Service: service,
Verb: verb,
Workflow: workflow,
}, nil
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/repository/prisma/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"log"

"github.com/google/uuid"
"github.com/jackc/pgx/v5"
Expand Down Expand Up @@ -592,6 +593,7 @@ func (r *workflowEngineRepository) createWorkflowVersionTxs(ctx context.Context,
// create concurrency group
if opts.Concurrency != nil {
// upsert the action
log.Printf("3 upserting action id %s", opts.Concurrency.Action)
action, err := r.queries.UpsertAction(
ctx,
tx,
Expand Down Expand Up @@ -795,6 +797,7 @@ func (r *workflowEngineRepository) createJobTx(ctx context.Context, tx pgx.Tx, t
}

// upsert the action
log.Printf("4 upserting action id %s", stepOpts.Action)
_, err := r.queries.UpsertAction(
ctx,
tx,
Expand Down
4 changes: 2 additions & 2 deletions pkg/worker/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (s *Service) On(t triggerConverter, workflow workflowConverter) error {
}
}

err = s.worker.registerAction(parsedAction.Service, parsedAction.Verb, fn)
err = s.worker.registerAction(apiWorkflow.Name, parsedAction.Service, parsedAction.Verb, fn)

if err != nil {
return err
Expand Down Expand Up @@ -89,7 +89,7 @@ func (s *Service) RegisterAction(fn any, opts ...RegisterActionOpt) error {
fnOpts.name = getFnName(fn)
}

return s.worker.registerAction(s.Name, fnOpts.name, fn)
return s.worker.registerAction("none", s.Name, fnOpts.name, fn)
}

func (s *Service) Call(verb string) *WorkflowStep {
Expand Down
12 changes: 8 additions & 4 deletions pkg/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"os"
"reflect"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -197,7 +198,7 @@ func NewWorker(fs ...WorkerOpt) (*Worker, error) {
for _, integrationAction := range actions {
action := fmt.Sprintf("%s:%s", integrationId, integrationAction)

err := w.registerAction(integrationId, action, integration.ActionHandler(integrationAction))
err := w.registerAction("integration", integrationId, action, integration.ActionHandler(integrationAction))

if err != nil {
return nil, fmt.Errorf("could not register integration action %s: %w", action, err)
Expand Down Expand Up @@ -266,11 +267,14 @@ func (w *Worker) RegisterAction(actionId string, method any) error {
return fmt.Errorf("could not parse action id: %w", err)
}

return w.registerAction(action.Service, action.Verb, method)
return w.registerAction("none", action.Service, action.Verb, method)
}

func (w *Worker) registerAction(service, verb string, method any) error {
actionId := fmt.Sprintf("%s:%s", service, verb)
func (w *Worker) registerAction(wf, service, verb string, method any) error {
wf = strings.ToLower(wf) // TODO
wf = strings.ReplaceAll(wf, " ", "-") // TODO

actionId := fmt.Sprintf("%s:%s:%s", wf, service, verb)

// if the service is "concurrency", then this is a special action
if service == "concurrency" {
Expand Down
18 changes: 11 additions & 7 deletions pkg/worker/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func (j *WorkflowJob) ToWorkflowJob(svcName string, namespace string) (*types.Wo

for i := range j.Steps {

newStep, err := j.Steps[i].ToWorkflowStep(svcName, i, namespace)
newStep, err := j.Steps[i].ToWorkflowStep(j.Name, svcName, i, namespace)

if err != nil {
return nil, err
Expand All @@ -245,7 +245,7 @@ func (j *WorkflowJob) ToActionMap(svcName string) map[string]any {
res := map[string]any{}

for i, step := range j.Steps {
actionId := step.GetActionId(svcName, i)
actionId := step.GetActionId(j.Name, svcName, i)

res[actionId] = step.Function
}
Expand Down Expand Up @@ -348,7 +348,7 @@ func (w *WorkflowStep) ToActionMap(svcName string) map[string]any {
step := *w

return map[string]any{
step.GetActionId(svcName, 0): w.Function,
step.GetActionId(w.Name, svcName, 0): w.Function,
}
}

Expand All @@ -364,7 +364,7 @@ type Step struct {
APIStep types.WorkflowStep
}

func (w *WorkflowStep) ToWorkflowStep(svcName string, index int, namespace string) (*Step, error) {
func (w *WorkflowStep) ToWorkflowStep(wfName, svcName string, index int, namespace string) (*Step, error) {
fnType := reflect.TypeOf(w.Function)

res := &Step{}
Expand All @@ -375,7 +375,7 @@ func (w *WorkflowStep) ToWorkflowStep(svcName string, index int, namespace strin
Name: res.Id,
ID: w.GetStepId(index),
Timeout: w.Timeout,
ActionID: w.GetActionId(svcName, index),
ActionID: w.GetActionId(wfName, svcName, index),
Parents: []string{},
Retries: w.Retries,
}
Expand Down Expand Up @@ -443,10 +443,14 @@ func (w *WorkflowStep) GetStepId(index int) string {
return stepId
}

func (w *WorkflowStep) GetActionId(svcName string, index int) string {
func (w *WorkflowStep) GetActionId(wfName, svcName string, index int) string {
stepId := w.GetStepId(index)

return fmt.Sprintf("%s:%s", svcName, stepId)
wf := wfName
wf = strings.ToLower(wf)
wf = strings.ReplaceAll(wf, " ", "-")

return fmt.Sprintf("%s:%s:%s", wf, svcName, stepId)
}

func getFnName(fn any) string {
Expand Down

0 comments on commit e602855

Please sign in to comment.