Skip to content

Commit

Permalink
vtctldclient implementation
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Oct 29, 2023
1 parent 6528a3a commit 8d19288
Show file tree
Hide file tree
Showing 3 changed files with 293 additions and 14 deletions.
32 changes: 26 additions & 6 deletions go/vt/vtctl/workflow/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,17 @@ func (mz *materializer) createMoveTablesStreams(req *vtctldatapb.MoveTablesCreat
}

sourceShards := mz.filterSourceShards(target)
blses, err := mz.generateBinlogSources(mz.ctx, target, sourceShards)
// streamKeyRangesEqual allows us to optimize the stream for the cases
// where while the target keyspace may be sharded, the target shard has
// a single source shard to stream data from and the target and source
// shard have equal key ranges. This can be done, for example, when doing
// shard by shard migrations -- migrating a single shard at a time between
// sharded source and sharded target keyspaces.
streamKeyRangesEqual := false
if len(sourceShards) == 1 && key.KeyRangeEqual(sourceShards[0].KeyRange, target.KeyRange) {
streamKeyRangesEqual = true
}
blses, err := mz.generateBinlogSources(mz.ctx, target, sourceShards, streamKeyRangesEqual)
if err != nil {
return err
}
Expand Down Expand Up @@ -139,7 +149,17 @@ func (mz *materializer) createMaterializerStreams() error {
insertMap := make(map[string]string, len(mz.targetShards))
for _, targetShard := range mz.targetShards {
sourceShards := mz.filterSourceShards(targetShard)
inserts, err := mz.generateInserts(mz.ctx, sourceShards)
// streamKeyRangesEqual allows us to optimize the stream for the cases
// where while the target keyspace may be sharded, the target shard has
// a single source shard to stream data from and the target and source
// shard have equal key ranges. This can be done, for example, when doing
// shard by shard migrations -- migrating a single shard at a time between
// sharded source and sharded target keyspaces.
streamKeyRangesEqual := false
if len(sourceShards) == 1 && key.KeyRangeEqual(sourceShards[0].KeyRange, targetShard.KeyRange) {
streamKeyRangesEqual = true
}
inserts, err := mz.generateInserts(mz.ctx, sourceShards, streamKeyRangesEqual)
if err != nil {
return err
}
Expand All @@ -151,7 +171,7 @@ func (mz *materializer) createMaterializerStreams() error {
return nil
}

func (mz *materializer) generateInserts(ctx context.Context, sourceShards []*topo.ShardInfo) (string, error) {
func (mz *materializer) generateInserts(ctx context.Context, sourceShards []*topo.ShardInfo, keyRangesEqual bool) (string, error) {
ig := vreplication.NewInsertGenerator(binlogdatapb.VReplicationWorkflowState_Stopped, "{{.dbname}}")

for _, sourceShard := range sourceShards {
Expand Down Expand Up @@ -185,7 +205,7 @@ func (mz *materializer) generateInserts(ctx context.Context, sourceShards []*top
return "", fmt.Errorf("unrecognized statement: %s", ts.SourceExpression)
}
filter := ts.SourceExpression
if mz.targetVSchema.Keyspace.Sharded && mz.targetVSchema.Tables[ts.TargetTable].Type != vindexes.TypeReference {
if !keyRangesEqual && mz.targetVSchema.Keyspace.Sharded && mz.targetVSchema.Tables[ts.TargetTable].Type != vindexes.TypeReference {
cv, err := vindexes.FindBestColVindex(mz.targetVSchema.Tables[ts.TargetTable])
if err != nil {
return "", err
Expand Down Expand Up @@ -251,7 +271,7 @@ func (mz *materializer) generateInserts(ctx context.Context, sourceShards []*top
return ig.String(), nil
}

func (mz *materializer) generateBinlogSources(ctx context.Context, targetShard *topo.ShardInfo, sourceShards []*topo.ShardInfo) ([]*binlogdatapb.BinlogSource, error) {
func (mz *materializer) generateBinlogSources(ctx context.Context, targetShard *topo.ShardInfo, sourceShards []*topo.ShardInfo, keyRangesEqual bool) ([]*binlogdatapb.BinlogSource, error) {
blses := make([]*binlogdatapb.BinlogSource, 0, len(mz.sourceShards))
for _, sourceShard := range sourceShards {
bls := &binlogdatapb.BinlogSource{
Expand Down Expand Up @@ -284,7 +304,7 @@ func (mz *materializer) generateBinlogSources(ctx context.Context, targetShard *
return nil, fmt.Errorf("unrecognized statement: %s", ts.SourceExpression)
}
filter := ts.SourceExpression
if mz.targetVSchema.Keyspace.Sharded && mz.targetVSchema.Tables[ts.TargetTable].Type != vindexes.TypeReference {
if !keyRangesEqual && mz.targetVSchema.Keyspace.Sharded && mz.targetVSchema.Tables[ts.TargetTable].Type != vindexes.TypeReference {
cv, err := vindexes.FindBestColVindex(mz.targetVSchema.Tables[ts.TargetTable])
if err != nil {
return nil, err
Expand Down
35 changes: 27 additions & 8 deletions go/vt/vtctl/workflow/materializer_env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,23 @@ import (
"sync"
"testing"

_flag "vitess.io/vitess/go/internal/flag"
"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/mysqlctl/tmutils"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/memorytopo"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tmclient"

_flag "vitess.io/vitess/go/internal/flag"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

type queryResult struct {
Expand Down Expand Up @@ -154,6 +158,7 @@ func (env *testMaterializerEnv) addTablet(id int, keyspace, shard string, tablet
if tabletType == topodatapb.TabletType_PRIMARY {
_, err := env.ws.ts.UpdateShardFields(context.Background(), keyspace, shard, func(si *topo.ShardInfo) error {
si.PrimaryAlias = tablet.Alias
si.IsPrimaryServing = true
return nil
})
if err != nil {
Expand All @@ -175,20 +180,22 @@ type testMaterializerTMClient struct {
tmclient.TabletManagerClient
schema map[string]*tabletmanagerdatapb.SchemaDefinition

mu sync.Mutex
vrQueries map[int][]*queryResult
getSchemaCounts map[string]int
muSchemaCount sync.Mutex
mu sync.Mutex
vrQueries map[int][]*queryResult
createVReplicationWorkflowRequests map[uint32]*tabletmanagerdatapb.CreateVReplicationWorkflowRequest
getSchemaCounts map[string]int
muSchemaCount sync.Mutex

// Used to confirm the number of times WorkflowDelete was called.
workflowDeleteCalls int
}

func newTestMaterializerTMClient() *testMaterializerTMClient {
return &testMaterializerTMClient{
schema: make(map[string]*tabletmanagerdatapb.SchemaDefinition),
vrQueries: make(map[int][]*queryResult),
getSchemaCounts: make(map[string]int),
schema: make(map[string]*tabletmanagerdatapb.SchemaDefinition),
vrQueries: make(map[int][]*queryResult),
createVReplicationWorkflowRequests: make(map[uint32]*tabletmanagerdatapb.CreateVReplicationWorkflowRequest),
getSchemaCounts: make(map[string]int),
}
}

Expand All @@ -205,6 +212,11 @@ func (tmc *testMaterializerTMClient) schemaRequested(uid uint32) {
}

func (tmc *testMaterializerTMClient) CreateVReplicationWorkflow(ctx context.Context, tablet *topodatapb.Tablet, request *tabletmanagerdatapb.CreateVReplicationWorkflowRequest) (*tabletmanagerdatapb.CreateVReplicationWorkflowResponse, error) {
if expect := tmc.createVReplicationWorkflowRequests[tablet.Alias.Uid]; expect != nil {
if !proto.Equal(expect, request) {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "Unexpected CreateVReplicationWorkflow request: got %+v, want %+v", request, expect)
}
}
res := sqltypes.MakeTestResult(sqltypes.MakeTestFields("rowsaffected", "int64"), "1")
return &tabletmanagerdatapb.CreateVReplicationWorkflowResponse{Result: sqltypes.ResultToProto3(res)}, nil
}
Expand Down Expand Up @@ -288,6 +300,13 @@ func (tmc *testMaterializerTMClient) expectVRQuery(tabletID int, query string, r
})
}

func (tmc *testMaterializerTMClient) expectVReplicationWorkflowRequests(tabletID uint32, req *tabletmanagerdatapb.CreateVReplicationWorkflowRequest) {
tmc.mu.Lock()
defer tmc.mu.Unlock()

tmc.createVReplicationWorkflowRequests[tabletID] = req
}

func (tmc *testMaterializerTMClient) verifyQueries(t *testing.T) {
t.Helper()
tmc.mu.Lock()
Expand Down
Loading

0 comments on commit 8d19288

Please sign in to comment.