From a20b5410809bc8a1866ec42589951ba54141360e Mon Sep 17 00:00:00 2001 From: Zhuowei Wang Date: Thu, 10 Oct 2024 14:18:34 +0800 Subject: [PATCH] feat: add lconn and mux client for tts --- .../kitex_gen/echo/streamserver/client.go | 4 +- .../kitex_gen/echo/streamserver/server.go | 4 +- .../echo/streamserver/streamserver.go | 2 +- go.mod | 6 +- go.sum | 6 +- .../grpc/client/{grpc_client.go => client.go} | 8 +- streaming/grpc/client/main.go | 26 ---- streaming/grpc/main.go | 6 + streaming/kitex/client/main.go | 26 ---- .../client/client.go} | 9 +- streaming/{kitex => kitex_grpc}/main.go | 8 +- streaming/kitex_tts_lconn/client/client.go | 112 ++++++++++++++++++ .../{kitex_tts => kitex_tts_lconn}/main.go | 20 +++- .../client/client.go | 4 +- streaming/kitex_tts_mux/main.go | 86 ++++++++++++++ 15 files changed, 254 insertions(+), 73 deletions(-) rename streaming/grpc/client/{grpc_client.go => client.go} (90%) delete mode 100644 streaming/grpc/client/main.go delete mode 100644 streaming/kitex/client/main.go rename streaming/{kitex/client/kitex_client.go => kitex_grpc/client/client.go} (89%) rename streaming/{kitex => kitex_grpc}/main.go (85%) create mode 100644 streaming/kitex_tts_lconn/client/client.go rename streaming/{kitex_tts => kitex_tts_lconn}/main.go (75%) rename streaming/{kitex_tts => kitex_tts_mux}/client/client.go (95%) create mode 100644 streaming/kitex_tts_mux/main.go diff --git a/codec/thrift/kitex_gen/echo/streamserver/client.go b/codec/thrift/kitex_gen/echo/streamserver/client.go index f893863..0ab5c1c 100644 --- a/codec/thrift/kitex_gen/echo/streamserver/client.go +++ b/codec/thrift/kitex_gen/echo/streamserver/client.go @@ -20,13 +20,13 @@ type Client interface { func NewClient(destService string, opts ...streamxclient.Option) (Client, error) { var options []streamxclient.Option options = append(options, streamxclient.WithDestService(destService)) - cp, err := ttstream.NewClientProvider(svcInfo) + cp, err := ttstream.NewClientProvider(ServiceInfo) if err != nil { return nil, err } options = append(options, streamxclient.WithProvider(cp)) options = append(options, opts...) - cli, err := streamxclient.NewClient(svcInfo, options...) + cli, err := streamxclient.NewClient(ServiceInfo, options...) if err != nil { return nil, err } diff --git a/codec/thrift/kitex_gen/echo/streamserver/server.go b/codec/thrift/kitex_gen/echo/streamserver/server.go index 16c9923..98f956a 100644 --- a/codec/thrift/kitex_gen/echo/streamserver/server.go +++ b/codec/thrift/kitex_gen/echo/streamserver/server.go @@ -15,7 +15,7 @@ type Server interface { } func RegisterService(svr server.Server, handler Server, opts ...server.RegisterOption) error { - sp, err := ttstream.NewServerProvider(svcInfo) + sp, err := ttstream.NewServerProvider(ServiceInfo) if err != nil { return err } @@ -23,5 +23,5 @@ func RegisterService(svr server.Server, handler Server, opts ...server.RegisterO streamxserver.WithProvider(sp), } nopts = append(nopts, opts...) - return svr.RegisterService(svcInfo, handler, nopts...) + return svr.RegisterService(ServiceInfo, handler, nopts...) } diff --git a/codec/thrift/kitex_gen/echo/streamserver/streamserver.go b/codec/thrift/kitex_gen/echo/streamserver/streamserver.go index 8a6c3e3..1c5d1af 100644 --- a/codec/thrift/kitex_gen/echo/streamserver/streamserver.go +++ b/codec/thrift/kitex_gen/echo/streamserver/streamserver.go @@ -10,7 +10,7 @@ import ( "github.com/cloudwego/kitex/server/streamxserver" ) -var svcInfo = &serviceinfo.ServiceInfo{ +var ServiceInfo = &serviceinfo.ServiceInfo{ ServiceName: "StreamServer", Methods: map[string]serviceinfo.MethodInfo{ "Echo": serviceinfo.NewMethodInfo( diff --git a/go.mod b/go.mod index 1517458..3d2f2ae 100644 --- a/go.mod +++ b/go.mod @@ -6,8 +6,8 @@ require ( github.com/bytedance/gopkg v0.1.1 github.com/cloudfoundry/gosigar v1.3.3 github.com/cloudwego/fastpb v0.0.5 - github.com/cloudwego/gopkg v0.1.2-0.20240910075652-f542979ecca4 - github.com/cloudwego/kitex v0.11.2-0.20240918080835-1a73caeb97dd + github.com/cloudwego/gopkg v0.1.2-0.20240919030844-cb7123236682 + github.com/cloudwego/kitex v0.11.2-0.20241010053736-f4746b81e7d9 github.com/gogo/protobuf v1.3.2 github.com/juju/ratelimit v1.0.1 github.com/lesismal/arpc v1.2.4 @@ -122,5 +122,3 @@ require ( ) replace github.com/apache/thrift => github.com/apache/thrift v0.13.0 - -replace github.com/cloudwego/kitex => ../kitex diff --git a/go.sum b/go.sum index d7fda4c..c6d5bc6 100644 --- a/go.sum +++ b/go.sum @@ -67,10 +67,12 @@ github.com/cloudwego/fastpb v0.0.5 h1:vYnBPsfbAtU5TVz5+f9UTlmSCixG9F9vRwaqE0mZPZ github.com/cloudwego/fastpb v0.0.5/go.mod h1:Bho7aAKBUtT9RPD2cNVkTdx4yQumfSv3If7wYnm1izk= github.com/cloudwego/frugal v0.2.0 h1:0ETSzQYoYqVvdl7EKjqJ9aJnDoG6TzvNKV3PMQiQTS8= github.com/cloudwego/frugal v0.2.0/go.mod h1:cpnV6kdRMjN3ylxRo63RNbZ9rBK6oxs70Zk6QZ4Enj4= -github.com/cloudwego/gopkg v0.1.2-0.20240910075652-f542979ecca4 h1:SHw9GUBBcAnLWeK2MtPH7O6YQG9Q2ZZ8koD/4alpLvE= -github.com/cloudwego/gopkg v0.1.2-0.20240910075652-f542979ecca4/go.mod h1:WoNTdXDPdvL97cBmRUWXVGkh2l2UFmpd9BUvbW2r0Aw= +github.com/cloudwego/gopkg v0.1.2-0.20240919030844-cb7123236682 h1:hj/AhlEngERp5Tjt864veEvyK6RglXKcXpxkIOSRfug= +github.com/cloudwego/gopkg v0.1.2-0.20240919030844-cb7123236682/go.mod h1:WoNTdXDPdvL97cBmRUWXVGkh2l2UFmpd9BUvbW2r0Aw= github.com/cloudwego/iasm v0.2.0 h1:1KNIy1I1H9hNNFEEH3DVnI4UujN+1zjpuk6gwHLTssg= github.com/cloudwego/iasm v0.2.0/go.mod h1:8rXZaNYT2n95jn+zTI1sDr+IgcD2GVs0nlbbQPiEFhY= +github.com/cloudwego/kitex v0.11.2-0.20241010053736-f4746b81e7d9 h1:E0NJGsawD+Jpv9VL5Nvr4vgThpNqIti4vzFRbEzvHRw= +github.com/cloudwego/kitex v0.11.2-0.20241010053736-f4746b81e7d9/go.mod h1:xavkyMxJxzdHCLuSDk9r51V6z11eQVAL4UDow3DH0kM= github.com/cloudwego/localsession v0.0.2 h1:N9/IDtCPj1fCL9bCTP+DbXx3f40YjVYWcwkJG0YhQkY= github.com/cloudwego/localsession v0.0.2/go.mod h1:kiJxmvAcy4PLgKtEnPS5AXed3xCiXcs7Z+KBHP72Wv8= github.com/cloudwego/netpoll v0.6.5-0.20240911104114-8a1f5597a920 h1:WT7vsDDb+ammyB7XLmNSS4vKGpPvM2JDl6h34Jj7mY4= diff --git a/streaming/grpc/client/grpc_client.go b/streaming/grpc/client/client.go similarity index 90% rename from streaming/grpc/client/grpc_client.go rename to streaming/grpc/client/client.go index 8c4cd60..99976d4 100644 --- a/streaming/grpc/client/grpc_client.go +++ b/streaming/grpc/client/client.go @@ -23,6 +23,7 @@ import ( "sync" "google.golang.org/grpc" + "google.golang.org/grpc/metadata" grpcg "github.com/cloudwego/kitex-benchmark/codec/protobuf/grpc_gen" "github.com/cloudwego/kitex-benchmark/runner" @@ -37,7 +38,8 @@ func NewGrpcClient(opt *runner.Options) runner.Client { return &grpcClient{ client: client, streampool: &sync.Pool{New: func() interface{} { - stream, err := client.Echo(context.Background()) + ctx := metadata.AppendToOutgoingContext(context.Background(), "header", "hello") + stream, err := client.Echo(ctx) if err != nil { log.Printf("client new stream failed: %v", err) return nil @@ -79,3 +81,7 @@ func (cli *grpcClient) Send(method, action, msg string) (err error) { runner.ProcessResponse(resp.Action, resp.Msg) return nil } + +func main() { + runner.Main("GRPC", NewGrpcClient) +} diff --git a/streaming/grpc/client/main.go b/streaming/grpc/client/main.go deleted file mode 100644 index bfd47e3..0000000 --- a/streaming/grpc/client/main.go +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Copyright 2021 CloudWeGo Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package main - -import ( - "github.com/cloudwego/kitex-benchmark/runner" -) - -// main is use for routing. -func main() { - runner.Main("GRPC", NewGrpcClient) -} diff --git a/streaming/grpc/main.go b/streaming/grpc/main.go index 096738a..6f073cc 100644 --- a/streaming/grpc/main.go +++ b/streaming/grpc/main.go @@ -23,6 +23,7 @@ import ( "net" "google.golang.org/grpc" + "google.golang.org/grpc/metadata" grpcg "github.com/cloudwego/kitex-benchmark/codec/protobuf/grpc_gen" "github.com/cloudwego/kitex-benchmark/perf" @@ -40,6 +41,11 @@ type server struct { } func (s *server) Echo(stream grpcg.SEcho_EchoServer) error { + md, _ := metadata.FromIncomingContext(stream.Context()) + if md == nil || len(md["header"]) == 0 || md["header"][0] != "hello" { + return fmt.Errorf("invalid header: %v", md) + } + for { req, err := stream.Recv() if err != nil { diff --git a/streaming/kitex/client/main.go b/streaming/kitex/client/main.go deleted file mode 100644 index b3b3e0a..0000000 --- a/streaming/kitex/client/main.go +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Copyright 2021 CloudWeGo Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package main - -import ( - "github.com/cloudwego/kitex-benchmark/runner" -) - -// main is use for routing. -func main() { - runner.Main("KITEX", NewKClient) -} diff --git a/streaming/kitex/client/kitex_client.go b/streaming/kitex_grpc/client/client.go similarity index 89% rename from streaming/kitex/client/kitex_client.go rename to streaming/kitex_grpc/client/client.go index c4866fa..7ebb07b 100644 --- a/streaming/kitex/client/kitex_client.go +++ b/streaming/kitex_grpc/client/client.go @@ -24,6 +24,7 @@ import ( "github.com/cloudwego/kitex/client" "github.com/cloudwego/kitex/pkg/klog" + "github.com/cloudwego/kitex/pkg/remote/trans/nphttp2/metadata" "github.com/cloudwego/kitex/transport" "github.com/cloudwego/kitex-benchmark/codec/protobuf/kitex_gen/echo" @@ -42,7 +43,8 @@ func NewKClient(opt *runner.Options) runner.Client { client: c, streampool: &sync.Pool{ New: func() interface{} { - stream, err := c.Echo(context.Background()) + ctx := metadata.AppendToOutgoingContext(context.Background(), "header", "hello") + stream, err := c.Echo(ctx) if err != nil { log.Printf("client new stream failed: %v", err) return nil @@ -87,3 +89,8 @@ func (cli *kClient) Send(method, action, msg string) error { runner.ProcessResponse(resp.Action, resp.Msg) return nil } + +// main is use for routing. +func main() { + runner.Main("KITEX_GRPC", NewKClient) +} diff --git a/streaming/kitex/main.go b/streaming/kitex_grpc/main.go similarity index 85% rename from streaming/kitex/main.go rename to streaming/kitex_grpc/main.go index edf87c2..0ede550 100644 --- a/streaming/kitex/main.go +++ b/streaming/kitex_grpc/main.go @@ -21,6 +21,7 @@ import ( "log" "net" + "github.com/cloudwego/kitex/pkg/remote/trans/nphttp2/metadata" "github.com/cloudwego/kitex/server" "github.com/cloudwego/kitex-benchmark/codec/protobuf/kitex_gen/echo" @@ -34,7 +35,7 @@ const port = 8001 var ( _ echo.SEcho = &EchoImpl{} - recorder = perf.NewRecorder("KITEX@Server") + recorder = perf.NewRecorder("KITEX_GRPC@Server") ) // EchoImpl implements the last service interface defined in the IDL. @@ -42,6 +43,11 @@ type EchoImpl struct{} // Echo implements the EchoImpl interface. func (s *EchoImpl) Echo(stream echo.SEcho_EchoServer) error { + md, _ := metadata.FromIncomingContext(stream.Context()) + if md == nil || len(md["header"]) == 0 || md["header"][0] != "hello" { + return fmt.Errorf("invalid header: %v", md) + } + for { req, err := stream.Recv() if err != nil { diff --git a/streaming/kitex_tts_lconn/client/client.go b/streaming/kitex_tts_lconn/client/client.go new file mode 100644 index 0000000..6aaff54 --- /dev/null +++ b/streaming/kitex_tts_lconn/client/client.go @@ -0,0 +1,112 @@ +/* + * Copyright 2021 CloudWeGo Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "context" + "errors" + "io" + "log" + "sync" + + "github.com/bytedance/gopkg/cloud/metainfo" + "github.com/cloudwego/kitex-benchmark/codec/thrift/kitex_gen/echo" + "github.com/cloudwego/kitex-benchmark/codec/thrift/kitex_gen/echo/streamserver" + "github.com/cloudwego/kitex-benchmark/runner" + "github.com/cloudwego/kitex/client/streamxclient" + "github.com/cloudwego/kitex/pkg/klog" + "github.com/cloudwego/kitex/pkg/streamx" + "github.com/cloudwego/kitex/pkg/streamx/provider/ttstream" +) + +func NewKClient(opt *runner.Options) runner.Client { + klog.SetLevel(klog.LevelWarn) + + cp, _ := ttstream.NewClientProvider( + streamserver.ServiceInfo, + ttstream.WithClientLongConnPool(ttstream.DefaultLongConnConfig), + ) + c, err := streamserver.NewClient( + "test.echo.kitex", + streamxclient.WithHostPorts(opt.Address), + streamxclient.WithProvider(cp), + ) + if err != nil { + log.Fatal(err) + } + cli := &kClient{ + client: c, + streampool: &sync.Pool{ + New: func() interface{} { + ctx := metainfo.WithValue(context.Background(), "header", "hello") + stream, err := c.Echo(ctx) + if err != nil { + log.Printf("client new stream failed: %v", err) + return nil + } + return stream + }, + }, + reqPool: &sync.Pool{ + New: func() interface{} { + return &echo.Request{} + }, + }, + } + return cli +} + +type kClient struct { + client streamserver.Client + streampool *sync.Pool + reqPool *sync.Pool +} + +func (cli *kClient) Send(method, action, msg string) error { + req := cli.reqPool.Get().(*echo.Request) + defer cli.reqPool.Put(req) + + st := cli.streampool.Get() + if st == nil { + return errors.New("new stream from streampool failed") + } + stream, ok := st.(streamx.BidiStreamingClient[echo.Request, echo.Response]) + if !ok { + return errors.New("new stream error") + } + defer cli.streampool.Put(stream) + + ctx := context.Background() + req.Action = action + req.Msg = msg + err := stream.Send(ctx, req) + if err != nil { + return err + } + + resp, err := stream.Recv(ctx) + if err != nil && !errors.Is(err, io.EOF) { + return err + } + runner.ProcessResponse(resp.Action, resp.Msg) + return nil +} + +// main is use for routing. +func main() { + runner.Main("KITEX_TTS_LCONN", NewKClient) +} diff --git a/streaming/kitex_tts/main.go b/streaming/kitex_tts_lconn/main.go similarity index 75% rename from streaming/kitex_tts/main.go rename to streaming/kitex_tts_lconn/main.go index db8dac0..1481e66 100644 --- a/streaming/kitex_tts/main.go +++ b/streaming/kitex_tts_lconn/main.go @@ -23,9 +23,11 @@ import ( "log" "net" + "github.com/bytedance/gopkg/cloud/metainfo" "github.com/cloudwego/kitex-benchmark/codec/thrift/kitex_gen/echo" "github.com/cloudwego/kitex-benchmark/codec/thrift/kitex_gen/echo/streamserver" "github.com/cloudwego/kitex-benchmark/perf" + "github.com/cloudwego/kitex-benchmark/runner" "github.com/cloudwego/kitex/pkg/streamx" "github.com/cloudwego/kitex/server" ) @@ -33,14 +35,18 @@ import ( const port = 8002 var ( - _ streamserver.Server = &StreamServerImpl{} - - recorder = perf.NewRecorder("KITEX_TTS@Server") + _ streamserver.Server = &StreamServerImpl{} + recorder = perf.NewRecorder("KITEX_TTS_LCONN@Server") ) type StreamServerImpl struct{} func (si *StreamServerImpl) Echo(ctx context.Context, stream streamx.BidiStreamingServer[echo.Request, echo.Response]) error { + v, _ := metainfo.GetValue(ctx, "header") + if v != "hello" { + return fmt.Errorf("invalid header: %v", v) + } + for { req, err := stream.Recv(ctx) if err == io.EOF { @@ -49,9 +55,11 @@ func (si *StreamServerImpl) Echo(ctx context.Context, stream streamx.BidiStreami if err != nil { return err } + action, msg := runner.ProcessRequest(recorder, req.Action, req.Msg) resp := new(echo.Response) - resp.Msg = req.Msg + resp.Action = action + resp.Msg = msg err = stream.Send(ctx, resp) if err != nil { return err @@ -65,7 +73,9 @@ func main() { perf.ServeMonitor(fmt.Sprintf(":%d", port+10000)) }() - svr := server.NewServer(server.WithServiceAddr(&net.TCPAddr{IP: net.IPv4zero, Port: port})) + svr := server.NewServer( + server.WithServiceAddr(&net.TCPAddr{IP: net.IPv4zero, Port: port}), + ) err := streamserver.RegisterService(svr, new(StreamServerImpl)) if err != nil { panic(err) diff --git a/streaming/kitex_tts/client/client.go b/streaming/kitex_tts_mux/client/client.go similarity index 95% rename from streaming/kitex_tts/client/client.go rename to streaming/kitex_tts_mux/client/client.go index 79d7b67..b82088d 100644 --- a/streaming/kitex_tts/client/client.go +++ b/streaming/kitex_tts_mux/client/client.go @@ -43,7 +43,7 @@ func NewKClient(opt *runner.Options) runner.Client { client: c, streampool: &sync.Pool{ New: func() interface{} { - ctx := metainfo.WithValue(context.Background(), "headerkey", "headerval") + ctx := metainfo.WithValue(context.Background(), "header", "hello") stream, err := c.Echo(ctx) if err != nil { log.Printf("client new stream failed: %v", err) @@ -99,5 +99,5 @@ func (cli *kClient) Send(method, action, msg string) error { // main is use for routing. func main() { - runner.Main("KITEX_TTS", NewKClient) + runner.Main("KITEX_TTS_MUX", NewKClient) } diff --git a/streaming/kitex_tts_mux/main.go b/streaming/kitex_tts_mux/main.go new file mode 100644 index 0000000..f8729db --- /dev/null +++ b/streaming/kitex_tts_mux/main.go @@ -0,0 +1,86 @@ +/* + * Copyright 2021 CloudWeGo Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "context" + "fmt" + "io" + "log" + "net" + + "github.com/bytedance/gopkg/cloud/metainfo" + "github.com/cloudwego/kitex-benchmark/codec/thrift/kitex_gen/echo" + "github.com/cloudwego/kitex-benchmark/codec/thrift/kitex_gen/echo/streamserver" + "github.com/cloudwego/kitex-benchmark/perf" + "github.com/cloudwego/kitex-benchmark/runner" + "github.com/cloudwego/kitex/pkg/streamx" + "github.com/cloudwego/kitex/server" +) + +const port = 8003 + +var ( + _ streamserver.Server = &StreamServerImpl{} + recorder = perf.NewRecorder("KITEX_TTS_MUX@Server") +) + +type StreamServerImpl struct{} + +func (si *StreamServerImpl) Echo(ctx context.Context, stream streamx.BidiStreamingServer[echo.Request, echo.Response]) error { + v, _ := metainfo.GetValue(ctx, "header") + if v != "hello" { + return fmt.Errorf("invalid header: %v", v) + } + + for { + req, err := stream.Recv(ctx) + if err == io.EOF { + return nil + } + if err != nil { + return err + } + action, msg := runner.ProcessRequest(recorder, req.Action, req.Msg) + + resp := new(echo.Response) + resp.Action = action + resp.Msg = msg + err = stream.Send(ctx, resp) + if err != nil { + return err + } + } +} + +func main() { + // start pprof server + go func() { + perf.ServeMonitor(fmt.Sprintf(":%d", port+10000)) + }() + + svr := server.NewServer( + server.WithServiceAddr(&net.TCPAddr{IP: net.IPv4zero, Port: port}), + ) + err := streamserver.RegisterService(svr, new(StreamServerImpl)) + if err != nil { + panic(err) + } + if err := svr.Run(); err != nil { + log.Println(err.Error()) + } +}