Skip to content

Commit

Permalink
feat: support triple group/version division for a certain Interface (a…
Browse files Browse the repository at this point in the history
…pache#2532)

* feat: support triple group/version division for a certain Interface

* add unit test

* import format

* fix some comments
  • Loading branch information
DMwangnima authored Dec 8, 2023
1 parent 90e7f69 commit 5465486
Show file tree
Hide file tree
Showing 15 changed files with 711 additions and 208 deletions.
4 changes: 2 additions & 2 deletions config/provider_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ import (

"github.com/dubbogo/gost/log/logger"

perrors "github.com/pkg/errors"

tripleConstant "github.com/dubbogo/triple/pkg/common/constant"

perrors "github.com/pkg/errors"
)

import (
Expand Down
18 changes: 11 additions & 7 deletions protocol/triple/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,33 +119,37 @@ func newClientManager(url *common.URL) (*clientManager, error) {
// If global trace instance was set, it means trace function enabled.
// If not, will return NoopTracer.
// tracer := opentracing.GlobalTracer()
var triClientOpts []tri.ClientOption
var cliOpts []tri.ClientOption

// set max send and recv msg size
maxCallRecvMsgSize := constant.DefaultMaxCallRecvMsgSize
if recvMsgSize, err := humanize.ParseBytes(url.GetParam(constant.MaxCallRecvMsgSize, "")); err == nil && recvMsgSize > 0 {
maxCallRecvMsgSize = int(recvMsgSize)
}
triClientOpts = append(triClientOpts, tri.WithReadMaxBytes(maxCallRecvMsgSize))
cliOpts = append(cliOpts, tri.WithReadMaxBytes(maxCallRecvMsgSize))
maxCallSendMsgSize := constant.DefaultMaxCallSendMsgSize
if sendMsgSize, err := humanize.ParseBytes(url.GetParam(constant.MaxCallSendMsgSize, "")); err == nil && sendMsgSize > 0 {
maxCallSendMsgSize = int(sendMsgSize)
}
triClientOpts = append(triClientOpts, tri.WithSendMaxBytes(maxCallSendMsgSize))
cliOpts = append(cliOpts, tri.WithSendMaxBytes(maxCallSendMsgSize))

// set serialization
serialization := url.GetParam(constant.SerializationKey, constant.ProtobufSerialization)
switch serialization {
case constant.ProtobufSerialization:
case constant.JSONSerialization:
triClientOpts = append(triClientOpts, tri.WithProtoJSON())
cliOpts = append(cliOpts, tri.WithProtoJSON())
default:
panic(fmt.Sprintf("Unsupported serialization: %s", serialization))
}

// set timeout
timeout := url.GetParamDuration(constant.TimeoutKey, "")
triClientOpts = append(triClientOpts, tri.WithTimeout(timeout))
cliOpts = append(cliOpts, tri.WithTimeout(timeout))

group := url.GetParam(constant.GroupKey, "")
version := url.GetParam(constant.VersionKey, "")
cliOpts = append(cliOpts, tri.WithGroup(group), tri.WithVersion(version))

// dialOpts = append(dialOpts,
//
Expand Down Expand Up @@ -187,7 +191,7 @@ func newClientManager(url *common.URL) (*clientManager, error) {
transport = &http.Transport{
TLSClientConfig: cfg,
}
triClientOpts = append(triClientOpts, tri.WithTriple())
cliOpts = append(cliOpts, tri.WithTriple())
case constant.CallHTTP2:
if tlsFlag {
transport = &http2.Transport{
Expand Down Expand Up @@ -222,7 +226,7 @@ func newClientManager(url *common.URL) (*clientManager, error) {
if err != nil {
return nil, fmt.Errorf("JoinPath failed for base %s, interface %s, method %s", baseTriURL, url.Interface(), method)
}
triClient := tri.NewClient(httpClient, triURL, triClientOpts...)
triClient := tri.NewClient(httpClient, triURL, cliOpts...)
triClients[method] = triClient
}

Expand Down
4 changes: 3 additions & 1 deletion protocol/triple/health/health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ import (
"testing"
"time"
)

import (
"github.com/stretchr/testify/assert"

// If there is a conflict between the healthCheck of Dubbo and the healthCheck of gRPC, an error will occur.
_ "google.golang.org/grpc/health/grpc_health_v1"
)

Expand All @@ -35,6 +35,8 @@ import (

const testService = "testService"

// If there is a conflict between the healthCheck of Dubbo and the healthCheck of gRPC, an error will occur.

func TestSetServingStatus(t *testing.T) {
s := NewServer()
s.SetServingStatus(testService, healthpb.HealthCheckResponse_SERVING)
Expand Down
56 changes: 56 additions & 0 deletions protocol/triple/internal/dubbo3_server/api/greet_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,59 @@ func (srv *GreetDubbo3Server) GreetServerStream(req *proto.GreetServerStreamRequ
}
return nil
}

const (
GroupVersionIdentifier = "g1v1"
)

type GreetDubbo3ServerGroup1Version1 struct {
greet.UnimplementedGreetServiceServer
}

func (srv *GreetDubbo3ServerGroup1Version1) Greet(ctx context.Context, req *proto.GreetRequest) (*proto.GreetResponse, error) {
return &proto.GreetResponse{Greeting: GroupVersionIdentifier + req.Name}, nil
}

func (srv *GreetDubbo3ServerGroup1Version1) GreetStream(stream greet.GreetService_GreetStreamServer) error {
for {
req, err := stream.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
break
}
return fmt.Errorf("dubbo3 Bidistream recv error: %s", err)
}
if err := stream.Send(&proto.GreetStreamResponse{Greeting: GroupVersionIdentifier + req.Name}); err != nil {
return fmt.Errorf("dubbo3 Bidistream send error: %s", err)
}
}
return nil
}

func (srv *GreetDubbo3ServerGroup1Version1) GreetClientStream(stream greet.GreetService_GreetClientStreamServer) error {
var reqs []string
for {
req, err := stream.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
break
}
return fmt.Errorf("dubbo3 ClientStream recv error: %s", err)
}
reqs = append(reqs, GroupVersionIdentifier+req.Name)
}

resp := &proto.GreetClientStreamResponse{
Greeting: strings.Join(reqs, ","),
}
return stream.SendAndClose(resp)
}

func (srv *GreetDubbo3ServerGroup1Version1) GreetServerStream(req *proto.GreetServerStreamRequest, stream greet.GreetService_GreetServerStreamServer) error {
for i := 0; i < 5; i++ {
if err := stream.Send(&proto.GreetServerStreamResponse{Greeting: GroupVersionIdentifier + req.Name}); err != nil {
return fmt.Errorf("dubbo3 ServerStream send error: %s", err)
}
}
return nil
}
54 changes: 52 additions & 2 deletions protocol/triple/internal/server/api/greet_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ import (
triple "dubbo.apache.org/dubbo-go/v3/protocol/triple/triple_protocol"
)

type GreetTripleServer struct {
}
type GreetTripleServer struct{}

func (srv *GreetTripleServer) Greet(ctx context.Context, req *greet.GreetRequest) (*greet.GreetResponse, error) {
resp := &greet.GreetResponse{Greeting: req.Name}
Expand Down Expand Up @@ -76,3 +75,54 @@ func (srv *GreetTripleServer) GreetServerStream(ctx context.Context, req *greet.
}
return nil
}

const (
GroupVersionIdentifier = "g1v1"
)

type GreetTripleServerGroup1Version1 struct{}

func (g *GreetTripleServerGroup1Version1) Greet(ctx context.Context, req *greet.GreetRequest) (*greet.GreetResponse, error) {
resp := &greet.GreetResponse{Greeting: GroupVersionIdentifier + req.Name}
return resp, nil
}

func (g *GreetTripleServerGroup1Version1) GreetStream(ctx context.Context, stream greettriple.GreetService_GreetStreamServer) error {
for {
req, err := stream.Recv()
if err != nil {
if triple.IsEnded(err) {
break
}
return fmt.Errorf("triple BidiStream recv error: %s", err)
}
if err := stream.Send(&greet.GreetStreamResponse{Greeting: GroupVersionIdentifier + req.Name}); err != nil {
return fmt.Errorf("triple BidiStream send error: %s", err)
}
}
return nil
}

func (g *GreetTripleServerGroup1Version1) GreetClientStream(ctx context.Context, stream greettriple.GreetService_GreetClientStreamServer) (*greet.GreetClientStreamResponse, error) {
var reqs []string
for stream.Recv() {
reqs = append(reqs, GroupVersionIdentifier+stream.Msg().Name)
}
if stream.Err() != nil && !triple.IsEnded(stream.Err()) {
return nil, fmt.Errorf("triple ClientStream recv err: %s", stream.Err())
}
resp := &greet.GreetClientStreamResponse{
Greeting: strings.Join(reqs, ","),
}

return resp, nil
}

func (g *GreetTripleServerGroup1Version1) GreetServerStream(ctx context.Context, req *greet.GreetServerStreamRequest, stream greettriple.GreetService_GreetServerStreamServer) error {
for i := 0; i < 5; i++ {
if err := stream.Send(&greet.GreetServerStreamResponse{Greeting: GroupVersionIdentifier + req.Name}); err != nil {
return fmt.Errorf("triple ServerStream send err: %s", err)
}
}
return nil
}
Loading

0 comments on commit 5465486

Please sign in to comment.