From b21a6a526c3ba044278d4a7408bbd37a4f7bef7e Mon Sep 17 00:00:00 2001 From: andrewshan Date: Sun, 26 Dec 2021 17:02:46 +0800 Subject: [PATCH 1/2] feat #19: change to interface to provider --- balancer.go | 3 + examples/quickstart/provider/main.go | 33 ++++++-- server.go | 117 ++++++++++++++------------- 3 files changed, 90 insertions(+), 63 deletions(-) diff --git a/balancer.go b/balancer.go index 09f9937..e240d32 100644 --- a/balancer.go +++ b/balancer.go @@ -383,6 +383,9 @@ type resultReporter struct { func (r *resultReporter) report(info balancer.DoneInfo) { recvErr := info.Err + if !info.BytesReceived { + return + } callResult := &api.ServiceCallResult{} callResult.CalledInstance = r.instance var code uint32 diff --git a/examples/quickstart/provider/main.go b/examples/quickstart/provider/main.go index 1edf312..dcd0d74 100644 --- a/examples/quickstart/provider/main.go +++ b/examples/quickstart/provider/main.go @@ -19,13 +19,23 @@ package main import ( "context" + "fmt" "log" "net" + "os" + "os/signal" polaris "github.com/polarismesh/grpc-go-polaris" + + "google.golang.org/grpc" + "github.com/polarismesh/grpc-go-polaris/examples/quickstart/pb" ) +const ( + listenPort = 16010 +) + // EchoService gRPC echo service struct type EchoService struct{} @@ -35,16 +45,29 @@ func (h *EchoService) Echo(ctx context.Context, req *pb.EchoRequest) (*pb.EchoRe } func main() { - srv := polaris.NewServer(polaris.WithServerApplication("EchoServerGRPC")) - pb.RegisterEchoServerServer(srv.GRPCServer(), &EchoService{}) - // 监听端口 - address := "0.0.0.0:0" + srv := grpc.NewServer() + pb.RegisterEchoServerServer(srv, &EchoService{}) + address := fmt.Sprintf("0.0.0.0:%d", listenPort) listen, err := net.Listen("tcp", address) if err != nil { log.Fatalf("Failed to addr %s: %v", address, err) } - err = srv.Serve(listen) + // 执行北极星的注册命令 + pSrv, err := polaris.Register(srv, listen, polaris.WithServerApplication("EchoServerGRPC")) if nil != err { log.Fatal(err) } + go func() { + c := make(chan os.Signal) + signal.Notify(c) + s := <-c + log.Printf("receive quit signal: %v", s) + // 执行北极星的反注册命令 + pSrv.Deregister() + srv.GracefulStop() + }() + err = srv.Serve(listen) + if nil != err { + log.Printf("listen err: %v", err) + } } diff --git a/server.go b/server.go index 1edd891..0f73735 100644 --- a/server.go +++ b/server.go @@ -21,12 +21,9 @@ import ( "context" "fmt" "net" - "os" - "os/signal" "strconv" "strings" "sync" - "syscall" "time" "github.com/golang/protobuf/proto" @@ -37,8 +34,8 @@ import ( // Server encapsulated server with gRPC option type Server struct { - gRPCServer *grpc.Server - serverOptions serverOptions + serverOptions serverOptions + registerContext *RegisterContext } type serverOptions struct { @@ -130,22 +127,6 @@ func WithPort(port int) ServerOption { }) } -// NewServer initializer for gRPC server -func NewServer(opts ...ServerOption) *Server { - srv := &Server{} - for _, opt := range opts { - opt.apply(&srv.serverOptions) - } - srv.serverOptions.setDefault() - srv.gRPCServer = grpc.NewServer(srv.serverOptions.gRPCServerOptions...) - return srv -} - -// GRPCServer get the raw gRPC server -func (s *Server) GRPCServer() *grpc.Server { - return s.gRPCServer -} - func getLocalHost(serverAddr string) (string, error) { conn, err := net.Dial("tcp", serverAddr) if nil != err { @@ -206,6 +187,16 @@ type RegisterContext struct { const maxHeartbeatIntervalSec = 60 +func checkAddress(address string) bool { + conn, err := net.DialTimeout("tcp", address, 100*time.Millisecond) + if nil != err { + grpclog.Infof("[Polaris]fail to dial %s: %v", address, err) + return false + } + _ = conn.Close() + return true +} + func (s *Server) startHeartbeat(ctx context.Context, providerAPI api.ProviderAPI, registerRequests []*api.InstanceRegisterRequest) *sync.WaitGroup { heartbeatIntervalSec := s.serverOptions.ttl @@ -214,6 +205,7 @@ func (s *Server) startHeartbeat(ctx context.Context, } wg := &sync.WaitGroup{} wg.Add(len(registerRequests)) + dialResults := make(map[string]bool) for i, request := range registerRequests { go func(idx int, registerRequest *api.InstanceRegisterRequest) { ticker := time.NewTicker(time.Duration(heartbeatIntervalSec) * time.Second) @@ -225,15 +217,23 @@ func (s *Server) startHeartbeat(ctx context.Context, wg.Done() return case <-ticker.C: - hbRequest := &api.InstanceHeartbeatRequest{} - hbRequest.Namespace = registerRequest.Namespace - hbRequest.Service = registerRequest.Service - hbRequest.Host = registerRequest.Host - hbRequest.Port = registerRequest.Port - err := providerAPI.Heartbeat(hbRequest) - if nil != err { - grpclog.Errorf("[Polaris]fail to heartbeat %s:%d to service %s(%s): %v", - hbRequest.Host, hbRequest.Port, hbRequest.Service, hbRequest.Namespace, err) + address := fmt.Sprintf("%s:%d", registerRequest.Host, registerRequest.Port) + result, ok := dialResults[address] + if !ok { + result = checkAddress(address) + dialResults[address] = result + } + if result { + hbRequest := &api.InstanceHeartbeatRequest{} + hbRequest.Namespace = registerRequest.Namespace + hbRequest.Service = registerRequest.Service + hbRequest.Host = registerRequest.Host + hbRequest.Port = registerRequest.Port + err := providerAPI.Heartbeat(hbRequest) + if nil != err { + grpclog.Errorf("[Polaris]fail to heartbeat %s:%d to service %s(%s): %v", + hbRequest.Host, hbRequest.Port, hbRequest.Service, hbRequest.Namespace, err) + } } } } @@ -244,9 +244,14 @@ func (s *Server) startHeartbeat(ctx context.Context, return wg } -// Serve listen and accept connections -func (s *Server) Serve(lis net.Listener) error { - svcInfos := s.gRPCServer.GetServiceInfo() +// Register register server as polaris instances +func Register(gSrv *grpc.Server, lis net.Listener, opts ...ServerOption) (*Server, error) { + srv := &Server{} + for _, opt := range opts { + opt.apply(&srv.serverOptions) + } + srv.serverOptions.setDefault() + svcInfos := gSrv.GetServiceInfo() ctx, cancel := context.WithCancel(context.Background()) registerContext := &RegisterContext{ cancel: cancel, @@ -254,59 +259,55 @@ func (s *Server) Serve(lis net.Listener) error { if len(svcInfos) > 0 { polarisCtx, err := PolarisContext() if nil != err { - return err + return nil, err } - if len(s.serverOptions.host) == 0 { + if len(srv.serverOptions.host) == 0 { host, err := getLocalHost(polarisCtx.GetConfig().GetGlobal().GetServerConnector().GetAddresses()[0]) if nil != err { - return fmt.Errorf("error occur while fetching localhost: %v", err) + return nil, fmt.Errorf("error occur while fetching localhost: %v", err) } - s.serverOptions.host = host + srv.serverOptions.host = host } - if s.serverOptions.port == 0 { + if srv.serverOptions.port == 0 { port, err := parsePort(lis.Addr().String()) if nil != err { - return fmt.Errorf("error occur while parsing port from listener: %v", err) + return nil, fmt.Errorf("error occur while parsing port from listener: %v", err) } - s.serverOptions.port = port + srv.serverOptions.port = port } registerContext.registerRequests = make([]*api.InstanceRegisterRequest, 0, len(svcInfos)) registerContext.providerAPI = api.NewProviderAPIByContext(polarisCtx) for name := range svcInfos { var svcName = name - if len(s.serverOptions.application) > 0 { - svcName = s.serverOptions.application + if len(srv.serverOptions.application) > 0 { + svcName = srv.serverOptions.application } registerRequest := &api.InstanceRegisterRequest{} - registerRequest.Namespace = s.serverOptions.namespace + registerRequest.Namespace = srv.serverOptions.namespace registerRequest.Service = svcName - registerRequest.Host = s.serverOptions.host - registerRequest.Port = s.serverOptions.port - registerRequest.SetTTL(s.serverOptions.ttl) + registerRequest.Host = srv.serverOptions.host + registerRequest.Port = srv.serverOptions.port + registerRequest.SetTTL(srv.serverOptions.ttl) registerRequest.Protocol = proto.String(lis.Addr().Network()) - registerRequest.Metadata = s.serverOptions.metadata + registerRequest.Metadata = srv.serverOptions.metadata registerContext.registerRequests = append(registerContext.registerRequests, registerRequest) resp, err := registerContext.providerAPI.Register(registerRequest) if nil != err { deregisterServices(registerContext) - return fmt.Errorf("fail to register service %s: %v", name, err) + return nil, fmt.Errorf("fail to register service %s: %v", name, err) } grpclog.Infof("[Polaris]success to register %s:%d to service %s(%s), id %s", registerRequest.Host, registerRequest.Port, name, registerRequest.Namespace, resp.InstanceID) } registerContext.healthCheckWait = - s.startHeartbeat(ctx, registerContext.providerAPI, registerContext.registerRequests) + srv.startHeartbeat(ctx, registerContext.providerAPI, registerContext.registerRequests) } - go s.scheduleDeregister(registerContext) - return s.gRPCServer.Serve(lis) + srv.registerContext = registerContext + return srv, nil } -func (s *Server) scheduleDeregister(registerContext *RegisterContext) { - c := make(chan os.Signal, 1) - signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) - sig := <-c - grpclog.Infof("[Polaris]receive quit signal %v", sig) - deregisterServices(registerContext) - s.gRPCServer.GracefulStop() +// Deregister deregister services from polaris +func (s *Server) Deregister() { + deregisterServices(s.registerContext) } From f669d334d4e53d7a2174b6e6e6e74aa666b869a9 Mon Sep 17 00:00:00 2001 From: andrewshan Date: Sun, 26 Dec 2021 17:10:46 +0800 Subject: [PATCH 2/2] feat: fix compile errors --- test/client_test.go | 11 ++++++++--- test/server_test.go | 13 ++++++++++--- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/test/client_test.go b/test/client_test.go index 9629f6e..00eb11c 100644 --- a/test/client_test.go +++ b/test/client_test.go @@ -54,15 +54,20 @@ func (s *clientTestingSuite) TearDownSuite(c *check.C) { } func (s *clientTestingSuite) TestClientCall(c *check.C) { - srv := polaris.NewServer( - polaris.WithServerApplication(serverSvc), polaris.WithServerNamespace(serverNamespace), polaris.WithTTL(2)) - hello.RegisterHelloServer(srv.GRPCServer(), &helloServer{}) + srv := grpc.NewServer() + hello.RegisterHelloServer(srv, &helloServer{}) listen, err := net.Listen("tcp", "0.0.0.0:0") if err != nil { log.Fatalf("Failed to listen: %v", err) } log.Printf("success to listen on %s\n", listen.Addr()) defer listen.Close() + pSrv, err := polaris.Register(srv, listen, + polaris.WithServerApplication(serverSvc), polaris.WithServerNamespace(serverNamespace), polaris.WithTTL(2)) + if nil != err { + log.Fatal(err) + } + defer pSrv.Deregister() go func() { err = srv.Serve(listen) if nil != err { diff --git a/test/server_test.go b/test/server_test.go index d7cfdf8..c1242e5 100644 --- a/test/server_test.go +++ b/test/server_test.go @@ -24,6 +24,8 @@ import ( "net" "time" + "google.golang.org/grpc" + polaris "github.com/polarismesh/grpc-go-polaris" "github.com/polarismesh/grpc-go-polaris/test/hello" "github.com/polarismesh/grpc-go-polaris/test/mock" @@ -67,9 +69,8 @@ func (t *helloServer) SayHello(ctx context.Context, request *hello.HelloRequest) } func (s *serverTestingSuite) TestRegister(c *check.C) { - srv := polaris.NewServer( - polaris.WithServerApplication(serverSvc), polaris.WithServerNamespace(serverNamespace), polaris.WithTTL(2)) - hello.RegisterHelloServer(srv.GRPCServer(), &helloServer{}) + srv := grpc.NewServer() + hello.RegisterHelloServer(srv, &helloServer{}) // 监听端口 address := "127.0.0.1:8988" listen, err := net.Listen("tcp", address) @@ -77,6 +78,12 @@ func (s *serverTestingSuite) TestRegister(c *check.C) { log.Fatalf("Failed to addr %s: %v", address, err) } defer listen.Close() + pSrv, err := polaris.Register(srv, listen, + polaris.WithServerApplication(serverSvc), polaris.WithServerNamespace(serverNamespace), polaris.WithTTL(2)) + if nil != err { + log.Fatal(err) + } + defer pSrv.Deregister() go func() { err = srv.Serve(listen) if nil != err {