Skip to content

Commit

Permalink
remove next fsm event if transition fails
Browse files Browse the repository at this point in the history
  • Loading branch information
matt2e committed Aug 12, 2024
1 parent 78d14b3 commit e70ecb5
Show file tree
Hide file tree
Showing 7 changed files with 396 additions and 14 deletions.
26 changes: 13 additions & 13 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1582,22 +1582,17 @@ func (s *Service) catchAsyncCall(ctx context.Context, logger *log.Logger, call *
}

func (s *Service) finaliseAsyncCall(ctx context.Context, tx *dal.Tx, call *dal.AsyncCall, callResult either.Either[[]byte, string], isFinalResult bool) error {
if !isFinalResult {
// Will retry, do not propagate yet.
return nil
}

_, failed := callResult.(either.Right[[]byte, string])

// Allow for handling of completion based on origin
switch origin := call.Origin.(type) {
case dal.AsyncOriginFSM:
if err := s.onAsyncFSMCallCompletion(ctx, tx, origin, failed); err != nil {
if err := s.onAsyncFSMCallCompletion(ctx, tx, origin, failed, isFinalResult); err != nil {
return fmt.Errorf("failed to finalize FSM async call: %w", err)
}

case dal.AsyncOriginPubSub:
if err := s.pubSub.OnCallCompletion(ctx, tx, origin, failed); err != nil {
if err := s.pubSub.OnCallCompletion(ctx, tx, origin, failed, isFinalResult); err != nil {
return fmt.Errorf("failed to finalize pubsub async call: %w", err)
}

Expand All @@ -1607,9 +1602,19 @@ func (s *Service) finaliseAsyncCall(ctx context.Context, tx *dal.Tx, call *dal.A
return nil
}

func (s *Service) onAsyncFSMCallCompletion(ctx context.Context, tx *dal.Tx, origin dal.AsyncOriginFSM, failed bool) error {
func (s *Service) onAsyncFSMCallCompletion(ctx context.Context, tx *dal.Tx, origin dal.AsyncOriginFSM, failed bool, isFinalResult bool) error {
logger := log.FromContext(ctx).Scope(origin.FSM.String())

// retrieve the next fsm event and delete it
next, err := tx.PopNextFSMEvent(ctx, origin.FSM, origin.Key)
if err != nil {
return fmt.Errorf("%s: failed to get next FSM event: %w", origin, err)
}
if !isFinalResult {
// Will retry, so we only want next fsm to be removed
return nil
}

instance, err := tx.AcquireFSMInstance(ctx, origin.FSM, origin.Key)
if err != nil {
return fmt.Errorf("%s: could not acquire lock on FSM instance: %w", origin, err)
Expand Down Expand Up @@ -1653,11 +1658,6 @@ func (s *Service) onAsyncFSMCallCompletion(ctx context.Context, tx *dal.Tx, orig
}

// If there's a next event enqueued, we immediately start it.
next, err := tx.PopNextFSMEvent(ctx, origin.FSM, origin.Key)
if err != nil {
return fmt.Errorf("%s: failed to get next FSM event: %w", origin, err)
}

if next, ok := next.Get(); ok {
return s.sendFSMEventInTx(ctx, tx, instance, fsm, next.RequestType, next.Request)
}
Expand Down
59 changes: 59 additions & 0 deletions backend/controller/dal/fsm_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import (
"time"

in "github.com/TBD54566975/ftl/integration"
"github.com/TBD54566975/ftl/internal/slices"
"github.com/alecthomas/assert/v2"
"github.com/alecthomas/types/optional"
)

func TestFSM(t *testing.T) {
Expand Down Expand Up @@ -133,3 +135,60 @@ func TestFSMGoTests(t *testing.T) {
in.ExecModuleTest("fsm"),
)
}

func TestFSMNext(t *testing.T) {
transitionFSMWithOptions := func(instance string, nextAttempts int, maybeErr optional.Option[string], states ...string) in.Action {
if len(states) == 0 {
return func(t testing.TB, ic in.TestContext) {}
}
return in.Call[in.Obj, in.Obj]("fsmnext", "sendOne", in.Obj{
"state": states[0],
"event": map[string]any{
"instance": instance,
"nextStates": states[1:],
"nextAttempts": nextAttempts,
"error": maybeErr,
},
}, nil)
}
transitionFSM := func(instance string, states ...string) in.Action {
return transitionFSMWithOptions(instance, 1, optional.None[string](), states...)
}

checkAsyncCall := func(instance string, states ...string) in.Action {
actions := slices.Map(states, func(state string) in.Action {
return in.QueryRow("ftl", fmt.Sprintf("SELECT COUNT(*) FROM async_calls WHERE origin = 'fsm:fsmnext.fsm:%s' AND verb = 'fsmnext.state%s' AND state = 'success'", instance, state), int64(1))
})
return in.Chain(actions...)
}

checkRepeatedAsyncCallError := func(instance string, state string, errorStr string) in.Action {
return func(t testing.TB, ic in.TestContext) {
// make sure each retry got the same error
for offset := range 3 {
result := in.GetRow(t, ic, "ftl", fmt.Sprintf("SELECT error FROM async_calls WHERE origin = 'fsm:fsmnext.fsm:%s' AND verb = 'fsmnext.state%s' AND state = 'error' ORDER BY created_at LIMIT 1 OFFSET %d", instance, state, offset), 1)
resultError, ok := result[0].(string)
assert.True(t, ok, "unexpected error type: %T", result[0])
assert.Contains(t, resultError, errorStr, "unexpected error: %s", resultError)
}
}
}

in.Run(t, "",
in.CopyModule("fsmnext"),
in.Deploy("fsmnext"),

// Simple progression through each state
transitionFSM("1", "A", "B", "C", "D"),

// Bad progression where fsm.Next() is called twice
transitionFSMWithOptions("2", 2, optional.None[string](), "A", "B"),

// Schedule next and then error and retry. Each error should be the expected error, not a failure to schedule the next state
transitionFSMWithOptions("3", 1, optional.Some("computers are fun"), "A", "B"),

checkAsyncCall("1", "A", "B", "C", "D"),
checkRepeatedAsyncCallError("2", "A", "fsm instance already has its next state set"),
checkRepeatedAsyncCallError("3", "A", "computers are fun"),
)
}
122 changes: 122 additions & 0 deletions backend/controller/dal/testdata/go/fsmnext/fsmnext.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package fsmnext

import (
"context"
"errors"

"github.com/TBD54566975/ftl/go-runtime/ftl"
)

// This FSM allows transitions moving forward through the alphabet
// Each transition also declares the next state(s) to transition to using State
//
//ftl:retry 2 1s
var fsm *ftl.FSMHandle

func init() {
fsm = ftl.FSM("fsm",
ftl.Start(StateA),
ftl.Transition(StateA, StateB),
ftl.Transition(StateA, StateC),
ftl.Transition(StateA, StateD),
ftl.Transition(StateB, StateC),
ftl.Transition(StateB, StateD),
ftl.Transition(StateC, StateD),
)
}

type State string

const (
A State = "A"
B State = "B"
C State = "C"
D State = "D"
)

type Event struct {
Instance string
NextStates []State // will schedule fsm.Next with these states progressively
NextAttempts ftl.Option[int] // will call fsm.Next this many times. Once otherwise
Error ftl.Option[string] // if present, returns this error after calling fsm.Next() as needed
}

//ftl:typealias
type EventA Event

//ftl:verb
func StateA(ctx context.Context, in EventA) error {
return handleEvent(ctx, Event(in))
}

//ftl:typealias
type EventB Event

//ftl:verb
func StateB(ctx context.Context, in EventB) error {
return handleEvent(ctx, Event(in))
}

//ftl:typealias
type EventC Event

//ftl:verb
func StateC(ctx context.Context, in EventC) error {
return handleEvent(ctx, Event(in))
}

//ftl:typealias
type EventD Event

//ftl:verb
func StateD(ctx context.Context, in EventD) error {
return handleEvent(ctx, Event(in))
}

//ftl:data export
type Request struct {
State State
Event Event
}

//ftl:verb export
func SendOne(ctx context.Context, in Request) error {
return fsm.Send(ctx, in.Event.Instance, eventFor(in.Event, in.State))
}

func handleEvent(ctx context.Context, in Event) error {
if len(in.NextStates) == 0 {
return nil
}
event := eventFor(Event{
Instance: in.Instance,
NextStates: in.NextStates[1:],
NextAttempts: in.NextAttempts,
}, in.NextStates[0])
attempts := in.NextAttempts.Default(1)
for i := range attempts {
ftl.LoggerFromContext(ctx).Infof("scheduling next event for %s (%d/%d)", in.Instance, i+1, attempts)
if err := fsm.Next(ctx, in.Instance, event); err != nil {
return err
}
}
if errStr, ok := in.Error.Get(); ok {
return errors.New(errStr)
}
return nil
}

func eventFor(event Event, state State) any {
switch state {
case A:
return EventA(event)
case B:
return EventB(event)
case C:
return EventC(event)
case D:
return EventD(event)
default:
panic("unknown state")
}
}
2 changes: 2 additions & 0 deletions backend/controller/dal/testdata/go/fsmnext/ftl.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
module = "fsmnext"
language = "go"
47 changes: 47 additions & 0 deletions backend/controller/dal/testdata/go/fsmnext/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
module ftl/fsmnext

go 1.22.5

require github.com/TBD54566975/ftl v1.1.5

require (
connectrpc.com/connect v1.16.2 // indirect
connectrpc.com/grpcreflect v1.2.0 // indirect
connectrpc.com/otelconnect v0.7.1 // indirect
github.com/alecthomas/atomic v0.1.0-alpha2 // indirect
github.com/alecthomas/concurrency v0.0.2 // indirect
github.com/alecthomas/participle/v2 v2.1.1 // indirect
github.com/alecthomas/types v0.16.0 // indirect
github.com/alessio/shellescape v1.4.2 // indirect
github.com/benbjohnson/clock v1.3.5 // indirect
github.com/danieljoos/wincred v1.2.0 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/godbus/dbus/v5 v5.1.0 // indirect
github.com/hashicorp/cronexpr v1.1.2 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/pgx/v5 v5.6.0 // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/jpillora/backoff v1.0.0 // indirect
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/multiformats/go-base36 v0.2.0 // indirect
github.com/puzpuzpuz/xsync/v3 v3.4.0 // indirect
github.com/swaggest/jsonschema-go v0.3.72 // indirect
github.com/swaggest/refl v1.3.0 // indirect
github.com/zalando/go-keyring v0.2.5 // indirect
go.opentelemetry.io/otel v1.28.0 // indirect
go.opentelemetry.io/otel/metric v1.28.0 // indirect
go.opentelemetry.io/otel/trace v1.28.0 // indirect
golang.org/x/crypto v0.25.0 // indirect
golang.org/x/exp v0.0.0-20240707233637-46b078467d37 // indirect
golang.org/x/mod v0.19.0 // indirect
golang.org/x/net v0.27.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.22.0 // indirect
golang.org/x/text v0.16.0 // indirect
google.golang.org/protobuf v1.34.2 // indirect
)

replace github.com/TBD54566975/ftl => ./../../../../../..
Loading

0 comments on commit e70ecb5

Please sign in to comment.