Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[wip] [chutil] add safe proxy #1136

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/api/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ const (
maxFunctions = 10

cacheInvalidateCheckInterval = 1 * time.Second
cacheInvalidateCheckTimeout = 5 * time.Second
cacheInvalidateCheckTimeout = 8 * time.Second // should be greater than dial timeout see chDialTimeout
cacheInvalidateMaxRows = 100_000
cacheDefaultDropEvery = 90 * time.Second

Expand Down
95 changes: 51 additions & 44 deletions internal/util/chutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,45 +25,47 @@ import (
"github.com/vkcom/statshouse/internal/util/queue"
)

type connPool struct {
rnd *rand.Rand
servers []*chpool.Pool
sem *queue.Queue

userActive map[string]int
mx sync.Mutex
userWait map[string]int
waitMx sync.Mutex
}
type (
connPool struct {
rnd *rand.Rand
servers []*severConnPool
sem *queue.Queue

userActive map[string]int
mx sync.Mutex
userWait map[string]int
waitMx sync.Mutex
}

type ClickHouse struct {
pools [4]*connPool
}
ClickHouse struct {
pools [4]*connPool
}

type QueryMetaInto struct {
IsFast bool
IsLight bool
User string
Metric int32
Table string
Kind string
}
QueryMetaInto struct {
IsFast bool
IsLight bool
User string
Metric int32
Table string
Kind string
}

type QueryHandleInfo struct {
Duration time.Duration
Profile proto.Profile
}
QueryHandleInfo struct {
Duration time.Duration
Profile proto.Profile
}

type ChConnOptions struct {
Addrs []string
User string
Password string
DialTimeout time.Duration
FastLightMaxConns int
FastHeavyMaxConns int
SlowLightMaxConns int
SlowHeavyMaxConns int
}
ChConnOptions struct {
Addrs []string
User string
Password string
DialTimeout time.Duration
FastLightMaxConns int
FastHeavyMaxConns int
SlowLightMaxConns int
SlowHeavyMaxConns int
}
)

const (
fastLight = 0
Expand All @@ -78,10 +80,10 @@ func OpenClickHouse(opt ChConnOptions) (*ClickHouse, error) {
}

result := &ClickHouse{[4]*connPool{
{rand.New(), make([]*chpool.Pool, 0, len(opt.Addrs)), queue.NewQueue(int64(opt.FastLightMaxConns)), map[string]int{}, sync.Mutex{}, map[string]int{}, sync.Mutex{}}, // fastLight
{rand.New(), make([]*chpool.Pool, 0, len(opt.Addrs)), queue.NewQueue(int64(opt.FastHeavyMaxConns)), map[string]int{}, sync.Mutex{}, map[string]int{}, sync.Mutex{}}, // fastHeavy
{rand.New(), make([]*chpool.Pool, 0, len(opt.Addrs)), queue.NewQueue(int64(opt.SlowLightMaxConns)), map[string]int{}, sync.Mutex{}, map[string]int{}, sync.Mutex{}}, // slowLight
{rand.New(), make([]*chpool.Pool, 0, len(opt.Addrs)), queue.NewQueue(int64(opt.SlowHeavyMaxConns)), map[string]int{}, sync.Mutex{}, map[string]int{}, sync.Mutex{}}, // slowHeavy
{rand.New(), make([]*severConnPool, 0, len(opt.Addrs)), queue.NewQueue(int64(opt.FastLightMaxConns)), map[string]int{}, sync.Mutex{}, map[string]int{}, sync.Mutex{}}, // fastLight
{rand.New(), make([]*severConnPool, 0, len(opt.Addrs)), queue.NewQueue(int64(opt.FastHeavyMaxConns)), map[string]int{}, sync.Mutex{}, map[string]int{}, sync.Mutex{}}, // fastHeavy
{rand.New(), make([]*severConnPool, 0, len(opt.Addrs)), queue.NewQueue(int64(opt.SlowLightMaxConns)), map[string]int{}, sync.Mutex{}, map[string]int{}, sync.Mutex{}}, // slowLight
{rand.New(), make([]*severConnPool, 0, len(opt.Addrs)), queue.NewQueue(int64(opt.SlowHeavyMaxConns)), map[string]int{}, sync.Mutex{}, map[string]int{}, sync.Mutex{}}, // slowHeavy
}}
for _, addr := range opt.Addrs {
for _, pool := range result.pools {
Expand All @@ -99,7 +101,7 @@ func OpenClickHouse(opt ChConnOptions) (*ClickHouse, error) {
result.Close()
return nil, err
}
pool.servers = append(pool.servers, server)
pool.servers = append(pool.servers, newSeverConnPool(server))
}
}

Expand All @@ -117,7 +119,7 @@ func (c *connPool) countOfReqLocked(m map[string]int) int {
func (ch *ClickHouse) Close() {
for _, a := range ch.pools {
for _, b := range a.servers {
b.Close()
b.ch.Close()
}
}
}
Expand Down Expand Up @@ -162,7 +164,12 @@ func (ch *ClickHouse) Select(ctx context.Context, meta QueryMetaInto, query ch.Q
}
kind := QueryKind(meta.IsFast, meta.IsLight)
pool := ch.pools[kind]
servers := append(make([]*chpool.Pool, 0, len(pool.servers)), pool.servers...)
servers := make([]*severConnPool, 0, len(pool.servers))
for _, p := range pool.servers {
if p.IsReady() {
servers = append(servers, p)
}
}
for safetyCounter := 0; safetyCounter < len(pool.servers); safetyCounter++ {
var i int
i, err = pickRandomServer(servers, pool.rnd)
Expand Down Expand Up @@ -220,7 +227,7 @@ func (ch *ClickHouse) Select(ctx context.Context, meta QueryMetaInto, query ch.Q
return info, err
}

func pickRandomServer(s []*chpool.Pool, r *rand.Rand) (int, error) {
func pickRandomServer(s []*severConnPool, r *rand.Rand) (int, error) {
if len(s) == 0 {
return 0, fmt.Errorf("all ClickHouse servers are dead")
}
Expand All @@ -232,7 +239,7 @@ func pickRandomServer(s []*chpool.Pool, r *rand.Rand) (int, error) {
if i2 >= i1 {
i2++
}
if s[i1].Stat().AcquiredResources() < s[i2].Stat().AcquiredResources() {
if s[i1].ch.Stat().AcquiredResources() < s[i2].ch.Stat().AcquiredResources() {
return i1, nil
} else {
return i2, nil
Expand Down
100 changes: 100 additions & 0 deletions internal/util/conn_proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package util

import (
"context"
"errors"
"net"
"sync"
"time"

"github.com/ClickHouse/ch-go"
"github.com/ClickHouse/ch-go/chpool"
"pgregory.net/rand"
)

const (
maxSucc = -5
maxFailErrAlive = 10
maxFailErrHalfOpen = 5
deadInterval = time.Second * 30

alive = 0
dead = 1
halfOpen = 2

halfOpenSendReq = 10 // when conn has halfOpen state send only 1/halfOpenSendReq requests
)

type severConnPool struct {
ch *chpool.Pool

mx sync.Mutex
errCount int
state int
stateStart time.Time
}

func newSeverConnPool(ch *chpool.Pool) *severConnPool {
return &severConnPool{
ch: ch,
errCount: 0,
state: alive,
stateStart: time.Now(),
}
}

func (p *severConnPool) Do(ctx context.Context, q ch.Query) (err error) {
err = p.ch.Do(ctx, q)
p.passResult(ctx, err)
return err
}

func (p *severConnPool) passResult(ctx context.Context, err error) {
p.mx.Lock()
defer p.mx.Unlock()
if p.state == dead {
return
}
var netErr = &net.OpError{}
if err != nil {
if errors.As(err, &netErr) {
p.errCount++
} else if ctx.Err() == nil { // some query can timeout and this is ok (in this case err != nil and ctx.Err() != nil)
p.errCount++
}
} else if p.errCount > maxSucc {
p.errCount--
}
if p.state == alive && p.errCount >= maxFailErrAlive {
p.errCount = 0
p.state = halfOpen
p.stateStart = time.Now()
}
if p.state == halfOpen && p.errCount >= maxFailErrHalfOpen {
p.errCount = 0
p.state = dead
p.stateStart = time.Now()
}
if p.errCount <= maxSucc {
p.errCount = 0
p.state = alive
p.stateStart = time.Now()
}

}

func (p *severConnPool) IsReady() bool {
p.mx.Lock()
defer p.mx.Unlock()
if p.state == dead && time.Since(p.stateStart) < deadInterval {
return false
}
if p.state == dead {
p.errCount = 0
p.state = halfOpen
}
if p.state == halfOpen {
return rand.Intn(halfOpenSendReq) < 1
}
return true
}
32 changes: 32 additions & 0 deletions internal/util/conn_proxy_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package util

import (
"context"
"fmt"
"net"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func Test_severConnPool_Do(t *testing.T) {
p := &severConnPool{
ch: nil,
errCount: 0,
state: alive,
stateStart: time.Now(),
}
ctx, cancel := context.WithCancel(context.Background())
cancel()
for i := 0; i < maxFailErrAlive; i++ {
p.passResult(ctx, &net.OpError{})
}
require.Equal(t, p.errCount, 0)
require.Equal(t, halfOpen, p.state)
for i := 0; i < maxFailErrHalfOpen; i++ {
p.passResult(context.Background(), fmt.Errorf("FAIL"))
}
require.Equal(t, p.errCount, 0)
require.Equal(t, dead, p.state)
}
Loading