Skip to content

Commit

Permalink
fix up new Ref types
Browse files Browse the repository at this point in the history
  • Loading branch information
wesbillman committed Mar 14, 2024
1 parent cd12047 commit 632022e
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 62 deletions.
4 changes: 2 additions & 2 deletions buildengine/build_go_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ func TestGenerateGoModule(t *testing.T) {
&schema.Data{Name: "SinkReq"},
&schema.Verb{
Name: "sink",
Request: &schema.DataRef{Name: "SinkReq"},
Request: &schema.Ref{Name: "SinkReq"},
Response: &schema.Unit{},
},
&schema.Data{Name: "SourceResp"},
&schema.Verb{
Name: "source",
Request: &schema.Unit{},
Response: &schema.DataRef{Name: "SourceResp"},
Response: &schema.Ref{Name: "SourceResp"},
},
&schema.Verb{
Name: "nothing",
Expand Down
4 changes: 2 additions & 2 deletions buildengine/build_kotlin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ func TestGenerateSourcesAndSinks(t *testing.T) {
}},
&schema.Verb{
Name: "sink",
Request: &schema.DataRef{Name: "SinkReq"},
Request: &schema.Ref{Name: "SinkReq"},
Response: &schema.Unit{},
},
&schema.Data{
Expand All @@ -334,7 +334,7 @@ func TestGenerateSourcesAndSinks(t *testing.T) {
&schema.Verb{
Name: "source",
Request: &schema.Unit{},
Response: &schema.DataRef{Name: "SourceResp"},
Response: &schema.Ref{Name: "SourceResp"},
},
&schema.Verb{
Name: "nothing",
Expand Down
47 changes: 13 additions & 34 deletions go-runtime/ftl/call.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"github.com/TBD54566975/ftl/internal/rpc"
)

func call[Req, Resp any](ctx context.Context, callee *schemapb.VerbRef, req Req) (resp Resp, err error) {
func call[Req, Resp any](ctx context.Context, callee *schemapb.Ref, req Req) (resp Resp, err error) {
client := rpc.ClientFromContext[ftlv1connect.VerbServiceClient](ctx)
reqData, err := encoding.Marshal(req)
if err != nil {
Expand All @@ -44,58 +44,37 @@ func call[Req, Resp any](ctx context.Context, callee *schemapb.VerbRef, req Req)
}

// Call a Verb through the FTL Controller.
func Call[Req, Resp any](ctx context.Context, verb Verb[Req, Resp], req Req) (resp Resp, err error) {
callee := VerbToRef(verb)
return call[Req, Resp](ctx, callee.ToProto(), req)
func Call[Req, Resp any](ctx context.Context, verb Verb[Req, Resp], req Req) (Resp, error) {
return call[Req, Resp](ctx, CallToSchemaRef(verb), req)
}

// CallSink calls a Sink through the FTL controller.
func CallSink[Req any](ctx context.Context, sink Sink[Req], req Req) error {
callee := SinkToRef(sink)
verbRef := &schemapb.VerbRef{Module: callee.Module, Name: callee.Name}
_, err := call[Req, Unit](ctx, verbRef, req)
_, err := call[Req, Unit](ctx, CallToSchemaRef(sink), req)
return err
}

// CallSource calls a Source through the FTL controller.
func CallSource[Resp any](ctx context.Context, source Source[Resp]) (Resp, error) {
callee := SourceToRef(source)
verbRef := &schemapb.VerbRef{Module: callee.Module, Name: callee.Name}
fmt.Printf("source ref2: %v\n", verbRef)
return call[Unit, Resp](ctx, verbRef, Unit{})
return call[Unit, Resp](ctx, CallToSchemaRef(source), Unit{})
}

// CallEmpty calls a Verb with no request or response through the FTL controller.
func CallEmpty(ctx context.Context, empty Empty) error {
callee := EmptyToRef(empty)
verbRef := &schemapb.VerbRef{Module: callee.Module, Name: callee.Name}

_, err := call[Unit, Unit](ctx, verbRef, Unit{})
_, err := call[Unit, Unit](ctx, CallToSchemaRef(empty), Unit{})
return err
}

// VerbToRef returns the FTL reference for a Verb.
func VerbToRef[Req, Resp any](verb Verb[Req, Resp]) Ref {
ref := runtime.FuncForPC(reflect.ValueOf(verb).Pointer()).Name()
return goRefToFTLRef(ref)
}

// SinkToRef returns the FTL reference for a Sink.
func SinkToRef[Req any](sink Sink[Req]) Ref {
ref := runtime.FuncForPC(reflect.ValueOf(sink).Pointer()).Name()
// CallToRef returns the Ref for a Verb, Sink, Source, or Empty.
func CallToRef(call any) Ref {
ref := runtime.FuncForPC(reflect.ValueOf(call).Pointer()).Name()
return goRefToFTLRef(ref)
}

// SourceToRef returns the FTL reference for a Source.
func SourceToRef[Resp any](source Source[Resp]) Ref {
ref := runtime.FuncForPC(reflect.ValueOf(source).Pointer()).Name()
return goRefToFTLRef(ref)
}

// EmptyToRef returns the FTL reference for an Empty.
func EmptyToRef(empty Empty) Ref {
ref := runtime.FuncForPC(reflect.ValueOf(empty).Pointer()).Name()
return goRefToFTLRef(ref)
// CallToSchemaRef returns the Ref for a Verb, Sink, Source, or Empty as a Schema Ref.
func CallToSchemaRef(call any) *schemapb.Ref {
ref := CallToRef(call)
return ref.ToProto()
}

func goRefToFTLRef(ref string) Ref {
Expand Down
5 changes: 4 additions & 1 deletion go-runtime/ftl/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,7 @@ type Verb[Req, Resp any] func(context.Context, Req) (Resp, error)
type Sink[Req any] func(context.Context, Req) error

// A Source is a function that does not accept input but returns output.
type Source[Req any] func(context.Context, Req) error
type Source[Resp any] func(context.Context) (Resp, error)

// An Empty is a function that does not accept input or return output.
type Empty func(context.Context) error
29 changes: 8 additions & 21 deletions go-runtime/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ type Handler struct {
fn func(ctx context.Context, req []byte) ([]byte, error)
}

func handler[Req, Resp any](ref ftl.VerbRef, verb func(ctx context.Context, req Req) (Resp, error)) Handler {
func handler[Req, Resp any](ref ftl.Ref, verb func(ctx context.Context, req Req) (Resp, error)) Handler {
return Handler{
ref: ref,
fn: func(ctx context.Context, reqdata []byte) ([]byte, error) {
Expand Down Expand Up @@ -94,43 +94,30 @@ func handler[Req, Resp any](ref ftl.VerbRef, verb func(ctx context.Context, req

// HandleCall creates a Handler from a Verb.
func HandleCall[Req, Resp any](verb func(ctx context.Context, req Req) (Resp, error)) Handler {
return handler(ftl.VerbToRef(verb), verb)
return handler(ftl.CallToRef(verb), verb)
}

// HandleSink creates a Handler from a Sink with no response.
func HandleSink[Req any](sink func(ctx context.Context, req Req) error) Handler {
ref := ftl.SinkToRef(sink)

verb := func(ctx context.Context, req Req) (ftl.Unit, error) {
return handler(ftl.CallToRef(sink), func(ctx context.Context, req Req) (ftl.Unit, error) {
err := sink(ctx, req)
return ftl.Unit{}, err
}

return handler(ftl.VerbRef{Module: ref.Module, Name: ref.Name}, verb)
})
}

// HandleSource creates a Handler from a Source with no request.
func HandleSource[Resp any](source func(ctx context.Context) (Resp, error)) Handler {
ref := ftl.SourceToRef(source)

fmt.Printf("source ref: %v\n", ref)
verb := func(ctx context.Context, _ ftl.Unit) (Resp, error) {
return handler(ftl.CallToRef(source), func(ctx context.Context, _ ftl.Unit) (Resp, error) {
return source(ctx)
}

return handler(ftl.VerbRef{Module: ref.Module, Name: ref.Name}, verb)
})
}

// HandleEmpty creates a Handler from a Verb with no request or response.
func HandleEmpty(empty func(ctx context.Context) error) Handler {
ref := ftl.EmptyToRef(empty)

verb := func(ctx context.Context, _ ftl.Unit) (ftl.Unit, error) {
return handler(ftl.CallToRef(empty), func(ctx context.Context, _ ftl.Unit) (ftl.Unit, error) {
err := empty(ctx)
return ftl.Unit{}, err
}

return handler(ftl.VerbRef{Module: ref.Module, Name: ref.Name}, verb)
})
}

var _ ftlv1connect.VerbServiceHandler = (*moduleServer)(nil)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import xyz.block.ftl.v1.schema.Ref
import xyz.block.ftl.v1.schema.StringValue
import xyz.block.ftl.v1.schema.Type
import xyz.block.ftl.v1.schema.TypeParameter
import xyz.block.ftl.v1.schema.Unit
import xyz.block.ftl.v1.schema.Value
import xyz.block.ftl.v1.schema.Verb

Expand Down Expand Up @@ -309,7 +310,7 @@ internal class ExtractSchemaRuleTest(private val env: KotlinCoreEnvironment) {
verb = Verb(
name = "sink",
request = Type(
dataRef = DataRef(
ref = Ref(
name = "Empty",
module = "builtin"
)
Expand All @@ -326,7 +327,7 @@ internal class ExtractSchemaRuleTest(private val env: KotlinCoreEnvironment) {
unit = Unit()
),
response = Type(
dataRef = DataRef(
ref = Ref(
name = "Empty",
module = "builtin"
)
Expand Down

0 comments on commit 632022e

Please sign in to comment.