Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VReplication: Undo vschema changes on LookupVindex workflow creation fail #16810

Merged
merged 11 commits into from
Sep 26, 2024
28 changes: 23 additions & 5 deletions go/vt/vtctl/workflow/framework_test.go
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes in this file are not relevant here, but I wanted to add the same new capabilities to the test framework for future use (soon).

Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ type testTMClient struct {

mu sync.Mutex
vrQueries map[int][]*queryResult
createVReplicationWorkflowRequests map[uint32]*tabletmanagerdatapb.CreateVReplicationWorkflowRequest
createVReplicationWorkflowRequests map[uint32]*createVReplicationWorkflowRequestResponse
readVReplicationWorkflowRequests map[uint32]*tabletmanagerdatapb.ReadVReplicationWorkflowRequest
primaryPositions map[uint32]string
vdiffRequests map[uint32]*vdiffRequestResponse
Expand All @@ -289,7 +289,7 @@ func newTestTMClient(env *testEnv) *testTMClient {
return &testTMClient{
schema: make(map[string]*tabletmanagerdatapb.SchemaDefinition),
vrQueries: make(map[int][]*queryResult),
createVReplicationWorkflowRequests: make(map[uint32]*tabletmanagerdatapb.CreateVReplicationWorkflowRequest),
createVReplicationWorkflowRequests: make(map[uint32]*createVReplicationWorkflowRequestResponse),
readVReplicationWorkflowRequests: make(map[uint32]*tabletmanagerdatapb.ReadVReplicationWorkflowRequest),
readVReplicationWorkflowsResponses: make(map[string][]*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse),
primaryPositions: make(map[uint32]string),
Expand All @@ -304,9 +304,12 @@ func (tmc *testTMClient) CreateVReplicationWorkflow(ctx context.Context, tablet
defer tmc.mu.Unlock()

if expect := tmc.createVReplicationWorkflowRequests[tablet.Alias.Uid]; expect != nil {
if !proto.Equal(expect, req) {
if expect.req != nil && !proto.Equal(expect.req, req) {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unexpected CreateVReplicationWorkflow request: got %+v, want %+v", req, expect)
}
if expect.res != nil {
return expect.res, expect.err
}
}
res := sqltypes.MakeTestResult(sqltypes.MakeTestFields("rowsaffected", "int64"), "1")
return &tabletmanagerdatapb.CreateVReplicationWorkflowResponse{Result: sqltypes.ResultToProto3(res)}, nil
Expand Down Expand Up @@ -418,20 +421,29 @@ func (tmc *testTMClient) expectVRQueryResultOnKeyspaceTablets(keyspace string, q
}
}

func (tmc *testTMClient) expectCreateVReplicationWorkflowRequest(tabletID uint32, req *tabletmanagerdatapb.CreateVReplicationWorkflowRequest) {
func (tmc *testTMClient) expectCreateVReplicationWorkflowRequest(tabletID uint32, req *createVReplicationWorkflowRequestResponse) {
tmc.mu.Lock()
defer tmc.mu.Unlock()

tmc.createVReplicationWorkflowRequests[tabletID] = req
}

func (tmc *testTMClient) expectCreateVReplicationWorkflowRequestOnTargetTablets(req *createVReplicationWorkflowRequestResponse) {
tmc.mu.Lock()
defer tmc.mu.Unlock()

for _, tablet := range tmc.env.tablets[tmc.env.targetKeyspace.KeyspaceName] {
tmc.createVReplicationWorkflowRequests[tablet.Alias.Uid] = req
}
}

func (tmc *testTMClient) VReplicationExec(ctx context.Context, tablet *topodatapb.Tablet, query string) (*querypb.QueryResult, error) {
tmc.mu.Lock()
defer tmc.mu.Unlock()

qrs := tmc.vrQueries[int(tablet.Alias.Uid)]
if len(qrs) == 0 {
return nil, fmt.Errorf("tablet %v does not expect any more queries: %s", tablet, query)
return nil, fmt.Errorf("tablet %v does not expect any more queries: %q", tablet, query)
}
matched := false
if qrs[0].query[0] == '/' {
Expand Down Expand Up @@ -479,6 +491,12 @@ type vdiffRequestResponse struct {
err error
}

type createVReplicationWorkflowRequestResponse struct {
req *tabletmanagerdatapb.CreateVReplicationWorkflowRequest
res *tabletmanagerdatapb.CreateVReplicationWorkflowResponse
err error
}

func (tmc *testTMClient) expectVDiffRequest(tablet *topodatapb.Tablet, vrr *vdiffRequestResponse) {
tmc.mu.Lock()
defer tmc.mu.Unlock()
Expand Down
25 changes: 17 additions & 8 deletions go/vt/vtctl/workflow/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

const (
Expand Down Expand Up @@ -113,13 +114,11 @@ func (mz *materializer) createWorkflowStreams(req *tabletmanagerdatapb.CreateVRe
if err := validateNewWorkflow(mz.ctx, mz.ts, mz.tmc, mz.ms.TargetKeyspace, mz.ms.Workflow); err != nil {
return err
}

err := mz.buildMaterializer()
if err != nil {
return err
}
if err := mz.deploySchema(); err != nil {
return err
}

var workflowSubType binlogdatapb.VReplicationWorkflowSubType
workflowSubType, err = mz.getWorkflowSubType()
Expand All @@ -133,6 +132,10 @@ func (mz *materializer) createWorkflowStreams(req *tabletmanagerdatapb.CreateVRe
}
req.Options = optionsJSON

if err := mz.deploySchema(); err != nil {
return err
}

Comment on lines +134 to +137
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

return mz.forAllTargets(func(target *topo.ShardInfo) error {
targetPrimary, err := mz.ts.GetTablet(mz.ctx, target.PrimaryAlias)
if err != nil {
Expand Down Expand Up @@ -268,15 +271,15 @@ func (mz *materializer) deploySchema() error {
return forAllShards(mz.targetShards, func(target *topo.ShardInfo) error {
allTables := []string{"/.*/"}

hasTargetTable := map[string]bool{}
req := &tabletmanagerdatapb.GetSchemaRequest{Tables: allTables}
targetSchema, err := schematools.GetSchema(mz.ctx, mz.ts, mz.tmc, target.PrimaryAlias, req)
if err != nil {
return err
}

hasTargetTable := make(map[string]*tabletmanagerdatapb.TableDefinition, len(targetSchema.TableDefinitions))
for _, td := range targetSchema.TableDefinitions {
hasTargetTable[td.Name] = true
hasTargetTable[td.Name] = td
}

targetTablet, err := mz.ts.GetTablet(mz.ctx, target.PrimaryAlias)
Expand All @@ -286,12 +289,18 @@ func (mz *materializer) deploySchema() error {

var applyDDLs []string
for _, ts := range mz.ms.TableSettings {
if hasTargetTable[ts.TargetTable] {
// Table already exists.
if td := hasTargetTable[ts.TargetTable]; td != nil {
// Table already exists. Let's be sure that it doesn't already have data.
// We exclude multi-tenant migrations from this check as the target tables
// are expected to frequently have data from previously migrated tenants.
if !mz.IsMultiTenantMigration() && td.RowCount > 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RowCount is from information_schema.tables which may not always be updated. We have anecdotally found those stats not getting updated for significant periods of time: showing 0 even after several copy phases have run. On this branch I started a second workflow sharing tables with the first one and it passed this validation, failing with aDuplicate Key error.

Which was why we were going for the approach in this PR: https://github.com/vitessio/vitess/pull/16826/files#diff-d3a320c7b03791f5d24189e3ae6d7fcac814f4fa1b3d7c02d496b3d8f0adf588R1040-R1043.

Copy link
Contributor Author

@mattlord mattlord Sep 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the row count is an estimate. It can remain 0 for some time if you have not inserted/updated enough rows to go beyond the initial 16KiB page in the tablespace. I would guess that in your testing you only had a few very small rows in the table (e.g. corder). Doing multiple copy phases would not help or matter in this case either. I don't think that this is a very likely scenario in production — since InnoDB updates the stats in information_schema when 1/16th of the table has changed, so now we're talking about 1KiB, and that normally happens pretty quickly on a brand new table that starts out as 16KiB in size — but it's certainly possible (then you'd have to wait for one of the other things that trigger an update).

I can remove the "table has existing data" code here in this PR then if you prefer the other approach. That was not the primary focus in this PR anyway. Sound good?

Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My tests were for a trivial number of rows, yes. But this was also reported by others recently for larger tables where the copy phase itself took minutes. They were using Progress and not seeing any changes there without analyze table ... Not sure why the stats update didn't happen in that case.

In any case, yes, let's handle the check for data in that other PR. Rest looks good.

Copy link
Contributor Author

@mattlord mattlord Sep 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I did that here: 345115e

return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION,
"target table %s exists in the %s keyspace and is not empty", td.Name, target.Keyspace())
}
continue
}
if ts.CreateDdl == "" {
return fmt.Errorf("target table %v does not exist and there is no create ddl defined", ts.TargetTable)
return fmt.Errorf("target table %s does not exist and there is no create ddl defined", ts.TargetTable)
}

var err error
Expand Down
56 changes: 47 additions & 9 deletions go/vt/vtctl/workflow/materializer_env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/mysqlctl/tmutils"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/memorytopo"
"vitess.io/vitess/go/vt/topotools"
Expand Down Expand Up @@ -120,16 +121,39 @@ func newTestMaterializerEnv(t *testing.T, ctx context.Context, ms *vtctldatapb.M
if err == nil {
tableName = table.Name.String()
}
var (
cols []string
fields []*querypb.Field
)
if ts.CreateDdl != "" {
stmt, err := env.venv.Parser().ParseStrictDDL(ts.CreateDdl)
require.NoError(t, err)
ddl, ok := stmt.(*sqlparser.CreateTable)
require.True(t, ok)
cols = make([]string, len(ddl.TableSpec.Columns))
fields = make([]*querypb.Field, len(ddl.TableSpec.Columns))
for i, col := range ddl.TableSpec.Columns {
cols[i] = col.Name.String()
fields[i] = &querypb.Field{
Name: col.Name.String(),
Type: col.Type.SQLType(),
}
}
}
env.tmc.schema[ms.SourceKeyspace+"."+tableName] = &tabletmanagerdatapb.SchemaDefinition{
TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{
Name: tableName,
Schema: fmt.Sprintf("%s_schema", tableName),
Name: tableName,
Schema: ts.CreateDdl,
Columns: cols,
Fields: fields,
}},
}
env.tmc.schema[ms.TargetKeyspace+"."+ts.TargetTable] = &tabletmanagerdatapb.SchemaDefinition{
TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{
Name: ts.TargetTable,
Schema: fmt.Sprintf("%s_schema", ts.TargetTable),
Name: ts.TargetTable,
Schema: ts.CreateDdl,
Columns: cols,
Fields: fields,
}},
}
}
Expand Down Expand Up @@ -199,7 +223,7 @@ type testMaterializerTMClient struct {

mu sync.Mutex
vrQueries map[int][]*queryResult
createVReplicationWorkflowRequests map[uint32]*tabletmanagerdatapb.CreateVReplicationWorkflowRequest
createVReplicationWorkflowRequests map[uint32]*createVReplicationWorkflowRequestResponse

// Used to confirm the number of times WorkflowDelete was called.
workflowDeleteCalls int
Expand All @@ -215,21 +239,29 @@ func newTestMaterializerTMClient(keyspace string, sourceShards []string, tableSe
sourceShards: sourceShards,
tableSettings: tableSettings,
vrQueries: make(map[int][]*queryResult),
createVReplicationWorkflowRequests: make(map[uint32]*tabletmanagerdatapb.CreateVReplicationWorkflowRequest),
createVReplicationWorkflowRequests: make(map[uint32]*createVReplicationWorkflowRequestResponse),
}
}

func (tmc *testMaterializerTMClient) CreateVReplicationWorkflow(ctx context.Context, tablet *topodatapb.Tablet, request *tabletmanagerdatapb.CreateVReplicationWorkflowRequest) (*tabletmanagerdatapb.CreateVReplicationWorkflowResponse, error) {
tmc.mu.Lock()
defer tmc.mu.Unlock()
if expect := tmc.createVReplicationWorkflowRequests[tablet.Alias.Uid]; expect != nil {
if !proto.Equal(expect, request) {
if expect.req != nil && !proto.Equal(expect.req, request) {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unexpected CreateVReplicationWorkflow request: got %+v, want %+v", request, expect)
}
if expect.res != nil {
return expect.res, expect.err
}
}
res := sqltypes.MakeTestResult(sqltypes.MakeTestFields("rowsaffected", "int64"), "1")
return &tabletmanagerdatapb.CreateVReplicationWorkflowResponse{Result: sqltypes.ResultToProto3(res)}, nil
}

func (tmc *testMaterializerTMClient) ReadVReplicationWorkflow(ctx context.Context, tablet *topodatapb.Tablet, request *tabletmanagerdatapb.ReadVReplicationWorkflowRequest) (*tabletmanagerdatapb.ReadVReplicationWorkflowResponse, error) {
tmc.mu.Lock()
defer tmc.mu.Unlock()

if tmc.readVReplicationWorkflow != nil {
return tmc.readVReplicationWorkflow(ctx, tablet, request)
}
Expand Down Expand Up @@ -283,6 +315,9 @@ func (tmc *testMaterializerTMClient) DeleteVReplicationWorkflow(ctx context.Cont
}

func (tmc *testMaterializerTMClient) GetSchema(ctx context.Context, tablet *topodatapb.Tablet, request *tabletmanagerdatapb.GetSchemaRequest) (*tabletmanagerdatapb.SchemaDefinition, error) {
tmc.mu.Lock()
defer tmc.mu.Unlock()

schemaDefn := &tabletmanagerdatapb.SchemaDefinition{}
for _, table := range request.Tables {
if table == "/.*/" {
Expand Down Expand Up @@ -315,7 +350,7 @@ func (tmc *testMaterializerTMClient) expectVRQuery(tabletID int, query string, r
})
}

func (tmc *testMaterializerTMClient) expectCreateVReplicationWorkflowRequest(tabletID uint32, req *tabletmanagerdatapb.CreateVReplicationWorkflowRequest) {
func (tmc *testMaterializerTMClient) expectCreateVReplicationWorkflowRequest(tabletID uint32, req *createVReplicationWorkflowRequestResponse) {
tmc.mu.Lock()
defer tmc.mu.Unlock()

Expand Down Expand Up @@ -344,7 +379,7 @@ func (tmc *testMaterializerTMClient) VReplicationExec(ctx context.Context, table

qrs := tmc.vrQueries[int(tablet.Alias.Uid)]
if len(qrs) == 0 {
return nil, fmt.Errorf("tablet %v does not expect any more queries: %s", tablet, query)
return nil, fmt.Errorf("tablet %v does not expect any more queries: %q", tablet, query)
}
matched := false
if qrs[0].query[0] == '/' {
Expand Down Expand Up @@ -403,6 +438,9 @@ func (tmc *testMaterializerTMClient) HasVReplicationWorkflows(ctx context.Contex
}

func (tmc *testMaterializerTMClient) ReadVReplicationWorkflows(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.ReadVReplicationWorkflowsRequest) (*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse, error) {
tmc.mu.Lock()
defer tmc.mu.Unlock()

workflowType := binlogdatapb.VReplicationWorkflowType_MoveTables
if len(req.IncludeWorkflows) > 0 {
for _, wf := range req.IncludeWorkflows {
Expand Down
Loading
Loading