Skip to content

Commit

Permalink
fix:修复监控数据上报丢失方法 (#69)
Browse files Browse the repository at this point in the history
  • Loading branch information
chuntaojun authored Jul 19, 2024
1 parent 0a9f3d0 commit b049688
Show file tree
Hide file tree
Showing 8 changed files with 149 additions and 36 deletions.
9 changes: 6 additions & 3 deletions balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (p *polarisNamingBalancer) createSubConnection(key string, addr resolver.Ad
}
// is a new address (not existing in b.subConns).
sc, err := p.cc.NewSubConn(
[]resolver.Address{addr}, balancer.NewSubConnOptions{HealthCheckEnabled: true})
[]resolver.Address{addr}, balancer.NewSubConnOptions{HealthCheckEnabled: false})
if err != nil {
GetLogger().Error("[Polaris][Balancer] failed to create new SubConn: %v", err)
return
Expand Down Expand Up @@ -245,6 +245,8 @@ func (p *polarisNamingBalancer) ResolverError(err error) {
// report an error.
return
}
p.rwMutex.RLock()
defer p.rwMutex.RUnlock()
p.regeneratePicker(nil)
p.cc.UpdateState(balancer.State{
ConnectivityState: p.state,
Expand Down Expand Up @@ -308,8 +310,6 @@ func (p *polarisNamingBalancer) regeneratePicker(options *dialOptions) {
return
}
readySCs := make(map[string]balancer.SubConn)
p.rwMutex.RLock()
defer p.rwMutex.RUnlock()
// Filter out all ready SCs from full subConn map.
for addr, sc := range p.subConns {
if st, ok := p.scStates[sc]; ok && st == connectivity.Ready {
Expand Down Expand Up @@ -436,6 +436,7 @@ func (pnp *polarisNamingPicker) Pick(info balancer.PickInfo) (balancer.PickResul
subSc, ok := pnp.readySCs[addr]
if ok {
reporter := &resultReporter{
method: info.FullMethodName,
instance: targetInstance,
consumerAPI: pnp.balancer.consumerAPI,
startTime: time.Now(),
Expand Down Expand Up @@ -543,6 +544,7 @@ func collectRouteLabels(routings []*traffic_manage.Route) []string {
}

type resultReporter struct {
method string
instance model.Instance
consumerAPI polaris.ConsumerAPI
startTime time.Time
Expand All @@ -559,6 +561,7 @@ func (r *resultReporter) report(info balancer.DoneInfo) {
callResult.CalledInstance = r.instance
callResult.RetStatus = retStatus
callResult.SourceService = r.sourceService
callResult.SetMethod(r.method)
callResult.SetDelay(time.Since(r.startTime))
callResult.SetRetCode(int32(code))
if err := r.consumerAPI.UpdateServiceCallResult(callResult); err != nil {
Expand Down
34 changes: 27 additions & 7 deletions examples/quickstart/consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,27 @@ import (
"fmt"
"log"
"net/http"
"os"
"os/signal"
"syscall"

"github.com/polarismesh/polaris-go/api"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"

polaris "github.com/polarismesh/grpc-go-polaris"
"github.com/polarismesh/grpc-go-polaris/examples/common/pb"
"github.com/polarismesh/polaris-go/api"
)

const (
listenPort = 16011
listenPort = 18080
)

func main() {
// grpc客户端连接获取
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
polaris.GetLogger().SetLevel(polaris.LogDebug)

conn, err := polaris.DialContext(ctx, "polaris://QuickStartEchoServerGRPC",
polaris.WithGRPCDialOptions(grpc.WithTransportCredentials(insecure.NewCredentials())),
Expand All @@ -49,7 +52,6 @@ func main() {
if err != nil {
log.Fatal(err)
}
defer conn.Close()
echoClient := pb.NewEchoServerClient(conn)

indexHandler := func(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -84,8 +86,26 @@ func main() {
_, _ = w.Write([]byte(resp.GetValue()))
}
http.HandleFunc("/echo", indexHandler)
if err := http.ListenAndServe(fmt.Sprintf(":%d", listenPort), nil); nil != err {
log.Fatal(err)
}
go func() {
if err := http.ListenAndServe(fmt.Sprintf(":%d", listenPort), nil); nil != err {
log.Fatal(err)
}
}()
runMainLoop(conn, cancel)
}

func runMainLoop(conn *grpc.ClientConn, cancel context.CancelFunc) {
ch := make(chan os.Signal, 1)
signal.Notify(ch, []os.Signal{
syscall.SIGINT, syscall.SIGTERM,
syscall.SIGSEGV,
}...)

for s := range ch {
log.Printf("catch signal(%+v), stop servers", s)
cancel()
conn.Close()
polaris.ClosePolarisContext()
return
}
}
16 changes: 15 additions & 1 deletion examples/quickstart/consumer/polaris.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,18 @@
global:
serverConnector:
addresses:
- 127.0.0.1:8091
- 127.0.0.1:8091
statReporter:
#描述:是否将统计信息上报至monitor
#类型:bool
enable: true
#描述:启用的统计上报插件类型
#类型:list
#范围:已经注册的统计上报插件的名字
chain:
- prometheus
plugin:
prometheus:
type: push
address: 127.0.0.1:9091
interval: 10s
17 changes: 16 additions & 1 deletion global.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ var (
// DefaultNamespace default namespace when namespace is not set
DefaultNamespace = "default"
// DefaultTTL default ttl value when ttl is not set
DefaultTTL = 20
DefaultTTL = 5
// DefaultGracefulStopMaxWaitDuration default stop max wait duration when not set
DefaultGracefulStopMaxWaitDuration = 30 * time.Second
// DefaultDelayStopWaitDuration default delay time before stop
Expand All @@ -57,16 +57,31 @@ const (
)

var (
ctxRef = 0
polarisContext api.SDKContext
polarisConfig config.Configuration
mutexPolarisContext sync.Mutex
oncePolarisConfig sync.Once
)

func ClosePolarisContext() {

Check failure on line 67 in global.go

View workflow job for this annotation

GitHub Actions / build (1.15)

exported function ClosePolarisContext should have comment or be unexported

Check failure on line 67 in global.go

View workflow job for this annotation

GitHub Actions / build (1.16)

exported function ClosePolarisContext should have comment or be unexported

Check failure on line 67 in global.go

View workflow job for this annotation

GitHub Actions / build (1.17)

exported function ClosePolarisContext should have comment or be unexported

Check failure on line 67 in global.go

View workflow job for this annotation

GitHub Actions / build (1.18)

exported function ClosePolarisContext should have comment or be unexported
mutexPolarisContext.Lock()
defer mutexPolarisContext.Unlock()
if nil == polarisContext {
return
}
ctxRef--
if ctxRef == 0 {
polarisContext.Destroy()
polarisContext = nil
}
}

// PolarisContext get or init the global polaris context
func PolarisContext() (api.SDKContext, error) {
mutexPolarisContext.Lock()
defer mutexPolarisContext.Unlock()
ctxRef++
if nil != polarisContext {
return polarisContext, nil
}
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ require (
github.com/google/uuid v1.6.0
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/natefinch/lumberjack v2.0.0+incompatible
github.com/polarismesh/polaris-go v1.6.0-beta.5
github.com/polarismesh/polaris-go v1.6.0-alpha.8
github.com/polarismesh/specification v1.5.1
github.com/prometheus/client_golang v1.19.1 // indirect
github.com/prometheus/common v0.54.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
go.uber.org/zap v1.27.0
golang.org/x/net v0.26.0
google.golang.org/genproto/googleapis/rpc v0.0.0-20240617180043-68d350f18fd4 // indirect
google.golang.org/grpc v1.64.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1695,8 +1695,8 @@ github.com/pkg/sftp v1.10.1/go.mod h1:lYOWFsE0bwd1+KfKJaKeuokY15vzFx25BLbzYYoAxZ
github.com/pkg/sftp v1.13.1/go.mod h1:3HaPG6Dq1ILlpPZRO0HVMrsydcdLt6HRDccSgb87qRg=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/polarismesh/polaris-go v1.6.0-beta.5 h1:llucvfydWlFWTNeABHbbuVL2ijR7AITx8UG02tx0c/Y=
github.com/polarismesh/polaris-go v1.6.0-beta.5/go.mod h1:CuXO9bhHGjSoOIMWr4NXf3bJAkRBp5YoM7ibBzENC+c=
github.com/polarismesh/polaris-go v1.6.0-alpha.8 h1:KzANbn7gumZLfbJEA1KavDiFBqlDKxeMVS3eTxZXFR0=
github.com/polarismesh/polaris-go v1.6.0-alpha.8/go.mod h1:CuXO9bhHGjSoOIMWr4NXf3bJAkRBp5YoM7ibBzENC+c=
github.com/polarismesh/specification v1.4.1/go.mod h1:rDvMMtl5qebPmqiBLNa5Ps0XtwkP31ZLirbH4kXA0YU=
github.com/polarismesh/specification v1.5.1 h1:cJ2m0RBepdopGo/e3UpKdsab3NpDZnw5IsVTB1sFc5I=
github.com/polarismesh/specification v1.5.1/go.mod h1:rDvMMtl5qebPmqiBLNa5Ps0XtwkP31ZLirbH4kXA0YU=
Expand Down
75 changes: 63 additions & 12 deletions logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ package grpcpolaris

import (
"fmt"
"log"
"runtime"
"sync/atomic"
"time"

"github.com/natefinch/lumberjack"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

type LogLevel int

Check failure on line 31 in logger.go

View workflow job for this annotation

GitHub Actions / build (1.15)

exported type LogLevel should have comment or be unexported

Check failure on line 31 in logger.go

View workflow job for this annotation

GitHub Actions / build (1.16)

exported type LogLevel should have comment or be unexported

Check failure on line 31 in logger.go

View workflow job for this annotation

GitHub Actions / build (1.17)

exported type LogLevel should have comment or be unexported

Check failure on line 31 in logger.go

View workflow job for this annotation

GitHub Actions / build (1.18)

exported type LogLevel should have comment or be unexported
Expand All @@ -35,6 +38,21 @@ const (
LogError
)

func (l LogLevel) String() string {
switch l {
case LogDebug:
return "[debug]"
case LogInfo:
return "[info]"
case LogWarn:
return "[warn]"
case LogError:
return "[error]"
default:
return ""
}
}

var _log Logger = newDefaultLogger()

func SetLogger(logger Logger) {

Check failure on line 58 in logger.go

View workflow job for this annotation

GitHub Actions / build (1.15)

exported function SetLogger should have comment or be unexported

Check failure on line 58 in logger.go

View workflow job for this annotation

GitHub Actions / build (1.16)

exported function SetLogger should have comment or be unexported

Check failure on line 58 in logger.go

View workflow job for this annotation

GitHub Actions / build (1.17)

exported function SetLogger should have comment or be unexported

Check failure on line 58 in logger.go

View workflow job for this annotation

GitHub Actions / build (1.18)

exported function SetLogger should have comment or be unexported
Expand All @@ -54,25 +72,35 @@ type Logger interface {
}

type defaultLogger struct {
writer *log.Logger
writer zapcore.Core
levelRef atomic.Value
}

func newDefaultLogger() *defaultLogger {
lumberJackLogger := &lumberjack.Logger{
encoderCfg := zapcore.EncoderConfig{
MessageKey: "msg",
LevelKey: "level",
TimeKey: "time",
NameKey: "name",
CallerKey: "caller",
StacktraceKey: "stacktrace",
LineEnding: zapcore.DefaultLineEnding,
EncodeLevel: zapcore.LowercaseLevelEncoder,
EncodeTime: zapcore.ISO8601TimeEncoder,
EncodeDuration: zapcore.StringDurationEncoder,
EncodeCaller: zapcore.ShortCallerEncoder,
}
w := zapcore.AddSync(&lumberjack.Logger{
Filename: "./logs/grpc-go-polaris.log", // 文件位置
MaxSize: 100, // 进行切割之前,日志文件的最大大小(MB为单位)
MaxAge: 7, // 保留旧文件的最大天数
MaxBackups: 100, // 保留旧文件的最大个数
Compress: true, // 是否压缩/归档旧文件
}
})

levelRef := atomic.Value{}

levelRef.Store(LogInfo)
core := zapcore.NewCore(zapcore.NewConsoleEncoder(encoderCfg), w, zap.InfoLevel)
return &defaultLogger{
writer: log.New(lumberJackLogger, "", log.Lshortfile|log.Ldate|log.Ltime),
levelRef: levelRef,
writer: core,
}
}

Expand All @@ -98,9 +126,32 @@ func (l *defaultLogger) Error(format string, args ...interface{}) {
}

func (l *defaultLogger) printf(expectLevel LogLevel, format string, args ...interface{}) {
curLevel := l.levelRef.Load().(LogLevel)
if curLevel > expectLevel {
zapL := func() zapcore.Level {
switch expectLevel {
case LogDebug:
return zapcore.DebugLevel
case LogInfo:
return zapcore.InfoLevel
case LogWarn:
return zapcore.WarnLevel
case LogError:
return zapcore.ErrorLevel
default:
return zapcore.InfoLevel
}
}()

if !l.writer.Enabled(zapL) {
return
}
_ = l.writer.Output(3, fmt.Sprintf(format, args...))

msg := fmt.Sprintf(format, args...)
e := zapcore.Entry{
Message: msg,
Level: zapL,
Time: time.Now(),
}

e.Caller = zapcore.NewEntryCaller(runtime.Caller(2))
_ = l.writer.Write(e, nil)
}
26 changes: 18 additions & 8 deletions resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,14 @@ func RegisterResolverInterceptor(i ResolverInterceptor) {
resolverInterceptors = append(resolverInterceptors, i)
}

func NewResolver(ctx api.SDKContext) *resolverBuilder {

Check failure on line 58 in resolver.go

View workflow job for this annotation

GitHub Actions / build (1.15)

exported function NewResolver should have comment or be unexported

Check failure on line 58 in resolver.go

View workflow job for this annotation

GitHub Actions / build (1.16)

exported function NewResolver should have comment or be unexported

Check failure on line 58 in resolver.go

View workflow job for this annotation

GitHub Actions / build (1.17)

exported function NewResolver should have comment or be unexported

Check failure on line 58 in resolver.go

View workflow job for this annotation

GitHub Actions / build (1.18)

exported function NewResolver should have comment or be unexported
return &resolverBuilder{
sdkCtx: ctx,
}
}

type resolverBuilder struct {
sdkCtx api.SDKContext
}

// Scheme polaris scheme
Expand Down Expand Up @@ -104,12 +111,14 @@ func (rb *resolverBuilder) Build(
return nil, err
}

sdkCtx, err := PolarisContext()
if nil != err {
return nil, err
if rb.sdkCtx == nil {
sdkCtx, err := PolarisContext()
if nil != err {
return nil, err
}
rb.sdkCtx = sdkCtx
}

options.SDKContext = sdkCtx
options.SDKContext = rb.sdkCtx

ctx, cancel := context.WithCancel(context.Background())
d := &polarisNamingResolver{
Expand All @@ -120,7 +129,7 @@ func (rb *resolverBuilder) Build(
options: options,
host: host,
port: port,
consumer: api.NewConsumerAPIByContext(sdkCtx),
consumer: api.NewConsumerAPIByContext(rb.sdkCtx),
eventCh: make(chan struct{}, 1),
}
go d.watcher()
Expand Down Expand Up @@ -247,13 +256,14 @@ func (pr *polarisNamingResolver) watcher() {
for {
select {
case <-pr.ctx.Done():
GetLogger().Info("[Polaris][Resolver] exist watch instance change event for namespace=%s service=%s: %v",
GetLogger().Info("[Polaris][Resolver] exit watch instance change event for namespace=%s service=%s: %v",
pr.options.Namespace, pr.host)
return
case <-pr.eventCh:
pr.doRefresh()
case <-ticker.C:
pr.doRefresh()
}
pr.doRefresh()
}
}

Expand Down

0 comments on commit b049688

Please sign in to comment.