Skip to content

Commit

Permalink
Merge pull request #20 from andrewshan/main
Browse files Browse the repository at this point in the history
feat #19: change to interface to provider
  • Loading branch information
andrewshan authored Feb 14, 2022
2 parents 234b199 + f669d33 commit de34a6c
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 69 deletions.
3 changes: 3 additions & 0 deletions balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,9 @@ type resultReporter struct {

func (r *resultReporter) report(info balancer.DoneInfo) {
recvErr := info.Err
if !info.BytesReceived {
return
}
callResult := &api.ServiceCallResult{}
callResult.CalledInstance = r.instance
var code uint32
Expand Down
33 changes: 28 additions & 5 deletions examples/quickstart/provider/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,23 @@ package main

import (
"context"
"fmt"
"log"
"net"
"os"
"os/signal"

polaris "github.com/polarismesh/grpc-go-polaris"

"google.golang.org/grpc"

"github.com/polarismesh/grpc-go-polaris/examples/quickstart/pb"
)

const (
listenPort = 16010
)

// EchoService gRPC echo service struct
type EchoService struct{}

Expand All @@ -35,16 +45,29 @@ func (h *EchoService) Echo(ctx context.Context, req *pb.EchoRequest) (*pb.EchoRe
}

func main() {
srv := polaris.NewServer(polaris.WithServerApplication("EchoServerGRPC"))
pb.RegisterEchoServerServer(srv.GRPCServer(), &EchoService{})
// 监听端口
address := "0.0.0.0:0"
srv := grpc.NewServer()
pb.RegisterEchoServerServer(srv, &EchoService{})
address := fmt.Sprintf("0.0.0.0:%d", listenPort)
listen, err := net.Listen("tcp", address)
if err != nil {
log.Fatalf("Failed to addr %s: %v", address, err)
}
err = srv.Serve(listen)
// 执行北极星的注册命令
pSrv, err := polaris.Register(srv, listen, polaris.WithServerApplication("EchoServerGRPC"))
if nil != err {
log.Fatal(err)
}
go func() {
c := make(chan os.Signal)
signal.Notify(c)
s := <-c
log.Printf("receive quit signal: %v", s)
// 执行北极星的反注册命令
pSrv.Deregister()
srv.GracefulStop()
}()
err = srv.Serve(listen)
if nil != err {
log.Printf("listen err: %v", err)
}
}
117 changes: 59 additions & 58 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,9 @@ import (
"context"
"fmt"
"net"
"os"
"os/signal"
"strconv"
"strings"
"sync"
"syscall"
"time"

"github.com/golang/protobuf/proto"
Expand All @@ -37,8 +34,8 @@ import (

// Server encapsulated server with gRPC option
type Server struct {
gRPCServer *grpc.Server
serverOptions serverOptions
serverOptions serverOptions
registerContext *RegisterContext
}

type serverOptions struct {
Expand Down Expand Up @@ -130,22 +127,6 @@ func WithPort(port int) ServerOption {
})
}

// NewServer initializer for gRPC server
func NewServer(opts ...ServerOption) *Server {
srv := &Server{}
for _, opt := range opts {
opt.apply(&srv.serverOptions)
}
srv.serverOptions.setDefault()
srv.gRPCServer = grpc.NewServer(srv.serverOptions.gRPCServerOptions...)
return srv
}

// GRPCServer get the raw gRPC server
func (s *Server) GRPCServer() *grpc.Server {
return s.gRPCServer
}

func getLocalHost(serverAddr string) (string, error) {
conn, err := net.Dial("tcp", serverAddr)
if nil != err {
Expand Down Expand Up @@ -206,6 +187,16 @@ type RegisterContext struct {

const maxHeartbeatIntervalSec = 60

func checkAddress(address string) bool {
conn, err := net.DialTimeout("tcp", address, 100*time.Millisecond)
if nil != err {
grpclog.Infof("[Polaris]fail to dial %s: %v", address, err)
return false
}
_ = conn.Close()
return true
}

func (s *Server) startHeartbeat(ctx context.Context,
providerAPI api.ProviderAPI, registerRequests []*api.InstanceRegisterRequest) *sync.WaitGroup {
heartbeatIntervalSec := s.serverOptions.ttl
Expand All @@ -214,6 +205,7 @@ func (s *Server) startHeartbeat(ctx context.Context,
}
wg := &sync.WaitGroup{}
wg.Add(len(registerRequests))
dialResults := make(map[string]bool)
for i, request := range registerRequests {
go func(idx int, registerRequest *api.InstanceRegisterRequest) {
ticker := time.NewTicker(time.Duration(heartbeatIntervalSec) * time.Second)
Expand All @@ -225,15 +217,23 @@ func (s *Server) startHeartbeat(ctx context.Context,
wg.Done()
return
case <-ticker.C:
hbRequest := &api.InstanceHeartbeatRequest{}
hbRequest.Namespace = registerRequest.Namespace
hbRequest.Service = registerRequest.Service
hbRequest.Host = registerRequest.Host
hbRequest.Port = registerRequest.Port
err := providerAPI.Heartbeat(hbRequest)
if nil != err {
grpclog.Errorf("[Polaris]fail to heartbeat %s:%d to service %s(%s): %v",
hbRequest.Host, hbRequest.Port, hbRequest.Service, hbRequest.Namespace, err)
address := fmt.Sprintf("%s:%d", registerRequest.Host, registerRequest.Port)
result, ok := dialResults[address]
if !ok {
result = checkAddress(address)
dialResults[address] = result
}
if result {
hbRequest := &api.InstanceHeartbeatRequest{}
hbRequest.Namespace = registerRequest.Namespace
hbRequest.Service = registerRequest.Service
hbRequest.Host = registerRequest.Host
hbRequest.Port = registerRequest.Port
err := providerAPI.Heartbeat(hbRequest)
if nil != err {
grpclog.Errorf("[Polaris]fail to heartbeat %s:%d to service %s(%s): %v",
hbRequest.Host, hbRequest.Port, hbRequest.Service, hbRequest.Namespace, err)
}
}
}
}
Expand All @@ -244,69 +244,70 @@ func (s *Server) startHeartbeat(ctx context.Context,
return wg
}

// Serve listen and accept connections
func (s *Server) Serve(lis net.Listener) error {
svcInfos := s.gRPCServer.GetServiceInfo()
// Register register server as polaris instances
func Register(gSrv *grpc.Server, lis net.Listener, opts ...ServerOption) (*Server, error) {
srv := &Server{}
for _, opt := range opts {
opt.apply(&srv.serverOptions)
}
srv.serverOptions.setDefault()
svcInfos := gSrv.GetServiceInfo()
ctx, cancel := context.WithCancel(context.Background())
registerContext := &RegisterContext{
cancel: cancel,
}
if len(svcInfos) > 0 {
polarisCtx, err := PolarisContext()
if nil != err {
return err
return nil, err
}
if len(s.serverOptions.host) == 0 {
if len(srv.serverOptions.host) == 0 {
host, err := getLocalHost(polarisCtx.GetConfig().GetGlobal().GetServerConnector().GetAddresses()[0])
if nil != err {
return fmt.Errorf("error occur while fetching localhost: %v", err)
return nil, fmt.Errorf("error occur while fetching localhost: %v", err)
}
s.serverOptions.host = host
srv.serverOptions.host = host
}
if s.serverOptions.port == 0 {
if srv.serverOptions.port == 0 {
port, err := parsePort(lis.Addr().String())
if nil != err {
return fmt.Errorf("error occur while parsing port from listener: %v", err)
return nil, fmt.Errorf("error occur while parsing port from listener: %v", err)
}
s.serverOptions.port = port
srv.serverOptions.port = port
}

registerContext.registerRequests = make([]*api.InstanceRegisterRequest, 0, len(svcInfos))
registerContext.providerAPI = api.NewProviderAPIByContext(polarisCtx)
for name := range svcInfos {
var svcName = name
if len(s.serverOptions.application) > 0 {
svcName = s.serverOptions.application
if len(srv.serverOptions.application) > 0 {
svcName = srv.serverOptions.application
}
registerRequest := &api.InstanceRegisterRequest{}
registerRequest.Namespace = s.serverOptions.namespace
registerRequest.Namespace = srv.serverOptions.namespace
registerRequest.Service = svcName
registerRequest.Host = s.serverOptions.host
registerRequest.Port = s.serverOptions.port
registerRequest.SetTTL(s.serverOptions.ttl)
registerRequest.Host = srv.serverOptions.host
registerRequest.Port = srv.serverOptions.port
registerRequest.SetTTL(srv.serverOptions.ttl)
registerRequest.Protocol = proto.String(lis.Addr().Network())
registerRequest.Metadata = s.serverOptions.metadata
registerRequest.Metadata = srv.serverOptions.metadata
registerContext.registerRequests = append(registerContext.registerRequests, registerRequest)
resp, err := registerContext.providerAPI.Register(registerRequest)
if nil != err {
deregisterServices(registerContext)
return fmt.Errorf("fail to register service %s: %v", name, err)
return nil, fmt.Errorf("fail to register service %s: %v", name, err)
}
grpclog.Infof("[Polaris]success to register %s:%d to service %s(%s), id %s",
registerRequest.Host, registerRequest.Port, name, registerRequest.Namespace, resp.InstanceID)
}
registerContext.healthCheckWait =
s.startHeartbeat(ctx, registerContext.providerAPI, registerContext.registerRequests)
srv.startHeartbeat(ctx, registerContext.providerAPI, registerContext.registerRequests)
}
go s.scheduleDeregister(registerContext)
return s.gRPCServer.Serve(lis)
srv.registerContext = registerContext
return srv, nil
}

func (s *Server) scheduleDeregister(registerContext *RegisterContext) {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
sig := <-c
grpclog.Infof("[Polaris]receive quit signal %v", sig)
deregisterServices(registerContext)
s.gRPCServer.GracefulStop()
// Deregister deregister services from polaris
func (s *Server) Deregister() {
deregisterServices(s.registerContext)
}
11 changes: 8 additions & 3 deletions test/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,20 @@ func (s *clientTestingSuite) TearDownSuite(c *check.C) {
}

func (s *clientTestingSuite) TestClientCall(c *check.C) {
srv := polaris.NewServer(
polaris.WithServerApplication(serverSvc), polaris.WithServerNamespace(serverNamespace), polaris.WithTTL(2))
hello.RegisterHelloServer(srv.GRPCServer(), &helloServer{})
srv := grpc.NewServer()
hello.RegisterHelloServer(srv, &helloServer{})
listen, err := net.Listen("tcp", "0.0.0.0:0")
if err != nil {
log.Fatalf("Failed to listen: %v", err)
}
log.Printf("success to listen on %s\n", listen.Addr())
defer listen.Close()
pSrv, err := polaris.Register(srv, listen,
polaris.WithServerApplication(serverSvc), polaris.WithServerNamespace(serverNamespace), polaris.WithTTL(2))
if nil != err {
log.Fatal(err)
}
defer pSrv.Deregister()
go func() {
err = srv.Serve(listen)
if nil != err {
Expand Down
13 changes: 10 additions & 3 deletions test/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"net"
"time"

"google.golang.org/grpc"

polaris "github.com/polarismesh/grpc-go-polaris"
"github.com/polarismesh/grpc-go-polaris/test/hello"
"github.com/polarismesh/grpc-go-polaris/test/mock"
Expand Down Expand Up @@ -67,16 +69,21 @@ func (t *helloServer) SayHello(ctx context.Context, request *hello.HelloRequest)
}

func (s *serverTestingSuite) TestRegister(c *check.C) {
srv := polaris.NewServer(
polaris.WithServerApplication(serverSvc), polaris.WithServerNamespace(serverNamespace), polaris.WithTTL(2))
hello.RegisterHelloServer(srv.GRPCServer(), &helloServer{})
srv := grpc.NewServer()
hello.RegisterHelloServer(srv, &helloServer{})
// 监听端口
address := "127.0.0.1:8988"
listen, err := net.Listen("tcp", address)
if err != nil {
log.Fatalf("Failed to addr %s: %v", address, err)
}
defer listen.Close()
pSrv, err := polaris.Register(srv, listen,
polaris.WithServerApplication(serverSvc), polaris.WithServerNamespace(serverNamespace), polaris.WithTTL(2))
if nil != err {
log.Fatal(err)
}
defer pSrv.Deregister()
go func() {
err = srv.Serve(listen)
if nil != err {
Expand Down

0 comments on commit de34a6c

Please sign in to comment.