Skip to content

Commit

Permalink
VDiff: "show all" should only report vdiffs for the specified keyspac…
Browse files Browse the repository at this point in the history
…e and workflow (#14442)

Signed-off-by: Rohit Nayak <[email protected]>
Signed-off-by: Matt Lord <[email protected]>
Signed-off-by: deepthi <[email protected]>
Co-authored-by: Matt Lord <[email protected]>
Co-authored-by: deepthi <[email protected]>
  • Loading branch information
3 people authored Nov 4, 2023
1 parent 8072a38 commit 6912eb7
Show file tree
Hide file tree
Showing 6 changed files with 193 additions and 9 deletions.
135 changes: 135 additions & 0 deletions go/test/endtoend/vreplication/vdiff_multiple_movetables_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
Copyright 2023 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package vreplication

import (
"context"
"fmt"
"sync"
"testing"
"time"

"github.com/tidwall/gjson"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/vt/log"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
)

func TestMultipleConcurrentVDiffs(t *testing.T) {
cellName := "zone"
cells := []string{cellName}
vc = NewVitessCluster(t, t.Name(), cells, mainClusterConfig)

require.NotNil(t, vc)
allCellNames = cellName
defaultCellName := cellName
defaultCell = vc.Cells[defaultCellName]
sourceKeyspace := "product"
shardName := "0"

defer vc.TearDown(t)

cell := vc.Cells[cellName]
vc.AddKeyspace(t, []*Cell{cell}, sourceKeyspace, shardName, initialProductVSchema, initialProductSchema, 0, 0, 100, sourceKsOpts)

vtgate = cell.Vtgates[0]
require.NotNil(t, vtgate)
err := cluster.WaitForHealthyShard(vc.VtctldClient, sourceKeyspace, shardName)
require.NoError(t, err)
vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", sourceKeyspace, shardName), 1, 30*time.Second)

vtgateConn = getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
defer vtgateConn.Close()
verifyClusterHealth(t, vc)

insertInitialData(t)
targetTabletId := 200
targetKeyspace := "customer"
vc.AddKeyspace(t, []*Cell{cell}, targetKeyspace, shardName, initialProductVSchema, initialProductSchema, 0, 0, targetTabletId, sourceKsOpts)
vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", targetKeyspace, shardName), 1, 30*time.Second)

index := 1000
var loadCtx context.Context
var loadCancel context.CancelFunc
loadCtx, loadCancel = context.WithCancel(context.Background())
load := func(tableName string) {
query := "insert into %s(cid, name) values(%d, 'customer-%d')"
for {
select {
case <-loadCtx.Done():
log.Infof("load cancelled")
return
default:
index += 1
vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
q := fmt.Sprintf(query, tableName, index, index)
vtgateConn.ExecuteFetch(q, 1000, false)
vtgateConn.Close()
}
time.Sleep(10 * time.Millisecond)
}
}
targetKs := vc.Cells[cellName].Keyspaces[targetKeyspace]
targetTab := targetKs.Shards["0"].Tablets[fmt.Sprintf("%s-%d", cellName, targetTabletId)].Vttablet
require.NotNil(t, targetTab)

time.Sleep(15 * time.Second) // wait for some rows to be inserted.

createWorkflow := func(workflowName, tables string) {
mt := newMoveTables(vc, &moveTables{
workflowName: workflowName,
targetKeyspace: targetKeyspace,
sourceKeyspace: sourceKeyspace,
tables: tables,
}, moveTablesFlavorVtctld)
mt.Create()
waitForWorkflowState(t, vc, fmt.Sprintf("%s.%s", targetKeyspace, workflowName), binlogdatapb.VReplicationWorkflowState_Running.String())
catchup(t, targetTab, workflowName, "MoveTables")
}

createWorkflow("wf1", "customer")
createWorkflow("wf2", "customer2")

go load("customer")
go load("customer2")

var wg sync.WaitGroup
wg.Add(2)

doVdiff := func(workflowName, table string) {
defer wg.Done()
vdiff(t, targetKeyspace, workflowName, cellName, true, false, nil)
}
go doVdiff("wf1", "customer")
go doVdiff("wf2", "customer2")
wg.Wait()
loadCancel()

// confirm that show all shows the correct workflow and only that workflow.
output, err := vc.VtctldClient.ExecuteCommandWithOutput("VDiff", "--format", "json", "--workflow", "wf1", "--target-keyspace", "customer", "show", "all")
require.NoError(t, err)
log.Infof("VDiff output: %s", output)
count := gjson.Get(output, "..#").Int()
wf := gjson.Get(output, "0.Workflow").String()
ksName := gjson.Get(output, "0.Keyspace").String()
require.Equal(t, int64(1), count)
require.Equal(t, "wf1", wf)
require.Equal(t, "customer", ksName)
}
1 change: 0 additions & 1 deletion go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1781,7 +1781,6 @@ func (s *Server) VDiffShow(ctx context.Context, req *vtctldatapb.VDiffShowReques
log.Errorf("Error executing vdiff show action: %v", output.err)
return nil, output.err
}

return &vtctldatapb.VDiffShowResponse{
TabletResponses: output.responses,
}, nil
Expand Down
16 changes: 13 additions & 3 deletions go/vt/vttablet/tabletmanager/vdiff/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ const (
DeleteAction VDiffAction = "delete"
AllActionArg = "all"
LastActionArg = "last"

maxVDiffsToReport = 100
)

var (
Expand Down Expand Up @@ -267,13 +269,13 @@ func (vde *Engine) handleCreateResumeAction(ctx context.Context, dbClient binlog

func (vde *Engine) handleShowAction(ctx context.Context, dbClient binlogplayer.DBClient, action VDiffAction, req *tabletmanagerdatapb.VDiffRequest, resp *tabletmanagerdatapb.VDiffResponse) error {
var qr *sqltypes.Result
var err error
vdiffUUID := ""

if req.ActionArg == LastActionArg {
query, err := sqlparser.ParseAndBind(sqlGetMostRecentVDiff,
query, err := sqlparser.ParseAndBind(sqlGetMostRecentVDiffByKeyspaceWorkflow,
sqltypes.StringBindVariable(req.Keyspace),
sqltypes.StringBindVariable(req.Workflow),
sqltypes.Int64BindVariable(1),
)
if err != nil {
return err
Expand Down Expand Up @@ -322,7 +324,15 @@ func (vde *Engine) handleShowAction(ctx context.Context, dbClient binlogplayer.D
}
switch req.ActionArg {
case AllActionArg:
if qr, err = dbClient.ExecuteFetch(sqlGetAllVDiffs, -1); err != nil {
query, err := sqlparser.ParseAndBind(sqlGetMostRecentVDiffByKeyspaceWorkflow,
sqltypes.StringBindVariable(req.Keyspace),
sqltypes.StringBindVariable(req.Workflow),
sqltypes.Int64BindVariable(maxVDiffsToReport),
)
if err != nil {
return err
}
if qr, err = dbClient.ExecuteFetch(query, -1); err != nil {
return err
}
resp.Output = sqltypes.ResultToProto3(qr)
Expand Down
32 changes: 32 additions & 0 deletions go/vt/vttablet/tabletmanager/vdiff/action_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,38 @@ func TestPerformVDiffAction(t *testing.T) {
},
},
},
{
name: "show last",
req: &tabletmanagerdatapb.VDiffRequest{
Action: string(ShowAction),
ActionArg: "last",
Keyspace: keyspace,
Workflow: workflow,
},
expectQueries: []queryAndResult{
{
query: fmt.Sprintf("select * from _vt.vdiff where keyspace = %s and workflow = %s order by id desc limit %d",
encodeString(keyspace), encodeString(workflow), 1),
result: noResults,
},
},
},
{
name: "show all",
req: &tabletmanagerdatapb.VDiffRequest{
Action: string(ShowAction),
ActionArg: "all",
Keyspace: keyspace,
Workflow: workflow,
},
expectQueries: []queryAndResult{
{
query: fmt.Sprintf("select * from _vt.vdiff where keyspace = %s and workflow = %s order by id desc limit %d",
encodeString(keyspace), encodeString(workflow), maxVDiffsToReport),
result: noResults,
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
9 changes: 4 additions & 5 deletions go/vt/vttablet/tabletmanager/vdiff/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ const (
and vdt.state in ('completed', 'stopped')`
sqlRetryVDiff = `update _vt.vdiff as vd left join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id) set vd.state = 'pending',
vd.last_error = '', vdt.state = 'pending' where vd.id = %a and (vd.state = 'error' or vdt.state = 'error')`
sqlGetVDiffByKeyspaceWorkflowUUID = "select * from _vt.vdiff where keyspace = %a and workflow = %a and vdiff_uuid = %a"
sqlGetMostRecentVDiff = "select * from _vt.vdiff where keyspace = %a and workflow = %a order by id desc limit 1"
sqlGetVDiffByID = "select * from _vt.vdiff where id = %a"
sqlDeleteVDiffs = `delete from vd, vdt, vdl using _vt.vdiff as vd left join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id)
sqlGetVDiffByKeyspaceWorkflowUUID = "select * from _vt.vdiff where keyspace = %a and workflow = %a and vdiff_uuid = %a"
sqlGetMostRecentVDiffByKeyspaceWorkflow = "select * from _vt.vdiff where keyspace = %a and workflow = %a order by id desc limit %a"
sqlGetVDiffByID = "select * from _vt.vdiff where id = %a"
sqlDeleteVDiffs = `delete from vd, vdt, vdl using _vt.vdiff as vd left join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id)
left join _vt.vdiff_log as vdl on (vd.id = vdl.vdiff_id)
where vd.keyspace = %a and vd.workflow = %a`
sqlDeleteVDiffByUUID = `delete from vd, vdt using _vt.vdiff as vd left join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id)
Expand All @@ -48,7 +48,6 @@ const (
sqlGetVDiffsToRetry = "select * from _vt.vdiff where state = 'error' and json_unquote(json_extract(options, '$.core_options.auto_retry')) = 'true'"
sqlGetVDiffID = "select id as id from _vt.vdiff where vdiff_uuid = %a"
sqlGetVDiffIDsByKeyspaceWorkflow = "select id as id from _vt.vdiff where keyspace = %a and workflow = %a"
sqlGetAllVDiffs = "select * from _vt.vdiff order by id desc"
sqlGetTableRows = "select table_rows as table_rows from INFORMATION_SCHEMA.TABLES where table_schema = %a and table_name = %a"
sqlGetAllTableRows = "select table_name as table_name, table_rows as table_rows from INFORMATION_SCHEMA.TABLES where table_schema = %s and table_name in (%s)"

Expand Down
9 changes: 9 additions & 0 deletions test/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -1049,6 +1049,15 @@
"RetryMax": 0,
"Tags": []
},
"vdiff_multiple_movetables_test.go": {
"File": "unused.go",
"Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestMultipleConcurrentVDiffs"],
"Command": [],
"Manual": false,
"Shard": "vreplication_partial_movetables_basic",
"RetryMax": 0,
"Tags": []
},
"vreplication_movetables_buffering": {
"File": "unused.go",
"Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestMoveTablesBuffering"],
Expand Down

0 comments on commit 6912eb7

Please sign in to comment.