diff --git a/go/cache/redis/cache.go b/go/cache/redis/cache.go index b82b7f689aa..cd4684df0f4 100644 --- a/go/cache/redis/cache.go +++ b/go/cache/redis/cache.go @@ -26,6 +26,20 @@ func NewCache() *Cache { } } +func NewCacheParams(addr, password string, DB int) *Cache { + opts := &gredis.Options{ + Addr: addr, + Password: password, + DB: DB, + } + + client := gredis.NewClient(opts) + + return &Cache{ + client: client, + } +} + func (c *Cache) Get(key string) (string, error) { return c.client.Get(key).Result() } diff --git a/go/cmd/boost/boost.go b/go/cmd/boost/boost.go index 20127ddcc5e..eac49365923 100644 --- a/go/cmd/boost/boost.go +++ b/go/cmd/boost/boost.go @@ -20,7 +20,9 @@ var ( targetHost, targetGtid, targetFilter, - targetTabletType string + targetColumns, + targetTabletType, + redisAddr string boostFlags = []string{ "host", @@ -28,7 +30,9 @@ var ( "grpcPort", "gtid", "filter", + "columns", "tabletType", + "redisAddr", } ) @@ -70,11 +74,13 @@ func flagUsage(f *flag.Flag) { } func init() { + flag.StringVar(&redisAddr, "redisAddr", "127.0.0.1:6379", "(defaults to 127.0.0.1:6379)") flag.StringVar(&targetHost, "host", "127.0.0.1", "(defaults to 127.0.0.1)") flag.IntVar(&targetPort, "port", 15306, "(defaults to 15306)") flag.IntVar(&targetGrpcPort, "grpcPort", 15991, "(defaults to 15991)") flag.StringVar(&targetGtid, "gtid", "{}", "(defaults to {})") flag.StringVar(&targetFilter, "filter", "{}", "(defaults to{})") + flag.StringVar(&targetColumns, "columns", "id,user_id", "(defaults to id)") flag.StringVar(&targetTabletType, "tabletType", "master", "(defaults to{})") logger := logutil.NewConsoleLogger() flag.CommandLine.SetOutput(logutil.NewLoggerWriter(logger)) @@ -92,7 +98,9 @@ func main() { targetHost, targetGtid, targetFilter, + targetColumns, targetTabletType, + redisAddr, ) if err != nil { os.Exit(1) diff --git a/go/cmd/boost/boost/boost.go b/go/cmd/boost/boost/boost.go index 772d0c7648d..0903d711581 100644 --- a/go/cmd/boost/boost/boost.go +++ b/go/cmd/boost/boost/boost.go @@ -8,6 +8,7 @@ import ( "strings" "time" + "vitess.io/vitess/go/cache/redis" "vitess.io/vitess/go/json2" "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/vt/binlog/binlogplayer" @@ -24,16 +25,18 @@ import ( ) type BoostConfig struct { - Host string - Port int - GrpcPort int - User string - Password string - Gtid *binlogdatapb.VGtid - Filter *binlogdatapb.Filter - TabletType topodatapb.TabletType - Source string - Flags vtgatepb.VStreamFlags + Host string + Port int + GrpcPort int + User string + Password string + Gtid *binlogdatapb.VGtid + Filter *binlogdatapb.Filter + RedisColumns []string + TabletType topodatapb.TabletType + Source string + Flags vtgatepb.VStreamFlags + RedisAddr string } type Boost struct { @@ -41,7 +44,7 @@ type Boost struct { player *vplayer } -func NewBoost(port, grpcPort int, host, gtid, filter, tabletType string) (*Boost, error) { +func NewBoost(port, grpcPort int, host, gtid, filter, columns, tabletType, redisAddr string) (*Boost, error) { targetVgtid := &binlogdatapb.VGtid{} targetFilter := &binlogdatapb.Filter{} if err := json2.Unmarshal([]byte(gtid), &targetVgtid); err != nil { @@ -54,15 +57,17 @@ func NewBoost(port, grpcPort int, host, gtid, filter, tabletType string) (*Boost } cfg := &BoostConfig{ - Host: host, - Port: port, - GrpcPort: grpcPort, - Gtid: targetVgtid, - Filter: targetFilter, + Host: host, + Port: port, + GrpcPort: grpcPort, + Gtid: targetVgtid, + Filter: targetFilter, + RedisColumns: strings.Split(columns, ","), Flags: vtgatepb.VStreamFlags{ //MinimizeSkew: false, HeartbeatInterval: 60, //seconds }, + RedisAddr: redisAddr, } switch tabletType { case "replica": @@ -93,6 +98,8 @@ func (b *Boost) Init() error { ), ) + cache := redis.NewCacheParams(b.config.RedisAddr, "", 0) + // for event aplicator colInfo, err := buildColInfoMap(context.Background(), dbPool) if err != nil { @@ -102,9 +109,11 @@ func (b *Boost) Init() error { replicatorPlan, err := buildReplicatorPlan( b.config.Filter, + colInfo, nil, binlogplayer.NewStats(), + b.config.RedisColumns, ) if err != nil { log.Error("---- buildReplicatorPlan", err) @@ -116,6 +125,7 @@ func (b *Boost) Init() error { dbPool: dbPool, replicatorPlan: replicatorPlan, tablePlans: make(map[string]*TablePlan), + cache: cache, } return nil } diff --git a/go/cmd/boost/boost/replicator_plan.go b/go/cmd/boost/boost/replicator_plan.go index a1b3df19bf2..bfdce4cd071 100644 --- a/go/cmd/boost/boost/replicator_plan.go +++ b/go/cmd/boost/boost/replicator_plan.go @@ -9,6 +9,7 @@ import ( "google.golang.org/protobuf/proto" "vitess.io/vitess/go/bytes2" + "vitess.io/vitess/go/cache/redis" "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/binlog/binlogplayer" @@ -40,6 +41,7 @@ type ReplicatorPlan struct { TablePlans map[string]*TablePlan ColInfoMap map[string][]*vreplication.ColumnInfo stats *binlogplayer.Stats + RedisColumns []string } // buildExecution plan uses the field info as input and the partially built @@ -83,10 +85,11 @@ func (rp *ReplicatorPlan) buildExecutionPlan(fieldEvent *binlogdatapb.FieldEvent // requires us to wait for the field info sent by the source. func (rp *ReplicatorPlan) buildFromFields(tableName string, lastpk *sqltypes.Result, fields []*querypb.Field) (*TablePlan, error) { tpb := &tablePlanBuilder{ - name: sqlparser.NewTableIdent(tableName), - lastpk: lastpk, - colInfos: rp.ColInfoMap[tableName], - stats: rp.stats, + name: sqlparser.NewTableIdent(tableName), + lastpk: lastpk, + colInfos: rp.ColInfoMap[tableName], + stats: rp.stats, + RedisColumns: rp.RedisColumns, } for _, field := range fields { colName := sqlparser.NewColIdent(field.Name) @@ -188,6 +191,7 @@ type TablePlan struct { Stats *binlogplayer.Stats FieldsToSkip map[string]bool ConvertCharset map[string](*binlogdatapb.CharsetConversion) + RedisColumns []string } // MarshalJSON performs a custom JSON Marshalling. @@ -372,6 +376,61 @@ func (tp *TablePlan) applyChange(rowChange *binlogdatapb.RowChange, executor fun return nil, nil } +func check(field string, b []string) bool { + for i := range b { + if field == b[i] { + return true + } + } + return false +} + +// applyChange generate mysql statement +func (tp *TablePlan) applyRedisChange(rowChange *binlogdatapb.RowChange, executor func(string) (*sqltypes.Result, error)) (*sqltypes.Result, error) { + var before, after bool + // insert uses after -> need after + // delete uses before -> need before + // update uses before/after -> need before + if rowChange.Before != nil { + before = true + } + if rowChange.After != nil { + after = true + } + bindvarsStr := append([]string{tp.TargetName}, tp.RedisColumns...) + if rowChange.Before != nil { + before = true + } + if rowChange.After != nil { + after = true + } + if !before && after { + vals := sqltypes.MakeRowTrusted(tp.Fields, rowChange.After) + for i, field := range tp.Fields { + bindVar, err := tp.bindFieldVal(field, &vals[i]) + if err != nil { + return nil, err + } + if check(field.Name, tp.RedisColumns) { + bindvarsStr = append(bindvarsStr, string(bindVar.GetValue())) + } + } + } else { + vals := sqltypes.MakeRowTrusted(tp.Fields, rowChange.Before) + for i, field := range tp.Fields { + bindVar, err := tp.bindFieldVal(field, &vals[i]) + if err != nil { + return nil, err + } + if check(field.Name, tp.RedisColumns) { + bindvarsStr = append(bindvarsStr, string(bindVar.GetValue())) + } + } + } + // fmt.Println("bind vars STR %s", bindvarsStr) + return executor(redis.GenerateCacheKey(bindvarsStr...)) +} + func execParsedQuery(pq *sqlparser.ParsedQuery, bindvars map[string]*querypb.BindVariable, executor func(string) (*sqltypes.Result, error)) (*sqltypes.Result, error) { sql, err := pq.GenerateQuery(bindvars, nil) if err != nil { diff --git a/go/cmd/boost/boost/table_plan_builder.go b/go/cmd/boost/boost/table_plan_builder.go index e130ff0d3f9..e6286d5a8e3 100644 --- a/go/cmd/boost/boost/table_plan_builder.go +++ b/go/cmd/boost/boost/table_plan_builder.go @@ -31,13 +31,14 @@ type tablePlanBuilder struct { // selColumns keeps track of the columns we want to pull from source. // If Lastpk is set, we compare this list against the table's pk and // add missing references. - selColumns map[string]bool - colExprs []*colExpr - onInsert insertType - pkCols []*colExpr - lastpk *sqltypes.Result - colInfos []*vreplication.ColumnInfo - stats *binlogplayer.Stats + selColumns map[string]bool + colExprs []*colExpr + onInsert insertType + pkCols []*colExpr + lastpk *sqltypes.Result + colInfos []*vreplication.ColumnInfo + stats *binlogplayer.Stats + RedisColumns []string } // colExpr describes the processing to be performed to @@ -107,13 +108,14 @@ const ( // The TablePlan built is a partial plan. The full plan for a table is built // when we receive field information from events or rows sent by the source. // buildExecutionPlan is the function that builds the full plan. -func buildReplicatorPlan(filter *binlogdatapb.Filter, colInfoMap map[string][]*vreplication.ColumnInfo, copyState map[string]*sqltypes.Result, stats *binlogplayer.Stats) (*ReplicatorPlan, error) { +func buildReplicatorPlan(filter *binlogdatapb.Filter, colInfoMap map[string][]*vreplication.ColumnInfo, copyState map[string]*sqltypes.Result, stats *binlogplayer.Stats, redisColumns []string) (*ReplicatorPlan, error) { plan := &ReplicatorPlan{ VStreamFilter: &binlogdatapb.Filter{FieldEventMode: filter.FieldEventMode}, TargetTables: make(map[string]*TablePlan), TablePlans: make(map[string]*TablePlan), ColInfoMap: colInfoMap, stats: stats, + RedisColumns: redisColumns, } for tableName := range colInfoMap { lastpk, ok := copyState[tableName] @@ -128,7 +130,7 @@ func buildReplicatorPlan(filter *binlogdatapb.Filter, colInfoMap map[string][]*v if rule == nil { continue } - tablePlan, err := buildTablePlan(tableName, rule, colInfoMap, lastpk, stats) + tablePlan, err := buildTablePlan(tableName, rule, colInfoMap, lastpk, stats, redisColumns) if err != nil { return nil, err } @@ -167,7 +169,7 @@ func MatchTable(tableName string, filter *binlogdatapb.Filter) (*binlogdatapb.Ru return nil, nil } -func buildTablePlan(tableName string, rule *binlogdatapb.Rule, colInfoMap map[string][]*vreplication.ColumnInfo, lastpk *sqltypes.Result, stats *binlogplayer.Stats) (*TablePlan, error) { +func buildTablePlan(tableName string, rule *binlogdatapb.Rule, colInfoMap map[string][]*vreplication.ColumnInfo, lastpk *sqltypes.Result, stats *binlogplayer.Stats, redisColumns []string) (*TablePlan, error) { filter := rule.Filter query := filter // generate equivalent select statement if filter is empty or a keyrange. @@ -214,6 +216,7 @@ func buildTablePlan(tableName string, rule *binlogdatapb.Rule, colInfoMap map[st Stats: stats, EnumValuesMap: enumValuesMap, ConvertCharset: rule.ConvertCharset, + RedisColumns: redisColumns, } return tablePlan, nil @@ -225,10 +228,11 @@ func buildTablePlan(tableName string, rule *binlogdatapb.Rule, colInfoMap map[st From: sel.From, Where: sel.Where, }, - selColumns: make(map[string]bool), - lastpk: lastpk, - colInfos: colInfoMap[tableName], - stats: stats, + selColumns: make(map[string]bool), + lastpk: lastpk, + colInfos: colInfoMap[tableName], + stats: stats, + RedisColumns: redisColumns, } if err := tpb.analyzeExprs(sel.SelectExprs); err != nil { @@ -310,6 +314,7 @@ func (tpb *tablePlanBuilder) generate() *TablePlan { PKReferences: pkrefs, Stats: tpb.stats, FieldsToSkip: fieldsToSkip, + RedisColumns: tpb.RedisColumns, } } diff --git a/go/cmd/boost/boost/vplayer.go b/go/cmd/boost/boost/vplayer.go index d44f9973b2a..cc44a35003c 100644 --- a/go/cmd/boost/boost/vplayer.go +++ b/go/cmd/boost/boost/vplayer.go @@ -5,6 +5,7 @@ import ( "fmt" "strings" + "vitess.io/vitess/go/cache/redis" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/dbconnpool" "vitess.io/vitess/go/vt/log" @@ -20,6 +21,7 @@ type vplayer struct { currentVgtid *binlogdatapb.VGtid // fields []*querypb.Field dbPool *dbconnpool.ConnectionPool + cache *redis.Cache } func (vp *vplayer) applyVEvent(ctx context.Context, event *binlogdatapb.VEvent) error { @@ -65,11 +67,15 @@ func (vp *vplayer) applyRowEvent(ctx context.Context, rowEvent *binlogdatapb.Row return fmt.Errorf("unexpected event on table %s", rowEvent.TableName) } for _, change := range rowEvent.RowChanges { - _, err := tplan.applyChange(change, func(sql string) (*sqltypes.Result, error) { + _, err := tplan.applyRedisChange(change, func(key string) (*sqltypes.Result, error) { + // this one for Mysql change + // _, err := tplan.applyChange(change, func(sql string) (*sqltypes.Result, error) { // stats := NewVrLogStats("ROWCHANGE") // start := time.Now() - fmt.Println(sql) - return nil, nil + log.Infof("Deleting key: %s", key) + err := vp.cache.Delete(key) + // fmt.Println(sql) + return nil, err // qr, err := vp.dbClient.ExecuteWithRetry(ctx, sql) // vp.vr.stats.QueryCount.Add(vp.phase, 1) // vp.vr.stats.QueryTimings.Record(vp.phase, start)