From 5aad813f2908a04ebb976dcd7f095d5ef73a3e1d Mon Sep 17 00:00:00 2001 From: phr3nzy Date: Fri, 31 May 2024 13:08:59 +0400 Subject: [PATCH] feat: v2 - adds strategies to allow sequential and concurrent execution - adds a simple example - adds benchmarks for different execution strategies --- .gitignore | 32 ++++++++ examples/scraper.go | 131 ++++++++++++++++++++++++++++++ machine.go | 80 +++++------------- machine_test.go | 87 +++++++++++++++++--- plugin.go | 9 ++- runtime.go | 193 ++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 454 insertions(+), 78 deletions(-) create mode 100644 .gitignore create mode 100644 examples/scraper.go create mode 100644 runtime.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..fb17f47 --- /dev/null +++ b/.gitignore @@ -0,0 +1,32 @@ +# golang .gitignore paths + +# Binaries for programs and plugins +*.exe +*.exe~ +*.dll +*.so +*.dylib + +# Test binary, built with `go test -c` +*.test + +# Output of the go coverage tool, specifically when used with LiteIDE +*.out + +# Dependency directories (remove the comment below to include it) +# vendor/ + +# HTML, JS, CSS, images +*.html +*.html~ +*.css +*.css~ +*.js +*.js~ +*.gif +*.png +*.jpg +*.jpeg + +# Compiled Object files, Static and Dynamic libs (Shared Objects) +*.o diff --git a/examples/scraper.go b/examples/scraper.go new file mode 100644 index 0000000..f015a98 --- /dev/null +++ b/examples/scraper.go @@ -0,0 +1,131 @@ +package main + +import ( + "bufio" + "fmt" + "io" + "net/http" + "os" + "time" + + "github.com/phr3nzy/tango" +) + +func main() { + type state struct { + Attempts int + PluginState string + RequestBodyReader *bufio.Reader + Content []byte + } + + type services struct { + http *http.Client + } + + ctx := &tango.MachineContext[services, state]{ + Services: services{ + http: &http.Client{ + Timeout: time.Second * 10, + }, + }, + State: state{ + Attempts: 0, + PluginState: "plugin state", + }, + PreviousResult: nil, + } + + config := &tango.MachineConfig[services, state]{ + Log: true, + LogLevel: "info", + Plugins: []tango.Plugin[services, state]{}, + } + + steps := []tango.Step[services, state]{ + { + Name: "visit website", + Execute: func(ctx *tango.MachineContext[services, state]) (*tango.Response[services, state], error) { + ctx.State.Attempts++ + resp, err := ctx.Services.http.Get("https://google.com/") + if err != nil { + return ctx.Machine.Error(err), nil + } + defer resp.Body.Close() + + ctx.State.RequestBodyReader = bufio.NewReader(resp.Body) + ctx.State.Content, err = io.ReadAll(ctx.State.RequestBodyReader) + if err != nil { + return ctx.Machine.Error(err.Error()), nil + } + + if resp.StatusCode != 200 { + return ctx.Machine.Error(fmt.Sprintf( + "status code: %d, body: %s", + resp.StatusCode, + string(ctx.State.Content), + )), nil + } + + return ctx.Machine.Next("page visited"), nil + }, + Compensate: func(ctx *tango.MachineContext[services, state]) (*tango.Response[services, state], error) { + return ctx.Machine.Done("Compensate"), nil + }, + }, + { + Name: "write to disk", + Execute: func(ctx *tango.MachineContext[services, state]) (*tango.Response[services, state], error) { + // if the file exists, return an error + _, err := os.Stat("examples/output.html") + if err == nil { + return ctx.Machine.Error("file already exists"), nil + } + + f, err := os.Create("examples/output.html") + if err != nil { + return ctx.Machine.Error(err.Error()), nil + } + defer f.Close() + + _, err = f.Write(ctx.State.Content) + if err != nil { + return ctx.Machine.Error(err.Error()), nil + } + + return ctx.Machine.Done("wrote the file to disk"), nil + }, + Compensate: func(ctx *tango.MachineContext[services, state]) (*tango.Response[services, state], error) { + _, err := os.Stat("examples/output.html") + if err == nil { + fmt.Println("removing file") + err = os.Remove("examples/output.html") + if err != nil { + return ctx.Machine.Error(err.Error()), nil + } + } + + return ctx.Machine.Next("compensated"), nil + }, + }, + } + + machine := tango.NewMachine[services, state]( + "scraper", + steps, + ctx, + config, + &tango.ConcurrentStrategy[services, state]{ + Concurrency: 1, + }, + ) + ctx.Machine = machine + + response, err := machine.Run() + + if err != nil { + fmt.Println(err) + } + + fmt.Println(response) +} diff --git a/machine.go b/machine.go index 48df866..5dd14f0 100644 --- a/machine.go +++ b/machine.go @@ -29,6 +29,7 @@ type Machine[Services, State any] struct { InitialContext *MachineContext[Services, State] Config *MachineConfig[Services, State] mu sync.Mutex + Strategy ExecutionStrategy[Services, State] } // NewMachine creates a new machine. @@ -37,6 +38,7 @@ func NewMachine[Services, State any]( steps []Step[Services, State], initialContext *MachineContext[Services, State], config *MachineConfig[Services, State], + strategy ExecutionStrategy[Services, State], ) *Machine[Services, State] { m := &Machine[Services, State]{ Name: name, @@ -44,6 +46,7 @@ func NewMachine[Services, State any]( InitialContext: initialContext, Context: initialContext, Config: config, + Strategy: strategy, } m.Context.Machine = m return m @@ -67,61 +70,34 @@ func (m *Machine[Services, State]) Run() (*Response[Services, State], error) { return nil, fmt.Errorf("no steps to execute") } - for i := 0; i < len(m.Steps); i++ { - step := m.Steps[i] - - response, err := m.executeStep(step) - if err != nil { - return nil, err + for _, plugin := range m.Config.Plugins { + if err := plugin.Init(m.Context); err != nil { + return nil, fmt.Errorf("plugin setup error: %v", err) } - - m.mu.Lock() - m.ExecutedSteps = append(m.ExecutedSteps, step) - m.Context.PreviousResult = response - m.mu.Unlock() - - switch response.Status { - case NEXT: - continue - case DONE: - return response, nil - case ERROR: - cResponse, err := m.Compensate() - if err != nil { - return nil, fmt.Errorf("compensate error: %v", err) - } - return cResponse, fmt.Errorf("step %s failed: %v", step.Name, response.Result) - case SKIP: - i += response.SkipCount - case JUMP: - targetIndex := -1 - for index, s := range m.Steps { - if s.Name == response.JumpTarget { - targetIndex = index - break - } - } - if targetIndex >= 0 { - i = targetIndex - 1 - } else { - return nil, fmt.Errorf("jump target '%s' not found at %s", response.JumpTarget, step.Name) - } + newStrategy := plugin.ModifyExecutionStrategy(m) + if newStrategy != nil { + m.Strategy = newStrategy } } + response, err := m.Strategy.Execute(m) + if err != nil { + return nil, err + } + for _, plugin := range m.Config.Plugins { if err := plugin.Cleanup(m.Context); err != nil { return nil, fmt.Errorf("plugin cleanup error: %v", err) } } - return nil, nil + return response, nil } // executeStep runs the step and its before and after functions. func (m *Machine[Services, State]) executeStep(step Step[Services, State]) (*Response[Services, State], error) { if m.Config.Log { - fmt.Printf("Executing step: %s\n", step.Name) + fmt.Printf("executing step: %s\n", step.Name) } for _, plugin := range m.Config.Plugins { @@ -137,7 +113,7 @@ func (m *Machine[Services, State]) executeStep(step Step[Services, State]) (*Res } if step.Execute == nil { - return nil, fmt.Errorf("step %s has no Execute function", step.Name) + return nil, fmt.Errorf("step %s has no execute function", step.Name) } response, err := step.Execute(m.Context) @@ -156,27 +132,7 @@ func (m *Machine[Services, State]) executeStep(step Step[Services, State]) (*Res // Compensate runs the compensate functions of the executed steps. func (m *Machine[Services, State]) Compensate() (*Response[Services, State], error) { - m.Context = m.InitialContext - for i := len(m.ExecutedSteps) - 1; i >= 0; i-- { - step := m.ExecutedSteps[i] - if step.BeforeCompensate != nil { - if err := step.BeforeCompensate(m.Context); err != nil { - return nil, err - } - } - if step.Compensate == nil { - return nil, fmt.Errorf("step %s has no Compensate function", step.Name) - } - if _, err := step.Compensate(m.Context); err != nil { - return nil, err - } - if step.AfterCompensate != nil { - if err := step.AfterCompensate(m.Context); err != nil { - return nil, err - } - } - } - return nil, nil + return m.Strategy.Compensate(m) } // Result is an alias for any. diff --git a/machine_test.go b/machine_test.go index ae2b0ca..d01129c 100644 --- a/machine_test.go +++ b/machine_test.go @@ -51,7 +51,7 @@ func TestMachine_Run(t *testing.T) { t.Run(tt.name, func(t *testing.T) { m := tango.NewMachine("TestMachine", []tango.Step[Services, State]{}, &tango.MachineContext[Services, State]{}, &tango.MachineConfig[Services, State]{ Log: false, - }) + }, &tango.SequentialStrategy[Services, State]{}) for _, step := range tt.steps { m.AddStep(step) @@ -129,7 +129,7 @@ func TestMachine_Compensate(t *testing.T) { context := &tango.MachineContext[Services, State]{} m := tango.NewMachine("TestMachine", []tango.Step[Services, State]{}, context, &tango.MachineConfig[Services, State]{ Log: false, - }) + }, &tango.SequentialStrategy[Services, State]{}) context.Machine = m for _, step := range tt.steps { @@ -205,7 +205,7 @@ func TestMachine_Compensate_State(t *testing.T) { context := &tango.MachineContext[Services, State]{State: State{Counter: 0}} m := tango.NewMachine("TestMachine", []tango.Step[Services, State]{}, context, &tango.MachineConfig[Services, State]{ Log: false, - }) + }, &tango.SequentialStrategy[Services, State]{}) context.Machine = m for _, step := range tt.steps { @@ -274,7 +274,7 @@ func TestMachine_Reset(t *testing.T) { context := &tango.MachineContext[Services, State]{} m := tango.NewMachine("TestMachine", []tango.Step[Services, State]{}, context, &tango.MachineConfig[Services, State]{ Log: false, - }) + }, &tango.SequentialStrategy[Services, State]{}) context.Machine = m for _, step := range tt.steps { @@ -332,7 +332,7 @@ func TestMachine_Context_State(t *testing.T) { context := &tango.MachineContext[Services, State]{State: tt.initialState} m := tango.NewMachine("TestMachine", []tango.Step[Services, State]{}, context, &tango.MachineConfig[Services, State]{ Log: false, - }) + }, &tango.SequentialStrategy[Services, State]{}) context.Machine = m for _, step := range tt.steps { @@ -385,7 +385,7 @@ func TestMachine_Context_Services(t *testing.T) { context := &tango.MachineContext[Services, State]{Services: tt.initialServices} m := tango.NewMachine("TestMachine", []tango.Step[Services, State]{}, context, &tango.MachineConfig[Services, State]{ Log: false, - }) + }, &tango.SequentialStrategy[Services, State]{}) context.Machine = m for _, step := range tt.steps { @@ -445,7 +445,7 @@ func TestMachine_Step_Jump(t *testing.T) { context := &tango.MachineContext[Services, State]{} m := tango.NewMachine("TestMachine", []tango.Step[Services, State]{}, context, &tango.MachineConfig[Services, State]{ Log: false, - }) + }, &tango.SequentialStrategy[Services, State]{}) context.Machine = m for _, step := range tt.steps { @@ -525,7 +525,7 @@ func TestMachine_Step_Skip(t *testing.T) { context := &tango.MachineContext[Services, State]{} m := tango.NewMachine("TestMachine", []tango.Step[Services, State]{}, context, &tango.MachineConfig[Services, State]{ Log: false, - }) + }, &tango.SequentialStrategy[Services, State]{}) context.Machine = m for _, step := range tt.steps { @@ -554,12 +554,38 @@ func TestMachine_Step_Skip(t *testing.T) { } } -func BenchmarkMachine_Run(b *testing.B) { +func BenchmarkMachine_Run_Sequential(b *testing.B) { // Create a new machine m := tango.NewMachine("TestMachine", []tango.Step[Services, State]{}, &tango.MachineContext[Services, State]{}, &tango.MachineConfig[Services, State]{ Log: false, + }, &tango.SequentialStrategy[Services, State]{}) + + // Add some steps to the machine + m.NewStep(&tango.Step[Services, State]{ + Name: "Step1", + Execute: func(ctx *tango.MachineContext[Services, State]) (*tango.Response[Services, State], error) { + return m.Next("Next"), nil + }, + }) + m.NewStep(&tango.Step[Services, State]{ + Name: "Step2", + Execute: func(ctx *tango.MachineContext[Services, State]) (*tango.Response[Services, State], error) { + return m.Done("Done"), nil + }, }) + // Run the machine + for i := 0; i < b.N; i++ { + _, _ = m.Run() + } +} + +func BenchmarkMachine_Run_Concurrent(b *testing.B) { + // Create a new machine + m := tango.NewMachine("TestMachine", []tango.Step[Services, State]{}, &tango.MachineContext[Services, State]{}, &tango.MachineConfig[Services, State]{ + Log: false, + }, &tango.ConcurrentStrategy[Services, State]{}) + // Add some steps to the machine m.NewStep(&tango.Step[Services, State]{ Name: "Step1", @@ -584,7 +610,7 @@ func BenchmarkMachine_Compensate(b *testing.B) { // Create a new machine m := tango.NewMachine("TestMachine", []tango.Step[Services, State]{}, &tango.MachineContext[Services, State]{}, &tango.MachineConfig[Services, State]{ Log: false, - }) + }, &tango.SequentialStrategy[Services, State]{}) // Add some steps to the machine m.NewStep(&tango.Step[Services, State]{ @@ -616,7 +642,7 @@ func BenchmarkMachine_Reset(b *testing.B) { // Create a new machine m := tango.NewMachine("TestMachine", []tango.Step[Services, State]{}, &tango.MachineContext[Services, State]{}, &tango.MachineConfig[Services, State]{ Log: false, - }) + }, &tango.SequentialStrategy[Services, State]{}) // Add some steps to the machine m.NewStep(&tango.Step[Services, State]{ @@ -641,7 +667,7 @@ func BenchmarkMachine_Reset(b *testing.B) { } } -func BenchmarkMachine_100Steps_Run(b *testing.B) { +func BenchmarkMachine_100Steps_Run_Sequential(b *testing.B) { // Create a new machine m := tango.NewMachine( "TestMachine", @@ -649,7 +675,44 @@ func BenchmarkMachine_100Steps_Run(b *testing.B) { &tango.MachineContext[Services, State]{}, &tango.MachineConfig[Services, State]{ Log: false, + }, + &tango.SequentialStrategy[Services, State]{}, + ) + + // Add 100 steps to the machine + for i := 0; i < 100; i++ { + m.NewStep(&tango.Step[Services, State]{ + Name: fmt.Sprintf("Step%d", i), + Execute: func(ctx *tango.MachineContext[Services, State]) (*tango.Response[Services, State], error) { + return m.Next("Next"), nil + }, }) + } + + m.NewStep(&tango.Step[Services, State]{ + Name: "LastStep", + Execute: func(ctx *tango.MachineContext[Services, State]) (*tango.Response[Services, State], error) { + return m.Done("Done"), nil + }, + }) + + // Run the machine + for i := 0; i < b.N; i++ { + _, _ = m.Run() + } +} + +func BenchmarkMachine_100Steps_Run_Concurrent(b *testing.B) { + // Create a new machine + m := tango.NewMachine( + "TestMachine", + []tango.Step[Services, State]{}, + &tango.MachineContext[Services, State]{}, + &tango.MachineConfig[Services, State]{ + Log: false, + }, + &tango.ConcurrentStrategy[Services, State]{}, + ) // Add 100 steps to the machine for i := 0; i < 100; i++ { diff --git a/plugin.go b/plugin.go index 34ebcfd..0ff06e8 100644 --- a/plugin.go +++ b/plugin.go @@ -1,7 +1,8 @@ package tango -type Plugin[S, T any] interface { - Init(ctx *MachineContext[S, T]) error - Execute(ctx *MachineContext[S, T]) error - Cleanup(ctx *MachineContext[S, T]) error +type Plugin[Services, State any] struct { + Init func(ctx *MachineContext[Services, State]) error + Execute func(ctx *MachineContext[Services, State]) error + Cleanup func(ctx *MachineContext[Services, State]) error + ModifyExecutionStrategy func(m *Machine[Services, State]) ExecutionStrategy[Services, State] } diff --git a/runtime.go b/runtime.go new file mode 100644 index 0000000..41bb7f4 --- /dev/null +++ b/runtime.go @@ -0,0 +1,193 @@ +package tango + +import ( + "fmt" +) + +// ExecutionStrategy defines the interface for different execution strategies. +type ExecutionStrategy[Services, State any] interface { + Execute(m *Machine[Services, State]) (*Response[Services, State], error) + Compensate(m *Machine[Services, State]) (*Response[Services, State], error) +} + +// SequentialStrategy is a default implementation of ExecutionStrategy that runs steps sequentially. +type SequentialStrategy[Services, State any] struct{} + +func (s *SequentialStrategy[Services, State]) Execute(m *Machine[Services, State]) (*Response[Services, State], error) { + for i := 0; i < len(m.Steps); i++ { + step := m.Steps[i] + + response, err := m.executeStep(step) + if err != nil { + return nil, err + } + + m.mu.Lock() + m.ExecutedSteps = append(m.ExecutedSteps, step) + m.Context.PreviousResult = response + m.mu.Unlock() + + switch response.Status { + case NEXT: + continue + case DONE: + return response, nil + case ERROR: + cResponse, err := m.Compensate() + if err != nil { + return nil, fmt.Errorf("compensate error: %v", err) + } + return cResponse, fmt.Errorf("step %s failed: %v", step.Name, response.Result) + case SKIP: + i += response.SkipCount + case JUMP: + targetIndex := -1 + for index, s := range m.Steps { + if s.Name == response.JumpTarget { + targetIndex = index + break + } + } + if targetIndex >= 0 { + i = targetIndex - 1 + } else { + return nil, fmt.Errorf("jump target '%s' not found at %s", response.JumpTarget, step.Name) + } + } + } + + return nil, nil +} + +// Compensate runs the compensate functions of the executed steps. +func (s *SequentialStrategy[Services, State]) Compensate(m *Machine[Services, State]) (*Response[Services, State], error) { + m.Context = m.InitialContext + for i := len(m.ExecutedSteps) - 1; i >= 0; i-- { + step := m.ExecutedSteps[i] + if step.BeforeCompensate != nil { + if err := step.BeforeCompensate(m.Context); err != nil { + return nil, err + } + } + if step.Compensate == nil { + return nil, fmt.Errorf("step %s has no compensate function", step.Name) + } + if _, err := step.Compensate(m.Context); err != nil { + return nil, err + } + if step.AfterCompensate != nil { + if err := step.AfterCompensate(m.Context); err != nil { + return nil, err + } + } + } + return nil, nil +} + +// ConcurrentStrategy runs steps concurrently. +type ConcurrentStrategy[Services, State any] struct { + Concurrency int +} + +func (c *ConcurrentStrategy[Services, State]) Execute(m *Machine[Services, State]) (*Response[Services, State], error) { + if c.Concurrency <= 1 { + return (&SequentialStrategy[Services, State]{}).Execute(m) + } + + sem := make(chan struct{}, c.Concurrency) + responseChan := make(chan *Response[Services, State], len(m.Steps)) + errorChan := make(chan error, len(m.Steps)) + + for i := 0; i < len(m.Steps); i++ { + sem <- struct{}{} + go func(step Step[Services, State]) { + defer func() { <-sem }() + response, err := m.executeStep(step) + if err != nil { + errorChan <- err + return + } + responseChan <- response + m.mu.Lock() + m.ExecutedSteps = append(m.ExecutedSteps, step) + m.Context.PreviousResult = response + m.mu.Unlock() + }(m.Steps[i]) + } + + for i := 0; i < c.Concurrency; i++ { + sem <- struct{}{} + } + + close(responseChan) + close(errorChan) + + select { + case <-errorChan: + cResponse, err := m.Compensate() + if err != nil { + return nil, fmt.Errorf("compensate error: %v", err) + } + return cResponse, err + default: + } + + for response := range responseChan { + if response.Status == DONE { + return response, nil + } + } + + return nil, nil +} + +// Compensate runs the compensate functions of the executed steps. +func (c *ConcurrentStrategy[Services, State]) Compensate(m *Machine[Services, State]) (*Response[Services, State], error) { + if c.Concurrency <= 1 { + return (&SequentialStrategy[Services, State]{}).Compensate(m) + } + + sem := make(chan struct{}, c.Concurrency) + errorChan := make(chan error, len(m.ExecutedSteps)) + + for i := len(m.ExecutedSteps) - 1; i >= 0; i-- { + sem <- struct{}{} + go func(step Step[Services, State]) { + defer func() { <-sem }() + + if step.BeforeCompensate != nil { + if err := step.BeforeCompensate(m.Context); err != nil { + errorChan <- err + return + } + } + if step.Compensate == nil { + errorChan <- fmt.Errorf("step %s has no compensate function", step.Name) + return + } + if _, err := step.Compensate(m.Context); err != nil { + errorChan <- err + return + } + if step.AfterCompensate != nil { + if err := step.AfterCompensate(m.Context); err != nil { + errorChan <- err + return + } + } + }(m.ExecutedSteps[i]) + } + + for i := 0; i < c.Concurrency; i++ { + sem <- struct{}{} + } + + close(errorChan) + + select { + case err := <-errorChan: + return nil, err + default: + return nil, nil + } +}