Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix:修复注册port不支持自定义 #66

Merged
merged 1 commit into from
Jun 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 11 additions & 20 deletions balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
Expand Down Expand Up @@ -72,11 +71,11 @@ type (

// Build creates polaris balancer.Balancer implement
func (bb *balancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
grpclog.Infof("[Polaris][Balancer] start to build polaris balancer")
GetLogger().Info("[Polaris][Balancer] start to build polaris balancer")
target := opts.Target
host, _, err := parseHost(target.URL.Host)
if err != nil {
grpclog.Errorln("[Polaris][Balancer] failed to create balancer: " + err.Error())
GetLogger().Error("[Polaris][Balancer] failed to create balancer: " + err.Error())
return nil
}
return &polarisNamingBalancer{
Expand Down Expand Up @@ -174,7 +173,7 @@ func (p *polarisNamingBalancer) createSubConnection(addr resolver.Address) {
// is a new address (not existing in b.subConns).
sc, err := p.cc.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{HealthCheckEnabled: true})
if err != nil {
grpclog.Warningf("[Polaris][Balancer] failed to create new SubConn: %v", err)
GetLogger().Warn("[Polaris][Balancer] failed to create new SubConn: %v", err)
return
}
p.subConns[key] = sc
Expand All @@ -198,9 +197,7 @@ func (p *polarisNamingBalancer) UpdateClientConnState(state balancer.ClientConnS
if state.BalancerConfig != nil {
p.lbCfg = state.BalancerConfig.(*LBConfig)
}
if grpclog.V(2) {
grpclog.Infoln("[Polaris][Balancer] got new ClientConn state: ", state)
}
GetLogger().Debug("[Polaris][Balancer] got new ClientConn state: ", state)
if len(state.ResolverState.Addresses) == 0 {
p.ResolverError(errors.New("produced zero addresses"))
return balancer.ErrBadResolverState
Expand Down Expand Up @@ -257,17 +254,13 @@ func (p *polarisNamingBalancer) ResolverError(err error) {
// UpdateSubConnState is called by gRPC when the state of a SubConn changes.
func (p *polarisNamingBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
s := state.ConnectivityState
if grpclog.V(2) {
grpclog.Infof("[Polaris][Balancer] handle SubConn state change: %p, %v", sc, s)
}
GetLogger().Info("[Polaris][Balancer] handle SubConn state change: %p, %v", sc, s)
oldS, quit := func() (connectivity.State, bool) {
p.rwMutex.Lock()
defer p.rwMutex.Unlock()
oldS, ok := p.scStates[sc]
if !ok {
if grpclog.V(2) {
grpclog.Infof("[Polaris][Balancer] got state changes for an unknown SubConn: %p, %v", sc, s)
}
GetLogger().Info("[Polaris][Balancer] got state changes for an unknown SubConn: %p, %v", sc, s)
return connectivity.TransientFailure, true
}
if oldS == connectivity.TransientFailure && s == connectivity.Connecting {
Expand Down Expand Up @@ -420,13 +413,11 @@ func (pnp *polarisNamingPicker) Pick(info balancer.PickInfo) (balancer.PickResul
request.SourceService = *sourceService
} else {
if err := pnp.addTrafficLabels(info, request); err != nil {
grpclog.Errorf("[Polaris][Balancer] fetch traffic labels fail : %+v", err)
GetLogger().Error("[Polaris][Balancer] fetch traffic labels fail : %+v", err)
}
}

if grpclog.V(2) {
grpclog.Infof("[Polaris][Balancer] get one instance request : %+v", request)
}
GetLogger().Debug("[Polaris][Balancer] get one instance request : %+v", request)
var err error
resp, err = pnp.balancer.routerAPI.ProcessRouters(request)
if err != nil {
Expand Down Expand Up @@ -500,13 +491,13 @@ func (pnp *polarisNamingPicker) addTrafficLabels(info balancer.PickInfo, insReq
engine := pnp.balancer.consumerAPI.SDKContext().GetEngine()
resp, err := engine.SyncGetServiceRule(model.EventRouting, req)
if err != nil {
grpclog.Errorf("[Polaris][Balancer] ns:%s svc:%s get route rule fail : %+v",
GetLogger().Error("[Polaris][Balancer] ns:%s svc:%s get route rule fail : %+v",
req.GetNamespace(), req.GetService(), err)
return err
}

if resp == nil || resp.GetValue() == nil {
grpclog.Errorf("[Polaris][Balancer] ns:%s svc:%s get route rule empty", req.GetNamespace(), req.GetService())
GetLogger().Error("[Polaris][Balancer] ns:%s svc:%s get route rule empty", req.GetNamespace(), req.GetService())
return ErrorPolarisServiceRouteRuleEmpty
}

Expand Down Expand Up @@ -573,6 +564,6 @@ func (r *resultReporter) report(info balancer.DoneInfo) {
callResult.SetDelay(time.Since(r.startTime))
callResult.SetRetCode(int32(code))
if err := r.consumerAPI.UpdateServiceCallResult(callResult); err != nil {
grpclog.Errorf("[Polaris][Balancer] report grpc call info fail : %+v", err)
GetLogger().Error("[Polaris][Balancer] report grpc call info fail : %+v", err)
}
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/golang/protobuf v1.5.4
github.com/google/uuid v1.6.0
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/natefinch/lumberjack v2.0.0+incompatible
github.com/polarismesh/polaris-go v1.6.0-beta.5
github.com/polarismesh/specification v1.5.1
github.com/prometheus/client_golang v1.19.1 // indirect
Expand Down
68 changes: 51 additions & 17 deletions logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,54 +18,88 @@
package grpcpolaris

import (
"io"
"log"
"sync/atomic"

"github.com/natefinch/lumberjack"
)

type LogLevel int

Check failure on line 27 in logger.go

View workflow job for this annotation

GitHub Actions / build (1.16)

exported type LogLevel should have comment or be unexported

Check failure on line 27 in logger.go

View workflow job for this annotation

GitHub Actions / build (1.17)

exported type LogLevel should have comment or be unexported

Check failure on line 27 in logger.go

View workflow job for this annotation

GitHub Actions / build (1.18)

exported type LogLevel should have comment or be unexported

Check failure on line 27 in logger.go

View workflow job for this annotation

GitHub Actions / build (1.15)

exported type LogLevel should have comment or be unexported

const (
_ LogLevel = iota
LogDebug

Check failure on line 31 in logger.go

View workflow job for this annotation

GitHub Actions / build (1.16)

exported const LogDebug should have comment (or a comment on this block) or be unexported

Check failure on line 31 in logger.go

View workflow job for this annotation

GitHub Actions / build (1.17)

exported const LogDebug should have comment (or a comment on this block) or be unexported

Check failure on line 31 in logger.go

View workflow job for this annotation

GitHub Actions / build (1.18)

exported const LogDebug should have comment (or a comment on this block) or be unexported

Check failure on line 31 in logger.go

View workflow job for this annotation

GitHub Actions / build (1.15)

exported const LogDebug should have comment (or a comment on this block) or be unexported
LogInfo
LogWarn
LogError
)

var _log Logger = newDefaultLogger()

func SetLogger(logger Logger) {

Check failure on line 39 in logger.go

View workflow job for this annotation

GitHub Actions / build (1.16)

exported function SetLogger should have comment or be unexported

Check failure on line 39 in logger.go

View workflow job for this annotation

GitHub Actions / build (1.17)

exported function SetLogger should have comment or be unexported

Check failure on line 39 in logger.go

View workflow job for this annotation

GitHub Actions / build (1.18)

exported function SetLogger should have comment or be unexported

Check failure on line 39 in logger.go

View workflow job for this annotation

GitHub Actions / build (1.15)

exported function SetLogger should have comment or be unexported
_log = logger
}

func GetLogger() Logger {

Check failure on line 43 in logger.go

View workflow job for this annotation

GitHub Actions / build (1.16)

exported function GetLogger should have comment or be unexported

Check failure on line 43 in logger.go

View workflow job for this annotation

GitHub Actions / build (1.17)

exported function GetLogger should have comment or be unexported

Check failure on line 43 in logger.go

View workflow job for this annotation

GitHub Actions / build (1.18)

exported function GetLogger should have comment or be unexported

Check failure on line 43 in logger.go

View workflow job for this annotation

GitHub Actions / build (1.15)

exported function GetLogger should have comment or be unexported
return _log
}

type Logger interface {

Check failure on line 47 in logger.go

View workflow job for this annotation

GitHub Actions / build (1.16)

exported type Logger should have comment or be unexported

Check failure on line 47 in logger.go

View workflow job for this annotation

GitHub Actions / build (1.17)

exported type Logger should have comment or be unexported

Check failure on line 47 in logger.go

View workflow job for this annotation

GitHub Actions / build (1.18)

exported type Logger should have comment or be unexported

Check failure on line 47 in logger.go

View workflow job for this annotation

GitHub Actions / build (1.15)

exported type Logger should have comment or be unexported
SetWriter(io.WriteCloser)
SetLevel()
Debug(format string, args interface{})
Info(format string, args interface{})
Warn(format string, args interface{})
Error(format string, args interface{})
SetLevel(LogLevel)
Debug(format string, args ...interface{})
Info(format string, args ...interface{})
Warn(format string, args ...interface{})
Error(format string, args ...interface{})
}

type defaultLogger struct {
writerRef atomic.Value
levelRef atomic.Value
writer *log.Logger
levelRef atomic.Value
}

func (l *defaultLogger) SetWriter(writer io.WriteCloser) {
l.writerRef.Store(writer)
func newDefaultLogger() *defaultLogger {
lumberJackLogger := &lumberjack.Logger{
Filename: "./logs/grpc-go-polaris.log", // 文件位置
MaxSize: 100, // 进行切割之前,日志文件的最大大小(MB为单位)
MaxAge: 7, // 保留旧文件的最大天数
MaxBackups: 100, // 保留旧文件的最大个数
Compress: true, // 是否压缩/归档旧文件
}

levelRef := atomic.Value{}

levelRef.Store(LogInfo)
return &defaultLogger{
writer: log.New(lumberJackLogger, "", log.Llongfile|log.Ldate|log.Ltime),
levelRef: levelRef,
}
}

func (l *defaultLogger) SetLevel(level LogLevel) {
l.levelRef.Store(level)
}

func (l *defaultLogger) Debug(format string, args interface{}) {

func (l *defaultLogger) Debug(format string, args ...interface{}) {
l.printf(LogDebug, format, args...)
}

func (l *defaultLogger) Info(format string, args interface{}) {
func (l *defaultLogger) Info(format string, args ...interface{}) {
l.printf(LogInfo, format, args...)

}

func (l *defaultLogger) Warn(format string, args interface{}) {

func (l *defaultLogger) Warn(format string, args ...interface{}) {
l.printf(LogWarn, format, args...)
}

func (l *defaultLogger) Error(format string, args interface{}) {
func (l *defaultLogger) Error(format string, args ...interface{}) {
l.printf(LogError, format, args...)
}

func (l *defaultLogger) printf(expectLevel LogLevel, format string, args ...interface{}) {
curLevel := l.levelRef.Load().(LogLevel)
if curLevel > expectLevel {
return
}
l.writer.Printf(format, args...)
}
9 changes: 4 additions & 5 deletions ratelimit.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
"github.com/polarismesh/specification/source/go/api/v1/traffic_manage"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -65,7 +64,7 @@
return p
}

func (p *RateLimitInterceptor) StreamServerInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {

Check failure on line 67 in ratelimit.go

View workflow job for this annotation

GitHub Actions / golangci-lint (1.18.x)

line is 158 characters (lll)

Check failure on line 67 in ratelimit.go

View workflow job for this annotation

GitHub Actions / golangci-lint (1.16.x)

line is 158 characters (lll)

Check failure on line 67 in ratelimit.go

View workflow job for this annotation

GitHub Actions / golangci-lint (1.17.x)

line is 158 characters (lll)

Check failure on line 67 in ratelimit.go

View workflow job for this annotation

GitHub Actions / golangci-lint (1.15.x)

line is 158 characters (lll)

Check failure on line 67 in ratelimit.go

View workflow job for this annotation

GitHub Actions / build (1.16)

exported method RateLimitInterceptor.StreamServerInterceptor should have comment or be unexported

Check failure on line 67 in ratelimit.go

View workflow job for this annotation

GitHub Actions / build (1.17)

exported method RateLimitInterceptor.StreamServerInterceptor should have comment or be unexported

Check failure on line 67 in ratelimit.go

View workflow job for this annotation

GitHub Actions / build (1.18)

exported method RateLimitInterceptor.StreamServerInterceptor should have comment or be unexported

Check failure on line 67 in ratelimit.go

View workflow job for this annotation

GitHub Actions / build (1.15)

exported method RateLimitInterceptor.StreamServerInterceptor should have comment or be unexported
return handler(srv, ss)
}

Expand All @@ -80,7 +79,7 @@

future, err := p.limitAPI.GetQuota(quotaReq)
if nil != err {
grpclog.Errorf("[Polaris][RateLimit] fail to get quota %#v: %v", quotaReq, err)
GetLogger().Error("[Polaris][RateLimit] fail to get quota %#v: %v", quotaReq, err)
return handler(ctx, req)
}

Expand Down Expand Up @@ -167,21 +166,21 @@
}

if err := engine.SyncGetResources(getRuleReq); err != nil {
grpclog.Errorf("[Polaris][RateLimit] ns:%s svc:%s get RateLimit Rule fail : %+v",
GetLogger().Error("[Polaris][RateLimit] ns:%s svc:%s get RateLimit Rule fail : %+v",
req.GetNamespace(), req.GetService(), err)
return nil, false
}

svcRule := getRuleReq.RateLimitRule
if svcRule == nil || svcRule.GetValue() == nil {
grpclog.Warningf("[Polaris][RateLimit] ns:%s svc:%s get RateLimit Rule is nil",
GetLogger().Warn("[Polaris][RateLimit] ns:%s svc:%s get RateLimit Rule is nil",
req.GetNamespace(), req.GetService())
return nil, false
}

rules, ok := svcRule.GetValue().(*traffic_manage.RateLimit)
if !ok {
grpclog.Errorf("[Polaris][RateLimit] ns:%s svc:%s get RateLimit Rule invalid",
GetLogger().Error("[Polaris][RateLimit] ns:%s svc:%s get RateLimit Rule invalid",
req.GetNamespace(), req.GetService())
return nil, false
}
Expand Down
5 changes: 2 additions & 3 deletions resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,10 @@
"github.com/polarismesh/polaris-go/api"
"github.com/polarismesh/polaris-go/pkg/model"
"google.golang.org/grpc/attributes"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/resolver"
)

type ResolverContext struct {

Check failure on line 37 in resolver.go

View workflow job for this annotation

GitHub Actions / build (1.16)

exported type ResolverContext should have comment or be unexported

Check failure on line 37 in resolver.go

View workflow job for this annotation

GitHub Actions / build (1.17)

exported type ResolverContext should have comment or be unexported

Check failure on line 37 in resolver.go

View workflow job for this annotation

GitHub Actions / build (1.18)

exported type ResolverContext should have comment or be unexported

Check failure on line 37 in resolver.go

View workflow job for this annotation

GitHub Actions / build (1.15)

exported type ResolverContext should have comment or be unexported
Target resolver.Target
Host string
Port int
Expand All @@ -49,7 +48,7 @@

var resolverInterceptors []ResolverInterceptor

// RegisterResolverInterceptor

Check failure on line 51 in resolver.go

View workflow job for this annotation

GitHub Actions / build (1.16)

comment on exported function RegisterResolverInterceptor should be of the form "RegisterResolverInterceptor ..."

Check failure on line 51 in resolver.go

View workflow job for this annotation

GitHub Actions / build (1.17)

comment on exported function RegisterResolverInterceptor should be of the form "RegisterResolverInterceptor ..."

Check failure on line 51 in resolver.go

View workflow job for this annotation

GitHub Actions / build (1.18)

comment on exported function RegisterResolverInterceptor should be of the form "RegisterResolverInterceptor ..."

Check failure on line 51 in resolver.go

View workflow job for this annotation

GitHub Actions / build (1.15)

comment on exported function RegisterResolverInterceptor should be of the form "RegisterResolverInterceptor ..."
// NOTE: this function must only be called during initialization time (i.e. in
// an init() function), and is not thread-safe. If multiple RegisterInterceptor are
// registered with the same name, the one registered last will take effect.
Expand Down Expand Up @@ -280,12 +279,12 @@
continue
}
if err = pr.cc.UpdateState(*state); nil != err {
grpclog.Errorf("fail to do update service %s: %v", pr.target.URL.Host, err)
GetLogger().Error("fail to do update service %s: %v", pr.target.URL.Host, err)
}
var svcKey model.ServiceKey
svcKey, eventChan, err = pr.doWatch(consumerAPI)
if nil != err {
grpclog.Errorf("fail to do watch for service %s: %v", svcKey, err)
GetLogger().Error("fail to do watch for service %s: %v", svcKey, err)
}
}
}
Expand Down
17 changes: 9 additions & 8 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import (
"context"
"fmt"
"log"
"net"
"os"
"os/signal"
Expand Down Expand Up @@ -74,7 +73,7 @@
for _, opt := range opts {
opt.apply(&srv.serverOptions)
}
srv.serverOptions.setDefault()

Check failure on line 76 in server.go

View workflow job for this annotation

GitHub Actions / golangci-lint (1.18.x)

Error return value of `srv.serverOptions.setDefault` is not checked (errcheck)

Check failure on line 76 in server.go

View workflow job for this annotation

GitHub Actions / golangci-lint (1.16.x)

Error return value of `srv.serverOptions.setDefault` is not checked (errcheck)

Check failure on line 76 in server.go

View workflow job for this annotation

GitHub Actions / golangci-lint (1.17.x)

Error return value of `srv.serverOptions.setDefault` is not checked (errcheck)

Check failure on line 76 in server.go

View workflow job for this annotation

GitHub Actions / golangci-lint (1.15.x)

Error return value of `srv.serverOptions.setDefault` is not checked (errcheck)

if *srv.serverOptions.delayRegisterEnable {
delayStrategy := srv.serverOptions.delayRegisterStrategy
Expand All @@ -99,11 +98,13 @@
}
srv.serverOptions.host = host
}
port, err := parsePort(lis.Addr().String())
if nil != err {
return fmt.Errorf("error occur while parsing port from listener: %w", err)
if srv.serverOptions.port == 0 {
port, err := parsePort(lis.Addr().String())
if nil != err {
return fmt.Errorf("error occur while parsing port from listener: %w", err)
}
srv.serverOptions.port = port
}
srv.serverOptions.port = port
svcInfos := buildServiceNames(srv.Server, srv)

for _, name := range svcInfos {
Expand All @@ -130,7 +131,7 @@
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGSEGV, syscall.SIGINT, syscall.SIGTERM)
s := <-c
log.Printf("[Polaris][Naming] receive quit signal: %v", s)
GetLogger().Info("[Polaris][Naming] receive quit signal: %v", s)
signal.Stop(c)
srv.Stop()
}()
Expand Down Expand Up @@ -170,14 +171,14 @@
func Serve(gSrv *grpc.Server, lis net.Listener, opts ...ServerOption) error {
pSrv, err := Register(gSrv, lis, opts...)
if err != nil {
log.Fatalf("polaris register err: %v", err)
GetLogger().Error("polaris register err: %v", err)
}

go func() {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
s := <-c
log.Printf("[Polaris][Naming] receive quit signal: %v", s)
GetLogger().Info("[Polaris][Naming] receive quit signal: %v", s)
signal.Stop(c)
pSrv.Stop()
}()
Expand Down
Loading