Skip to content

Commit

Permalink
VReplication: Migrate intra-keyspace materialize workflows when Resha…
Browse files Browse the repository at this point in the history
…rding the keyspace (#15536)

Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord authored Apr 16, 2024
1 parent 791ca02 commit d9cd21b
Show file tree
Hide file tree
Showing 10 changed files with 336 additions and 68 deletions.
3 changes: 2 additions & 1 deletion go/test/endtoend/throttler/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ import (
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/log"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/base"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

type Config struct {
Expand Down
42 changes: 42 additions & 0 deletions go/test/endtoend/vreplication/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,22 @@ create table nopk (name varchar(128), age int unsigned);
"sequence": "customer_seq"
}
},
"customer_name": {
"column_vindexes": [
{
"column": "cid",
"name": "xxhash"
}
]
},
"customer_type": {
"column_vindexes": [
{
"column": "cid",
"name": "xxhash"
}
]
},
"customer2": {
"column_vindexes": [
{
Expand Down Expand Up @@ -401,6 +417,32 @@ create table nopk (name varchar(128), age int unsigned);
"create_ddl": "create table cproduct(pid bigint, description varchar(128), date1 datetime not null default '0000-00-00 00:00:00', date2 datetime not null default '2021-00-01 00:00:00', primary key(pid)) CHARSET=utf8mb4"
}]
}
`

materializeCustomerNameSpec = `
{
"workflow": "customer_name",
"source_keyspace": "customer",
"target_keyspace": "customer",
"table_settings": [{
"target_table": "customer_name",
"source_expression": "select cid, name from customer",
"create_ddl": "create table if not exists customer_name (cid bigint not null, name varchar(128), primary key(cid), key(name))"
}]
}
`

materializeCustomerTypeSpec = `
{
"workflow": "customer_type",
"source_keyspace": "customer",
"target_keyspace": "customer",
"table_settings": [{
"target_table": "customer_type",
"source_expression": "select cid, typ from customer",
"create_ddl": "create table if not exists customer_type (cid bigint not null, typ enum('individual','soho','enterprise'), primary key(cid), key(typ))"
}]
}
`

merchantOrdersVSchema = `
Expand Down
127 changes: 101 additions & 26 deletions go/test/endtoend/vreplication/resharding_workflows_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ limitations under the License.
package vreplication

import (
"context"
"encoding/json"
"fmt"
"net"
"strconv"
"strings"
"sync"
"testing"
"time"

Expand All @@ -30,6 +32,7 @@ import (
"google.golang.org/protobuf/encoding/protojson"

"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/throttler"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/wrangler"

Expand Down Expand Up @@ -335,8 +338,8 @@ func validateReadsRoute(t *testing.T, tabletTypes string, tablet *cluster.Vttabl
for _, tt := range []string{"replica", "rdonly"} {
destination := fmt.Sprintf("%s:%s@%s", tablet.Keyspace, tablet.Shard, tt)
if strings.Contains(tabletTypes, tt) {
readQuery := "select * from customer"
assertQueryExecutesOnTablet(t, vtgateConn, tablet, destination, readQuery, readQuery)
readQuery := "select cid from customer limit 10"
assertQueryExecutesOnTablet(t, vtgateConn, tablet, destination, readQuery, "select cid from customer limit :vtg1")
}
}
}
Expand All @@ -355,7 +358,7 @@ func validateWritesRouteToSource(t *testing.T) {
insertQuery := "insert into customer(name, cid) values('tempCustomer2', 200)"
matchInsertQuery := "insert into customer(`name`, cid) values"
assertQueryExecutesOnTablet(t, vtgateConn, sourceTab, "customer", insertQuery, matchInsertQuery)
execVtgateQuery(t, vtgateConn, "customer", "delete from customer where cid > 100")
execVtgateQuery(t, vtgateConn, "customer", "delete from customer where cid = 200")
}

func validateWritesRouteToTarget(t *testing.T) {
Expand All @@ -366,7 +369,7 @@ func validateWritesRouteToTarget(t *testing.T) {
assertQueryExecutesOnTablet(t, vtgateConn, targetTab2, "customer", insertQuery, matchInsertQuery)
insertQuery = "insert into customer(name, cid) values('tempCustomer3', 102)"
assertQueryExecutesOnTablet(t, vtgateConn, targetTab1, "customer", insertQuery, matchInsertQuery)
execVtgateQuery(t, vtgateConn, "customer", "delete from customer where cid > 100")
execVtgateQuery(t, vtgateConn, "customer", "delete from customer where cid in (101, 102)")
}

func revert(t *testing.T, workflowType string) {
Expand Down Expand Up @@ -534,6 +537,31 @@ func testReshardV2Workflow(t *testing.T) {
defer closeConn()
currentWorkflowType = binlogdatapb.VReplicationWorkflowType_Reshard

// Generate customer records in the background for the rest of the test
// in order to confirm that no writes are lost in either the customer
// table or the customer_name and customer_type materializations
// against it during the Reshard and all of the traffic switches.
dataGenCtx, dataGenCancel := context.WithCancel(context.Background())
defer dataGenCancel()
dataGenConn, dataGenCloseConn := getVTGateConn()
defer dataGenCloseConn()
dataGenWg := sync.WaitGroup{}
dataGenWg.Add(1)
go func() {
defer dataGenWg.Done()
id := 1000
for {
select {
case <-dataGenCtx.Done():
return
default:
_ = execVtgateQuery(t, dataGenConn, "customer", fmt.Sprintf("insert into customer (cid, name) values (%d, 'tempCustomer%d')", id, id))
}
time.Sleep(1 * time.Millisecond)
id++
}
}()

// create internal tables on the original customer shards that should be
// ignored and not show up on the new shards
execMultipleQueries(t, vtgateConn, targetKs+"/-80", internalSchema)
Expand All @@ -553,32 +581,45 @@ func testReshardV2Workflow(t *testing.T) {
testWorkflowUpdate(t)

testRestOfWorkflow(t)

// Confirm that we lost no customer related writes during the Reshard.
dataGenCancel()
dataGenWg.Wait()
cres := execVtgateQuery(t, dataGenConn, "customer", "select count(*) from customer")
require.Len(t, cres.Rows, 1)
waitForNoWorkflowLag(t, vc, "customer", "customer_name")
cnres := execVtgateQuery(t, dataGenConn, "customer", "select count(*) from customer_name")
require.Len(t, cnres.Rows, 1)
require.EqualValues(t, cres.Rows, cnres.Rows)
waitForNoWorkflowLag(t, vc, "customer", "customer_type")
ctres := execVtgateQuery(t, dataGenConn, "customer", "select count(*) from customer_type")
require.Len(t, ctres.Rows, 1)
require.EqualValues(t, cres.Rows, ctres.Rows)
if debugMode {
t.Logf("Done inserting customer data. Record counts in customer: %s, customer_name: %s, customer_type: %s",
cres.Rows[0][0].ToString(), cnres.Rows[0][0].ToString(), ctres.Rows[0][0].ToString())
}
// We also do a vdiff on the materialize workflows for good measure.
doVtctldclientVDiff(t, "customer", "customer_name", "", nil)
doVtctldclientVDiff(t, "customer", "customer_type", "", nil)
}

func testMoveTablesV2Workflow(t *testing.T) {
vtgateConn, closeConn := getVTGateConn()
defer closeConn()
currentWorkflowType = binlogdatapb.VReplicationWorkflowType_MoveTables

// test basic forward and reverse flows
setupCustomerKeyspace(t)
// The purge table should get skipped/ignored
// If it's not then we'll get an error as the table doesn't exist in the vschema
createMoveTablesWorkflow(t, "customer,loadtest,vdiff_order,reftable,_vt_PURGE_4f9194b43b2011eb8a0104ed332e05c2_20221210194431")
waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String())
validateReadsRouteToSource(t, "replica")
validateWritesRouteToSource(t)

// Verify that we've properly ignored any internal operational tables
// and that they were not copied to the new target keyspace
verifyNoInternalTables(t, vtgateConn, targetKs)

testReplicatingWithPKEnumCols(t)

// Confirm that updating MoveTable workflows works.
testWorkflowUpdate(t)
materializeShow := func() {
if !debugMode {
return
}
output, err := vc.VtctldClient.ExecuteCommandWithOutput("materialize", "--target-keyspace=customer", "show", "--workflow=customer_name", "--compact", "--include-logs=false")
require.NoError(t, err)
t.Logf("Materialize show output: %s", output)
}

testRestOfWorkflow(t)
// Test basic forward and reverse flows.
setupCustomerKeyspace(t)

listOutputContainsWorkflow := func(output string, workflow string) bool {
workflows := []string{}
Expand All @@ -597,12 +638,39 @@ func testMoveTablesV2Workflow(t *testing.T) {
require.NoError(t, err)
return len(workflows) == 0
}

listAllArgs := []string{"workflow", "--keyspace", "customer", "list"}

output, err := vc.VtctldClient.ExecuteCommandWithOutput(listAllArgs...)
require.NoError(t, err)
require.True(t, listOutputIsEmpty(output))

// The purge table should get skipped/ignored
// If it's not then we'll get an error as the table doesn't exist in the vschema
createMoveTablesWorkflow(t, "customer,loadtest,vdiff_order,reftable,_vt_PURGE_4f9194b43b2011eb8a0104ed332e05c2_20221210194431")
waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String())
validateReadsRouteToSource(t, "replica")
validateWritesRouteToSource(t)

// Verify that we've properly ignored any internal operational tables
// and that they were not copied to the new target keyspace
verifyNoInternalTables(t, vtgateConn, targetKs)

testReplicatingWithPKEnumCols(t)

// Confirm that updating MoveTable workflows works.
testWorkflowUpdate(t)

testRestOfWorkflow(t)
// Create our primary intra-keyspace materialization.
materialize(t, materializeCustomerNameSpec, false)
// Create a second one to confirm that multiple ones get migrated correctly.
materialize(t, materializeCustomerTypeSpec, false)
materializeShow()

output, err = vc.VtctldClient.ExecuteCommandWithOutput(listAllArgs...)
require.NoError(t, err)
require.True(t, listOutputContainsWorkflow(output, "customer_name") && listOutputContainsWorkflow(output, "customer_type") && !listOutputContainsWorkflow(output, "wf1"))

testVSchemaForSequenceAfterMoveTables(t)

// Confirm that the auto_increment clause on customer.cid was removed.
Expand All @@ -616,14 +684,14 @@ func testMoveTablesV2Workflow(t *testing.T) {
createMoveTablesWorkflow(t, "Lead,Lead-1")
output, err = vc.VtctldClient.ExecuteCommandWithOutput(listAllArgs...)
require.NoError(t, err)
require.True(t, listOutputContainsWorkflow(output, "wf1"))
require.True(t, listOutputContainsWorkflow(output, "wf1") && listOutputContainsWorkflow(output, "customer_name") && listOutputContainsWorkflow(output, "customer_type"))

err = tstWorkflowCancel(t)
require.NoError(t, err)

output, err = vc.VtctldClient.ExecuteCommandWithOutput(listAllArgs...)
require.NoError(t, err)
require.True(t, listOutputIsEmpty(output))
require.True(t, listOutputContainsWorkflow(output, "customer_name") && listOutputContainsWorkflow(output, "customer_type") && !listOutputContainsWorkflow(output, "wf1"))
}

func testPartialSwitches(t *testing.T) {
Expand Down Expand Up @@ -671,6 +739,11 @@ func testPartialSwitches(t *testing.T) {
}

func testRestOfWorkflow(t *testing.T) {
// Relax the throttler so that it does not cause switches to fail because it can block
// the catchup for the intra-keyspace materialization.
res, err := throttler.UpdateThrottlerTopoConfigRaw(vc.VtctldClient, "customer", true, false, throttlerConfig.Threshold*5, throttlerConfig.Query, nil)
require.NoError(t, err, res)

testPartialSwitches(t)

// test basic forward and reverse flows
Expand Down Expand Up @@ -732,12 +805,14 @@ func testRestOfWorkflow(t *testing.T) {
validateWritesRouteToSource(t)

// trying to complete an unswitched workflow should error
err := tstWorkflowComplete(t)
err = tstWorkflowComplete(t)
require.Error(t, err)
require.Contains(t, err.Error(), wrangler.ErrWorkflowNotFullySwitched)

// fully switch and complete
waitForLowLag(t, "customer", "wf1")
waitForLowLag(t, "customer", "customer_name")
waitForLowLag(t, "customer", "customer_type")
tstWorkflowSwitchReadsAndWrites(t)
validateReadsRoute(t, "rdonly", targetRdonlyTab1)
validateReadsRouteToTarget(t, "replica")
Expand Down
29 changes: 21 additions & 8 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3302,22 +3302,35 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
return 0, sw.logs(), nil
}

// We stop writes on the source before stopping the source streams so that the catchup time
// is lessened and other workflows that we have to migrate such as intra-keyspace materialize
// workflows also have a chance to catch up as well because those are internally generated
// GTIDs within the shards we're switching traffic away from.
// For intra-keyspace materialization streams that we migrate where the source and target are
// the keyspace being resharded, we wait for those to catchup in the stopStreams path before
// we actually stop them.
ts.Logger().Infof("Stopping source writes")
if err := sw.stopSourceWrites(ctx); err != nil {
sw.cancelMigration(ctx, sm)
return handleError(fmt.Sprintf("failed to stop writes in the %s keyspace", ts.SourceKeyspaceName()), err)
}

ts.Logger().Infof("Stopping streams")
sourceWorkflows, err = sw.stopStreams(ctx, sm)
// Use a shorter context for this since since when doing a Reshard, if there are intra-keyspace
// materializations then we have to wait for them to catchup before switching traffic for the
// Reshard workflow. We use the the same timeout value here that is used for VReplication catchup
// with the inter-keyspace workflows.
stopCtx, stopCancel := context.WithTimeout(ctx, timeout)
defer stopCancel()
sourceWorkflows, err = sw.stopStreams(stopCtx, sm)
if err != nil {
for key, streams := range sm.Streams() {
for _, stream := range streams {
ts.Logger().Errorf("stream in stopStreams: key %s shard %s stream %+v", key, stream.BinlogSource.Shard, stream.BinlogSource)
}
}
sw.cancelMigration(ctx, sm)
return handleError("failed to stop the workflow streams", err)
}

ts.Logger().Infof("Stopping source writes")
if err := sw.stopSourceWrites(ctx); err != nil {
sw.cancelMigration(ctx, sm)
return handleError(fmt.Sprintf("failed to stop writes in the %s keyspace", ts.SourceKeyspaceName()), err)
return handleError(fmt.Sprintf("failed to stop the workflow streams in the %s keyspace", ts.SourceKeyspaceName()), err)
}

if ts.MigrationType() == binlogdatapb.MigrationType_TABLES {
Expand Down
Loading

0 comments on commit d9cd21b

Please sign in to comment.