Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

boost: generate cache key and hit redis in vtgate #13

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go/vt/vtexplain/vtexplain_vtgate.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func initVtgateExecutor(vSchemaStr, ksShardMapStr string, opts *Options) error {

streamSize := 10
var schemaTracker vtgate.SchemaInfo // no schema tracker for these tests
vtgateExecutor = vtgate.NewExecutor(context.Background(), explainTopo, vtexplainCell, resolver, opts.Normalize, false /*do not warn for sharded only*/, streamSize, cache.DefaultConfig, schemaTracker, nil)
vtgateExecutor = vtgate.NewExecutor(context.Background(), explainTopo, vtexplainCell, resolver, opts.Normalize, false /*do not warn for sharded only*/, streamSize, cache.DefaultConfig, schemaTracker, nil, nil)

queryLogBufferSize := 10
vtgate.QueryLogger = streamlog.New("VTGate", queryLogBufferSize)
Expand Down
9 changes: 5 additions & 4 deletions go/vt/vtgate/boost/boost.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ package boost
type Columns map[string]string

type PlanConfig struct {
IsBoosted bool
BoostColumns Columns
IsBoosted bool
Columns Columns
Table string
}

func NonBoostedPlanConfig() *PlanConfig {
return &PlanConfig{
IsBoosted: false,
BoostColumns: Columns{},
IsBoosted: false,
Columns: Columns{},
}
}

Expand Down
9 changes: 7 additions & 2 deletions go/vt/vtgate/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"strings"
"sync"
"time"
"vitess.io/vitess/go/cache/redis"
"vitess.io/vitess/go/vt/vtgate/boost"

"vitess.io/vitess/go/acl"
Expand Down Expand Up @@ -103,6 +104,7 @@ type Executor struct {
schemaTracker SchemaInfo

queryFilterConfigs *boost.QueryFilterConfigs
boostCache *redis.Cache
}

var executorOnce sync.Once
Expand All @@ -122,6 +124,7 @@ func NewExecutor(
cacheCfg *cache.Config,
schemaTracker SchemaInfo,
queryFilterConfigs *boost.QueryFilterConfigs,
boostCache *redis.Cache,
) *Executor {
e := &Executor{
serv: serv,
Expand All @@ -135,6 +138,7 @@ func NewExecutor(
streamSize: streamSize,
schemaTracker: schemaTracker,
queryFilterConfigs: queryFilterConfigs,
boostCache: boostCache,
}

vschemaacl.Init()
Expand Down Expand Up @@ -1257,8 +1261,9 @@ func configForBoost(configs *boost.QueryFilterConfigs, columns boost.Columns, in

if keysMatch(columns, filterConfig.Columns) {
return &boost.PlanConfig{
IsBoosted: true,
BoostColumns: columns,
IsBoosted: true,
Columns: columns,
Table: tableName,
}
}
}
Expand Down
37 changes: 36 additions & 1 deletion go/vt/vtgate/plan_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ package vtgate

import (
"context"
"fmt"
"time"
"vitess.io/vitess/go/cache/redis"
"vitess.io/vitess/go/vt/vtgate/boost"

"vitess.io/vitess/go/sqltypes"
querypb "vitess.io/vitess/go/vt/proto/query"
Expand Down Expand Up @@ -121,7 +124,17 @@ func (e *Executor) newExecute(ctx context.Context, safeSession *SafeSession, sql
}

// Check if boosted and hit Redis
// plan.BoostPlanConfig.IsBoosted == true

if plan.BoostPlanConfig != nil && plan.BoostPlanConfig.IsBoosted {
cacheKey := cacheKey(plan.BoostPlanConfig, bindVars)

fmt.Println("Cache Key: ", cacheKey)

redisResults, err := e.boostCache.Get(cacheKey)

fmt.Println("Redis Results: ", redisResults)
fmt.Println("Error: ", err)
}

statementTypeResult, sqlResult, err := e.executePlan(ctx, plan, vcursor, bindVars, execStart)(logStats, safeSession)

Expand All @@ -130,6 +143,28 @@ func (e *Executor) newExecute(ctx context.Context, safeSession *SafeSession, sql
return statementTypeResult, sqlResult, err
}

func cacheKey(config *boost.PlanConfig, vars map[string]*querypb.BindVariable) string {
return redis.GenerateCacheKey(cacheKeyParams(config, vars)...)
}

func cacheKeyParams(boostConfig *boost.PlanConfig, vars map[string]*querypb.BindVariable) []string {
var allColumns []string
var allValues []string

for key, vtgValueKey := range boostConfig.Columns {
allColumns = append(allColumns, key)

var byteArray = vars[vtgValueKey].Value
var stringValue = string(byteArray)

allValues = append(allValues, stringValue)
}

tail := append(allColumns, allValues...)

return append([]string{boostConfig.Table}, tail...)
}

func (e *Executor) startTxIfNecessary(ctx context.Context, safeSession *SafeSession) error {
if !safeSession.Autocommit && !safeSession.InTransaction() {
if err := e.txConn.Begin(ctx, safeSession); err != nil {
Expand Down
9 changes: 6 additions & 3 deletions go/vt/vtgate/vtgate.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"os"
"strings"
"time"
"vitess.io/vitess/go/cache/redis"
"vitess.io/vitess/go/vt/vtgate/boost"

"vitess.io/vitess/go/vt/key"
Expand Down Expand Up @@ -216,8 +217,9 @@ func Init(ctx context.Context, serv srvtopo.Server, cell string, tabletTypesToWa
}

queryFilterConfigs, _ := boost.Load(boostQueryFilterConfigPath)
boostCache := redis.NewCache()

executor := NewExecutor(ctx, serv, cell, resolver, *normalizeQueries, *warnShardedOnly, *streamBufferSize, cacheCfg, si, queryFilterConfigs)
executor := NewExecutor(ctx, serv, cell, resolver, *normalizeQueries, *warnShardedOnly, *streamBufferSize, cacheCfg, si, queryFilterConfigs, boostCache)

// connect the schema tracker with the vschema manager
if *enableSchemaChangeSignal {
Expand Down Expand Up @@ -619,11 +621,12 @@ func LegacyInit(ctx context.Context, hc discovery.LegacyHealthCheck, serv srvtop
MaxMemoryUsage: *queryPlanCacheMemory,
LFU: *queryPlanCacheLFU,
}

queryFilterConfigs := &boost.QueryFilterConfigs{}
boostCache := redis.NewCache()

rpcVTGate = &VTGate{
executor: NewExecutor(ctx, serv, cell, resolver, *normalizeQueries, *warnShardedOnly, *streamBufferSize, cacheCfg, nil, queryFilterConfigs),
executor: NewExecutor(ctx, serv, cell, resolver, *normalizeQueries, *warnShardedOnly, *streamBufferSize, cacheCfg, nil, queryFilterConfigs, boostCache),
resolver: resolver,
vsm: vsm,
txConn: tc,
Expand Down
Loading