Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow catch verbs to receive request types as any #2398

Merged
merged 6 commits into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1498,12 +1498,25 @@ func (s *Service) catchAsyncCall(ctx context.Context, logger *log.Logger, call *
}
logger.Debugf("Catching async call %s with %s", call.Verb, catchVerb)

sch := s.schema.Load()

verb := &schema.Verb{}
if err := sch.ResolveToType(call.Verb.ToRef(), verb); err != nil {
logger.Warnf("Async call %s could not catch, could not resolve original verb: %s", call.Verb, err)
return fmt.Errorf("async call %s could not catch, could not resolve original verb: %w", call.Verb, err)
}

originalError := call.Error.Default("unknown error")
originalResult := either.RightOf[[]byte](originalError)

request := map[string]any{
"request": json.RawMessage(call.Request),
"error": originalError,
"verb": map[string]string{
"module": call.Verb.Module,
"name": call.Verb.Name,
},
"requestType": verb.Request.String(),
"request": json.RawMessage(call.Request),
"error": originalError,
}
body, err := json.Marshal(request)
if err != nil {
Expand Down
18 changes: 18 additions & 0 deletions backend/controller/pubsub/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func TestRetry(t *testing.T) {
FROM async_calls
WHERE
state = 'error'
AND verb = 'subscriber.consumeButFailAndRetry'
AND catching = false
AND origin = '%s'
`, dal.AsyncOriginPubSub{Subscription: schema.RefKey{Module: "subscriber", Name: "doomedSubscription"}}.String()),
Expand All @@ -116,6 +117,7 @@ func TestRetry(t *testing.T) {
FROM async_calls
WHERE
state = 'error'
AND verb = 'subscriber.consumeButFailAndRetry'
AND error LIKE '%%subscriber.catch %%'
AND error LIKE '%%catching error%%'
AND catching = true
Expand All @@ -130,11 +132,27 @@ func TestRetry(t *testing.T) {
FROM async_calls
WHERE
state = 'success'
AND verb = 'subscriber.consumeButFailAndRetry'
AND error IS NULL
AND catching = true
AND origin = '%s'
`, dal.AsyncOriginPubSub{Subscription: schema.RefKey{Module: "subscriber", Name: "doomedSubscription"}}.String()),
1),

// check that there was one successful attempt to catchAny
// CatchRequest<Any> is not supported in java yet
in.IfLanguage("go", in.QueryRow("ftl",
fmt.Sprintf(`
SELECT COUNT(*)
FROM async_calls
WHERE
state = 'success'
AND verb = 'subscriber.consumeButFailAndCatchAny'
AND error IS NULL
AND catching = true
AND origin = '%s'
`, dal.AsyncOriginPubSub{Subscription: schema.RefKey{Module: "subscriber", Name: "doomedSubscription2"}}.String()),
1)),
)
}

Expand Down
36 changes: 34 additions & 2 deletions backend/controller/pubsub/testdata/go/subscriber/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
)

var _ = ftl.Subscription(publisher.TestTopic, "testTopicSubscription")
var _ = ftl.Subscription(publisher.Topic2, "doomedSubscription")
var _ = ftl.Subscription(publisher.Topic2, "doomedSubscription2")

var catchCount atomic.Value[int]

Expand All @@ -25,15 +27,20 @@ func Consume(ctx context.Context, req publisher.PubSubEvent) error {
return nil
}

var _ = ftl.Subscription(publisher.Topic2, "doomedSubscription")

//ftl:verb
//ftl:subscribe doomedSubscription
//ftl:retry 2 1s 1s catch catch
func ConsumeButFailAndRetry(ctx context.Context, req publisher.PubSubEvent) error {
return fmt.Errorf("always error: event %v", req.Time)
}

//ftl:verb
//ftl:subscribe doomedSubscription2
//ftl:retry 1 1s 1s catch catchAny
func ConsumeButFailAndCatchAny(ctx context.Context, req publisher.PubSubEvent) error {
return fmt.Errorf("always error: event %v", req.Time)
}

//ftl:verb
func PublishToExternalModule(ctx context.Context) error {
// Get around compile-time checks
Expand All @@ -57,3 +64,28 @@ func Catch(ctx context.Context, req builtin.CatchRequest[publisher.PubSubEvent])
// succeed after that
return nil
}

//ftl:verb
func CatchAny(ctx context.Context, req builtin.CatchRequest[any]) error {
if req.Verb.Module != "subscriber" {
return fmt.Errorf("unexpected verb module: %v", req.Verb.Module)
}
if req.Verb.Name != "consumeButFailAndCatchAny" {
return fmt.Errorf("unexpected verb name: %v", req.Verb.Name)
}
if req.RequestType != "publisher.PubSubEvent" {
return fmt.Errorf("unexpected request type: %v", req.RequestType)
}
requestMap, ok := req.Request.(map[string]any)
if !ok {
return fmt.Errorf("expected request to be a map[string]any: %T", req.Request)
}
timeValue, ok := requestMap["time"]
if !ok {
return fmt.Errorf("expected request to have a time key")
}
if _, ok := timeValue.(string); !ok {
return fmt.Errorf("expected request to have a time value of type string")
}
return nil
}
8 changes: 8 additions & 0 deletions backend/schema/builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ import "github.com/TBD54566975/ftl/internal/reflect"
const BuiltinsSource = `
// Built-in types for FTL.
builtin module builtin {
// A reference to a declaration in the schema.
export data Ref {
module String
name String
}

// HTTP request structure used for HTTP ingress verbs.
export data HttpRequest<Body> {
method String
Expand All @@ -29,7 +35,9 @@ builtin module builtin {

// CatchRequest is a request structure for catch verbs.
export data CatchRequest<Req> {
verb builtin.Ref
request Req
requestType String
error String
}
}
Expand Down
24 changes: 24 additions & 0 deletions backend/schema/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,7 @@ func TestParsing(t *testing.T) {

verb consumesA(test.eventA) Unit
+subscribe subA1
+retry 1m5s 1h catch catchesAny

verb consumesB1(test.eventB) Unit
+subscribe subB
Expand All @@ -561,6 +562,8 @@ func TestParsing(t *testing.T) {
verb catchesA(builtin.CatchRequest<test.eventA>) Unit

verb catchesB(builtin.CatchRequest<test.eventB>) Unit

verb catchesAny(builtin.CatchRequest<Any>) Unit
}
`,
expected: &Schema{
Expand Down Expand Up @@ -626,6 +629,19 @@ func TestParsing(t *testing.T) {
Unit: true,
},
},
&Verb{
Name: "catchesAny",
Request: &Ref{
Module: "builtin",
Name: "CatchRequest",
TypeParameters: []Type{
&Any{},
},
},
Response: &Unit{
Unit: true,
},
},
&Verb{
Name: "catchesB",
Request: &Ref{
Expand Down Expand Up @@ -655,6 +671,14 @@ func TestParsing(t *testing.T) {
&MetadataSubscriber{
Name: "subA1",
},
&MetadataRetry{
MinBackoff: "1m5s",
MaxBackoff: "1h",
Catch: &Ref{
Module: "test",
Name: "catchesAny",
},
},
},
},
&Verb{
Expand Down
16 changes: 12 additions & 4 deletions backend/schema/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -790,14 +790,22 @@ func validateRetries(module *Module, retry *MetadataRetry, requestType optional.
merr = append(merr, errorf(retry, "expected catch to be a verb"))
return
}

hasValidCatchRequest := true
catchRequestRef, ok := catchVerb.Request.(*Ref)
if !ok {
merr = append(merr, errorf(retry, "catch verb must have a request type of builtin.CatchRequest<%s> but found %v", requestType, catchVerb.Request))
hasValidCatchRequest = false
} else if !strings.HasPrefix(catchRequestRef.String(), "builtin.CatchRequest") {
merr = append(merr, errorf(retry, "catch verb must have a request type of builtin.CatchRequest<%s> but found %v", requestType, catchRequestRef))
} else if len(catchRequestRef.TypeParameters) != 1 || catchRequestRef.TypeParameters[0].String() != req.String() {
merr = append(merr, errorf(retry, "catch verb must have a request type of builtin.CatchRequest<%s> but found %v", requestType, catchRequestRef))
hasValidCatchRequest = false
} else if len(catchRequestRef.TypeParameters) != 1 {
hasValidCatchRequest = false
} else if _, isAny := catchRequestRef.TypeParameters[0].(*Any); !isAny && catchRequestRef.TypeParameters[0].String() != req.String() {
hasValidCatchRequest = false
}
if !hasValidCatchRequest {
merr = append(merr, errorf(retry, "catch verb must have a request type of builtin.CatchRequest<%s> or builtin.CatchRequest<Any>, but found %v", requestType, catchVerb.Request))
}

if _, ok := catchVerb.Response.(*Unit); !ok {
merr = append(merr, errorf(retry, "catch verb must not have a response type but found %v", catchVerb.Response))
}
Expand Down
6 changes: 3 additions & 3 deletions backend/schema/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,10 +478,10 @@ func TestValidate(t *testing.T) {
}
`,
errs: []string{
"34:5-5: catch verb must have a request type of builtin.CatchRequest<test.EventA> but found builtin.CatchRequest<test.EventB>",
"34:5-5: catch verb must have a request type of builtin.CatchRequest<test.EventA> or builtin.CatchRequest<Any>, but found builtin.CatchRequest<test.EventB>",
"38:5-5: catch verb must not have a response type but found test.EventA",
"42:5-5: catch verb must have a request type of builtin.CatchRequest<test.EventB> but found Unit",
"46:5-5: catch verb must have a request type of builtin.CatchRequest<test.EventB> but found test.EventB",
"42:5-5: catch verb must have a request type of builtin.CatchRequest<test.EventB> or builtin.CatchRequest<Any>, but found Unit",
"46:5-5: catch verb must have a request type of builtin.CatchRequest<test.EventB> or builtin.CatchRequest<Any>, but found test.EventB",
"50:5-5: expected catch to be a verb",
},
},
Expand Down
Loading