Skip to content

Commit

Permalink
feat:add internal service extension (apache#2577)
Browse files Browse the repository at this point in the history
  • Loading branch information
FinalT authored Jan 26, 2024
1 parent df969fe commit 2e54f26
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 29 deletions.
10 changes: 10 additions & 0 deletions common/constant/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package constant

import "math"

type DubboCtxKey string

const (
Expand Down Expand Up @@ -439,3 +441,11 @@ const (
DefaultMetaFileName = "dubbo.metadata."
DefaultEntrySize = 100
)

// priority
const (
DefaultPriority = 0
HighestPriority = math.MinInt32
// LowestPriority for metadata service
LowestPriority = math.MaxInt32
)
18 changes: 11 additions & 7 deletions protocol/triple/health/healthServer.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,14 +172,18 @@ func (srv *HealthTripleServer) Resume() {

func init() {
healthServer = NewServer()

internal.HealthSetServingStatusServing = SetServingStatusServing

server.SetProServices(&server.ServiceDefinition{
Handler: healthServer,
Info: &triple_health.Health_ServiceInfo,
Opts: []server.ServiceOption{server.WithNotRegister(),
server.WithInterface(constant.HealthCheckServiceInterface)},
server.SetProServices(&server.InternalService{
Name: "healthCheck",
Init: func(options *server.ServiceOptions) (*server.ServiceDefinition, bool) {
return &server.ServiceDefinition{
Handler: healthServer,
Info: &triple_health.Health_ServiceInfo,
Opts: []server.ServiceOption{server.WithNotRegister(),
server.WithInterface(constant.HealthCheckServiceInterface)},
}, true
},
Priority: constant.DefaultPriority,
})

// In order to adapt config.Load
Expand Down
16 changes: 11 additions & 5 deletions protocol/triple/reflection/serverreflection.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,11 +263,17 @@ var reflectionServer *ReflectionServer
func init() {
reflectionServer = NewServer()
internal.ReflectionRegister = Register
server.SetProServices(&server.ServiceDefinition{
Handler: reflectionServer,
Info: &rpb.ServerReflection_ServiceInfo,
Opts: []server.ServiceOption{server.WithNotRegister(),
server.WithInterface(constant.ReflectionServiceInterface)},
server.SetProServices(&server.InternalService{
Name: "reflection",
Init: func(options *server.ServiceOptions) (*server.ServiceDefinition, bool) {
return &server.ServiceDefinition{
Handler: reflectionServer,
Info: &rpb.ServerReflection_ServiceInfo,
Opts: []server.ServiceOption{server.WithNotRegister(),
server.WithInterface(constant.ReflectionServiceInterface)},
}, true
},
Priority: constant.DefaultPriority,
})
// In order to adapt config.Load
// Plans for future removal
Expand Down
117 changes: 100 additions & 17 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package server
import (
"context"
"fmt"
"sort"
"sync"
)

Expand All @@ -38,7 +39,8 @@ import (
)

// proServices are for internal services
var proServices = map[string]*ServiceDefinition{}
var proServices = make([]*InternalService, 0, 16)
var proLock sync.Mutex

type Server struct {
invoker protocol.Invoker
Expand Down Expand Up @@ -123,8 +125,17 @@ func newInfoInvoker(url *common.URL, info *ServiceInfo, svc common.RPCService) p

// Register assemble invoker chains like ProviderConfig.Load, init a service per call
func (s *Server) Register(handler interface{}, info *ServiceInfo, opts ...ServiceOption) error {
newSvcOpts, err := s.genSvcOpts(handler, opts...)
if err != nil {
return err
}
s.svcOptsMap.Store(newSvcOpts, info)
return nil
}

func (s *Server) genSvcOpts(handler interface{}, opts ...ServiceOption) (*ServiceOptions, error) {
if s.cfg == nil {
return errors.New("Server has not been initialized, please use NewServer() to create Server")
return nil, errors.New("Server has not been initialized, please use NewServer() to create Server")
}
var svcOpts []ServiceOption
appCfg := s.cfg.Application
Expand Down Expand Up @@ -153,16 +164,13 @@ func (s *Server) Register(handler interface{}, info *ServiceInfo, opts ...Servic
SetRegistries(regsCfg),
)
}

// options passed by users have higher priority
svcOpts = append(svcOpts, opts...)
if err := newSvcOpts.init(s, svcOpts...); err != nil {
return err
return nil, err
}
newSvcOpts.Implement(handler)
s.svcOptsMap.Store(newSvcOpts, info)

return nil
return newSvcOpts, nil
}

func (s *Server) exportServices() (err error) {
Expand All @@ -187,11 +195,88 @@ func (s *Server) Serve() error {
if err := s.exportServices(); err != nil {
return err
}
if err := s.exportInternalServices(); err != nil {
return err
}
metadata.ExportMetadataService()
registry_exposed.RegisterServiceInstance(s.cfg.Application.Name, s.cfg.Application.Tag, s.cfg.Application.MetadataType)
select {}
}

// In order to expose internal services
func (s *Server) exportInternalServices() error {
cfg := &ServiceOptions{}
cfg.Application = s.cfg.Application
cfg.Provider = s.cfg.Provider
cfg.Protocols = s.cfg.Protocols
cfg.Registries = s.cfg.Registries

services := make([]*InternalService, 0, len(proServices))

proLock.Lock()
defer proLock.Unlock()
for _, service := range proServices {
if service.Init == nil {
return errors.New("[internal service]internal service init func is empty, please set the init func correctly")
}
sd, ok := service.Init(cfg)
if !ok {
logger.Infof("[internal service]%s service will not expose", service.Name)
continue
}
newSvcOpts, err := s.genSvcOpts(sd.Handler, sd.Opts...)
if err != nil {
return err
}
service.svcOpts = newSvcOpts
service.info = sd.Info
services = append(services, service)
}

sort.Slice(services, func(i, j int) bool {
return services[i].Priority < services[j].Priority
})

for _, service := range services {
if service.BeforeExport != nil {
service.BeforeExport(service.svcOpts)
}
err := service.svcOpts.ExportWithInfo(service.info)
if service.AfterExport != nil {
service.AfterExport(service.svcOpts, err)
}
if err != nil {
logger.Errorf("[internal service]export %s service failed, err: %s", service.Name, err)
return err
}
}

return nil
}

// InternalService for dubbo internal services
type InternalService struct {
// This is required
// internal service name
Name string
svcOpts *ServiceOptions
info *ServiceInfo
// This is required
// This options is service configuration
// Return serviceDefinition and bool, where bool indicates whether it is exported
Init func(options *ServiceOptions) (*ServiceDefinition, bool)
// This options is InternalService.svcOpts itself
BeforeExport func(options *ServiceOptions)
// This options is InternalService.svcOpts itself
AfterExport func(options *ServiceOptions, err error)
// Priority of service exposure
// Lower numbers have the higher priority
// The default priority is 0
// The metadata service is exposed at the end
// If you have no requirements for the order of service exposure, you can use the default priority or not set
Priority int
}

type MethodInfo struct {
Name string
Type string
Expand All @@ -210,17 +295,15 @@ func NewServer(opts ...ServerOption) (*Server, error) {
srv := &Server{
cfg: newSrvOpts,
}

for _, service := range proServices {
err := srv.Register(service.Handler, service.Info, service.Opts...)
if err != nil {
return nil, err
}
}

return srv, nil
}

func SetProServices(sd *ServiceDefinition) {
proServices[sd.Info.InterfaceName] = sd
func SetProServices(sd *InternalService) {
if sd.Name == "" {
logger.Warnf("[internal service]internal name is empty, please set internal name")
return
}
proLock.Lock()
defer proLock.Unlock()
proServices = append(proServices, sd)
}

0 comments on commit 2e54f26

Please sign in to comment.