Skip to content

Commit

Permalink
[ISSUE #102] 修复 circuitbreaker 熔断指标上报 prometheus 出现 nil 导致 panic (#104)
Browse files Browse the repository at this point in the history
* feat: 添加就近路由支持文档

* rebase upstream/master

* fix issue #102

* fix: change countervec to gaugevec

* fix:fix review issue

* 更新

* fix:修复route/nearby/consumer编译失败问题

* fix:调整限流规则demo的规则图片
  • Loading branch information
chuntaojun authored Oct 25, 2022
1 parent a917801 commit 13c16b4
Show file tree
Hide file tree
Showing 16 changed files with 110 additions and 61 deletions.
2 changes: 1 addition & 1 deletion examples/circuitbreaker/consumer/polaris.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
global:
serverConnector:
addresses:
- 172.18.0.1:8091
- 127.0.0.1:8091
statReporter:
enable: true
chain:
Expand Down
2 changes: 1 addition & 1 deletion examples/circuitbreaker/provider/polaris.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
global:
serverConnector:
addresses:
- 172.18.0.1:8091
- 127.0.0.1:8091
statReporter:
enable: true
chain:
Expand Down
17 changes: 0 additions & 17 deletions examples/quickstart/provider/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"os/signal"
"strings"
"syscall"
"time"

"github.com/polarismesh/polaris-go"
)
Expand Down Expand Up @@ -121,22 +120,6 @@ func (svr *PolarisProvider) deregisterService() {
log.Printf("deregister successfully.")
}

func (svr *PolarisProvider) doHeartbeat() {
log.Printf("start to invoke heartbeat operation")
ticker := time.NewTicker(time.Duration(5 * time.Second))
for range ticker.C {
if !svr.isShutdown {
heartbeatRequest := &polaris.InstanceHeartbeatRequest{}
heartbeatRequest.Namespace = namespace
heartbeatRequest.Service = service
heartbeatRequest.Host = svr.host
heartbeatRequest.Port = svr.port
heartbeatRequest.ServiceToken = token
svr.provider.Heartbeat(heartbeatRequest)
}
}
}

func (svr *PolarisProvider) runMainLoop() {
ch := make(chan os.Signal, 1)
signal.Notify(ch, []os.Signal{
Expand Down
2 changes: 1 addition & 1 deletion examples/ratelimit/consumer/polaris.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
global:
serverConnector:
addresses:
- 9.134.5.52:8091
- 127.0.0.1:8091
statReporter:
enable: true
chain:
Expand Down
Binary file modified examples/ratelimit/image/create_service_ratelimit.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion examples/ratelimit/provider/polaris.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
global:
serverConnector:
addresses:
- 9.134.5.52:8091
- 127.0.0.1:8091
statReporter:
enable: true
chain:
Expand Down
22 changes: 11 additions & 11 deletions examples/route/dynamic/README-zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,16 @@ global:

```
# linux/mac运行命令
./provider --metadata="env=dev" > provider-20000.log 2>&1 &
./provider --metadata="env=test" > provider-20001.log 2>&1 &
./provider --metadata="env=pre" > provider-20002.log 2>&1 &
./provider --metadata="env=prod" > provider-20003.log 2>&1 &
./provider --port="20000" --metadata="env=dev" > provider-20000.log 2>&1 &
./provider --port="20001" --metadata="env=test" > provider-20001.log 2>&1 &
./provider --port="20002" --metadata="env=pre" > provider-20002.log 2>&1 &
./provider --port="20003" --metadata="env=prod" > provider-20003.log 2>&1 &
# windows运行命令
./provider.exe --metadata="env=dev" > provider-20000.log
./provider.exe --metadata="env=test" > provider-20001.log
./provider.exe --metadata="env=pre" > provider-20002.log
./provider.exe --metadata="env=prod" > provider-20003.log
./provider.exe --port="20000" --metadata="env=dev" > provider-20000.log
./provider.exe --port="20001" --metadata="env=test" > provider-20001.log
./provider.exe --port="20002" --metadata="env=pre" > provider-20002.log
./provider.exe --port="20003" --metadata="env=prod" > provider-20003.log
```

运行构建出的**consumer**可执行文件
Expand All @@ -76,18 +76,18 @@ global:
```
# linux/mac运行命令
./consumer --selfNamespace={selfName} --selfService=EchoConsumer
./consumer
# windows运行命令
./consumer.exe --selfNamespace={selfName} --selfService=EchoConsumer
./consumer.exe
```

### 验证

通过设置请求头参数***env***的值,实现路由到不同的服务实例

```
curl -H 'env: pre' http://127.0.0.1:18080/echo
curl http://127.0.0.1:18080/echo?env=pre
Hello, I'm RouteEchoServer Provider, My metadata's : env=pre, host : x.x.x.x:x
```
6 changes: 3 additions & 3 deletions examples/route/dynamic/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,18 +76,18 @@ Run the built **consumer** executable
```
# linux/mac
./consumer --selfNamespace={selfName} --selfService=EchoConsumer
./consumer
# windows
./consumer.exe --selfNamespace={selfName} --selfService=EchoConsumer
./consumer.exe
```

### Verify

Realize the route to different service instances by setting the value of the request header **env**

```
curl -H 'env: pre' http://127.0.0.1:18080/echo
curl http://127.0.0.1:18080/echo?env=pre
Hello, I'm RouteEchoServer Provider, My metadata's : env=pre, host : x.x.x.x:x
```
40 changes: 40 additions & 0 deletions examples/route/dynamic/consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"fmt"
"io/ioutil"
"log"
"net"
"net/http"
"strings"
"time"
Expand All @@ -37,6 +38,7 @@ var (
selfNamespace string
selfService string
port int64
token string
)

func initArgs() {
Expand All @@ -45,18 +47,42 @@ func initArgs() {
flag.StringVar(&selfNamespace, "selfNamespace", "default", "selfNamespace")
flag.StringVar(&selfService, "selfService", "", "selfService")
flag.Int64Var(&port, "port", 18080, "port")
flag.StringVar(&token, "token", "", "token")
}

// PolarisConsumer .
type PolarisConsumer struct {
consumer polaris.ConsumerAPI
router polaris.RouterAPI
provider polaris.ProviderAPI
namespace string
service string
}

// Run .
func (svr *PolarisConsumer) Run() {
if selfService != "" && selfNamespace != "" {
tmpHost, err := getLocalHost(svr.provider.SDKContext().GetConfig().GetGlobal().GetServerConnector().GetAddresses()[0])
if nil != err {
panic(fmt.Errorf("error occur while fetching localhost: %v", err))
}
req := &polaris.InstanceRegisterRequest{}
req.Namespace = selfNamespace
req.Service = selfService
log.Printf("start to invoke register operation")
registerRequest := &polaris.InstanceRegisterRequest{}
registerRequest.Service = service
registerRequest.Namespace = namespace
registerRequest.Host = tmpHost
registerRequest.Port = int(port)
registerRequest.ServiceToken = token
resp, err := svr.provider.RegisterInstance(registerRequest)
if nil != err {
log.Fatalf("fail to register instance, err is %v", err)
}
log.Printf("register response: instanceId %s", resp.InstanceID)
}

svr.runWebServer()
}

Expand Down Expand Up @@ -155,6 +181,7 @@ func main() {
svr := &PolarisConsumer{
consumer: polaris.NewConsumerAPIByContext(sdkCtx),
router: polaris.NewRouterAPIByContext(sdkCtx),
provider: polaris.NewProviderAPIByContext(sdkCtx),
namespace: namespace,
service: service,
}
Expand All @@ -177,3 +204,16 @@ func convertQuery(rawQuery string) map[string]string {
}
return meta
}

func getLocalHost(serverAddr string) (string, error) {
conn, err := net.Dial("tcp", serverAddr)
if nil != err {
return "", err
}
localAddr := conn.LocalAddr().String()
colonIdx := strings.LastIndex(localAddr, ":")
if colonIdx > 0 {
return localAddr[:colonIdx], nil
}
return localAddr, nil
}
32 changes: 16 additions & 16 deletions examples/route/dynamic/provider/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,22 @@ func (svr *PolarisProvider) Run() {
runMainLoop()
}

func (svr *PolarisProvider) registerService() {
log.Printf("start to invoke register operation")
registerRequest := &polaris.InstanceRegisterRequest{}
registerRequest.Service = service
registerRequest.Namespace = namespace
registerRequest.Host = host
registerRequest.Port = svr.port
registerRequest.ServiceToken = token
registerRequest.Metadata = convertMetadatas()
resp, err := svr.provider.RegisterInstance(registerRequest)
if nil != err {
log.Fatalf("fail to register instance, err is %v", err)
}
log.Printf("register response: instanceId %s", resp.InstanceID)
}

func (svr *PolarisProvider) runWebServer() {
http.HandleFunc("/echo", func(rw http.ResponseWriter, r *http.Request) {
rw.WriteHeader(http.StatusOK)
Expand All @@ -112,22 +128,6 @@ func (svr *PolarisProvider) runWebServer() {

}

func (svr *PolarisProvider) registerService() {
log.Printf("start to invoke register operation")
registerRequest := &polaris.InstanceRegisterRequest{}
registerRequest.Service = service
registerRequest.Namespace = namespace
registerRequest.Host = host
registerRequest.Port = svr.port
registerRequest.ServiceToken = token
registerRequest.Metadata = convertMetadatas()
resp, err := svr.provider.RegisterInstance(registerRequest)
if nil != err {
log.Fatalf("fail to register instance, err is %v", err)
}
log.Printf("register response: instanceId %s", resp.InstanceID)
}

func runMainLoop() {
ch := make(chan os.Signal, 1)
signal.Notify(ch, []os.Signal{
Expand Down
4 changes: 2 additions & 2 deletions examples/route/nearby/consumer/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ go 1.17

require github.com/polarismesh/polaris-go v1.2.0-beta.3

replace github.com/polarismesh/polaris-go => ../../../../

require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
Expand Down Expand Up @@ -32,5 +34,3 @@ require (
google.golang.org/protobuf v1.28.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
)

replace github.com/polarismesh/polaris-go => ../../../../
2 changes: 1 addition & 1 deletion pkg/model/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -775,7 +775,7 @@ type CircuitBreakGauge struct {
CBStatus CircuitBreakerStatus
}

// GetCircuitBreakerStatus 获取变化前的熔断状态
// GetCircuitBreakerStatus 获取当前实例熔断状态
func (cbg *CircuitBreakGauge) GetCircuitBreakerStatus() CircuitBreakerStatus {
return cbg.CBStatus
}
Expand Down
25 changes: 25 additions & 0 deletions pkg/model/stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,31 @@ const (
RouteStat
)

func DescMetricType(t MetricType) string {
switch t {
case SDKAPIStat:
return "SDKAPIStat"
case ServiceStat:
return "ServiceStat"
case InstanceStat:
return "InstanceStat"
case SDKCfgStat:
return "SDKCfgStat"
case CircuitBreakStat:
return "CircuitBreakStat"
case PluginAPIStat:
return "PluginAPIStat"
case LoadBalanceStat:
return "LoadBalanceStat"
case RateLimitStat:
return "RateLimitStat"
case RouteStat:
return "RouteStat"
default:
return "Unknown"
}
}

var metricTypes = HashSet{}

// ValidMetircType 检测是不是合法的统计类型.
Expand Down
2 changes: 1 addition & 1 deletion plugin/localregistry/inmemory/inmemory.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ func (g *LocalCache) UpdateInstances(svcUpdateReq *localregistry.ServiceUpdateRe
cbStatusUpdated = false
}
err := g.engine.SyncReportStat(model.CircuitBreakStat,
&model.CircuitBreakGauge{ChangeInstance: updateInstance, CBStatus: preCBStatus})
&model.CircuitBreakGauge{ChangeInstance: updateInstance, CBStatus: nextCBStatus})
if err != nil {
log.GetBaseLogger().Errorf("fail to report circuitbreak change, error %v", err)
}
Expand Down
4 changes: 2 additions & 2 deletions plugin/statreporter/prometheus/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,14 +161,14 @@ var (
CircuitBreakerOpen = metricDesc{
Name: MetricsNameCircuitBreakerOpen,
Help: "total of opened circuit breaker",
MetricType: TypeForCounterVec,
MetricType: TypeForGaugeVec,
LabelNames: GetLabels(CircuitBreakerGaugeLabelOrder),
}

CircuitBreakerHalfOpen = metricDesc{
Name: MetricsNameCircuitBreakerHalfOpen,
Help: "total of half-open circuit breaker",
MetricType: TypeForCounterVec,
MetricType: TypeForGaugeVec,
LabelNames: GetLabels(CircuitBreakerGaugeLabelOrder),
}
)
Expand Down
9 changes: 5 additions & 4 deletions plugin/statreporter/prometheus/prometheus_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,21 +223,22 @@ func (p *PrometheusHandler) handleRateLimitGauge(metricsType model.MetricType, v
func (p *PrometheusHandler) handleCircuitBreakGauge(metricsType model.MetricType, val *model.CircuitBreakGauge) {
labels := p.convertCircuitBreakGaugeToLabels(val)

open := p.metricVecCaches[MetricsNameCircuitBreakerOpen].(*prometheus.CounterVec)
open := p.metricVecCaches[MetricsNameCircuitBreakerOpen].(*prometheus.GaugeVec)

// 计算完之后的熔断状态
status := val.GetCircuitBreakerStatus().GetStatus()
if status == model.Open {
open.With(labels).Inc()
} else {
open.With(labels).Add(-1)
open.With(labels).Dec()
}

halfOpen := p.metricVecCaches[MetricsNameCircuitBreakerHalfOpen].(*prometheus.CounterVec)
halfOpen := p.metricVecCaches[MetricsNameCircuitBreakerHalfOpen].(*prometheus.GaugeVec)

if status == model.HalfOpen {
halfOpen.With(labels).Inc()
} else {
halfOpen.With(labels).Add(-1)
halfOpen.With(labels).Dec()
}
}

Expand Down

0 comments on commit 13c16b4

Please sign in to comment.