Skip to content

Commit

Permalink
feat: add lconn and mux client for tts
Browse files Browse the repository at this point in the history
  • Loading branch information
joway committed Oct 10, 2024
1 parent 81607bc commit a20b541
Show file tree
Hide file tree
Showing 15 changed files with 254 additions and 73 deletions.
4 changes: 2 additions & 2 deletions codec/thrift/kitex_gen/echo/streamserver/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions codec/thrift/kitex_gen/echo/streamserver/server.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion codec/thrift/kitex_gen/echo/streamserver/streamserver.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 2 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ require (
github.com/bytedance/gopkg v0.1.1
github.com/cloudfoundry/gosigar v1.3.3
github.com/cloudwego/fastpb v0.0.5
github.com/cloudwego/gopkg v0.1.2-0.20240910075652-f542979ecca4
github.com/cloudwego/kitex v0.11.2-0.20240918080835-1a73caeb97dd
github.com/cloudwego/gopkg v0.1.2-0.20240919030844-cb7123236682
github.com/cloudwego/kitex v0.11.2-0.20241010053736-f4746b81e7d9
github.com/gogo/protobuf v1.3.2
github.com/juju/ratelimit v1.0.1
github.com/lesismal/arpc v1.2.4
Expand Down Expand Up @@ -122,5 +122,3 @@ require (
)

replace github.com/apache/thrift => github.com/apache/thrift v0.13.0

replace github.com/cloudwego/kitex => ../kitex
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,12 @@ github.com/cloudwego/fastpb v0.0.5 h1:vYnBPsfbAtU5TVz5+f9UTlmSCixG9F9vRwaqE0mZPZ
github.com/cloudwego/fastpb v0.0.5/go.mod h1:Bho7aAKBUtT9RPD2cNVkTdx4yQumfSv3If7wYnm1izk=
github.com/cloudwego/frugal v0.2.0 h1:0ETSzQYoYqVvdl7EKjqJ9aJnDoG6TzvNKV3PMQiQTS8=
github.com/cloudwego/frugal v0.2.0/go.mod h1:cpnV6kdRMjN3ylxRo63RNbZ9rBK6oxs70Zk6QZ4Enj4=
github.com/cloudwego/gopkg v0.1.2-0.20240910075652-f542979ecca4 h1:SHw9GUBBcAnLWeK2MtPH7O6YQG9Q2ZZ8koD/4alpLvE=
github.com/cloudwego/gopkg v0.1.2-0.20240910075652-f542979ecca4/go.mod h1:WoNTdXDPdvL97cBmRUWXVGkh2l2UFmpd9BUvbW2r0Aw=
github.com/cloudwego/gopkg v0.1.2-0.20240919030844-cb7123236682 h1:hj/AhlEngERp5Tjt864veEvyK6RglXKcXpxkIOSRfug=
github.com/cloudwego/gopkg v0.1.2-0.20240919030844-cb7123236682/go.mod h1:WoNTdXDPdvL97cBmRUWXVGkh2l2UFmpd9BUvbW2r0Aw=
github.com/cloudwego/iasm v0.2.0 h1:1KNIy1I1H9hNNFEEH3DVnI4UujN+1zjpuk6gwHLTssg=
github.com/cloudwego/iasm v0.2.0/go.mod h1:8rXZaNYT2n95jn+zTI1sDr+IgcD2GVs0nlbbQPiEFhY=
github.com/cloudwego/kitex v0.11.2-0.20241010053736-f4746b81e7d9 h1:E0NJGsawD+Jpv9VL5Nvr4vgThpNqIti4vzFRbEzvHRw=
github.com/cloudwego/kitex v0.11.2-0.20241010053736-f4746b81e7d9/go.mod h1:xavkyMxJxzdHCLuSDk9r51V6z11eQVAL4UDow3DH0kM=
github.com/cloudwego/localsession v0.0.2 h1:N9/IDtCPj1fCL9bCTP+DbXx3f40YjVYWcwkJG0YhQkY=
github.com/cloudwego/localsession v0.0.2/go.mod h1:kiJxmvAcy4PLgKtEnPS5AXed3xCiXcs7Z+KBHP72Wv8=
github.com/cloudwego/netpoll v0.6.5-0.20240911104114-8a1f5597a920 h1:WT7vsDDb+ammyB7XLmNSS4vKGpPvM2JDl6h34Jj7mY4=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"sync"

"google.golang.org/grpc"
"google.golang.org/grpc/metadata"

grpcg "github.com/cloudwego/kitex-benchmark/codec/protobuf/grpc_gen"
"github.com/cloudwego/kitex-benchmark/runner"
Expand All @@ -37,7 +38,8 @@ func NewGrpcClient(opt *runner.Options) runner.Client {
return &grpcClient{
client: client,
streampool: &sync.Pool{New: func() interface{} {
stream, err := client.Echo(context.Background())
ctx := metadata.AppendToOutgoingContext(context.Background(), "header", "hello")
stream, err := client.Echo(ctx)
if err != nil {
log.Printf("client new stream failed: %v", err)
return nil
Expand Down Expand Up @@ -79,3 +81,7 @@ func (cli *grpcClient) Send(method, action, msg string) (err error) {
runner.ProcessResponse(resp.Action, resp.Msg)
return nil
}

func main() {
runner.Main("GRPC", NewGrpcClient)
}
26 changes: 0 additions & 26 deletions streaming/grpc/client/main.go

This file was deleted.

6 changes: 6 additions & 0 deletions streaming/grpc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"net"

"google.golang.org/grpc"
"google.golang.org/grpc/metadata"

grpcg "github.com/cloudwego/kitex-benchmark/codec/protobuf/grpc_gen"
"github.com/cloudwego/kitex-benchmark/perf"
Expand All @@ -40,6 +41,11 @@ type server struct {
}

func (s *server) Echo(stream grpcg.SEcho_EchoServer) error {
md, _ := metadata.FromIncomingContext(stream.Context())
if md == nil || len(md["header"]) == 0 || md["header"][0] != "hello" {
return fmt.Errorf("invalid header: %v", md)
}

for {
req, err := stream.Recv()
if err != nil {
Expand Down
26 changes: 0 additions & 26 deletions streaming/kitex/client/main.go

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/cloudwego/kitex/client"
"github.com/cloudwego/kitex/pkg/klog"
"github.com/cloudwego/kitex/pkg/remote/trans/nphttp2/metadata"
"github.com/cloudwego/kitex/transport"

"github.com/cloudwego/kitex-benchmark/codec/protobuf/kitex_gen/echo"
Expand All @@ -42,7 +43,8 @@ func NewKClient(opt *runner.Options) runner.Client {
client: c,
streampool: &sync.Pool{
New: func() interface{} {
stream, err := c.Echo(context.Background())
ctx := metadata.AppendToOutgoingContext(context.Background(), "header", "hello")
stream, err := c.Echo(ctx)
if err != nil {
log.Printf("client new stream failed: %v", err)
return nil
Expand Down Expand Up @@ -87,3 +89,8 @@ func (cli *kClient) Send(method, action, msg string) error {
runner.ProcessResponse(resp.Action, resp.Msg)
return nil
}

// main is use for routing.
func main() {
runner.Main("KITEX_GRPC", NewKClient)
}
8 changes: 7 additions & 1 deletion streaming/kitex/main.go → streaming/kitex_grpc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"log"
"net"

"github.com/cloudwego/kitex/pkg/remote/trans/nphttp2/metadata"
"github.com/cloudwego/kitex/server"

"github.com/cloudwego/kitex-benchmark/codec/protobuf/kitex_gen/echo"
Expand All @@ -34,14 +35,19 @@ const port = 8001
var (
_ echo.SEcho = &EchoImpl{}

recorder = perf.NewRecorder("KITEX@Server")
recorder = perf.NewRecorder("KITEX_GRPC@Server")
)

// EchoImpl implements the last service interface defined in the IDL.
type EchoImpl struct{}

// Echo implements the EchoImpl interface.
func (s *EchoImpl) Echo(stream echo.SEcho_EchoServer) error {
md, _ := metadata.FromIncomingContext(stream.Context())
if md == nil || len(md["header"]) == 0 || md["header"][0] != "hello" {
return fmt.Errorf("invalid header: %v", md)
}

for {
req, err := stream.Recv()
if err != nil {
Expand Down
112 changes: 112 additions & 0 deletions streaming/kitex_tts_lconn/client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright 2021 CloudWeGo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package main

import (
"context"
"errors"
"io"
"log"
"sync"

"github.com/bytedance/gopkg/cloud/metainfo"
"github.com/cloudwego/kitex-benchmark/codec/thrift/kitex_gen/echo"
"github.com/cloudwego/kitex-benchmark/codec/thrift/kitex_gen/echo/streamserver"
"github.com/cloudwego/kitex-benchmark/runner"
"github.com/cloudwego/kitex/client/streamxclient"
"github.com/cloudwego/kitex/pkg/klog"
"github.com/cloudwego/kitex/pkg/streamx"
"github.com/cloudwego/kitex/pkg/streamx/provider/ttstream"
)

func NewKClient(opt *runner.Options) runner.Client {
klog.SetLevel(klog.LevelWarn)

cp, _ := ttstream.NewClientProvider(
streamserver.ServiceInfo,
ttstream.WithClientLongConnPool(ttstream.DefaultLongConnConfig),
)
c, err := streamserver.NewClient(
"test.echo.kitex",
streamxclient.WithHostPorts(opt.Address),
streamxclient.WithProvider(cp),
)
if err != nil {
log.Fatal(err)
}
cli := &kClient{
client: c,
streampool: &sync.Pool{
New: func() interface{} {
ctx := metainfo.WithValue(context.Background(), "header", "hello")
stream, err := c.Echo(ctx)
if err != nil {
log.Printf("client new stream failed: %v", err)
return nil
}
return stream
},
},
reqPool: &sync.Pool{
New: func() interface{} {
return &echo.Request{}
},
},
}
return cli
}

type kClient struct {
client streamserver.Client
streampool *sync.Pool
reqPool *sync.Pool
}

func (cli *kClient) Send(method, action, msg string) error {
req := cli.reqPool.Get().(*echo.Request)
defer cli.reqPool.Put(req)

st := cli.streampool.Get()
if st == nil {
return errors.New("new stream from streampool failed")
}
stream, ok := st.(streamx.BidiStreamingClient[echo.Request, echo.Response])
if !ok {
return errors.New("new stream error")
}
defer cli.streampool.Put(stream)

ctx := context.Background()
req.Action = action
req.Msg = msg
err := stream.Send(ctx, req)
if err != nil {
return err
}

resp, err := stream.Recv(ctx)
if err != nil && !errors.Is(err, io.EOF) {
return err
}
runner.ProcessResponse(resp.Action, resp.Msg)
return nil
}

// main is use for routing.
func main() {
runner.Main("KITEX_TTS_LCONN", NewKClient)
}
20 changes: 15 additions & 5 deletions streaming/kitex_tts/main.go → streaming/kitex_tts_lconn/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,30 @@ import (
"log"
"net"

"github.com/bytedance/gopkg/cloud/metainfo"
"github.com/cloudwego/kitex-benchmark/codec/thrift/kitex_gen/echo"
"github.com/cloudwego/kitex-benchmark/codec/thrift/kitex_gen/echo/streamserver"
"github.com/cloudwego/kitex-benchmark/perf"
"github.com/cloudwego/kitex-benchmark/runner"
"github.com/cloudwego/kitex/pkg/streamx"
"github.com/cloudwego/kitex/server"
)

const port = 8002

var (
_ streamserver.Server = &StreamServerImpl{}

recorder = perf.NewRecorder("KITEX_TTS@Server")
_ streamserver.Server = &StreamServerImpl{}
recorder = perf.NewRecorder("KITEX_TTS_LCONN@Server")
)

type StreamServerImpl struct{}

func (si *StreamServerImpl) Echo(ctx context.Context, stream streamx.BidiStreamingServer[echo.Request, echo.Response]) error {
v, _ := metainfo.GetValue(ctx, "header")
if v != "hello" {
return fmt.Errorf("invalid header: %v", v)
}

for {
req, err := stream.Recv(ctx)
if err == io.EOF {
Expand All @@ -49,9 +55,11 @@ func (si *StreamServerImpl) Echo(ctx context.Context, stream streamx.BidiStreami
if err != nil {
return err
}
action, msg := runner.ProcessRequest(recorder, req.Action, req.Msg)

resp := new(echo.Response)
resp.Msg = req.Msg
resp.Action = action
resp.Msg = msg
err = stream.Send(ctx, resp)
if err != nil {
return err
Expand All @@ -65,7 +73,9 @@ func main() {
perf.ServeMonitor(fmt.Sprintf(":%d", port+10000))
}()

svr := server.NewServer(server.WithServiceAddr(&net.TCPAddr{IP: net.IPv4zero, Port: port}))
svr := server.NewServer(
server.WithServiceAddr(&net.TCPAddr{IP: net.IPv4zero, Port: port}),
)
err := streamserver.RegisterService(svr, new(StreamServerImpl))
if err != nil {
panic(err)
Expand Down
Loading

0 comments on commit a20b541

Please sign in to comment.