Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor:调整 grpc 部分逻辑实现&补充日志打印 #68

Merged
merged 3 commits into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ polaris/
.vscode/

style_tool/
goimports-reviser
goimports-reviser
*.log
88 changes: 43 additions & 45 deletions balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,21 +163,22 @@ func buildAddressKey(addr resolver.Address) string {
return fmt.Sprintf("%s", addr.Addr)
}

func (p *polarisNamingBalancer) createSubConnection(addr resolver.Address) {
key := buildAddressKey(addr)
func (p *polarisNamingBalancer) createSubConnection(key string, addr resolver.Address) {
p.rwMutex.Lock()
defer p.rwMutex.Unlock()
if _, ok := p.subConns[key]; ok {
return
}
// is a new address (not existing in b.subConns).
sc, err := p.cc.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{HealthCheckEnabled: true})
sc, err := p.cc.NewSubConn(
[]resolver.Address{addr}, balancer.NewSubConnOptions{HealthCheckEnabled: true})
if err != nil {
GetLogger().Warn("[Polaris][Balancer] failed to create new SubConn: %v", err)
GetLogger().Error("[Polaris][Balancer] failed to create new SubConn: %v", err)
return
}
p.subConns[key] = sc
p.scStates[sc] = connectivity.Idle
p.csEvltr.RecordTransition(connectivity.Shutdown, connectivity.Idle)
sc.Connect()
}

Expand All @@ -199,37 +200,37 @@ func (p *polarisNamingBalancer) UpdateClientConnState(state balancer.ClientConnS
}
GetLogger().Debug("[Polaris][Balancer] got new ClientConn state: ", state)
if len(state.ResolverState.Addresses) == 0 {
GetLogger().Error("[Polaris][Balancer] receive empty addresses, host=%s", p.host)
p.ResolverError(errors.New("produced zero addresses"))
return balancer.ErrBadResolverState
}
if nil == p.consumerAPI {
sdkCtx, err := PolarisContext()
if nil != err {
return err
}
p.consumerAPI = polaris.NewConsumerAPIByContext(sdkCtx)
p.routerAPI = polaris.NewRouterAPIByContext(sdkCtx)
p.consumerAPI = polaris.NewConsumerAPIByContext(p.options.SDKContext)
p.routerAPI = polaris.NewRouterAPIByContext(p.options.SDKContext)
}
// Successful resolution; clear resolver error and ensure we return nil.
p.resolverErr = nil
// addressSet is the set converted from address;
// it's used for a quick lookup of an address.
addressSet := make(map[string]struct{})
for _, a := range state.ResolverState.Addresses {
addressSet[buildAddressKey(a)] = struct{}{}
p.createSubConnection(a)
key := buildAddressKey(a)
addressSet[key] = struct{}{}
p.createSubConnection(key, a)
}
p.rwMutex.Lock()
defer p.rwMutex.Unlock()
for a, sc := range p.subConns {
// a way removed by resolver.
if _, ok := addressSet[a]; !ok {
p.cc.RemoveSubConn(sc)
delete(p.subConns, a)
sc.Shutdown()
// Keep the state of this sc in b.scStates until sc's state becomes Shutdown.
// The entry will be deleted in HandleSubConnStateChange.
}
}
p.regeneratePicker(p.options)
p.cc.UpdateState(balancer.State{ConnectivityState: p.state, Picker: p.v2Picker})
return nil
}

Expand All @@ -255,37 +256,35 @@ func (p *polarisNamingBalancer) ResolverError(err error) {
func (p *polarisNamingBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
s := state.ConnectivityState
GetLogger().Info("[Polaris][Balancer] handle SubConn state change: %p, %v", sc, s)
oldS, quit := func() (connectivity.State, bool) {
p.rwMutex.Lock()
defer p.rwMutex.Unlock()
oldS, ok := p.scStates[sc]
if !ok {
GetLogger().Info("[Polaris][Balancer] got state changes for an unknown SubConn: %p, %v", sc, s)
return connectivity.TransientFailure, true
}
if oldS == connectivity.TransientFailure && s == connectivity.Connecting {
// Once a subconn enters TRANSIENT_FAILURE, ignore subsequent
// CONNECTING transitions to prevent the aggregated state from being
// always CONNECTING when many backends exist but are all down.
return oldS, true
}
p.scStates[sc] = s
switch s {
case connectivity.Idle:
p.rwMutex.Lock()
defer p.rwMutex.Unlock()
oldS, ok := p.scStates[sc]
if !ok {
GetLogger().Info("[Polaris][Balancer] got state changes for an unknown SubConn: %p, %v", sc, s)
return
}
if oldS == connectivity.TransientFailure &&
(s == connectivity.Connecting || s == connectivity.Idle) {
// Once a subconn enters TRANSIENT_FAILURE, ignore subsequent IDLE or
// CONNECTING transitions to prevent the aggregated state from being
// always CONNECTING when many backends exist but are all down.
if s == connectivity.Idle {
sc.Connect()
case connectivity.Shutdown:
// When an address was removed by resolver, b called RemoveSubConn but
// kept the sc's state in scStates. Remove state for this sc here.
delete(p.scStates, sc)
case connectivity.TransientFailure:
// Save error to be reported via picker.
p.connErr = state.ConnectionError
}
return oldS, false
}()
if quit {
return
}
p.scStates[sc] = s
switch s {
case connectivity.Idle:
sc.Connect()
case connectivity.Shutdown:
// When an address was removed by resolver, b called RemoveSubConn but
// kept the sc's state in scStates. Remove state for this sc here.
delete(p.scStates, sc)
case connectivity.TransientFailure:
// Save error to be reported via picker.
p.connErr = state.ConnectionError
}
p.state = p.csEvltr.RecordTransition(oldS, s)

// Regenerate picker when one of the following happens:
Expand Down Expand Up @@ -336,7 +335,7 @@ func (p *polarisNamingBalancer) regeneratePicker(options *dialOptions) {
readySCs: readySCs,
options: options,
lbCfg: p.lbCfg,
response: &copyR,
insList: &copyR,
}
p.v2Picker = picker
}
Expand All @@ -360,7 +359,7 @@ type polarisNamingPicker struct {
readySCs map[string]balancer.SubConn
options *dialOptions
lbCfg *LBConfig
response *model.InstancesResponse
insList *model.InstancesResponse
}

func buildSourceInfo(options *dialOptions) *model.ServiceInfo {
Expand Down Expand Up @@ -407,7 +406,7 @@ func (pnp *polarisNamingPicker) Pick(info balancer.PickInfo) (balancer.PickResul

if pnp.options.Route {
request := &polaris.ProcessRoutersRequest{}
request.DstInstances = pnp.response
request.DstInstances = pnp.insList
if sourceService != nil {
// 如果在Conf中配置了SourceService,则优先使用配置
request.SourceService = *sourceService
Expand All @@ -424,7 +423,7 @@ func (pnp *polarisNamingPicker) Pick(info balancer.PickInfo) (balancer.PickResul
return balancer.PickResult{}, err
}
} else {
resp = pnp.response
resp = pnp.insList
}

lbReq := pnp.buildLoadBalanceRequest(info, resp)
Expand Down Expand Up @@ -523,7 +522,6 @@ func (pnp *polarisNamingPicker) addTrafficLabels(info balancer.PickInfo, insReq
}
}
}

return nil
}

Expand Down
9 changes: 5 additions & 4 deletions examples/quickstart/consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* specific language governing permissions and limitations under the License.
*/

package main

Check warning on line 18 in examples/quickstart/consumer/main.go

View workflow job for this annotation

GitHub Actions / Run Revive Action (1.17.x)

should have a package comment

import (
"context"
Expand All @@ -29,6 +29,7 @@

polaris "github.com/polarismesh/grpc-go-polaris"
"github.com/polarismesh/grpc-go-polaris/examples/common/pb"
"github.com/polarismesh/polaris-go/api"
)

const (
Expand Down Expand Up @@ -70,16 +71,16 @@
ctx = metadata.AppendToOutgoingContext(ctx, "uid", r.Header.Get("uid"))

// 请求时设置本次请求的负载均衡算法
// ctx = polaris.RequestScopeLbPolicy(ctx, api.LBPolicyRingHash)
// ctx = polaris.RequestScopeLbHashKey(ctx, r.Header.Get("uid"))
ctx = polaris.RequestScopeLbPolicy(ctx, api.LBPolicyRingHash)
ctx = polaris.RequestScopeLbHashKey(ctx, r.Header.Get("uid"))
resp, err := echoClient.Echo(ctx, &pb.EchoRequest{Value: value})
log.Printf("send message, resp (%v), err(%v)", resp, err)
if nil != err {
w.WriteHeader(500)
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
return
}
w.WriteHeader(200)
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(resp.GetValue()))
}
http.HandleFunc("/echo", indexHandler)
Expand Down
5 changes: 3 additions & 2 deletions logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,18 @@
package grpcpolaris

import (
"fmt"
"log"
"sync/atomic"

"github.com/natefinch/lumberjack"
)

type LogLevel int

Check failure on line 28 in logger.go

View workflow job for this annotation

GitHub Actions / build (1.15)

exported type LogLevel should have comment or be unexported

Check failure on line 28 in logger.go

View workflow job for this annotation

GitHub Actions / build (1.18)

exported type LogLevel should have comment or be unexported

Check failure on line 28 in logger.go

View workflow job for this annotation

GitHub Actions / build (1.16)

exported type LogLevel should have comment or be unexported

Check failure on line 28 in logger.go

View workflow job for this annotation

GitHub Actions / build (1.17)

exported type LogLevel should have comment or be unexported

const (
_ LogLevel = iota
LogDebug

Check failure on line 32 in logger.go

View workflow job for this annotation

GitHub Actions / build (1.15)

exported const LogDebug should have comment (or a comment on this block) or be unexported

Check failure on line 32 in logger.go

View workflow job for this annotation

GitHub Actions / build (1.18)

exported const LogDebug should have comment (or a comment on this block) or be unexported

Check failure on line 32 in logger.go

View workflow job for this annotation

GitHub Actions / build (1.16)

exported const LogDebug should have comment (or a comment on this block) or be unexported

Check failure on line 32 in logger.go

View workflow job for this annotation

GitHub Actions / build (1.17)

exported const LogDebug should have comment (or a comment on this block) or be unexported
LogInfo
LogWarn
LogError
Expand All @@ -36,15 +37,15 @@

var _log Logger = newDefaultLogger()

func SetLogger(logger Logger) {

Check failure on line 40 in logger.go

View workflow job for this annotation

GitHub Actions / build (1.15)

exported function SetLogger should have comment or be unexported

Check failure on line 40 in logger.go

View workflow job for this annotation

GitHub Actions / build (1.18)

exported function SetLogger should have comment or be unexported

Check failure on line 40 in logger.go

View workflow job for this annotation

GitHub Actions / build (1.16)

exported function SetLogger should have comment or be unexported

Check failure on line 40 in logger.go

View workflow job for this annotation

GitHub Actions / build (1.17)

exported function SetLogger should have comment or be unexported
_log = logger
}

func GetLogger() Logger {

Check failure on line 44 in logger.go

View workflow job for this annotation

GitHub Actions / build (1.15)

exported function GetLogger should have comment or be unexported

Check failure on line 44 in logger.go

View workflow job for this annotation

GitHub Actions / build (1.18)

exported function GetLogger should have comment or be unexported

Check failure on line 44 in logger.go

View workflow job for this annotation

GitHub Actions / build (1.16)

exported function GetLogger should have comment or be unexported

Check failure on line 44 in logger.go

View workflow job for this annotation

GitHub Actions / build (1.17)

exported function GetLogger should have comment or be unexported
return _log
}

type Logger interface {

Check failure on line 48 in logger.go

View workflow job for this annotation

GitHub Actions / build (1.15)

exported type Logger should have comment or be unexported

Check failure on line 48 in logger.go

View workflow job for this annotation

GitHub Actions / build (1.18)

exported type Logger should have comment or be unexported

Check failure on line 48 in logger.go

View workflow job for this annotation

GitHub Actions / build (1.16)

exported type Logger should have comment or be unexported

Check failure on line 48 in logger.go

View workflow job for this annotation

GitHub Actions / build (1.17)

exported type Logger should have comment or be unexported
SetLevel(LogLevel)
Debug(format string, args ...interface{})
Info(format string, args ...interface{})
Expand All @@ -70,7 +71,7 @@

levelRef.Store(LogInfo)
return &defaultLogger{
writer: log.New(lumberJackLogger, "", log.Llongfile|log.Ldate|log.Ltime),
writer: log.New(lumberJackLogger, "", log.Lshortfile|log.Ldate|log.Ltime),
levelRef: levelRef,
}
}
Expand Down Expand Up @@ -101,5 +102,5 @@
if curLevel > expectLevel {
return
}
l.writer.Printf(format, args...)
_ = l.writer.Output(3, fmt.Sprintf(format, args...))
}
Loading
Loading