Skip to content

Commit

Permalink
Merge pull request #4 from phr3nzy/feat/v2
Browse files Browse the repository at this point in the history
feat: v2
  • Loading branch information
phr3nzy authored May 31, 2024
2 parents cc09249 + 5aad813 commit 949e87e
Show file tree
Hide file tree
Showing 6 changed files with 454 additions and 78 deletions.
32 changes: 32 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# golang .gitignore paths

# Binaries for programs and plugins
*.exe
*.exe~
*.dll
*.so
*.dylib

# Test binary, built with `go test -c`
*.test

# Output of the go coverage tool, specifically when used with LiteIDE
*.out

# Dependency directories (remove the comment below to include it)
# vendor/

# HTML, JS, CSS, images
*.html
*.html~
*.css
*.css~
*.js
*.js~
*.gif
*.png
*.jpg
*.jpeg

# Compiled Object files, Static and Dynamic libs (Shared Objects)
*.o
131 changes: 131 additions & 0 deletions examples/scraper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package main

import (
"bufio"
"fmt"
"io"
"net/http"
"os"
"time"

"github.com/phr3nzy/tango"
)

func main() {
type state struct {
Attempts int
PluginState string
RequestBodyReader *bufio.Reader
Content []byte
}

type services struct {
http *http.Client
}

ctx := &tango.MachineContext[services, state]{
Services: services{
http: &http.Client{
Timeout: time.Second * 10,
},
},
State: state{
Attempts: 0,
PluginState: "plugin state",
},
PreviousResult: nil,
}

config := &tango.MachineConfig[services, state]{
Log: true,
LogLevel: "info",
Plugins: []tango.Plugin[services, state]{},
}

steps := []tango.Step[services, state]{
{
Name: "visit website",
Execute: func(ctx *tango.MachineContext[services, state]) (*tango.Response[services, state], error) {
ctx.State.Attempts++
resp, err := ctx.Services.http.Get("https://google.com/")
if err != nil {
return ctx.Machine.Error(err), nil
}
defer resp.Body.Close()

ctx.State.RequestBodyReader = bufio.NewReader(resp.Body)
ctx.State.Content, err = io.ReadAll(ctx.State.RequestBodyReader)
if err != nil {
return ctx.Machine.Error(err.Error()), nil
}

if resp.StatusCode != 200 {
return ctx.Machine.Error(fmt.Sprintf(
"status code: %d, body: %s",
resp.StatusCode,
string(ctx.State.Content),
)), nil
}

return ctx.Machine.Next("page visited"), nil
},
Compensate: func(ctx *tango.MachineContext[services, state]) (*tango.Response[services, state], error) {
return ctx.Machine.Done("Compensate"), nil
},
},
{
Name: "write to disk",
Execute: func(ctx *tango.MachineContext[services, state]) (*tango.Response[services, state], error) {
// if the file exists, return an error
_, err := os.Stat("examples/output.html")
if err == nil {
return ctx.Machine.Error("file already exists"), nil
}

f, err := os.Create("examples/output.html")
if err != nil {
return ctx.Machine.Error(err.Error()), nil
}
defer f.Close()

_, err = f.Write(ctx.State.Content)
if err != nil {
return ctx.Machine.Error(err.Error()), nil
}

return ctx.Machine.Done("wrote the file to disk"), nil
},
Compensate: func(ctx *tango.MachineContext[services, state]) (*tango.Response[services, state], error) {
_, err := os.Stat("examples/output.html")
if err == nil {
fmt.Println("removing file")
err = os.Remove("examples/output.html")
if err != nil {
return ctx.Machine.Error(err.Error()), nil
}
}

return ctx.Machine.Next("compensated"), nil
},
},
}

machine := tango.NewMachine[services, state](
"scraper",
steps,
ctx,
config,
&tango.ConcurrentStrategy[services, state]{
Concurrency: 1,
},
)
ctx.Machine = machine

response, err := machine.Run()

if err != nil {
fmt.Println(err)
}

fmt.Println(response)
}
80 changes: 18 additions & 62 deletions machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type Machine[Services, State any] struct {
InitialContext *MachineContext[Services, State]
Config *MachineConfig[Services, State]
mu sync.Mutex
Strategy ExecutionStrategy[Services, State]
}

// NewMachine creates a new machine.
Expand All @@ -37,13 +38,15 @@ func NewMachine[Services, State any](
steps []Step[Services, State],
initialContext *MachineContext[Services, State],
config *MachineConfig[Services, State],
strategy ExecutionStrategy[Services, State],
) *Machine[Services, State] {
m := &Machine[Services, State]{
Name: name,
Steps: steps,
InitialContext: initialContext,
Context: initialContext,
Config: config,
Strategy: strategy,
}
m.Context.Machine = m
return m
Expand All @@ -67,61 +70,34 @@ func (m *Machine[Services, State]) Run() (*Response[Services, State], error) {
return nil, fmt.Errorf("no steps to execute")
}

for i := 0; i < len(m.Steps); i++ {
step := m.Steps[i]

response, err := m.executeStep(step)
if err != nil {
return nil, err
for _, plugin := range m.Config.Plugins {
if err := plugin.Init(m.Context); err != nil {
return nil, fmt.Errorf("plugin setup error: %v", err)
}

m.mu.Lock()
m.ExecutedSteps = append(m.ExecutedSteps, step)
m.Context.PreviousResult = response
m.mu.Unlock()

switch response.Status {
case NEXT:
continue
case DONE:
return response, nil
case ERROR:
cResponse, err := m.Compensate()
if err != nil {
return nil, fmt.Errorf("compensate error: %v", err)
}
return cResponse, fmt.Errorf("step %s failed: %v", step.Name, response.Result)
case SKIP:
i += response.SkipCount
case JUMP:
targetIndex := -1
for index, s := range m.Steps {
if s.Name == response.JumpTarget {
targetIndex = index
break
}
}
if targetIndex >= 0 {
i = targetIndex - 1
} else {
return nil, fmt.Errorf("jump target '%s' not found at %s", response.JumpTarget, step.Name)
}
newStrategy := plugin.ModifyExecutionStrategy(m)
if newStrategy != nil {
m.Strategy = newStrategy
}
}

response, err := m.Strategy.Execute(m)
if err != nil {
return nil, err
}

for _, plugin := range m.Config.Plugins {
if err := plugin.Cleanup(m.Context); err != nil {
return nil, fmt.Errorf("plugin cleanup error: %v", err)
}
}

return nil, nil
return response, nil
}

// executeStep runs the step and its before and after functions.
func (m *Machine[Services, State]) executeStep(step Step[Services, State]) (*Response[Services, State], error) {
if m.Config.Log {
fmt.Printf("Executing step: %s\n", step.Name)
fmt.Printf("executing step: %s\n", step.Name)
}

for _, plugin := range m.Config.Plugins {
Expand All @@ -137,7 +113,7 @@ func (m *Machine[Services, State]) executeStep(step Step[Services, State]) (*Res
}

if step.Execute == nil {
return nil, fmt.Errorf("step %s has no Execute function", step.Name)
return nil, fmt.Errorf("step %s has no execute function", step.Name)
}

response, err := step.Execute(m.Context)
Expand All @@ -156,27 +132,7 @@ func (m *Machine[Services, State]) executeStep(step Step[Services, State]) (*Res

// Compensate runs the compensate functions of the executed steps.
func (m *Machine[Services, State]) Compensate() (*Response[Services, State], error) {
m.Context = m.InitialContext
for i := len(m.ExecutedSteps) - 1; i >= 0; i-- {
step := m.ExecutedSteps[i]
if step.BeforeCompensate != nil {
if err := step.BeforeCompensate(m.Context); err != nil {
return nil, err
}
}
if step.Compensate == nil {
return nil, fmt.Errorf("step %s has no Compensate function", step.Name)
}
if _, err := step.Compensate(m.Context); err != nil {
return nil, err
}
if step.AfterCompensate != nil {
if err := step.AfterCompensate(m.Context); err != nil {
return nil, err
}
}
}
return nil, nil
return m.Strategy.Compensate(m)
}

// Result is an alias for any.
Expand Down
Loading

0 comments on commit 949e87e

Please sign in to comment.