From 81b191d47f2eb78da80200135c59faf96a29d803 Mon Sep 17 00:00:00 2001 From: chuntaojun Date: Sat, 22 Jun 2024 18:59:07 +0800 Subject: [PATCH] dep:upgrade polaris-go version --- balancer.go | 31 +++++++++--------------- go.mod | 1 + logger.go | 68 +++++++++++++++++++++++++++++++++++++++------------- ratelimit.go | 9 ++++--- resolver.go | 5 ++-- server.go | 17 ++++++------- 6 files changed, 78 insertions(+), 53 deletions(-) diff --git a/balancer.go b/balancer.go index cb7cd5e..f901117 100644 --- a/balancer.go +++ b/balancer.go @@ -33,7 +33,6 @@ import ( "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/base" "google.golang.org/grpc/connectivity" - "google.golang.org/grpc/grpclog" "google.golang.org/grpc/metadata" "google.golang.org/grpc/resolver" "google.golang.org/grpc/serviceconfig" @@ -72,11 +71,11 @@ type ( // Build creates polaris balancer.Balancer implement func (bb *balancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { - grpclog.Infof("[Polaris][Balancer] start to build polaris balancer") + GetLogger().Info("[Polaris][Balancer] start to build polaris balancer") target := opts.Target host, _, err := parseHost(target.URL.Host) if err != nil { - grpclog.Errorln("[Polaris][Balancer] failed to create balancer: " + err.Error()) + GetLogger().Error("[Polaris][Balancer] failed to create balancer: " + err.Error()) return nil } return &polarisNamingBalancer{ @@ -174,7 +173,7 @@ func (p *polarisNamingBalancer) createSubConnection(addr resolver.Address) { // is a new address (not existing in b.subConns). sc, err := p.cc.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{HealthCheckEnabled: true}) if err != nil { - grpclog.Warningf("[Polaris][Balancer] failed to create new SubConn: %v", err) + GetLogger().Warn("[Polaris][Balancer] failed to create new SubConn: %v", err) return } p.subConns[key] = sc @@ -198,9 +197,7 @@ func (p *polarisNamingBalancer) UpdateClientConnState(state balancer.ClientConnS if state.BalancerConfig != nil { p.lbCfg = state.BalancerConfig.(*LBConfig) } - if grpclog.V(2) { - grpclog.Infoln("[Polaris][Balancer] got new ClientConn state: ", state) - } + GetLogger().Debug("[Polaris][Balancer] got new ClientConn state: ", state) if len(state.ResolverState.Addresses) == 0 { p.ResolverError(errors.New("produced zero addresses")) return balancer.ErrBadResolverState @@ -257,17 +254,13 @@ func (p *polarisNamingBalancer) ResolverError(err error) { // UpdateSubConnState is called by gRPC when the state of a SubConn changes. func (p *polarisNamingBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) { s := state.ConnectivityState - if grpclog.V(2) { - grpclog.Infof("[Polaris][Balancer] handle SubConn state change: %p, %v", sc, s) - } + GetLogger().Info("[Polaris][Balancer] handle SubConn state change: %p, %v", sc, s) oldS, quit := func() (connectivity.State, bool) { p.rwMutex.Lock() defer p.rwMutex.Unlock() oldS, ok := p.scStates[sc] if !ok { - if grpclog.V(2) { - grpclog.Infof("[Polaris][Balancer] got state changes for an unknown SubConn: %p, %v", sc, s) - } + GetLogger().Info("[Polaris][Balancer] got state changes for an unknown SubConn: %p, %v", sc, s) return connectivity.TransientFailure, true } if oldS == connectivity.TransientFailure && s == connectivity.Connecting { @@ -420,13 +413,11 @@ func (pnp *polarisNamingPicker) Pick(info balancer.PickInfo) (balancer.PickResul request.SourceService = *sourceService } else { if err := pnp.addTrafficLabels(info, request); err != nil { - grpclog.Errorf("[Polaris][Balancer] fetch traffic labels fail : %+v", err) + GetLogger().Error("[Polaris][Balancer] fetch traffic labels fail : %+v", err) } } - if grpclog.V(2) { - grpclog.Infof("[Polaris][Balancer] get one instance request : %+v", request) - } + GetLogger().Debug("[Polaris][Balancer] get one instance request : %+v", request) var err error resp, err = pnp.balancer.routerAPI.ProcessRouters(request) if err != nil { @@ -500,13 +491,13 @@ func (pnp *polarisNamingPicker) addTrafficLabels(info balancer.PickInfo, insReq engine := pnp.balancer.consumerAPI.SDKContext().GetEngine() resp, err := engine.SyncGetServiceRule(model.EventRouting, req) if err != nil { - grpclog.Errorf("[Polaris][Balancer] ns:%s svc:%s get route rule fail : %+v", + GetLogger().Error("[Polaris][Balancer] ns:%s svc:%s get route rule fail : %+v", req.GetNamespace(), req.GetService(), err) return err } if resp == nil || resp.GetValue() == nil { - grpclog.Errorf("[Polaris][Balancer] ns:%s svc:%s get route rule empty", req.GetNamespace(), req.GetService()) + GetLogger().Error("[Polaris][Balancer] ns:%s svc:%s get route rule empty", req.GetNamespace(), req.GetService()) return ErrorPolarisServiceRouteRuleEmpty } @@ -573,6 +564,6 @@ func (r *resultReporter) report(info balancer.DoneInfo) { callResult.SetDelay(time.Since(r.startTime)) callResult.SetRetCode(int32(code)) if err := r.consumerAPI.UpdateServiceCallResult(callResult); err != nil { - grpclog.Errorf("[Polaris][Balancer] report grpc call info fail : %+v", err) + GetLogger().Error("[Polaris][Balancer] report grpc call info fail : %+v", err) } } diff --git a/go.mod b/go.mod index bb38978..429125a 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/golang/protobuf v1.5.4 github.com/google/uuid v1.6.0 github.com/hashicorp/errwrap v1.1.0 // indirect + github.com/natefinch/lumberjack v2.0.0+incompatible github.com/polarismesh/polaris-go v1.6.0-beta.5 github.com/polarismesh/specification v1.5.1 github.com/prometheus/client_golang v1.19.1 // indirect diff --git a/logger.go b/logger.go index 234ebb4..324923f 100644 --- a/logger.go +++ b/logger.go @@ -18,8 +18,10 @@ package grpcpolaris import ( - "io" + "log" "sync/atomic" + + "github.com/natefinch/lumberjack" ) type LogLevel int @@ -32,40 +34,72 @@ const ( LogError ) +var _log Logger = newDefaultLogger() + +func SetLogger(logger Logger) { + _log = logger +} + +func GetLogger() Logger { + return _log +} + type Logger interface { - SetWriter(io.WriteCloser) - SetLevel() - Debug(format string, args interface{}) - Info(format string, args interface{}) - Warn(format string, args interface{}) - Error(format string, args interface{}) + SetLevel(LogLevel) + Debug(format string, args ...interface{}) + Info(format string, args ...interface{}) + Warn(format string, args ...interface{}) + Error(format string, args ...interface{}) } type defaultLogger struct { - writerRef atomic.Value - levelRef atomic.Value + writer *log.Logger + levelRef atomic.Value } -func (l *defaultLogger) SetWriter(writer io.WriteCloser) { - l.writerRef.Store(writer) +func newDefaultLogger() *defaultLogger { + lumberJackLogger := &lumberjack.Logger{ + Filename: "./logs/grpc-go-polaris.log", // 文件位置 + MaxSize: 100, // 进行切割之前,日志文件的最大大小(MB为单位) + MaxAge: 7, // 保留旧文件的最大天数 + MaxBackups: 100, // 保留旧文件的最大个数 + Compress: true, // 是否压缩/归档旧文件 + } + + levelRef := atomic.Value{} + + levelRef.Store(LogInfo) + return &defaultLogger{ + writer: log.New(lumberJackLogger, "", log.Llongfile|log.Ldate|log.Ltime), + levelRef: levelRef, + } } func (l *defaultLogger) SetLevel(level LogLevel) { l.levelRef.Store(level) } -func (l *defaultLogger) Debug(format string, args interface{}) { - +func (l *defaultLogger) Debug(format string, args ...interface{}) { + l.printf(LogDebug, format, args...) } -func (l *defaultLogger) Info(format string, args interface{}) { +func (l *defaultLogger) Info(format string, args ...interface{}) { + l.printf(LogInfo, format, args...) } -func (l *defaultLogger) Warn(format string, args interface{}) { - +func (l *defaultLogger) Warn(format string, args ...interface{}) { + l.printf(LogWarn, format, args...) } -func (l *defaultLogger) Error(format string, args interface{}) { +func (l *defaultLogger) Error(format string, args ...interface{}) { + l.printf(LogError, format, args...) +} +func (l *defaultLogger) printf(expectLevel LogLevel, format string, args ...interface{}) { + curLevel := l.levelRef.Load().(LogLevel) + if curLevel > expectLevel { + return + } + l.writer.Printf(format, args...) } diff --git a/ratelimit.go b/ratelimit.go index 0845479..13e6bb1 100644 --- a/ratelimit.go +++ b/ratelimit.go @@ -29,7 +29,6 @@ import ( "github.com/polarismesh/specification/source/go/api/v1/traffic_manage" "google.golang.org/grpc" "google.golang.org/grpc/codes" - "google.golang.org/grpc/grpclog" "google.golang.org/grpc/metadata" "google.golang.org/grpc/peer" "google.golang.org/grpc/status" @@ -80,7 +79,7 @@ func (p *RateLimitInterceptor) UnaryInterceptor(ctx context.Context, req interfa future, err := p.limitAPI.GetQuota(quotaReq) if nil != err { - grpclog.Errorf("[Polaris][RateLimit] fail to get quota %#v: %v", quotaReq, err) + GetLogger().Error("[Polaris][RateLimit] fail to get quota %#v: %v", quotaReq, err) return handler(ctx, req) } @@ -167,21 +166,21 @@ func (p *RateLimitInterceptor) fetchArguments(req *model.QuotaRequestImpl) ([]*t } if err := engine.SyncGetResources(getRuleReq); err != nil { - grpclog.Errorf("[Polaris][RateLimit] ns:%s svc:%s get RateLimit Rule fail : %+v", + GetLogger().Error("[Polaris][RateLimit] ns:%s svc:%s get RateLimit Rule fail : %+v", req.GetNamespace(), req.GetService(), err) return nil, false } svcRule := getRuleReq.RateLimitRule if svcRule == nil || svcRule.GetValue() == nil { - grpclog.Warningf("[Polaris][RateLimit] ns:%s svc:%s get RateLimit Rule is nil", + GetLogger().Warn("[Polaris][RateLimit] ns:%s svc:%s get RateLimit Rule is nil", req.GetNamespace(), req.GetService()) return nil, false } rules, ok := svcRule.GetValue().(*traffic_manage.RateLimit) if !ok { - grpclog.Errorf("[Polaris][RateLimit] ns:%s svc:%s get RateLimit Rule invalid", + GetLogger().Error("[Polaris][RateLimit] ns:%s svc:%s get RateLimit Rule invalid", req.GetNamespace(), req.GetService()) return nil, false } diff --git a/resolver.go b/resolver.go index 35c48ea..e6cf926 100644 --- a/resolver.go +++ b/resolver.go @@ -31,7 +31,6 @@ import ( "github.com/polarismesh/polaris-go/api" "github.com/polarismesh/polaris-go/pkg/model" "google.golang.org/grpc/attributes" - "google.golang.org/grpc/grpclog" "google.golang.org/grpc/resolver" ) @@ -280,12 +279,12 @@ func (pr *polarisNamingResolver) watcher() { continue } if err = pr.cc.UpdateState(*state); nil != err { - grpclog.Errorf("fail to do update service %s: %v", pr.target.URL.Host, err) + GetLogger().Error("fail to do update service %s: %v", pr.target.URL.Host, err) } var svcKey model.ServiceKey svcKey, eventChan, err = pr.doWatch(consumerAPI) if nil != err { - grpclog.Errorf("fail to do watch for service %s: %v", svcKey, err) + GetLogger().Error("fail to do watch for service %s: %v", svcKey, err) } } } diff --git a/server.go b/server.go index 71e9499..0858a9e 100644 --- a/server.go +++ b/server.go @@ -20,7 +20,6 @@ package grpcpolaris import ( "context" "fmt" - "log" "net" "os" "os/signal" @@ -99,11 +98,13 @@ func (srv *Server) doRegister(lis net.Listener) error { } srv.serverOptions.host = host } - port, err := parsePort(lis.Addr().String()) - if nil != err { - return fmt.Errorf("error occur while parsing port from listener: %w", err) + if srv.serverOptions.port == 0 { + port, err := parsePort(lis.Addr().String()) + if nil != err { + return fmt.Errorf("error occur while parsing port from listener: %w", err) + } + srv.serverOptions.port = port } - srv.serverOptions.port = port svcInfos := buildServiceNames(srv.Server, srv) for _, name := range svcInfos { @@ -130,7 +131,7 @@ func (srv *Server) Serve(lis net.Listener) error { c := make(chan os.Signal, 1) signal.Notify(c, syscall.SIGSEGV, syscall.SIGINT, syscall.SIGTERM) s := <-c - log.Printf("[Polaris][Naming] receive quit signal: %v", s) + GetLogger().Info("[Polaris][Naming] receive quit signal: %v", s) signal.Stop(c) srv.Stop() }() @@ -170,14 +171,14 @@ func (srv *Server) Deregister() { func Serve(gSrv *grpc.Server, lis net.Listener, opts ...ServerOption) error { pSrv, err := Register(gSrv, lis, opts...) if err != nil { - log.Fatalf("polaris register err: %v", err) + GetLogger().Error("polaris register err: %v", err) } go func() { c := make(chan os.Signal, 1) signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) s := <-c - log.Printf("[Polaris][Naming] receive quit signal: %v", s) + GetLogger().Info("[Polaris][Naming] receive quit signal: %v", s) signal.Stop(c) pSrv.Stop() }()