Skip to content

Commit

Permalink
remove next fsm event if transition fails
Browse files Browse the repository at this point in the history
  • Loading branch information
matt2e committed Aug 12, 2024
1 parent 78d14b3 commit 3845ebd
Show file tree
Hide file tree
Showing 7 changed files with 399 additions and 14 deletions.
26 changes: 13 additions & 13 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1582,22 +1582,17 @@ func (s *Service) catchAsyncCall(ctx context.Context, logger *log.Logger, call *
}

func (s *Service) finaliseAsyncCall(ctx context.Context, tx *dal.Tx, call *dal.AsyncCall, callResult either.Either[[]byte, string], isFinalResult bool) error {
if !isFinalResult {
// Will retry, do not propagate yet.
return nil
}

_, failed := callResult.(either.Right[[]byte, string])

// Allow for handling of completion based on origin
switch origin := call.Origin.(type) {
case dal.AsyncOriginFSM:
if err := s.onAsyncFSMCallCompletion(ctx, tx, origin, failed); err != nil {
if err := s.onAsyncFSMCallCompletion(ctx, tx, origin, failed, isFinalResult); err != nil {
return fmt.Errorf("failed to finalize FSM async call: %w", err)
}

case dal.AsyncOriginPubSub:
if err := s.pubSub.OnCallCompletion(ctx, tx, origin, failed); err != nil {
if err := s.pubSub.OnCallCompletion(ctx, tx, origin, failed, isFinalResult); err != nil {
return fmt.Errorf("failed to finalize pubsub async call: %w", err)
}

Expand All @@ -1607,9 +1602,19 @@ func (s *Service) finaliseAsyncCall(ctx context.Context, tx *dal.Tx, call *dal.A
return nil
}

func (s *Service) onAsyncFSMCallCompletion(ctx context.Context, tx *dal.Tx, origin dal.AsyncOriginFSM, failed bool) error {
func (s *Service) onAsyncFSMCallCompletion(ctx context.Context, tx *dal.Tx, origin dal.AsyncOriginFSM, failed bool, isFinalResult bool) error {
logger := log.FromContext(ctx).Scope(origin.FSM.String())

// retrieve the next fsm event and delete it
next, err := tx.PopNextFSMEvent(ctx, origin.FSM, origin.Key)
if err != nil {
return fmt.Errorf("%s: failed to get next FSM event: %w", origin, err)
}
if !isFinalResult {
// Will retry, so we only want next fsm to be removed
return nil
}

instance, err := tx.AcquireFSMInstance(ctx, origin.FSM, origin.Key)
if err != nil {
return fmt.Errorf("%s: could not acquire lock on FSM instance: %w", origin, err)
Expand Down Expand Up @@ -1653,11 +1658,6 @@ func (s *Service) onAsyncFSMCallCompletion(ctx context.Context, tx *dal.Tx, orig
}

// If there's a next event enqueued, we immediately start it.
next, err := tx.PopNextFSMEvent(ctx, origin.FSM, origin.Key)
if err != nil {
return fmt.Errorf("%s: failed to get next FSM event: %w", origin, err)
}

if next, ok := next.Get(); ok {
return s.sendFSMEventInTx(ctx, tx, instance, fsm, next.RequestType, next.Request)
}
Expand Down
62 changes: 62 additions & 0 deletions backend/controller/dal/fsm_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import (
"time"

in "github.com/TBD54566975/ftl/integration"
"github.com/TBD54566975/ftl/internal/slices"
"github.com/alecthomas/assert/v2"
"github.com/alecthomas/types/optional"
)

func TestFSM(t *testing.T) {
Expand Down Expand Up @@ -133,3 +135,63 @@ func TestFSMGoTests(t *testing.T) {
in.ExecModuleTest("fsm"),
)
}

func TestFSMNext(t *testing.T) {
transitionFSMWithOptions := func(instance string, nextAttempts int, maybeErr optional.Option[string], states ...string) in.Action {
if len(states) == 0 {
return func(t testing.TB, ic in.TestContext) {}
}
return in.Call[in.Obj, in.Obj]("fsmnext", "sendOne", in.Obj{
"state": states[0],
"event": map[string]any{
"instance": instance,
"nextStates": states[1:],
"nextAttempts": nextAttempts,
"error": maybeErr,
},
}, nil)
}
transitionFSM := func(instance string, states ...string) in.Action {
return transitionFSMWithOptions(instance, 1, optional.None[string](), states...)
}

checkAsyncCall := func(instance string, states ...string) in.Action {
actions := slices.Map(states, func(state string) in.Action {
return in.QueryRow("ftl", fmt.Sprintf("SELECT COUNT(*) FROM async_calls WHERE origin = 'fsm:fsmnext.fsm:%s' AND verb = 'fsmnext.state%s' AND state = 'success'", instance, state), int64(1))
})
return in.Chain(actions...)
}

checkRepeatedAsyncCallError := func(instance string, state string, errorStr string) in.Action {
return func(t testing.TB, ic in.TestContext) {
// make sure each retry got the same error
for offset := range 3 {
result := in.GetRow(t, ic, "ftl", fmt.Sprintf("SELECT error FROM async_calls WHERE origin = 'fsm:fsmnext.fsm:%s' AND verb = 'fsmnext.state%s' AND state = 'error' ORDER BY created_at LIMIT 1 OFFSET %d", instance, state, offset), 1)
resultError, ok := result[0].(string)
assert.True(t, ok, "unexpected error type: %T", result[0])
assert.Contains(t, resultError, errorStr, "unexpected error: %s", resultError)
}
}
}

in.Run(t, "",
in.CopyModule("fsmnext"),
in.Deploy("fsmnext"),

// Simple progression through each state
transitionFSM("1", "A", "B", "C", "D"),

// Bad progression where fsm.Next() is called twice
transitionFSMWithOptions("2", 2, optional.None[string](), "A", "B"),

// Schedule next and then error and retry. Each error should be the expected error, not a failure to schedule the next state
transitionFSMWithOptions("3", 1, optional.Some("computers are fun"), "A", "B"),

in.Sleep(4*time.Second),

checkAsyncCall("1", "A", "B", "C", "D"),
checkRepeatedAsyncCallError("2", "A", "fsm instance already has its next state set"),
// will get "fsm instance already has its next state set" if next event is not cleared properly
checkRepeatedAsyncCallError("3", "A", "computers are fun"),
)
}
122 changes: 122 additions & 0 deletions backend/controller/dal/testdata/go/fsmnext/fsmnext.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package fsmnext

import (
"context"
"errors"

"github.com/TBD54566975/ftl/go-runtime/ftl"
)

// This FSM allows transitions moving forward through the alphabet
// Each transition also declares the next state(s) to transition to using State
//
//ftl:retry 2 1s
var fsm *ftl.FSMHandle

func init() {
fsm = ftl.FSM("fsm",
ftl.Start(StateA),
ftl.Transition(StateA, StateB),
ftl.Transition(StateA, StateC),
ftl.Transition(StateA, StateD),
ftl.Transition(StateB, StateC),
ftl.Transition(StateB, StateD),
ftl.Transition(StateC, StateD),
)
}

type State string

const (
A State = "A"
B State = "B"
C State = "C"
D State = "D"
)

type Event struct {
Instance string
NextStates []State // will schedule fsm.Next with these states progressively
NextAttempts ftl.Option[int] // will call fsm.Next this many times. Once otherwise
Error ftl.Option[string] // if present, returns this error after calling fsm.Next() as needed
}

//ftl:typealias
type EventA Event

//ftl:verb
func StateA(ctx context.Context, in EventA) error {
return handleEvent(ctx, Event(in))
}

//ftl:typealias
type EventB Event

//ftl:verb
func StateB(ctx context.Context, in EventB) error {
return handleEvent(ctx, Event(in))
}

//ftl:typealias
type EventC Event

//ftl:verb
func StateC(ctx context.Context, in EventC) error {
return handleEvent(ctx, Event(in))
}

//ftl:typealias
type EventD Event

//ftl:verb
func StateD(ctx context.Context, in EventD) error {
return handleEvent(ctx, Event(in))
}

//ftl:data export
type Request struct {
State State
Event Event
}

//ftl:verb export
func SendOne(ctx context.Context, in Request) error {
return fsm.Send(ctx, in.Event.Instance, eventFor(in.Event, in.State))
}

func handleEvent(ctx context.Context, in Event) error {
if len(in.NextStates) == 0 {
return nil
}
event := eventFor(Event{
Instance: in.Instance,
NextStates: in.NextStates[1:],
NextAttempts: in.NextAttempts,
}, in.NextStates[0])
attempts := in.NextAttempts.Default(1)
for i := range attempts {
ftl.LoggerFromContext(ctx).Infof("scheduling next event for %s (%d/%d)", in.Instance, i+1, attempts)
if err := fsm.Next(ctx, in.Instance, event); err != nil {
return err
}
}
if errStr, ok := in.Error.Get(); ok {
return errors.New(errStr)
}
return nil
}

func eventFor(event Event, state State) any {
switch state {
case A:
return EventA(event)
case B:
return EventB(event)
case C:
return EventC(event)
case D:
return EventD(event)
default:
panic("unknown state")
}
}
2 changes: 2 additions & 0 deletions backend/controller/dal/testdata/go/fsmnext/ftl.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
module = "fsmnext"
language = "go"
47 changes: 47 additions & 0 deletions backend/controller/dal/testdata/go/fsmnext/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
module ftl/fsmnext

go 1.22.5

require github.com/TBD54566975/ftl v1.1.5

require (
connectrpc.com/connect v1.16.2 // indirect
connectrpc.com/grpcreflect v1.2.0 // indirect
connectrpc.com/otelconnect v0.7.1 // indirect
github.com/alecthomas/atomic v0.1.0-alpha2 // indirect
github.com/alecthomas/concurrency v0.0.2 // indirect
github.com/alecthomas/participle/v2 v2.1.1 // indirect
github.com/alecthomas/types v0.16.0 // indirect
github.com/alessio/shellescape v1.4.2 // indirect
github.com/benbjohnson/clock v1.3.5 // indirect
github.com/danieljoos/wincred v1.2.0 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/godbus/dbus/v5 v5.1.0 // indirect
github.com/hashicorp/cronexpr v1.1.2 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/pgx/v5 v5.6.0 // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/jpillora/backoff v1.0.0 // indirect
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/multiformats/go-base36 v0.2.0 // indirect
github.com/puzpuzpuz/xsync/v3 v3.4.0 // indirect
github.com/swaggest/jsonschema-go v0.3.72 // indirect
github.com/swaggest/refl v1.3.0 // indirect
github.com/zalando/go-keyring v0.2.5 // indirect
go.opentelemetry.io/otel v1.28.0 // indirect
go.opentelemetry.io/otel/metric v1.28.0 // indirect
go.opentelemetry.io/otel/trace v1.28.0 // indirect
golang.org/x/crypto v0.25.0 // indirect
golang.org/x/exp v0.0.0-20240707233637-46b078467d37 // indirect
golang.org/x/mod v0.19.0 // indirect
golang.org/x/net v0.27.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.22.0 // indirect
golang.org/x/text v0.16.0 // indirect
google.golang.org/protobuf v1.34.2 // indirect
)

replace github.com/TBD54566975/ftl => ./../../../../../..
Loading

0 comments on commit 3845ebd

Please sign in to comment.