Skip to content

Commit

Permalink
go/vt/wrangler: reduce VReplicationExec calls when getting copy state (
Browse files Browse the repository at this point in the history
…#14375)

Signed-off-by: Max Englander <[email protected]>
Signed-off-by: Max Englander <[email protected]>
Co-authored-by: Matt Lord <[email protected]>
  • Loading branch information
maxenglander and mattlord authored Dec 28, 2023
1 parent e5b7def commit 2783e32
Show file tree
Hide file tree
Showing 14 changed files with 2,170 additions and 1,930 deletions.
3,660 changes: 1,835 additions & 1,825 deletions go/vt/proto/vtctldata/vtctldata.pb.go

Large diffs are not rendered by default.

32 changes: 30 additions & 2 deletions go/vt/proto/vtctldata/vtctldata_vtproto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

79 changes: 47 additions & 32 deletions go/vt/vtctl/vtctl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ func TestMoveTables(t *testing.T) {
shard := "0"
sourceKs := "sourceks"
targetKs := "targetks"
table := "customer"
table1 := "customer"
table2 := "customer_order"
wf := "testwf"
ksWf := fmt.Sprintf("%s.%s", targetKs, wf)
minTableSize := 16384 // a single 16KiB InnoDB page
Expand All @@ -159,16 +160,22 @@ func TestMoveTables(t *testing.T) {
defer env.close()
source := env.addTablet(100, sourceKs, shard, &topodatapb.KeyRange{}, topodatapb.TabletType_PRIMARY)
target := env.addTablet(200, targetKs, shard, &topodatapb.KeyRange{}, topodatapb.TabletType_PRIMARY)
sourceCol := fmt.Sprintf(`keyspace:"%s" shard:"%s" filter:{rules:{match:"%s" filter:"select * from %s"}}`,
sourceKs, shard, table, table)
sourceCol := fmt.Sprintf(`keyspace:"%s" shard:"%s" filter:{rules:{match:"%s" filter:"select * from %s"} rules:{match:"%s" filter:"select * from %s"}}`,
sourceKs, shard, table1, table1, table2, table2)
bls := &binlogdatapb.BinlogSource{
Keyspace: sourceKs,
Shard: shard,
Filter: &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: table,
Filter: fmt.Sprintf("select * from %s", table),
}},
Rules: []*binlogdatapb.Rule{
{
Match: table1,
Filter: fmt.Sprintf("select * from %s", table1),
},
{
Match: table2,
Filter: fmt.Sprintf("select * from %s", table2),
},
},
},
}
now := time.Now().UTC().Unix()
Expand Down Expand Up @@ -200,12 +207,13 @@ func TestMoveTables(t *testing.T) {
expectResults: func() {
env.tmc.setVRResults(
target.tablet,
fmt.Sprintf("select table_name, lastpk from _vt.copy_state where vrepl_id = %d and id in (select max(id) from _vt.copy_state where vrepl_id = %d group by vrepl_id, table_name)",
fmt.Sprintf("select vrepl_id, table_name, lastpk from _vt.copy_state where vrepl_id in (%d) and id in (select max(id) from _vt.copy_state where vrepl_id in (%d) group by vrepl_id, table_name)",
vrID, vrID),
sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"table_name|lastpk",
"varchar|varbinary"),
fmt.Sprintf("%s|", table),
"vrepl_id|table_name|lastpk",
"int64|varchar|varbinary"),
fmt.Sprintf("%d|%s|", vrID, table1),
fmt.Sprintf("%d|%s|", vrID, table2),
),
)
env.tmc.setDBAResults(
Expand All @@ -215,7 +223,8 @@ func TestMoveTables(t *testing.T) {
sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"table_name",
"varchar"),
table,
table1,
table2,
),
)
env.tmc.setVRResults(
Expand All @@ -231,26 +240,28 @@ func TestMoveTables(t *testing.T) {
)
env.tmc.setDBAResults(
target.tablet,
fmt.Sprintf("select table_name, table_rows, data_length from information_schema.tables where table_schema = 'vt_%s' and table_name in ('%s')",
targetKs, table),
fmt.Sprintf("select table_name, table_rows, data_length from information_schema.tables where table_schema = 'vt_%s' and table_name in ('%s','%s')",
targetKs, table1, table2),
sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"table_name|table_rows|data_length",
"varchar|int64|int64"),
fmt.Sprintf("%s|0|%d", table, minTableSize),
fmt.Sprintf("%s|0|%d", table1, minTableSize),
fmt.Sprintf("%s|0|%d", table2, minTableSize),
),
)
env.tmc.setDBAResults(
source.tablet,
fmt.Sprintf("select table_name, table_rows, data_length from information_schema.tables where table_schema = 'vt_%s' and table_name in ('%s')",
sourceKs, table),
fmt.Sprintf("select table_name, table_rows, data_length from information_schema.tables where table_schema = 'vt_%s' and table_name in ('%s','%s')",
sourceKs, table1, table2),
sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"table_name|table_rows|data_length",
"varchar|int64|int64"),
fmt.Sprintf("%s|10|%d", table, minTableSize),
fmt.Sprintf("%s|10|%d", table1, minTableSize),
fmt.Sprintf("%s|10|%d", table2, minTableSize),
),
)
},
want: fmt.Sprintf("\nCopy Progress (approx):\n\n\ncustomer: rows copied 0/10 (0%%), size copied 16384/16384 (100%%)\n\n\n\nThe following vreplication streams exist for workflow %s:\n\nid=%d on %s/%s-0000000%d: Status: Copying. VStream has not started.\n\n\n",
want: fmt.Sprintf("\nCopy Progress (approx):\n\n\ncustomer: rows copied 0/10 (0%%), size copied 16384/16384 (100%%)\ncustomer_order: rows copied 0/10 (0%%), size copied 16384/16384 (100%%)\n\n\n\nThe following vreplication streams exist for workflow %s:\n\nid=%d on %s/%s-0000000%d: Status: Copying. VStream has not started.\n\n\n",
ksWf, vrID, shard, env.cell, target.tablet.Alias.Uid),
},
{
Expand All @@ -260,12 +271,13 @@ func TestMoveTables(t *testing.T) {
expectResults: func() {
env.tmc.setVRResults(
target.tablet,
fmt.Sprintf("select table_name, lastpk from _vt.copy_state where vrepl_id = %d and id in (select max(id) from _vt.copy_state where vrepl_id = %d group by vrepl_id, table_name)",
fmt.Sprintf("select vrepl_id, table_name, lastpk from _vt.copy_state where vrepl_id in (%d) and id in (select max(id) from _vt.copy_state where vrepl_id in (%d) group by vrepl_id, table_name)",
vrID, vrID),
sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"table_name|lastpk",
"varchar|varbinary"),
fmt.Sprintf("%s|", table),
"vrepl_id|table_name|lastpk",
"int64|varchar|varbinary"),
fmt.Sprintf("%d|%s|", vrID, table1),
fmt.Sprintf("%d|%s|", vrID, table2),
),
)
env.tmc.setDBAResults(
Expand All @@ -275,7 +287,8 @@ func TestMoveTables(t *testing.T) {
sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"table_name",
"varchar"),
table,
table1,
table2,
),
)
env.tmc.setVRResults(
Expand All @@ -291,26 +304,28 @@ func TestMoveTables(t *testing.T) {
)
env.tmc.setDBAResults(
target.tablet,
fmt.Sprintf("select table_name, table_rows, data_length from information_schema.tables where table_schema = 'vt_%s' and table_name in ('%s')",
targetKs, table),
fmt.Sprintf("select table_name, table_rows, data_length from information_schema.tables where table_schema = 'vt_%s' and table_name in ('%s','%s')",
targetKs, table1, table2),
sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"table_name|table_rows|data_length",
"varchar|int64|int64"),
fmt.Sprintf("%s|5|%d", table, minTableSize),
fmt.Sprintf("%s|5|%d", table1, minTableSize),
fmt.Sprintf("%s|5|%d", table2, minTableSize),
),
)
env.tmc.setDBAResults(
source.tablet,
fmt.Sprintf("select table_name, table_rows, data_length from information_schema.tables where table_schema = 'vt_%s' and table_name in ('%s')",
sourceKs, table),
fmt.Sprintf("select table_name, table_rows, data_length from information_schema.tables where table_schema = 'vt_%s' and table_name in ('%s','%s')",
sourceKs, table1, table2),
sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"table_name|table_rows|data_length",
"varchar|int64|int64"),
fmt.Sprintf("%s|10|%d", table, minTableSize),
fmt.Sprintf("%s|10|%d", table1, minTableSize),
fmt.Sprintf("%s|10|%d", table2, minTableSize),
),
)
},
want: fmt.Sprintf("\nCopy Progress (approx):\n\n\ncustomer: rows copied 5/10 (50%%), size copied 16384/16384 (100%%)\n\n\n\nThe following vreplication streams exist for workflow %s:\n\nid=%d on %s/%s-0000000%d: Status: Error: Duplicate entry '6' for key 'customer.PRIMARY' (errno 1062) (sqlstate 23000) during query: insert into customer(customer_id,email) values (6,'[email protected]').\n\n\n",
want: fmt.Sprintf("\nCopy Progress (approx):\n\n\ncustomer: rows copied 5/10 (50%%), size copied 16384/16384 (100%%)\ncustomer_order: rows copied 5/10 (50%%), size copied 16384/16384 (100%%)\n\n\n\nThe following vreplication streams exist for workflow %s:\n\nid=%d on %s/%s-0000000%d: Status: Error: Duplicate entry '6' for key 'customer.PRIMARY' (errno 1062) (sqlstate 23000) during query: insert into customer(customer_id,email) values (6,'[email protected]').\n\n\n",
ksWf, vrID, shard, env.cell, target.tablet.Alias.Uid),
},
{
Expand All @@ -320,7 +335,7 @@ func TestMoveTables(t *testing.T) {
expectResults: func() {
env.tmc.setVRResults(
target.tablet,
fmt.Sprintf("select table_name, lastpk from _vt.copy_state where vrepl_id = %d and id in (select max(id) from _vt.copy_state where vrepl_id = %d group by vrepl_id, table_name)",
fmt.Sprintf("select vrepl_id, table_name, lastpk from _vt.copy_state where vrepl_id in (%d) and id in (select max(id) from _vt.copy_state where vrepl_id in (%d) group by vrepl_id, table_name)",
vrID, vrID),
&sqltypes.Result{},
)
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtctl/workflow/materializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ const mzSelectFrozenQuery = "select 1 from _vt.vreplication where db_name='vt_ta
const mzCheckJournal = "/select val from _vt.resharding_journal where id="
const mzGetWorkflowStatusQuery = "select id, workflow, source, pos, stop_pos, max_replication_lag, state, db_name, time_updated, transaction_timestamp, message, tags, workflow_type, workflow_sub_type, time_heartbeat, defer_secondary_keys, component_throttled, time_throttled, rows_copied from _vt.vreplication where workflow = 'workflow' and db_name = 'vt_targetks'"
const mzGetCopyState = "select distinct table_name from _vt.copy_state cs, _vt.vreplication vr where vr.id = cs.vrepl_id and vr.id = 1"
const mzGetLatestCopyState = "select table_name, lastpk from _vt.copy_state where vrepl_id = 1 and id in (select max(id) from _vt.copy_state where vrepl_id = 1 group by vrepl_id, table_name)"
const mzGetLatestCopyState = "select vrepl_id, table_name, lastpk from _vt.copy_state where vrepl_id in (1) and id in (select max(id) from _vt.copy_state where vrepl_id in (1) group by vrepl_id, table_name)"
const insertPrefix = `/insert into _vt.vreplication\(workflow, source, pos, max_tps, max_replication_lag, cell, tablet_types, time_updated, transaction_timestamp, state, db_name, workflow_type, workflow_sub_type, defer_secondary_keys\) values `
const eol = "$"

Expand Down
98 changes: 83 additions & 15 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"time"

"golang.org/x/exp/maps"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
"google.golang.org/protobuf/encoding/prototext"
"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -419,7 +420,65 @@ func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflows
return nil, err
}

m := sync.Mutex{} // guards access to the following maps during concurrent calls to scanWorkflow
m := sync.Mutex{} // guards access to the following maps during concurrent calls to fetchCopyStates and scanWorkflow

copyStatesByShardStreamId := make(map[string][]*vtctldatapb.Workflow_Stream_CopyState, len(results))

fetchCopyStates := func(ctx context.Context, tablet *topo.TabletInfo, streamIds []int64) error {
span, ctx := trace.NewSpan(ctx, "workflow.Server.fetchCopyStates")
defer span.Finish()

span.Annotate("keyspace", req.Keyspace)
span.Annotate("shard", tablet.Shard)
span.Annotate("tablet_alias", tablet.AliasString())

copyStates, err := s.getWorkflowCopyStates(ctx, tablet, streamIds)
if err != nil {
return err
}

m.Lock()
defer m.Unlock()

for _, copyState := range copyStates {
shardStreamId := fmt.Sprintf("%s/%d", tablet.Shard, copyState.StreamId)
copyStatesByShardStreamId[shardStreamId] = append(
copyStatesByShardStreamId[shardStreamId],
copyState,
)
}

return nil
}

fetchCopyStatesEg, fetchCopyStatesCtx := errgroup.WithContext(ctx)

for tablet, result := range results {
qr := sqltypes.Proto3ToResult(result)
tablet := tablet // loop closure

streamIds := make([]int64, 0, len(qr.Rows))
for _, row := range qr.Named().Rows {
streamId, err := row.ToInt64("id")
if err != nil {
return nil, err
}
streamIds = append(streamIds, streamId)
}

if len(streamIds) == 0 {
continue
}

fetchCopyStatesEg.Go(func() error {
return fetchCopyStates(fetchCopyStatesCtx, tablet, streamIds)
})
}

if err := fetchCopyStatesEg.Wait(); err != nil {
return nil, err
}

workflowsMap := make(map[string]*vtctldatapb.Workflow, len(results))
sourceKeyspaceByWorkflow := make(map[string]string, len(results))
sourceShardsByWorkflow := make(map[string]sets.Set[string], len(results))
Expand Down Expand Up @@ -547,19 +606,15 @@ func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflows
},
}

stream.CopyStates, err = s.getWorkflowCopyStates(ctx, tablet, id)
if err != nil {
return err
// Merge in copy states, which we've already fetched.
shardStreamId := fmt.Sprintf("%s/%d", tablet.Shard, id)
if copyState, ok := copyStatesByShardStreamId[shardStreamId]; ok {
stream.CopyStates = copyState
}

span.Annotate("num_copy_states", len(stream.CopyStates))

// At this point, we're going to start modifying the maps defined
// outside this function, as well as fields on the passed-in Workflow
// pointer. Since we're running concurrently, take the lock.
//
// We've already made the remote call to getCopyStates, so synchronizing
// here shouldn't hurt too badly, performance-wise.
m.Lock()
defer m.Unlock()

Expand Down Expand Up @@ -1053,16 +1108,24 @@ func (s *Server) getWorkflowState(ctx context.Context, targetKeyspace, workflowN
return ts, state, nil
}

func (s *Server) getWorkflowCopyStates(ctx context.Context, tablet *topo.TabletInfo, id int64) ([]*vtctldatapb.Workflow_Stream_CopyState, error) {
func (s *Server) getWorkflowCopyStates(ctx context.Context, tablet *topo.TabletInfo, streamIds []int64) ([]*vtctldatapb.Workflow_Stream_CopyState, error) {
span, ctx := trace.NewSpan(ctx, "workflow.Server.getWorkflowCopyStates")
defer span.Finish()

span.Annotate("keyspace", tablet.Keyspace)
span.Annotate("shard", tablet.Shard)
span.Annotate("tablet_alias", tablet.AliasString())
span.Annotate("vrepl_id", id)
span.Annotate("stream_ids", fmt.Sprintf("%#v", streamIds))

query := fmt.Sprintf("select table_name, lastpk from _vt.copy_state where vrepl_id = %d and id in (select max(id) from _vt.copy_state where vrepl_id = %d group by vrepl_id, table_name)", id, id)
idsBV, err := sqltypes.BuildBindVariable(streamIds)
if err != nil {
return nil, err
}
query, err := sqlparser.ParseAndBind("select vrepl_id, table_name, lastpk from _vt.copy_state where vrepl_id in %a and id in (select max(id) from _vt.copy_state where vrepl_id in %a group by vrepl_id, table_name)",
idsBV, idsBV)
if err != nil {
return nil, err
}
qr, err := s.tmc.VReplicationExec(ctx, tablet.Tablet, query)
if err != nil {
return nil, err
Expand All @@ -1075,10 +1138,15 @@ func (s *Server) getWorkflowCopyStates(ctx context.Context, tablet *topo.TabletI

copyStates := make([]*vtctldatapb.Workflow_Stream_CopyState, len(result.Rows))
for i, row := range result.Rows {
// These fields are technically varbinary, but this is close enough.
streamId, err := row[0].ToInt64()
if err != nil {
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to cast vrepl_id to int64: %v", err)
}
// These string fields are technically varbinary, but this is close enough.
copyStates[i] = &vtctldatapb.Workflow_Stream_CopyState{
Table: row[0].ToString(),
LastPk: row[1].ToString(),
StreamId: streamId,
Table: row[1].ToString(),
LastPk: row[2].ToString(),
}
}

Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletmanager/rpc_vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ const (
getWorkflowState = "select pos, stop_pos, max_tps, max_replication_lag, state, workflow_type, workflow, workflow_sub_type, defer_secondary_keys from _vt.vreplication where id=1"
getCopyState = "select distinct table_name from _vt.copy_state cs, _vt.vreplication vr where vr.id = cs.vrepl_id and vr.id = 1"
getNumCopyStateTable = "select count(distinct table_name) from _vt.copy_state where vrepl_id=1"
getLatestCopyState = "select table_name, lastpk from _vt.copy_state where vrepl_id = 1 and id in (select max(id) from _vt.copy_state where vrepl_id = 1 group by vrepl_id, table_name)"
getLatestCopyState = "select vrepl_id, table_name, lastpk from _vt.copy_state where vrepl_id in (1) and id in (select max(id) from _vt.copy_state where vrepl_id in (1) group by vrepl_id, table_name)"
getAutoIncrementStep = "select @@session.auto_increment_increment"
setSessionTZ = "set @@session.time_zone = '+00:00'"
setNames = "set names 'binary'"
Expand Down
Loading

0 comments on commit 2783e32

Please sign in to comment.