Skip to content

Commit

Permalink
add invalidation via stream to redis
Browse files Browse the repository at this point in the history
Signed-off-by: Vilius Okockis <[email protected]>
  • Loading branch information
DeathBorn committed Mar 27, 2024
1 parent 3f636e6 commit 1358459
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 38 deletions.
14 changes: 14 additions & 0 deletions go/cache/redis/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,20 @@ func NewCache() *Cache {
}
}

func NewCacheParams(addr, password string, DB int) *Cache {
opts := &gredis.Options{
Addr: addr,
Password: password,
DB: DB,
}

client := gredis.NewClient(opts)

return &Cache{
client: client,
}
}

func (c *Cache) Get(key string) (string, error) {
return c.client.Get(key).Result()
}
Expand Down
10 changes: 9 additions & 1 deletion go/cmd/boost/boost.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,19 @@ var (
targetHost,
targetGtid,
targetFilter,
targetTabletType string
targetColumns,
targetTabletType,
redisAddr string

boostFlags = []string{
"host",
"port",
"grpcPort",
"gtid",
"filter",
"columns",
"tabletType",
"redisAddr",
}
)

Expand Down Expand Up @@ -70,11 +74,13 @@ func flagUsage(f *flag.Flag) {
}

func init() {
flag.StringVar(&redisAddr, "redisAddr", "127.0.0.1:6379", "(defaults to 127.0.0.1:6379)")
flag.StringVar(&targetHost, "host", "127.0.0.1", "(defaults to 127.0.0.1)")
flag.IntVar(&targetPort, "port", 15306, "(defaults to 15306)")
flag.IntVar(&targetGrpcPort, "grpcPort", 15991, "(defaults to 15991)")
flag.StringVar(&targetGtid, "gtid", "{}", "(defaults to {})")
flag.StringVar(&targetFilter, "filter", "{}", "(defaults to{})")
flag.StringVar(&targetColumns, "columns", "id,user_id", "(defaults to id)")
flag.StringVar(&targetTabletType, "tabletType", "master", "(defaults to{})")
logger := logutil.NewConsoleLogger()
flag.CommandLine.SetOutput(logutil.NewLoggerWriter(logger))
Expand All @@ -92,7 +98,9 @@ func main() {
targetHost,
targetGtid,
targetFilter,
targetColumns,
targetTabletType,
redisAddr,
)
if err != nil {
os.Exit(1)
Expand Down
42 changes: 26 additions & 16 deletions go/cmd/boost/boost/boost.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strings"
"time"

"vitess.io/vitess/go/cache/redis"
"vitess.io/vitess/go/json2"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
Expand All @@ -24,24 +25,26 @@ import (
)

type BoostConfig struct {
Host string
Port int
GrpcPort int
User string
Password string
Gtid *binlogdatapb.VGtid
Filter *binlogdatapb.Filter
TabletType topodatapb.TabletType
Source string
Flags vtgatepb.VStreamFlags
Host string
Port int
GrpcPort int
User string
Password string
Gtid *binlogdatapb.VGtid
Filter *binlogdatapb.Filter
RedisColumns []string
TabletType topodatapb.TabletType
Source string
Flags vtgatepb.VStreamFlags
RedisAddr string
}

type Boost struct {
config *BoostConfig
player *vplayer
}

func NewBoost(port, grpcPort int, host, gtid, filter, tabletType string) (*Boost, error) {
func NewBoost(port, grpcPort int, host, gtid, filter, columns, tabletType, redisAddr string) (*Boost, error) {
targetVgtid := &binlogdatapb.VGtid{}
targetFilter := &binlogdatapb.Filter{}
if err := json2.Unmarshal([]byte(gtid), &targetVgtid); err != nil {
Expand All @@ -54,15 +57,17 @@ func NewBoost(port, grpcPort int, host, gtid, filter, tabletType string) (*Boost
}

cfg := &BoostConfig{
Host: host,
Port: port,
GrpcPort: grpcPort,
Gtid: targetVgtid,
Filter: targetFilter,
Host: host,
Port: port,
GrpcPort: grpcPort,
Gtid: targetVgtid,
Filter: targetFilter,
RedisColumns: strings.Split(columns, ","),
Flags: vtgatepb.VStreamFlags{
//MinimizeSkew: false,
HeartbeatInterval: 60, //seconds
},
RedisAddr: redisAddr,
}
switch tabletType {
case "replica":
Expand Down Expand Up @@ -93,6 +98,8 @@ func (b *Boost) Init() error {
),
)

cache := redis.NewCacheParams(b.config.RedisAddr, "", 0)

// for event aplicator
colInfo, err := buildColInfoMap(context.Background(), dbPool)
if err != nil {
Expand All @@ -102,9 +109,11 @@ func (b *Boost) Init() error {

replicatorPlan, err := buildReplicatorPlan(
b.config.Filter,

colInfo,
nil,
binlogplayer.NewStats(),
b.config.RedisColumns,
)
if err != nil {
log.Error("---- buildReplicatorPlan", err)
Expand All @@ -116,6 +125,7 @@ func (b *Boost) Init() error {
dbPool: dbPool,
replicatorPlan: replicatorPlan,
tablePlans: make(map[string]*TablePlan),
cache: cache,
}
return nil
}
Expand Down
67 changes: 63 additions & 4 deletions go/cmd/boost/boost/replicator_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/bytes2"
"vitess.io/vitess/go/cache/redis"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
Expand Down Expand Up @@ -40,6 +41,7 @@ type ReplicatorPlan struct {
TablePlans map[string]*TablePlan
ColInfoMap map[string][]*vreplication.ColumnInfo
stats *binlogplayer.Stats
RedisColumns []string
}

// buildExecution plan uses the field info as input and the partially built
Expand Down Expand Up @@ -83,10 +85,11 @@ func (rp *ReplicatorPlan) buildExecutionPlan(fieldEvent *binlogdatapb.FieldEvent
// requires us to wait for the field info sent by the source.
func (rp *ReplicatorPlan) buildFromFields(tableName string, lastpk *sqltypes.Result, fields []*querypb.Field) (*TablePlan, error) {
tpb := &tablePlanBuilder{
name: sqlparser.NewTableIdent(tableName),
lastpk: lastpk,
colInfos: rp.ColInfoMap[tableName],
stats: rp.stats,
name: sqlparser.NewTableIdent(tableName),
lastpk: lastpk,
colInfos: rp.ColInfoMap[tableName],
stats: rp.stats,
RedisColumns: rp.RedisColumns,
}
for _, field := range fields {
colName := sqlparser.NewColIdent(field.Name)
Expand Down Expand Up @@ -188,6 +191,7 @@ type TablePlan struct {
Stats *binlogplayer.Stats
FieldsToSkip map[string]bool
ConvertCharset map[string](*binlogdatapb.CharsetConversion)
RedisColumns []string
}

// MarshalJSON performs a custom JSON Marshalling.
Expand Down Expand Up @@ -372,6 +376,61 @@ func (tp *TablePlan) applyChange(rowChange *binlogdatapb.RowChange, executor fun
return nil, nil
}

func check(field string, b []string) bool {
for i := range b {
if field == b[i] {
return true
}
}
return false
}

// applyChange generate mysql statement
func (tp *TablePlan) applyRedisChange(rowChange *binlogdatapb.RowChange, executor func(string) (*sqltypes.Result, error)) (*sqltypes.Result, error) {
var before, after bool
// insert uses after -> need after
// delete uses before -> need before
// update uses before/after -> need before
if rowChange.Before != nil {
before = true
}
if rowChange.After != nil {
after = true
}
bindvarsStr := append([]string{tp.TargetName}, tp.RedisColumns...)
if rowChange.Before != nil {
before = true
}
if rowChange.After != nil {
after = true
}
if !before && after {
vals := sqltypes.MakeRowTrusted(tp.Fields, rowChange.After)
for i, field := range tp.Fields {
bindVar, err := tp.bindFieldVal(field, &vals[i])
if err != nil {
return nil, err
}
if check(field.Name, tp.RedisColumns) {
bindvarsStr = append(bindvarsStr, string(bindVar.GetValue()))
}
}
} else {
vals := sqltypes.MakeRowTrusted(tp.Fields, rowChange.Before)
for i, field := range tp.Fields {
bindVar, err := tp.bindFieldVal(field, &vals[i])
if err != nil {
return nil, err
}
if check(field.Name, tp.RedisColumns) {
bindvarsStr = append(bindvarsStr, string(bindVar.GetValue()))
}
}
}
// fmt.Println("bind vars STR %s", bindvarsStr)
return executor(redis.GenerateCacheKey(bindvarsStr...))
}

func execParsedQuery(pq *sqlparser.ParsedQuery, bindvars map[string]*querypb.BindVariable, executor func(string) (*sqltypes.Result, error)) (*sqltypes.Result, error) {
sql, err := pq.GenerateQuery(bindvars, nil)
if err != nil {
Expand Down
33 changes: 19 additions & 14 deletions go/cmd/boost/boost/table_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,14 @@ type tablePlanBuilder struct {
// selColumns keeps track of the columns we want to pull from source.
// If Lastpk is set, we compare this list against the table's pk and
// add missing references.
selColumns map[string]bool
colExprs []*colExpr
onInsert insertType
pkCols []*colExpr
lastpk *sqltypes.Result
colInfos []*vreplication.ColumnInfo
stats *binlogplayer.Stats
selColumns map[string]bool
colExprs []*colExpr
onInsert insertType
pkCols []*colExpr
lastpk *sqltypes.Result
colInfos []*vreplication.ColumnInfo
stats *binlogplayer.Stats
RedisColumns []string
}

// colExpr describes the processing to be performed to
Expand Down Expand Up @@ -107,13 +108,14 @@ const (
// The TablePlan built is a partial plan. The full plan for a table is built
// when we receive field information from events or rows sent by the source.
// buildExecutionPlan is the function that builds the full plan.
func buildReplicatorPlan(filter *binlogdatapb.Filter, colInfoMap map[string][]*vreplication.ColumnInfo, copyState map[string]*sqltypes.Result, stats *binlogplayer.Stats) (*ReplicatorPlan, error) {
func buildReplicatorPlan(filter *binlogdatapb.Filter, colInfoMap map[string][]*vreplication.ColumnInfo, copyState map[string]*sqltypes.Result, stats *binlogplayer.Stats, redisColumns []string) (*ReplicatorPlan, error) {
plan := &ReplicatorPlan{
VStreamFilter: &binlogdatapb.Filter{FieldEventMode: filter.FieldEventMode},
TargetTables: make(map[string]*TablePlan),
TablePlans: make(map[string]*TablePlan),
ColInfoMap: colInfoMap,
stats: stats,
RedisColumns: redisColumns,
}
for tableName := range colInfoMap {
lastpk, ok := copyState[tableName]
Expand All @@ -128,7 +130,7 @@ func buildReplicatorPlan(filter *binlogdatapb.Filter, colInfoMap map[string][]*v
if rule == nil {
continue
}
tablePlan, err := buildTablePlan(tableName, rule, colInfoMap, lastpk, stats)
tablePlan, err := buildTablePlan(tableName, rule, colInfoMap, lastpk, stats, redisColumns)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -167,7 +169,7 @@ func MatchTable(tableName string, filter *binlogdatapb.Filter) (*binlogdatapb.Ru
return nil, nil
}

func buildTablePlan(tableName string, rule *binlogdatapb.Rule, colInfoMap map[string][]*vreplication.ColumnInfo, lastpk *sqltypes.Result, stats *binlogplayer.Stats) (*TablePlan, error) {
func buildTablePlan(tableName string, rule *binlogdatapb.Rule, colInfoMap map[string][]*vreplication.ColumnInfo, lastpk *sqltypes.Result, stats *binlogplayer.Stats, redisColumns []string) (*TablePlan, error) {
filter := rule.Filter
query := filter
// generate equivalent select statement if filter is empty or a keyrange.
Expand Down Expand Up @@ -214,6 +216,7 @@ func buildTablePlan(tableName string, rule *binlogdatapb.Rule, colInfoMap map[st
Stats: stats,
EnumValuesMap: enumValuesMap,
ConvertCharset: rule.ConvertCharset,
RedisColumns: redisColumns,
}

return tablePlan, nil
Expand All @@ -225,10 +228,11 @@ func buildTablePlan(tableName string, rule *binlogdatapb.Rule, colInfoMap map[st
From: sel.From,
Where: sel.Where,
},
selColumns: make(map[string]bool),
lastpk: lastpk,
colInfos: colInfoMap[tableName],
stats: stats,
selColumns: make(map[string]bool),
lastpk: lastpk,
colInfos: colInfoMap[tableName],
stats: stats,
RedisColumns: redisColumns,
}

if err := tpb.analyzeExprs(sel.SelectExprs); err != nil {
Expand Down Expand Up @@ -310,6 +314,7 @@ func (tpb *tablePlanBuilder) generate() *TablePlan {
PKReferences: pkrefs,
Stats: tpb.stats,
FieldsToSkip: fieldsToSkip,
RedisColumns: tpb.RedisColumns,
}
}

Expand Down
12 changes: 9 additions & 3 deletions go/cmd/boost/boost/vplayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"strings"

"vitess.io/vitess/go/cache/redis"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/dbconnpool"
"vitess.io/vitess/go/vt/log"
Expand All @@ -20,6 +21,7 @@ type vplayer struct {
currentVgtid *binlogdatapb.VGtid
// fields []*querypb.Field
dbPool *dbconnpool.ConnectionPool
cache *redis.Cache
}

func (vp *vplayer) applyVEvent(ctx context.Context, event *binlogdatapb.VEvent) error {
Expand Down Expand Up @@ -65,11 +67,15 @@ func (vp *vplayer) applyRowEvent(ctx context.Context, rowEvent *binlogdatapb.Row
return fmt.Errorf("unexpected event on table %s", rowEvent.TableName)
}
for _, change := range rowEvent.RowChanges {
_, err := tplan.applyChange(change, func(sql string) (*sqltypes.Result, error) {
_, err := tplan.applyRedisChange(change, func(key string) (*sqltypes.Result, error) {
// this one for Mysql change
// _, err := tplan.applyChange(change, func(sql string) (*sqltypes.Result, error) {
// stats := NewVrLogStats("ROWCHANGE")
// start := time.Now()
fmt.Println(sql)
return nil, nil
log.Infof("Deleting key: %s", key)
err := vp.cache.Delete(key)
// fmt.Println(sql)
return nil, err
// qr, err := vp.dbClient.ExecuteWithRetry(ctx, sql)
// vp.vr.stats.QueryCount.Add(vp.phase, 1)
// vp.vr.stats.QueryTimings.Record(vp.phase, start)
Expand Down

0 comments on commit 1358459

Please sign in to comment.