Skip to content

Commit

Permalink
feat: 增加优雅退出机制
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewshan committed Dec 14, 2021
1 parent 09c84d4 commit 2404b30
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 12 deletions.
2 changes: 1 addition & 1 deletion examples/quickstart/consumer/polaris.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
global:
serverConnector:
addresses:
- 9.134.15.118:8091
- 127.0.0.1:8091
2 changes: 1 addition & 1 deletion examples/quickstart/provider/polaris.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
global:
serverConnector:
addresses:
- 9.134.15.118:8091
- 127.0.0.1:8091
39 changes: 33 additions & 6 deletions resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ import (
"fmt"
"sync"

"google.golang.org/grpc/grpclog"

"github.com/polarismesh/polaris-go/pkg/model"

"github.com/polarismesh/polaris-go/api"
"google.golang.org/grpc/attributes"
"google.golang.org/grpc/resolver"
Expand Down Expand Up @@ -106,10 +110,10 @@ func getNamespace(options *dialOptions) string {

const keyDialOptions = "options"

func (pr *polarisNamingResolver) lookup() (*resolver.State, error) {
func (pr *polarisNamingResolver) lookup() (*resolver.State, api.ConsumerAPI, error) {
sdkCtx, err := PolarisContext()
if nil != err {
return nil, err
return nil, nil, err
}
consumerAPI := api.NewConsumerAPIByContext(sdkCtx)
instancesRequest := &api.GetInstancesRequest{}
Expand All @@ -125,7 +129,7 @@ func (pr *polarisNamingResolver) lookup() (*resolver.State, error) {
}
resp, err := consumerAPI.GetInstances(instancesRequest)
if nil != err {
return nil, err
return nil, consumerAPI, err
}
state := &resolver.State{}
for _, instance := range resp.Instances {
Expand All @@ -134,23 +138,46 @@ func (pr *polarisNamingResolver) lookup() (*resolver.State, error) {
Attributes: attributes.New(keyDialOptions, pr.options),
})
}
return state, nil
return state, consumerAPI, nil
}

func (pr *polarisNamingResolver) doWatch(
consumerAPI api.ConsumerAPI) (model.ServiceKey, <-chan model.SubScribeEvent, error) {
watchRequest := &api.WatchServiceRequest{}
watchRequest.Key = model.ServiceKey{
Namespace: getNamespace(pr.options),
Service: pr.target.Authority,
}
resp, err := consumerAPI.WatchService(watchRequest)
if nil != err {
return watchRequest.Key, nil, err
}
return watchRequest.Key, resp.EventChannel, nil
}

func (pr *polarisNamingResolver) watcher() {
defer pr.wg.Done()
var consumerAPI api.ConsumerAPI
var eventChan <-chan model.SubScribeEvent
for {
select {
case <-pr.ctx.Done():
return
case <-pr.rn:
case <-eventChan:
}

state, err := pr.lookup()
var state *resolver.State
var err error
state, consumerAPI, err = pr.lookup()
if err != nil {
pr.cc.ReportError(err)
} else {
pr.cc.UpdateState(*state)
var svcKey model.ServiceKey
svcKey, eventChan, err = pr.doWatch(consumerAPI)
if nil != err {
grpclog.Errorf("fail to do watch for service %s: %v", svcKey, err)
}
}
}
}
Expand Down
22 changes: 18 additions & 4 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@ import (
"context"
"fmt"
"net"
"os"
"os/signal"
"strconv"
"strings"
"sync"
"syscall"
"time"

"github.com/golang/protobuf/proto"
Expand Down Expand Up @@ -166,6 +169,7 @@ func parsePort(addr string) (int, error) {
}

func deregisterServices(registerContext *RegisterContext) {
fmt.Printf("invoke deregisterServices\n")
registerContext.cancel()
if nil != registerContext.healthCheckWait {
grpclog.Infof("[Polaris]start to wait heartbeat finish")
Expand Down Expand Up @@ -217,7 +221,7 @@ func (s *Server) startHeartbeat(ctx context.Context,
for {
select {
case <-ctx.Done():
grpclog.Infof("[Polaris]heartbeat ticker %d has stopped")
grpclog.Infof("[Polaris]heartbeat ticker has stopped")
wg.Done()
return
case <-ticker.C:
Expand All @@ -228,8 +232,8 @@ func (s *Server) startHeartbeat(ctx context.Context,
hbRequest.Port = registerRequest.Port
err := providerAPI.Heartbeat(hbRequest)
if nil != err {
grpclog.Errorf("[Polaris]fail to heartbeat %s:%d to service %s(%s)",
hbRequest.Host, hbRequest.Port, hbRequest.Service, hbRequest.Namespace)
grpclog.Errorf("[Polaris]fail to heartbeat %s:%d to service %s(%s): %v",
hbRequest.Host, hbRequest.Port, hbRequest.Service, hbRequest.Namespace, err)
}
}
}
Expand All @@ -247,7 +251,6 @@ func (s *Server) Serve(lis net.Listener) error {
registerContext := &RegisterContext{
cancel: cancel,
}
defer deregisterServices(registerContext)
if len(svcInfos) > 0 {
polarisCtx, err := PolarisContext()
if nil != err {
Expand Down Expand Up @@ -286,6 +289,7 @@ func (s *Server) Serve(lis net.Listener) error {
registerContext.registerRequests = append(registerContext.registerRequests, registerRequest)
resp, err := registerContext.providerAPI.Register(registerRequest)
if nil != err {
deregisterServices(registerContext)
return fmt.Errorf("fail to register service %s: %v", name, err)
}
grpclog.Infof("[Polaris]success to register %s:%d to service %s(%s), id %s",
Expand All @@ -294,5 +298,15 @@ func (s *Server) Serve(lis net.Listener) error {
registerContext.healthCheckWait =
s.startHeartbeat(ctx, registerContext.providerAPI, registerContext.registerRequests)
}
go s.scheduleDeregister(registerContext)
return s.gRPCServer.Serve(lis)
}

func (s *Server) scheduleDeregister(registerContext *RegisterContext) {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
sig := <-c
grpclog.Infof("[Polaris]receive quit signal %v", sig)
deregisterServices(registerContext)
s.gRPCServer.GracefulStop()
}

0 comments on commit 2404b30

Please sign in to comment.