Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add invalidation via stream to redis #14

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions go/cache/redis/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,20 @@ func NewCache() *Cache {
}
}

func NewCacheParams(addr, password string, DB int) *Cache {
opts := &gredis.Options{
Addr: addr,
Password: password,
DB: DB,
}

client := gredis.NewClient(opts)

return &Cache{
client: client,
}
}

func (c *Cache) Get(key string) (string, error) {
return c.client.Get(key).Result()
}
Expand Down
10 changes: 9 additions & 1 deletion go/cmd/boost/boost.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,19 @@ var (
targetHost,
targetGtid,
targetFilter,
targetTabletType string
targetColumns,
targetTabletType,
redisAddr string

boostFlags = []string{
"host",
"port",
"grpcPort",
"gtid",
"filter",
"columns",
"tabletType",
"redisAddr",
}
)

Expand Down Expand Up @@ -70,11 +74,13 @@ func flagUsage(f *flag.Flag) {
}

func init() {
flag.StringVar(&redisAddr, "redisAddr", "127.0.0.1:6379", "(defaults to 127.0.0.1:6379)")
flag.StringVar(&targetHost, "host", "127.0.0.1", "(defaults to 127.0.0.1)")
flag.IntVar(&targetPort, "port", 15306, "(defaults to 15306)")
flag.IntVar(&targetGrpcPort, "grpcPort", 15991, "(defaults to 15991)")
flag.StringVar(&targetGtid, "gtid", "{}", "(defaults to {})")
flag.StringVar(&targetFilter, "filter", "{}", "(defaults to{})")
flag.StringVar(&targetColumns, "columns", "id,user_id", "(defaults to id)")
flag.StringVar(&targetTabletType, "tabletType", "master", "(defaults to{})")
logger := logutil.NewConsoleLogger()
flag.CommandLine.SetOutput(logutil.NewLoggerWriter(logger))
Expand All @@ -92,7 +98,9 @@ func main() {
targetHost,
targetGtid,
targetFilter,
targetColumns,
targetTabletType,
redisAddr,
)
if err != nil {
os.Exit(1)
Expand Down
42 changes: 26 additions & 16 deletions go/cmd/boost/boost/boost.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strings"
"time"

"vitess.io/vitess/go/cache/redis"
"vitess.io/vitess/go/json2"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
Expand All @@ -24,24 +25,26 @@ import (
)

type BoostConfig struct {
Host string
Port int
GrpcPort int
User string
Password string
Gtid *binlogdatapb.VGtid
Filter *binlogdatapb.Filter
TabletType topodatapb.TabletType
Source string
Flags vtgatepb.VStreamFlags
Host string
Port int
GrpcPort int
User string
Password string
Gtid *binlogdatapb.VGtid
Filter *binlogdatapb.Filter
RedisColumns []string
TabletType topodatapb.TabletType
Source string
Flags vtgatepb.VStreamFlags
RedisAddr string
}

type Boost struct {
config *BoostConfig
player *vplayer
}

func NewBoost(port, grpcPort int, host, gtid, filter, tabletType string) (*Boost, error) {
func NewBoost(port, grpcPort int, host, gtid, filter, columns, tabletType, redisAddr string) (*Boost, error) {
targetVgtid := &binlogdatapb.VGtid{}
targetFilter := &binlogdatapb.Filter{}
if err := json2.Unmarshal([]byte(gtid), &targetVgtid); err != nil {
Expand All @@ -54,15 +57,17 @@ func NewBoost(port, grpcPort int, host, gtid, filter, tabletType string) (*Boost
}

cfg := &BoostConfig{
Host: host,
Port: port,
GrpcPort: grpcPort,
Gtid: targetVgtid,
Filter: targetFilter,
Host: host,
Port: port,
GrpcPort: grpcPort,
Gtid: targetVgtid,
Filter: targetFilter,
RedisColumns: strings.Split(columns, ","),
Flags: vtgatepb.VStreamFlags{
//MinimizeSkew: false,
HeartbeatInterval: 60, //seconds
},
RedisAddr: redisAddr,
}
switch tabletType {
case "replica":
Expand Down Expand Up @@ -93,6 +98,8 @@ func (b *Boost) Init() error {
),
)

cache := redis.NewCacheParams(b.config.RedisAddr, "", 0)

// for event aplicator
colInfo, err := buildColInfoMap(context.Background(), dbPool)
if err != nil {
Expand All @@ -102,9 +109,11 @@ func (b *Boost) Init() error {

replicatorPlan, err := buildReplicatorPlan(
b.config.Filter,

colInfo,
nil,
binlogplayer.NewStats(),
b.config.RedisColumns,
)
if err != nil {
log.Error("---- buildReplicatorPlan", err)
Expand All @@ -116,6 +125,7 @@ func (b *Boost) Init() error {
dbPool: dbPool,
replicatorPlan: replicatorPlan,
tablePlans: make(map[string]*TablePlan),
cache: cache,
}
return nil
}
Expand Down
67 changes: 63 additions & 4 deletions go/cmd/boost/boost/replicator_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/bytes2"
"vitess.io/vitess/go/cache/redis"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
Expand Down Expand Up @@ -40,6 +41,7 @@ type ReplicatorPlan struct {
TablePlans map[string]*TablePlan
ColInfoMap map[string][]*vreplication.ColumnInfo
stats *binlogplayer.Stats
RedisColumns []string
}

// buildExecution plan uses the field info as input and the partially built
Expand Down Expand Up @@ -83,10 +85,11 @@ func (rp *ReplicatorPlan) buildExecutionPlan(fieldEvent *binlogdatapb.FieldEvent
// requires us to wait for the field info sent by the source.
func (rp *ReplicatorPlan) buildFromFields(tableName string, lastpk *sqltypes.Result, fields []*querypb.Field) (*TablePlan, error) {
tpb := &tablePlanBuilder{
name: sqlparser.NewTableIdent(tableName),
lastpk: lastpk,
colInfos: rp.ColInfoMap[tableName],
stats: rp.stats,
name: sqlparser.NewTableIdent(tableName),
lastpk: lastpk,
colInfos: rp.ColInfoMap[tableName],
stats: rp.stats,
RedisColumns: rp.RedisColumns,
}
for _, field := range fields {
colName := sqlparser.NewColIdent(field.Name)
Expand Down Expand Up @@ -188,6 +191,7 @@ type TablePlan struct {
Stats *binlogplayer.Stats
FieldsToSkip map[string]bool
ConvertCharset map[string](*binlogdatapb.CharsetConversion)
RedisColumns []string
}

// MarshalJSON performs a custom JSON Marshalling.
Expand Down Expand Up @@ -372,6 +376,61 @@ func (tp *TablePlan) applyChange(rowChange *binlogdatapb.RowChange, executor fun
return nil, nil
}

func check(field string, b []string) bool {
for i := range b {
if field == b[i] {
return true
}
}
return false
}

// applyChange generate mysql statement
func (tp *TablePlan) applyRedisChange(rowChange *binlogdatapb.RowChange, executor func(string) (*sqltypes.Result, error)) (*sqltypes.Result, error) {
var before, after bool
// insert uses after -> need after
// delete uses before -> need before
// update uses before/after -> need before
if rowChange.Before != nil {
before = true
}
if rowChange.After != nil {
after = true
}
bindvarsStr := append([]string{tp.TargetName}, tp.RedisColumns...)
if rowChange.Before != nil {
before = true
}
if rowChange.After != nil {
after = true
}
if !before && after {
vals := sqltypes.MakeRowTrusted(tp.Fields, rowChange.After)
for i, field := range tp.Fields {
bindVar, err := tp.bindFieldVal(field, &vals[i])
if err != nil {
return nil, err
}
if check(field.Name, tp.RedisColumns) {
bindvarsStr = append(bindvarsStr, string(bindVar.GetValue()))
}
}
} else {
vals := sqltypes.MakeRowTrusted(tp.Fields, rowChange.Before)
for i, field := range tp.Fields {
bindVar, err := tp.bindFieldVal(field, &vals[i])
if err != nil {
return nil, err
}
if check(field.Name, tp.RedisColumns) {
bindvarsStr = append(bindvarsStr, string(bindVar.GetValue()))
}
}
}
// fmt.Println("bind vars STR %s", bindvarsStr)
return executor(redis.GenerateCacheKey(bindvarsStr...))
}

func execParsedQuery(pq *sqlparser.ParsedQuery, bindvars map[string]*querypb.BindVariable, executor func(string) (*sqltypes.Result, error)) (*sqltypes.Result, error) {
sql, err := pq.GenerateQuery(bindvars, nil)
if err != nil {
Expand Down
33 changes: 19 additions & 14 deletions go/cmd/boost/boost/table_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,14 @@ type tablePlanBuilder struct {
// selColumns keeps track of the columns we want to pull from source.
// If Lastpk is set, we compare this list against the table's pk and
// add missing references.
selColumns map[string]bool
colExprs []*colExpr
onInsert insertType
pkCols []*colExpr
lastpk *sqltypes.Result
colInfos []*vreplication.ColumnInfo
stats *binlogplayer.Stats
selColumns map[string]bool
colExprs []*colExpr
onInsert insertType
pkCols []*colExpr
lastpk *sqltypes.Result
colInfos []*vreplication.ColumnInfo
stats *binlogplayer.Stats
RedisColumns []string
}

// colExpr describes the processing to be performed to
Expand Down Expand Up @@ -107,13 +108,14 @@ const (
// The TablePlan built is a partial plan. The full plan for a table is built
// when we receive field information from events or rows sent by the source.
// buildExecutionPlan is the function that builds the full plan.
func buildReplicatorPlan(filter *binlogdatapb.Filter, colInfoMap map[string][]*vreplication.ColumnInfo, copyState map[string]*sqltypes.Result, stats *binlogplayer.Stats) (*ReplicatorPlan, error) {
func buildReplicatorPlan(filter *binlogdatapb.Filter, colInfoMap map[string][]*vreplication.ColumnInfo, copyState map[string]*sqltypes.Result, stats *binlogplayer.Stats, redisColumns []string) (*ReplicatorPlan, error) {
plan := &ReplicatorPlan{
VStreamFilter: &binlogdatapb.Filter{FieldEventMode: filter.FieldEventMode},
TargetTables: make(map[string]*TablePlan),
TablePlans: make(map[string]*TablePlan),
ColInfoMap: colInfoMap,
stats: stats,
RedisColumns: redisColumns,
}
for tableName := range colInfoMap {
lastpk, ok := copyState[tableName]
Expand All @@ -128,7 +130,7 @@ func buildReplicatorPlan(filter *binlogdatapb.Filter, colInfoMap map[string][]*v
if rule == nil {
continue
}
tablePlan, err := buildTablePlan(tableName, rule, colInfoMap, lastpk, stats)
tablePlan, err := buildTablePlan(tableName, rule, colInfoMap, lastpk, stats, redisColumns)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -167,7 +169,7 @@ func MatchTable(tableName string, filter *binlogdatapb.Filter) (*binlogdatapb.Ru
return nil, nil
}

func buildTablePlan(tableName string, rule *binlogdatapb.Rule, colInfoMap map[string][]*vreplication.ColumnInfo, lastpk *sqltypes.Result, stats *binlogplayer.Stats) (*TablePlan, error) {
func buildTablePlan(tableName string, rule *binlogdatapb.Rule, colInfoMap map[string][]*vreplication.ColumnInfo, lastpk *sqltypes.Result, stats *binlogplayer.Stats, redisColumns []string) (*TablePlan, error) {
filter := rule.Filter
query := filter
// generate equivalent select statement if filter is empty or a keyrange.
Expand Down Expand Up @@ -214,6 +216,7 @@ func buildTablePlan(tableName string, rule *binlogdatapb.Rule, colInfoMap map[st
Stats: stats,
EnumValuesMap: enumValuesMap,
ConvertCharset: rule.ConvertCharset,
RedisColumns: redisColumns,
}

return tablePlan, nil
Expand All @@ -225,10 +228,11 @@ func buildTablePlan(tableName string, rule *binlogdatapb.Rule, colInfoMap map[st
From: sel.From,
Where: sel.Where,
},
selColumns: make(map[string]bool),
lastpk: lastpk,
colInfos: colInfoMap[tableName],
stats: stats,
selColumns: make(map[string]bool),
lastpk: lastpk,
colInfos: colInfoMap[tableName],
stats: stats,
RedisColumns: redisColumns,
}

if err := tpb.analyzeExprs(sel.SelectExprs); err != nil {
Expand Down Expand Up @@ -310,6 +314,7 @@ func (tpb *tablePlanBuilder) generate() *TablePlan {
PKReferences: pkrefs,
Stats: tpb.stats,
FieldsToSkip: fieldsToSkip,
RedisColumns: tpb.RedisColumns,
}
}

Expand Down
12 changes: 9 additions & 3 deletions go/cmd/boost/boost/vplayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"strings"

"vitess.io/vitess/go/cache/redis"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/dbconnpool"
"vitess.io/vitess/go/vt/log"
Expand All @@ -20,6 +21,7 @@ type vplayer struct {
currentVgtid *binlogdatapb.VGtid
// fields []*querypb.Field
dbPool *dbconnpool.ConnectionPool
cache *redis.Cache
}

func (vp *vplayer) applyVEvent(ctx context.Context, event *binlogdatapb.VEvent) error {
Expand Down Expand Up @@ -65,11 +67,15 @@ func (vp *vplayer) applyRowEvent(ctx context.Context, rowEvent *binlogdatapb.Row
return fmt.Errorf("unexpected event on table %s", rowEvent.TableName)
}
for _, change := range rowEvent.RowChanges {
_, err := tplan.applyChange(change, func(sql string) (*sqltypes.Result, error) {
_, err := tplan.applyRedisChange(change, func(key string) (*sqltypes.Result, error) {
// this one for Mysql change
// _, err := tplan.applyChange(change, func(sql string) (*sqltypes.Result, error) {
// stats := NewVrLogStats("ROWCHANGE")
// start := time.Now()
fmt.Println(sql)
return nil, nil
log.Infof("Deleting key: %s", key)
err := vp.cache.Delete(key)
// fmt.Println(sql)
return nil, err
// qr, err := vp.dbClient.ExecuteWithRetry(ctx, sql)
// vp.vr.stats.QueryCount.Add(vp.phase, 1)
// vp.vr.stats.QueryTimings.Record(vp.phase, start)
Expand Down
Loading