Skip to content

Commit

Permalink
improve logics for triple server
Browse files Browse the repository at this point in the history
  • Loading branch information
DMwangnima committed Dec 30, 2023
1 parent c8f9a2d commit 4f649c4
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 195 deletions.
38 changes: 5 additions & 33 deletions protocol/triple/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,8 @@ func (cm *clientManager) close() error {
return nil
}

// newClientManager extracts configurations from url and builds clientManager
func newClientManager(url *common.URL) (*clientManager, error) {
// If global trace instance was set, it means trace function enabled.
// If not, will return NoopTracer.
// tracer := opentracing.GlobalTracer()
var cliOpts []tri.ClientOption

// set max send and recv msg size
Expand Down Expand Up @@ -154,42 +152,16 @@ func newClientManager(url *common.URL) (*clientManager, error) {
timeout := url.GetParamDuration(constant.TimeoutKey, "")
cliOpts = append(cliOpts, tri.WithTimeout(timeout))

// set service group and version
group := url.GetParam(constant.GroupKey, "")
version := url.GetParam(constant.VersionKey, "")
cliOpts = append(cliOpts, tri.WithGroup(group), tri.WithVersion(version))

// dialOpts = append(dialOpts,
//
// grpc.WithBlock(),
// // todo config tracing
// grpc.WithTimeout(time.Second*3),
// grpc.WithUnaryInterceptor(otgrpc.OpenTracingClientInterceptor(tracer, otgrpc.LogPayloads())),
// grpc.WithStreamInterceptor(otgrpc.OpenTracingStreamClientInterceptor(tracer, otgrpc.LogPayloads())),
// grpc.WithDefaultCallOptions(
// grpc.CallContentSubtype(clientConf.ContentSubType),
// grpc.MaxCallRecvMsgSize(maxCallRecvMsgSize),
// grpc.MaxCallSendMsgSize(maxCallSendMsgSize),
// ),
//
// )
// todo(DMwangnima): support opentracing

// todo(DMwangnima): support TLS in an ideal way
var cfg *tls.Config
var tlsFlag bool
//var err error

// todo: think about a more elegant way to configure tls
//if tlsConfig := config.GetRootConfig().TLSConfig; tlsConfig != nil {
// cfg, err = config.GetClientTlsConfig(&config.TLSConfig{
// CACertFile: tlsConfig.CACertFile,
// TLSCertFile: tlsConfig.TLSCertFile,
// TLSKeyFile: tlsConfig.TLSKeyFile,
// TLSServerName: tlsConfig.TLSServerName,
// })
// if err != nil {
// return nil, err
// }
// logger.Infof("TRIPLE clientManager initialized the TLSConfig configuration successfully")
// tlsFlag = true
//}

var transport http.RoundTripper
callType := url.GetParam(constant.CallHTTPTypeKey, constant.CallHTTP2)
Expand Down
77 changes: 0 additions & 77 deletions protocol/triple/codec.go

This file was deleted.

126 changes: 42 additions & 84 deletions protocol/triple/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"fmt"
"reflect"
"sync"
"time"
)

import (
Expand All @@ -45,14 +44,16 @@ import (
"dubbo.apache.org/dubbo-go/v3/server"
)

// Server is TRIPLE server
// Server is TRIPLE adaptation layer representation. It makes use of tri.Server to
// provide functionality.
type Server struct {
triServer *tri.Server
services map[string]grpc.ServiceInfo
mu sync.RWMutex
services map[string]grpc.ServiceInfo
}

// NewServer creates a new TRIPLE server
// NewServer creates a new TRIPLE server.
// triServer would not be initialized since we could not get configurations here.
func NewServer() *Server {
return &Server{
services: make(map[string]grpc.ServiceInfo),
Expand All @@ -61,13 +62,11 @@ func NewServer() *Server {

// Start TRIPLE server
func (s *Server) Start(invoker protocol.Invoker, info *server.ServiceInfo) {
var (
addr string
URL *common.URL
)
URL = invoker.GetURL()
addr = URL.Location
URL := invoker.GetURL()
addr := URL.Location
// initialize tri.Server
s.triServer = tri.NewServer(addr)

serialization := URL.GetParam(constant.SerializationKey, constant.ProtobufSerialization)
switch serialization {
case constant.ProtobufSerialization:
Expand All @@ -77,41 +76,21 @@ func (s *Server) Start(invoker protocol.Invoker, info *server.ServiceInfo) {
default:
panic(fmt.Sprintf("Unsupported serialization: %s", serialization))
}
// todo: implement interceptor
// If global trace instance was set, then server tracer instance
// can be get. If not, will return NoopTracer.
//tracer := opentracing.GlobalTracer()
//serverOpts = append(serverOpts,
// grpc.UnaryInterceptor(otgrpc.OpenTracingServerInterceptor(tracer)),
// grpc.StreamInterceptor(otgrpc.OpenTracingStreamServerInterceptor(tracer)),
// grpc.MaxRecvMsgSize(maxServerRecvMsgSize),
// grpc.MaxSendMsgSize(maxServerSendMsgSize),
//)
//var cfg *tls.Config
// todo: support opentracing interceptor

// todo(DMwangnima): think about a more elegant way to configure tls
//tlsConfig := config.GetRootConfig().TLSConfig
//if tlsConfig != nil {
// cfg, err = config.GetServerTlsConfig(&config.TLSConfig{
// CACertFile: tlsConfig.CACertFile,
// TLSCertFile: tlsConfig.TLSCertFile,
// TLSKeyFile: tlsConfig.TLSKeyFile,
// TLSServerName: tlsConfig.TLSServerName,
// })
// if err != nil {
// return
// }
// logger.Infof("Triple Server initialized the TLSConfig configuration")
//}
//srv.TLSConfig = cfg

// todo:// move tls config to handleService

hanOpts := getHanOpts(URL)
intfName := URL.Interface()
if info != nil {
// new triple idl mode
s.handleServiceWithInfo(intfName, invoker, info, hanOpts...)
s.saveServiceInfo(intfName, info)
} else {
s.compatHandleService(intfName, hanOpts...)
// old triple idl mode and non-idl mode
s.compatHandleService(intfName, URL.Group(), URL.Version(), hanOpts...)
}
reflection.Register(s)

Expand All @@ -125,11 +104,7 @@ func (s *Server) Start(invoker protocol.Invoker, info *server.ServiceInfo) {
// todo(DMwangnima): extract a common function
// RefreshService refreshes Triple Service
func (s *Server) RefreshService(invoker protocol.Invoker, info *server.ServiceInfo) {
var (
URL *common.URL
hanOpts []tri.HandlerOption
)
URL = invoker.GetURL()
URL := invoker.GetURL()
serialization := URL.GetParam(constant.SerializationKey, constant.ProtobufSerialization)
switch serialization {
case constant.ProtobufSerialization:
Expand All @@ -139,13 +114,13 @@ func (s *Server) RefreshService(invoker protocol.Invoker, info *server.ServiceIn
default:
panic(fmt.Sprintf("Unsupported serialization: %s", serialization))
}
hanOpts = getHanOpts(URL)
hanOpts := getHanOpts(URL)
intfName := URL.Interface()
if info != nil {
s.handleServiceWithInfo(intfName, invoker, info, hanOpts...)
s.saveServiceInfo(intfName, info)
} else {
s.compatHandleService(intfName, hanOpts...)
s.compatHandleService(intfName, URL.Group(), URL.Version(), hanOpts...)
}
}

Expand All @@ -164,65 +139,32 @@ func getHanOpts(url *common.URL) (hanOpts []tri.HandlerOption) {
hanOpts = append(hanOpts, tri.WithSendMaxBytes(maxServerSendMsgSize))

// todo:// open tracing
hanOpts = append(hanOpts, tri.WithInterceptors())

group := url.GetParam(constant.GroupKey, "")
version := url.GetParam(constant.VersionKey, "")
hanOpts = append(hanOpts, tri.WithGroup(group), tri.WithVersion(version))
return hanOpts
}

// getSyncMapLen gets sync map len
func getSyncMapLen(m *sync.Map) int {
length := 0

m.Range(func(_, _ interface{}) bool {
length++
return true
})
return length
}

// waitTripleExporter wait until len(providerServices) = len(ExporterMap)
func waitTripleExporter(providerServices map[string]*config.ServiceConfig) {
t := time.NewTicker(50 * time.Millisecond)
defer t.Stop()
pLen := len(providerServices)
ta := time.NewTimer(10 * time.Second)
defer ta.Stop()

for {
select {
case <-t.C:
mLen := getSyncMapLen(tripleProtocol.ExporterMap())
if pLen == mLen {
return
}
case <-ta.C:
panic("wait Triple exporter timeout when start GRPC_NEW server")
}
}
}

// *Important*, this function is responsible for being compatible with old triple-gen code
// *Important*, this function is responsible for being compatible with old triple-gen code and non-idl code
// compatHandleService registers handler based on ServiceConfig and provider service.
func (s *Server) compatHandleService(interfaceName string, opts ...tri.HandlerOption) {
func (s *Server) compatHandleService(interfaceName string, group, version string, opts ...tri.HandlerOption) {
providerServices := config.GetProviderConfig().Services
if len(providerServices) == 0 {
logger.Info("Provider service map is null")
logger.Info("Provider service map is null, please register ProviderServices")
return
}
//waitTripleExporter(providerServices)
for key, providerService := range providerServices {
if providerService.Interface != interfaceName {
if providerService.Interface != interfaceName || providerService.Group != group || providerService.Version != version {
continue
}
// todo(DMwangnima): judge protocol type
service := config.GetProviderService(key)
serviceKey := common.ServiceKey(providerService.Interface, providerService.Group, providerService.Version)
exporter, _ := tripleProtocol.ExporterMap().Load(serviceKey)
if exporter == nil {
logger.Warnf("no exporter found for serviceKey: %v", serviceKey)
continue
//panic(fmt.Sprintf("no exporter found for servicekey: %v", serviceKey))
}
invoker := exporter.(protocol.Exporter).GetInvoker()
if invoker == nil {
Expand Down Expand Up @@ -282,20 +224,26 @@ func (s *Server) handleServiceWithInfo(interfaceName string, invoker protocol.In
func(ctx context.Context, req *tri.Request) (*tri.Response, error) {
var args []interface{}
if argsRaw, ok := req.Msg.([]interface{}); ok {
// non-idl mode, req.Msg consists of many arguments
for _, argRaw := range argsRaw {
// refer to createServiceInfoWithReflection, in ReqInitFunc, argRaw is a pointer to real arg.
// so we have to invoke Elem to get the real arg.
args = append(args, reflect.ValueOf(argRaw).Elem().Interface())
}
} else {
// triple idl mode and old triple idl mode
args = append(args, req.Msg)
}
// todo: inject method.Meta to attachments
invo := invocation.NewRPCInvocation(m.Name, args, nil)
res := invoker.Invoke(ctx, invo)
// todo(DMwangnima): modify InfoInvoker to get a unified processing logic
// please refer to server/InfoInvoker.Invoke()
if triResp, ok := res.Result().(*tri.Response); ok {
return triResp, res.Error()
}
// please refer to proxy/proxy_factory/ProxyInvoker.Invoke
triResp := tri.NewResponse([]interface{}{res.Result()})
// todo(DMwangnima): if we do not use MethodInfo.MethodFunc, create Response manually
return triResp, res.Error()
},
opts...,
Expand Down Expand Up @@ -366,6 +314,7 @@ func (s *Server) saveServiceInfo(interfaceName string, info *server.ServiceInfo)
ret.Metadata = info
s.mu.Lock()
defer s.mu.Unlock()
// todo(DMwangnima): using interfaceName is not enough, we need to consider group and version
s.services[interfaceName] = ret
}

Expand All @@ -389,6 +338,9 @@ func (s *Server) GracefulStop() {
_ = s.triServer.GracefulStop(context.Background())
}

// createServiceInfoWithReflection is for non-idl scenario.
// It makes use of reflection to extract method parameters information and create ServiceInfo.
// As a result, Server could use this ServiceInfo to register.
func createServiceInfoWithReflection(svc common.RPCService) *server.ServiceInfo {
var info server.ServiceInfo
val := reflect.ValueOf(svc)
Expand All @@ -401,13 +353,19 @@ func createServiceInfoWithReflection(svc common.RPCService) *server.ServiceInfo
continue
}
paramsNum := methodType.Type.NumIn()
// ignore ctx
// the first param is receiver itself, the second param is ctx
// just ignore them
if paramsNum <= 2 {
logger.Fatalf("TRIPLE does not support %s method that only has ctx parameter or does not have any parameter", methodType.Name)
continue
}
paramsTypes := make([]reflect.Type, paramsNum-2)
for j := 2; j < paramsNum; j++ {
paramsTypes[j-2] = methodType.Type.In(j)
}
methodInfo := server.MethodInfo{
Name: methodType.Name,
// only support Unary invocation now
Type: constant.CallUnary,
ReqInitFunc: func() interface{} {
params := make([]interface{}, len(paramsTypes))
Expand Down
Loading

0 comments on commit 4f649c4

Please sign in to comment.