diff --git a/go/flags/endtoend/vtgate.txt b/go/flags/endtoend/vtgate.txt index f29885644c4..7498d0c2068 100644 --- a/go/flags/endtoend/vtgate.txt +++ b/go/flags/endtoend/vtgate.txt @@ -120,6 +120,7 @@ Usage of vtgate: --mysql_slow_connect_warn_threshold duration Warn if it takes more than the given threshold for a mysql connection to establish (default 0s) --mysql_tcp_version string Select tcp, tcp4, or tcp6 to control the socket type. (default "tcp") --no_scatter when set to true, the planner will fail instead of producing a plan that includes scatter queries + --no_vstream_copy when set to true, vstream copy will not be allowed - temporary until we can properly support RDONLY for this --normalize_queries Rewrite queries with bind vars. Turn this off if the app itself sends normalized queries with bind vars. (default true) --onclose_timeout duration wait no more than this for OnClose handlers before stopping (default 10s) --onterm_timeout duration wait no more than this for OnTermSync handlers before stopping (default 10s) diff --git a/go/test/endtoend/vreplication/vstream_test.go b/go/test/endtoend/vreplication/vstream_test.go index 4e2ea223628..6899e7c583b 100644 --- a/go/test/endtoend/vreplication/vstream_test.go +++ b/go/test/endtoend/vreplication/vstream_test.go @@ -177,6 +177,7 @@ func testVStreamWithFailover(t *testing.T, failover bool) { const schemaUnsharded = ` create table customer_seq(id int, next_id bigint, cache bigint, primary key(id)) comment 'vitess_sequence'; +insert into customer_seq(id, next_id, cache) values(0, 1, 3); ` const vschemaUnsharded = ` { @@ -218,14 +219,18 @@ const vschemaSharded = ` func insertRow(keyspace, table string, id int) { vtgateConn.ExecuteFetch(fmt.Sprintf("use %s;", keyspace), 1000, false) vtgateConn.ExecuteFetch("begin", 1000, false) - vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s (cid, name) values (%d, '%s%d')", table, id+100, table, id), 1000, false) + _, err := vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s (name) values ('%s%d')", table, table, id), 1000, false) + if err != nil { + log.Infof("error inserting row %d: %v", id, err) + } vtgateConn.ExecuteFetch("commit", 1000, false) } type numEvents struct { - numRowEvents, numJournalEvents int64 - numLessThan80Events, numGreaterThan80Events int64 - numLessThan40Events, numGreaterThan40Events int64 + numRowEvents, numJournalEvents int64 + numLessThan80Events, numGreaterThan80Events int64 + numLessThan40Events, numGreaterThan40Events int64 + numShard0BeforeReshardEvents, numShard0AfterReshardEvents int64 } // tests the StopOnReshard flag @@ -375,6 +380,150 @@ func testVStreamStopOnReshardFlag(t *testing.T, stopOnReshard bool, baseTabletID return &ne } +// Validate that we can continue streaming from multiple keyspaces after first copying some tables and then resharding one of the keyspaces +// Ensure that there are no missing row events during the resharding process. +func testVStreamCopyMultiKeyspaceReshard(t *testing.T, baseTabletID int) numEvents { + defaultCellName := "zone1" + allCellNames = defaultCellName + allCells := []string{allCellNames} + vc = NewVitessCluster(t, "VStreamCopyMultiKeyspaceReshard", allCells, mainClusterConfig) + + require.NotNil(t, vc) + ogdr := defaultReplicas + defaultReplicas = 0 // because of CI resource constraints we can only run this test with primary tablets + defer func(dr int) { defaultReplicas = dr }(ogdr) + + defer vc.TearDown(t) + + defaultCell = vc.Cells[defaultCellName] + vc.AddKeyspace(t, []*Cell{defaultCell}, "unsharded", "0", vschemaUnsharded, schemaUnsharded, defaultReplicas, defaultRdonly, baseTabletID+100, nil) + vtgate = defaultCell.Vtgates[0] + require.NotNil(t, vtgate) + vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", "unsharded", "0"), 1, time.Second*30) + + vtgateConn = getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) + defer vtgateConn.Close() + verifyClusterHealth(t, vc) + + vc.AddKeyspace(t, []*Cell{defaultCell}, "sharded", "-80,80-", vschemaSharded, schemaSharded, defaultReplicas, defaultRdonly, baseTabletID+200, nil) + + ctx := context.Background() + vstreamConn, err := vtgateconn.Dial(ctx, fmt.Sprintf("%s:%d", vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateGrpcPort)) + if err != nil { + log.Fatal(err) + } + defer vstreamConn.Close() + vgtid := &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{{ + Keyspace: "/.*", + }}} + + filter := &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + // We want to confirm that the following two tables are streamed. + // 1. the customer_seq in the unsharded keyspace + // 2. the customer table in the sharded keyspace + Match: "/customer.*/", + }}, + } + flags := &vtgatepb.VStreamFlags{} + done := false + + id := 1000 + // First goroutine that keeps inserting rows into the table being streamed until a minute after reshard + // We should keep getting events on the new shards + go func() { + for { + if done { + return + } + id++ + time.Sleep(1 * time.Second) + insertRow("sharded", "customer", id) + } + }() + // stream events from the VStream API + var ne numEvents + reshardDone := false + go func() { + var reader vtgateconn.VStreamReader + reader, err = vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags) + require.NoError(t, err) + for { + evs, err := reader.Recv() + + switch err { + case nil: + for _, ev := range evs { + switch ev.Type { + case binlogdatapb.VEventType_ROW: + shard := ev.RowEvent.Shard + switch shard { + case "0": + if reshardDone { + ne.numShard0AfterReshardEvents++ + } else { + ne.numShard0BeforeReshardEvents++ + } + case "-80": + ne.numLessThan80Events++ + case "80-": + ne.numGreaterThan80Events++ + case "-40": + ne.numLessThan40Events++ + case "40-": + ne.numGreaterThan40Events++ + } + ne.numRowEvents++ + case binlogdatapb.VEventType_JOURNAL: + ne.numJournalEvents++ + } + } + case io.EOF: + log.Infof("Stream Ended") + done = true + default: + log.Errorf("Returned err %v", err) + done = true + } + if done { + return + } + } + }() + + ticker := time.NewTicker(1 * time.Second) + tickCount := 0 + for { + <-ticker.C + tickCount++ + switch tickCount { + case 1: + reshard(t, "sharded", "customer", "vstreamCopyMultiKeyspaceReshard", "-80,80-", "-40,40-", baseTabletID+400, nil, nil, nil, defaultCellName) + reshardDone = true + case 60: + done = true + } + if done { + break + } + } + log.Infof("ne=%v", ne) + + // The number of row events streamed by the VStream API should match the number of rows inserted. + // This is important for sharded tables, where we need to ensure that no row events are missed during the resharding process. + // + // On the other hand, we don't verify the exact number of row events for the unsharded keyspace + // because the keyspace remains unsharded and the number of rows in the customer_seq table is always 1. + // We believe that checking the number of row events for the unsharded keyspace, which should always be greater than 0 before and after resharding, + // is sufficient to confirm that the resharding of one keyspace does not affect another keyspace, while keeping the test straightforward. + customerResult := execVtgateQuery(t, vtgateConn, "sharded", "select count(*) from customer") + insertedCustomerRows, err := evalengine.ToInt64(customerResult.Rows[0][0]) + require.NoError(t, err) + require.Equal(t, insertedCustomerRows, ne.numLessThan80Events+ne.numGreaterThan80Events+ne.numLessThan40Events+ne.numGreaterThan40Events) + return ne +} + func TestVStreamFailover(t *testing.T) { testVStreamWithFailover(t, true) } @@ -406,3 +555,15 @@ func TestVStreamWithKeyspacesToWatch(t *testing.T) { testVStreamWithFailover(t, false) } + +func TestVStreamCopyMultiKeyspaceReshard(t *testing.T) { + ne := testVStreamCopyMultiKeyspaceReshard(t, 3000) + require.Equal(t, int64(0), ne.numJournalEvents) + require.NotZero(t, ne.numRowEvents) + require.NotZero(t, ne.numShard0BeforeReshardEvents) + require.NotZero(t, ne.numShard0AfterReshardEvents) + require.NotZero(t, ne.numLessThan80Events) + require.NotZero(t, ne.numGreaterThan80Events) + require.NotZero(t, ne.numLessThan40Events) + require.NotZero(t, ne.numGreaterThan40Events) +} diff --git a/go/vt/proto/binlogdata/binlogdata.pb.go b/go/vt/proto/binlogdata/binlogdata.pb.go index 22d61cfdc5f..075ef39aec2 100644 --- a/go/vt/proto/binlogdata/binlogdata.pb.go +++ b/go/vt/proto/binlogdata/binlogdata.pb.go @@ -234,6 +234,10 @@ const ( VEventType_VERSION VEventType = 17 VEventType_LASTPK VEventType = 18 VEventType_SAVEPOINT VEventType = 19 + // COPY_COMPLETED is sent when VTGate's VStream copy operation is done. + // If a client experiences some disruptions before receiving the event, + // the client should restart the copy operation. + VEventType_COPY_COMPLETED VEventType = 20 ) // Enum value maps for VEventType. @@ -259,28 +263,30 @@ var ( 17: "VERSION", 18: "LASTPK", 19: "SAVEPOINT", + 20: "COPY_COMPLETED", } VEventType_value = map[string]int32{ - "UNKNOWN": 0, - "GTID": 1, - "BEGIN": 2, - "COMMIT": 3, - "ROLLBACK": 4, - "DDL": 5, - "INSERT": 6, - "REPLACE": 7, - "UPDATE": 8, - "DELETE": 9, - "SET": 10, - "OTHER": 11, - "ROW": 12, - "FIELD": 13, - "HEARTBEAT": 14, - "VGTID": 15, - "JOURNAL": 16, - "VERSION": 17, - "LASTPK": 18, - "SAVEPOINT": 19, + "UNKNOWN": 0, + "GTID": 1, + "BEGIN": 2, + "COMMIT": 3, + "ROLLBACK": 4, + "DDL": 5, + "INSERT": 6, + "REPLACE": 7, + "UPDATE": 8, + "DELETE": 9, + "SET": 10, + "OTHER": 11, + "ROW": 12, + "FIELD": 13, + "HEARTBEAT": 14, + "VGTID": 15, + "JOURNAL": 16, + "VERSION": 17, + "LASTPK": 18, + "SAVEPOINT": 19, + "COPY_COMPLETED": 20, } ) @@ -3031,7 +3037,7 @@ var file_binlogdata_proto_rawDesc = []byte{ 0x4c, 0x10, 0x05, 0x2a, 0x34, 0x0a, 0x1b, 0x56, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x53, 0x75, 0x62, 0x54, 0x79, 0x70, 0x65, 0x12, 0x08, 0x0a, 0x04, 0x4e, 0x6f, 0x6e, 0x65, 0x10, 0x00, 0x12, 0x0b, 0x0a, 0x07, - 0x50, 0x61, 0x72, 0x74, 0x69, 0x61, 0x6c, 0x10, 0x01, 0x2a, 0xf9, 0x01, 0x0a, 0x0a, 0x56, 0x45, + 0x50, 0x61, 0x72, 0x74, 0x69, 0x61, 0x6c, 0x10, 0x01, 0x2a, 0x8d, 0x02, 0x0a, 0x0a, 0x56, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x0b, 0x0a, 0x07, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x08, 0x0a, 0x04, 0x47, 0x54, 0x49, 0x44, 0x10, 0x01, 0x12, 0x09, 0x0a, 0x05, 0x42, 0x45, 0x47, 0x49, 0x4e, 0x10, 0x02, 0x12, 0x0a, 0x0a, 0x06, 0x43, 0x4f, @@ -3047,13 +3053,14 @@ var file_binlogdata_proto_rawDesc = []byte{ 0x12, 0x0b, 0x0a, 0x07, 0x4a, 0x4f, 0x55, 0x52, 0x4e, 0x41, 0x4c, 0x10, 0x10, 0x12, 0x0b, 0x0a, 0x07, 0x56, 0x45, 0x52, 0x53, 0x49, 0x4f, 0x4e, 0x10, 0x11, 0x12, 0x0a, 0x0a, 0x06, 0x4c, 0x41, 0x53, 0x54, 0x50, 0x4b, 0x10, 0x12, 0x12, 0x0d, 0x0a, 0x09, 0x53, 0x41, 0x56, 0x45, 0x50, 0x4f, - 0x49, 0x4e, 0x54, 0x10, 0x13, 0x2a, 0x27, 0x0a, 0x0d, 0x4d, 0x69, 0x67, 0x72, 0x61, 0x74, 0x69, - 0x6f, 0x6e, 0x54, 0x79, 0x70, 0x65, 0x12, 0x0a, 0x0a, 0x06, 0x54, 0x41, 0x42, 0x4c, 0x45, 0x53, - 0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06, 0x53, 0x48, 0x41, 0x52, 0x44, 0x53, 0x10, 0x01, 0x42, 0x29, - 0x5a, 0x27, 0x76, 0x69, 0x74, 0x65, 0x73, 0x73, 0x2e, 0x69, 0x6f, 0x2f, 0x76, 0x69, 0x74, 0x65, - 0x73, 0x73, 0x2f, 0x67, 0x6f, 0x2f, 0x76, 0x74, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x62, - 0x69, 0x6e, 0x6c, 0x6f, 0x67, 0x64, 0x61, 0x74, 0x61, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x33, + 0x49, 0x4e, 0x54, 0x10, 0x13, 0x12, 0x12, 0x0a, 0x0e, 0x43, 0x4f, 0x50, 0x59, 0x5f, 0x43, 0x4f, + 0x4d, 0x50, 0x4c, 0x45, 0x54, 0x45, 0x44, 0x10, 0x14, 0x2a, 0x27, 0x0a, 0x0d, 0x4d, 0x69, 0x67, + 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x79, 0x70, 0x65, 0x12, 0x0a, 0x0a, 0x06, 0x54, 0x41, + 0x42, 0x4c, 0x45, 0x53, 0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06, 0x53, 0x48, 0x41, 0x52, 0x44, 0x53, + 0x10, 0x01, 0x42, 0x29, 0x5a, 0x27, 0x76, 0x69, 0x74, 0x65, 0x73, 0x73, 0x2e, 0x69, 0x6f, 0x2f, + 0x76, 0x69, 0x74, 0x65, 0x73, 0x73, 0x2f, 0x67, 0x6f, 0x2f, 0x76, 0x74, 0x2f, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x2f, 0x62, 0x69, 0x6e, 0x6c, 0x6f, 0x67, 0x64, 0x61, 0x74, 0x61, 0x62, 0x06, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/go/vt/vtexplain/vtexplain_vtgate.go b/go/vt/vtexplain/vtexplain_vtgate.go index 2a53d1f68ae..ff39e79c47e 100644 --- a/go/vt/vtexplain/vtexplain_vtgate.go +++ b/go/vt/vtexplain/vtexplain_vtgate.go @@ -72,7 +72,7 @@ func (vte *VTExplain) initVtgateExecutor(vSchemaStr, ksShardMapStr string, opts streamSize := 10 var schemaTracker vtgate.SchemaInfo // no schema tracker for these tests - vte.vtgateExecutor = vtgate.NewExecutor(context.Background(), vte.explainTopo, vtexplainCell, resolver, opts.Normalize, false, streamSize, cache.DefaultConfig, schemaTracker, false, opts.PlannerVersion) + vte.vtgateExecutor = vtgate.NewExecutor(context.Background(), vte.explainTopo, vtexplainCell, resolver, opts.Normalize, false, streamSize, cache.DefaultConfig, schemaTracker, false, opts.PlannerVersion, false) queryLogBufferSize := 10 vtgate.QueryLogger = streamlog.New("VTGate", queryLogBufferSize) diff --git a/go/vt/vtgate/endtoend/main_test.go b/go/vt/vtgate/endtoend/main_test.go index 046af36a3dd..28ea5d0f7ec 100644 --- a/go/vt/vtgate/endtoend/main_test.go +++ b/go/vt/vtgate/endtoend/main_test.go @@ -46,6 +46,24 @@ create table t1( primary key(id1) ) Engine=InnoDB; +create table t1_copy_basic( + id1 bigint, + id2 bigint, + primary key(id1) +) Engine=InnoDB; + +create table t1_copy_all( + id1 bigint, + id2 bigint, + primary key(id1) +) Engine=InnoDB; + +create table t1_copy_resume( + id1 bigint, + id2 bigint, + primary key(id1) +) Engine=InnoDB; + create table t1_id2_idx( id2 bigint, keyspace_id varbinary(10), @@ -134,6 +152,24 @@ create table t1_sharded( Name: "t1_id2_vdx", }}, }, + "t1_copy_basic": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Column: "id1", + Name: "hash", + }}, + }, + "t1_copy_all": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Column: "id1", + Name: "hash", + }}, + }, + "t1_copy_resume": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Column: "id1", + Name: "hash", + }}, + }, "t1_sharded": { ColumnVindexes: []*vschemapb.ColumnVindex{{ Column: "id1", @@ -195,6 +231,31 @@ create table t1_sharded( }, }, } + + schema2 = ` +create table t1_copy_all_ks2( + id1 bigint, + id2 bigint, + primary key(id1) +) Engine=InnoDB; +` + + vschema2 = &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + "hash": { + Type: "hash", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1_copy_all_ks2": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Column: "id1", + Name: "hash", + }}, + }, + }, + } ) func TestMain(m *testing.M) { @@ -203,14 +264,24 @@ func TestMain(m *testing.M) { exitCode := func() int { var cfg vttest.Config cfg.Topology = &vttestpb.VTTestTopology{ - Keyspaces: []*vttestpb.Keyspace{{ - Name: "ks", - Shards: []*vttestpb.Shard{{ - Name: "-80", - }, { - Name: "80-", - }}, - }}, + Keyspaces: []*vttestpb.Keyspace{ + { + Name: "ks", + Shards: []*vttestpb.Shard{{ + Name: "-80", + }, { + Name: "80-", + }}, + }, + { + Name: "ks2", + Shards: []*vttestpb.Shard{{ + Name: "-80", + }, { + Name: "80-", + }}, + }, + }, } if err := cfg.InitSchemas("ks", schema, vschema); err != nil { fmt.Fprintf(os.Stderr, "%v\n", err) @@ -218,6 +289,11 @@ func TestMain(m *testing.M) { return 1 } defer os.RemoveAll(cfg.SchemaDir) + if err := cfg.InitSchemas("ks2", schema2, vschema2); err != nil { + fmt.Fprintf(os.Stderr, "%v\n", err) + os.RemoveAll(cfg.SchemaDir) + return 1 + } cfg.TabletHostName = *tabletHostName diff --git a/go/vt/vtgate/endtoend/misc_test.go b/go/vt/vtgate/endtoend/misc_test.go index 138b68d0aa3..aeeb1c122db 100644 --- a/go/vt/vtgate/endtoend/misc_test.go +++ b/go/vt/vtgate/endtoend/misc_test.go @@ -19,6 +19,7 @@ package endtoend import ( "context" "fmt" + osExec "os/exec" "testing" "github.com/stretchr/testify/assert" @@ -55,6 +56,16 @@ func TestCreateAndDropDatabase(t *testing.T) { require.NoError(t, err) defer conn.Close() + // cleanup the keyspace from the topology. + defer func() { + // the corresponding database needs to be created in advance. + // a subsequent DeleteKeyspace command returns the error of 'node doesn't exist' without it. + _ = exec(t, conn, "create database testitest") + + _, err := osExec.Command("vtctldclient", "--server", grpcAddress, "DeleteKeyspace", "--recursive", "--force", "testitest").CombinedOutput() + require.NoError(t, err) + }() + // run it 3 times. for count := 0; count < 3; count++ { t.Run(fmt.Sprintf("exec:%d", count), func(t *testing.T) { diff --git a/go/vt/vtgate/endtoend/row_count_test.go b/go/vt/vtgate/endtoend/row_count_test.go index 9ac200b33fa..5a29f6177a9 100644 --- a/go/vt/vtgate/endtoend/row_count_test.go +++ b/go/vt/vtgate/endtoend/row_count_test.go @@ -24,6 +24,7 @@ import ( "github.com/stretchr/testify/require" "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/test/endtoend/utils" ) func TestRowCount(t *testing.T) { @@ -31,6 +32,7 @@ func TestRowCount(t *testing.T) { conn, err := mysql.Connect(ctx, &vtParams) require.NoError(t, err) defer conn.Close() + utils.Exec(t, conn, "use ks") type tc struct { query string expected int diff --git a/go/vt/vtgate/endtoend/vstream_test.go b/go/vt/vtgate/endtoend/vstream_test.go index 350cd5d8251..e7ead3dcd22 100644 --- a/go/vt/vtgate/endtoend/vstream_test.go +++ b/go/vt/vtgate/endtoend/vstream_test.go @@ -21,6 +21,7 @@ import ( "fmt" "io" "regexp" + "sort" "sync" "testing" @@ -169,7 +170,7 @@ func TestVStreamCopyBasic(t *testing.T) { gconn, conn, mconn, closeConnections := initialize(ctx, t) defer closeConnections() - _, err := conn.ExecuteFetch("insert into t1(id1,id2) values(1,1), (2,2), (3,3), (4,4), (5,5), (6,6), (7,7), (8,8)", 1, false) + _, err := conn.ExecuteFetch("insert into t1_copy_basic(id1,id2) values(1,1), (2,2), (3,3), (4,4), (5,5), (6,6), (7,7), (8,8)", 1, false) if err != nil { t.Fatal(err) } @@ -180,7 +181,7 @@ func TestVStreamCopyBasic(t *testing.T) { } qr := sqltypes.ResultToProto3(&lastPK) tablePKs := []*binlogdatapb.TableLastPK{{ - TableName: "t1", + TableName: "t1_copy_basic", Lastpk: qr, }} var shardGtids []*binlogdatapb.ShardGtid @@ -200,8 +201,8 @@ func TestVStreamCopyBasic(t *testing.T) { vgtid.ShardGtids = shardGtids filter := &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ - Match: "t1", - Filter: "select * from t1", + Match: "t1_copy_basic", + Filter: "select * from t1_copy_basic", }}, } flags := &vtgatepb.VStreamFlags{} @@ -210,19 +211,285 @@ func TestVStreamCopyBasic(t *testing.T) { if err != nil { t.Fatal(err) } - numExpectedEvents := 2 /* num shards */ * (7 /* begin/field/vgtid:pos/2 rowevents avg/vgitd: lastpk/commit) */ + 3 /* begin/vgtid/commit for completed table */) + numExpectedEvents := 2 /* num shards */ *(7 /* begin/field/vgtid:pos/2 rowevents avg/vgitd: lastpk/commit) */ +3 /* begin/vgtid/commit for completed table */ +1 /* copy operation completed */) + 1 /* fully copy operation completed */ + expectedCompletedEvents := []string{ + `type:COPY_COMPLETED keyspace:"ks" shard:"-80"`, + `type:COPY_COMPLETED keyspace:"ks" shard:"80-"`, + `type:COPY_COMPLETED`, + } require.NotNil(t, reader) var evs []*binlogdatapb.VEvent + var completedEvs []*binlogdatapb.VEvent for { e, err := reader.Recv() switch err { case nil: evs = append(evs, e...) + + for _, ev := range e { + if ev.Type == binlogdatapb.VEventType_COPY_COMPLETED { + completedEvs = append(completedEvs, ev) + } + } + + printEvents(evs) // for debugging ci failures + if len(evs) == numExpectedEvents { + sortCopyCompletedEvents(completedEvs) + for i, ev := range completedEvs { + require.Regexp(t, expectedCompletedEvents[i], ev.String()) + } t.Logf("TestVStreamCopyBasic was successful") return + } else if numExpectedEvents < len(evs) { + t.Fatalf("len(events)=%v are not expected\n", len(evs)) + } + case io.EOF: + log.Infof("stream ended\n") + cancel() + default: + log.Errorf("Returned err %v", err) + t.Fatalf("remote error: %v\n", err) + } + } +} + +// TestVStreamCopyUnspecifiedShardGtid tests the case where the keyspace contains wildcards and/or the shard is not specified in the request. +// Verify that the Vstream API resolves the unspecified ShardGtid input to a list of all the matching keyspaces and all the shards in the topology. +// - If the keyspace contains wildcards and the shard is not specified, the copy operation should be performed on all shards of all matching keyspaces. +// - If the keyspace is specified and the shard is not specified, the copy operation should be performed on all shards of the specified keyspace. +func TestVStreamCopyUnspecifiedShardGtid(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + conn, err := mysql.Connect(ctx, &vtParams) + if err != nil { + require.NoError(t, err) + } + defer conn.Close() + + _, err = conn.ExecuteFetch("insert into t1_copy_all(id1,id2) values(1,1), (2,2), (3,3), (4,4), (5,5), (6,6), (7,7), (8,8)", 1, false) + if err != nil { + require.NoError(t, err) + } + + _, err = conn.ExecuteFetch("insert into t1_copy_all_ks2(id1,id2) values(10,10), (20,20)", 1, false) + if err != nil { + require.NoError(t, err) + } + + filter := &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "/t1_copy_all.*/", + }}, + } + flags := &vtgatepb.VStreamFlags{} + + // We have 2 shards in each keyspace. We assume the rows are + // evenly split across each shard. For each INSERT statement, which + // is a transaction and gets a global transaction identifier or GTID, we + // have 1 each of the following events: + // begin, field, position, lastpk, commit (5) + // For each row created in the INSERT statement -- 8 on ks1 and + // 2 on ks2 -- we have 1 row event between the begin and commit. + // When we have copied all rows for a table in the shard, the shard + // also gets events marking the transition from the copy phase to + // the streaming phase for that table with 1 each of the following: + // begin, vgtid, commit (3) + // As the copy phase completes for all tables on the shard, the shard + // gets 1 copy phase completed event. + // Lastly the stream has 1 final event to mark the final end to all + // copy phase operations in the vstream. + expectedKs1EventNum := 2 /* num shards */ * (9 /* begin/field/vgtid:pos/4 rowevents avg/vgitd: lastpk/commit) */ + 3 /* begin/vgtid/commit for completed table */ + 1 /* copy operation completed */) + expectedKs2EventNum := 2 /* num shards */ * (6 /* begin/field/vgtid:pos/1 rowevents avg/vgitd: lastpk/commit) */ + 3 /* begin/vgtid/commit for completed table */ + 1 /* copy operation completed */) + expectedFullyCopyCompletedNum := 1 + + cases := []struct { + name string + shardGtid *binlogdatapb.ShardGtid + expectedEventNum int + expectedCompletedEvents []string + }{ + { + name: "copy from all keyspaces", + shardGtid: &binlogdatapb.ShardGtid{ + Keyspace: "/.*", + }, + expectedEventNum: expectedKs1EventNum + expectedKs2EventNum + expectedFullyCopyCompletedNum, + expectedCompletedEvents: []string{ + `type:COPY_COMPLETED keyspace:"ks" shard:"-80"`, + `type:COPY_COMPLETED keyspace:"ks" shard:"80-"`, + `type:COPY_COMPLETED keyspace:"ks2" shard:"-80"`, + `type:COPY_COMPLETED keyspace:"ks2" shard:"80-"`, + `type:COPY_COMPLETED`, + }, + }, + { + name: "copy from all shards in one keyspace", + shardGtid: &binlogdatapb.ShardGtid{ + Keyspace: "ks", + }, + expectedEventNum: expectedKs1EventNum + expectedFullyCopyCompletedNum, + expectedCompletedEvents: []string{ + `type:COPY_COMPLETED keyspace:"ks" shard:"-80"`, + `type:COPY_COMPLETED keyspace:"ks" shard:"80-"`, + `type:COPY_COMPLETED`, + }, + }, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + gconn, conn, mconn, closeConnections := initialize(ctx, t) + defer closeConnections() + + var vgtid = &binlogdatapb.VGtid{} + vgtid.ShardGtids = []*binlogdatapb.ShardGtid{c.shardGtid} + reader, err := gconn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags) + _, _ = conn, mconn + if err != nil { + require.NoError(t, err) + } + require.NotNil(t, reader) + var evs []*binlogdatapb.VEvent + var completedEvs []*binlogdatapb.VEvent + for { + e, err := reader.Recv() + switch err { + case nil: + evs = append(evs, e...) + + for _, ev := range e { + if ev.Type == binlogdatapb.VEventType_COPY_COMPLETED { + completedEvs = append(completedEvs, ev) + } + } + + if len(evs) == c.expectedEventNum { + sortCopyCompletedEvents(completedEvs) + for i, ev := range completedEvs { + require.Equal(t, c.expectedCompletedEvents[i], ev.String()) + } + t.Logf("TestVStreamCopyUnspecifiedShardGtid was successful") + return + } else if c.expectedEventNum < len(evs) { + printEvents(evs) // for debugging ci failures + require.FailNow(t, "len(events)=%v are not expected\n", len(evs)) + } + case io.EOF: + log.Infof("stream ended\n") + cancel() + default: + log.Errorf("Returned err %v", err) + require.FailNow(t, "remote error: %v\n", err) + } + } + }) + } +} + +func TestVStreamCopyResume(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + gconn, conn, mconn, closeConnections := initialize(ctx, t) + defer closeConnections() + + _, err := conn.ExecuteFetch("insert into t1_copy_resume(id1,id2) values(1,1), (2,2), (3,3), (4,4), (5,5), (6,6), (7,7), (8,8)", 1, false) + if err != nil { + t.Fatal(err) + } + + // Any subsequent GTIDs will be part of the stream + mpos, err := mconn.PrimaryPosition() + require.NoError(t, err) + + // lastPK is id1=4, meaning we should only copy rows for id1 IN(5,6,7,8,9) + lastPK := sqltypes.Result{ + Fields: []*query.Field{{Name: "id1", Type: query.Type_INT64}}, + Rows: [][]sqltypes.Value{{sqltypes.NewInt64(4)}}, + } + tableLastPK := []*binlogdatapb.TableLastPK{{ + TableName: "t1_copy_resume", + Lastpk: sqltypes.ResultToProto3(&lastPK), + }} + + catchupQueries := []string{ + "insert into t1_copy_resume(id1,id2) values(9,9)", // this row will show up twice: once in catchup and copy + "update t1_copy_resume set id2 = 10 where id1 = 1", + "insert into t1(id1, id2) values(100,100)", + "delete from t1_copy_resume where id1 = 1", + "update t1_copy_resume set id2 = 90 where id1 = 9", + } + for _, query := range catchupQueries { + _, err = conn.ExecuteFetch(query, 1, false) + require.NoError(t, err) + } + + var shardGtids []*binlogdatapb.ShardGtid + var vgtid = &binlogdatapb.VGtid{} + shardGtids = append(shardGtids, &binlogdatapb.ShardGtid{ + Keyspace: "ks", + Shard: "-80", + Gtid: fmt.Sprintf("%s/%s", mpos.GTIDSet.Flavor(), mpos), + TablePKs: tableLastPK, + }) + shardGtids = append(shardGtids, &binlogdatapb.ShardGtid{ + Keyspace: "ks", + Shard: "80-", + Gtid: fmt.Sprintf("%s/%s", mpos.GTIDSet.Flavor(), mpos), + TablePKs: tableLastPK, + }) + vgtid.ShardGtids = shardGtids + filter := &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1_copy_resume", + Filter: "select * from t1_copy_resume", + }}, + } + flags := &vtgatepb.VStreamFlags{} + reader, err := gconn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags) + if err != nil { + t.Fatal(err) + } + require.NotNil(t, reader) + + expectedRowCopyEvents := 5 // id1 and id2 IN(5,6,7,8,9) + expectedCatchupEvents := len(catchupQueries) - 1 // insert into t1 should never reach + rowCopyEvents, replCatchupEvents := 0, 0 + expectedEvents := []string{ + `type:ROW timestamp:[0-9]+ row_event:{table_name:"ks.t1_copy_resume" row_changes:{before:{lengths:1 lengths:1 values:"11"} after:{lengths:1 lengths:2 values:"110"}} keyspace:"ks" shard:"-80"} current_time:[0-9]+ keyspace:"ks" shard:"-80"`, + `type:ROW timestamp:[0-9]+ row_event:{table_name:"ks.t1_copy_resume" row_changes:{before:{lengths:1 lengths:2 values:"110"}} keyspace:"ks" shard:"-80"} current_time:[0-9]+ keyspace:"ks" shard:"-80"`, + `type:ROW row_event:{table_name:"ks.t1_copy_resume" row_changes:{after:{lengths:1 lengths:1 values:"55"}} keyspace:"ks" shard:"-80"} keyspace:"ks" shard:"-80"`, + `type:ROW row_event:{table_name:"ks.t1_copy_resume" row_changes:{after:{lengths:1 lengths:1 values:"66"}} keyspace:"ks" shard:"80-"} keyspace:"ks" shard:"80-"`, + `type:ROW row_event:{table_name:"ks.t1_copy_resume" row_changes:{after:{lengths:1 lengths:1 values:"77"}} keyspace:"ks" shard:"80-"} keyspace:"ks" shard:"80-"`, + `type:ROW row_event:{table_name:"ks.t1_copy_resume" row_changes:{after:{lengths:1 lengths:1 values:"88"}} keyspace:"ks" shard:"80-"} keyspace:"ks" shard:"80-"`, + `type:ROW timestamp:[0-9]+ row_event:{table_name:"ks.t1_copy_resume" row_changes:{after:{lengths:1 lengths:1 values:"99"}} keyspace:"ks" shard:"-80"} current_time:[0-9]+ keyspace:"ks" shard:"-80"`, + `type:ROW row_event:{table_name:"ks.t1_copy_resume" row_changes:{after:{lengths:1 lengths:2 values:"990"}} keyspace:"ks" shard:"-80"} keyspace:"ks" shard:"-80"`, + `type:ROW timestamp:[0-9]+ row_event:{table_name:"ks.t1_copy_resume" row_changes:{before:{lengths:1 lengths:1 values:"99"} after:{lengths:1 lengths:2 values:"990"}} keyspace:"ks" shard:"-80"} current_time:[0-9]+ keyspace:"ks" shard:"-80"`, + } + var evs []*binlogdatapb.VEvent + for { + e, err := reader.Recv() + switch err { + case nil: + for _, ev := range e { + if ev.Type == binlogdatapb.VEventType_ROW { + evs = append(evs, ev) + if ev.Timestamp == 0 { + rowCopyEvents++ + } else { + replCatchupEvents++ + } + printEvents(evs) // for debugging ci failures + } + } + if expectedCatchupEvents == replCatchupEvents && expectedRowCopyEvents == rowCopyEvents { + sort.Sort(VEventSorter(evs)) + for i, ev := range evs { + require.Regexp(t, expectedEvents[i], ev.String()) + } + t.Logf("TestVStreamCopyResume was successful") + return } - printEvents(evs) // for debugging ci failures case io.EOF: log.Infof("stream ended\n") cancel() @@ -408,3 +675,47 @@ func printEvents(evs []*binlogdatapb.VEvent) { s += "===END===" + "\n" log.Infof("%s", s) } + +// Sort the VEvents by the first row change's after value bytes primarily, with +// secondary ordering by timestamp (ASC). Note that row copy events do not have +// a timestamp and the value will be 0. +type VEventSorter []*binlogdatapb.VEvent + +func (v VEventSorter) Len() int { + return len(v) +} +func (v VEventSorter) Swap(i, j int) { + v[i], v[j] = v[j], v[i] +} +func (v VEventSorter) Less(i, j int) bool { + valsI := v[i].GetRowEvent().RowChanges[0].After + if valsI == nil { + valsI = v[i].GetRowEvent().RowChanges[0].Before + } + valsJ := v[j].GetRowEvent().RowChanges[0].After + if valsJ == nil { + valsJ = v[j].GetRowEvent().RowChanges[0].Before + } + valI := string(valsI.Values) + valJ := string(valsJ.Values) + if valI == valJ { + return v[i].Timestamp < v[j].Timestamp + } + return valI < valJ +} + +// The arrival order of COPY_COMPLETED events with keyspace/shard is not constant. +// On the other hand, the last event should always be a fully COPY_COMPLETED event. +// That's why the sort.Slice doesn't have to handle the last element in completedEvs. +func sortCopyCompletedEvents(completedEvs []*binlogdatapb.VEvent) { + sortVEventByKeyspaceAndShard(completedEvs[:len(completedEvs)-1]) +} + +func sortVEventByKeyspaceAndShard(evs []*binlogdatapb.VEvent) { + sort.Slice(evs, func(i, j int) bool { + if evs[i].Keyspace == evs[j].Keyspace { + return evs[i].Shard < evs[j].Shard + } + return evs[i].Keyspace < evs[j].Keyspace + }) +} diff --git a/go/vt/vtgate/executor.go b/go/vt/vtgate/executor.go index 7c0e4eceed2..3f73f657666 100644 --- a/go/vt/vtgate/executor.go +++ b/go/vt/vtgate/executor.go @@ -115,6 +115,9 @@ type Executor struct { // allowScatter will fail planning if set to false and a plan contains any scatter queries allowScatter bool + // allowVstreamCopy will fail on vstream copy if false and no GTID provided for the stream. + // This is temporary until RDONLYs are properly supported for bootstrapping. + allowVstreamCopy bool } var executorOnce sync.Once @@ -135,20 +138,22 @@ func NewExecutor( schemaTracker SchemaInfo, noScatter bool, pv plancontext.PlannerVersion, + noVstreamCopy bool, ) *Executor { e := &Executor{ - serv: serv, - cell: cell, - resolver: resolver, - scatterConn: resolver.scatterConn, - txConn: resolver.scatterConn.txConn, - plans: cache.NewDefaultCacheImpl(cacheCfg), - normalize: normalize, - warnShardedOnly: warnOnShardedOnly, - streamSize: streamSize, - schemaTracker: schemaTracker, - allowScatter: !noScatter, - pv: pv, + serv: serv, + cell: cell, + resolver: resolver, + scatterConn: resolver.scatterConn, + txConn: resolver.scatterConn.txConn, + plans: cache.NewDefaultCacheImpl(cacheCfg), + normalize: normalize, + warnShardedOnly: warnOnShardedOnly, + streamSize: streamSize, + schemaTracker: schemaTracker, + allowScatter: !noScatter, + allowVstreamCopy: !noVstreamCopy, + pv: pv, } vschemaacl.Init() @@ -1330,7 +1335,7 @@ func (e *Executor) startVStream(ctx context.Context, rss []*srvtopo.ResolvedShar return err } - vsm := newVStreamManager(e.resolver.resolver, e.serv, e.cell) + vsm := newVStreamManager(e.resolver.resolver, e.serv, e.cell, e.allowVstreamCopy) vs := &vstream{ vgtid: vgtid, tabletType: topodatapb.TabletType_PRIMARY, @@ -1343,6 +1348,7 @@ func (e *Executor) startVStream(ctx context.Context, rss []*srvtopo.ResolvedShar vsm: vsm, eventCh: make(chan []*binlogdatapb.VEvent), ts: ts, + copyCompletedShard: make(map[string]struct{}), } _ = vs.stream(ctx) return nil diff --git a/go/vt/vtgate/executor_framework_test.go b/go/vt/vtgate/executor_framework_test.go index 4a2a1e7cfec..5185ece673e 100644 --- a/go/vt/vtgate/executor_framework_test.go +++ b/go/vt/vtgate/executor_framework_test.go @@ -449,7 +449,7 @@ func createExecutorEnv() (executor *Executor, sbc1, sbc2, sbclookup *sandboxconn bad.VSchema = badVSchema getSandbox(KsTestUnsharded).VSchema = unshardedVSchema - executor = NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3) + executor = NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3, false) key.AnyShardPicker = DestinationAnyShardPickerFirstShard{} // create a new session each time so that ShardSessions don't get re-used across tests @@ -473,7 +473,7 @@ func createCustomExecutor(vschema string) (executor *Executor, sbc1, sbc2, sbclo sbclookup = hc.AddTestTablet(cell, "0", 1, KsTestUnsharded, "0", topodatapb.TabletType_PRIMARY, true, 1, nil) getSandbox(KsTestUnsharded).VSchema = unshardedVSchema - executor = NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3) + executor = NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3, false) // create a new session each time so that ShardSessions don't get re-used across tests primarySession = &vtgatepb.Session{ TargetString: "@primary", @@ -502,7 +502,7 @@ func createCustomExecutorSetValues(vschema string, values []*sqltypes.Result) (e sbclookup = hc.AddTestTablet(cell, "0", 1, KsTestUnsharded, "0", topodatapb.TabletType_PRIMARY, true, 1, nil) getSandbox(KsTestUnsharded).VSchema = unshardedVSchema - executor = NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3) + executor = NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3, false) // create a new session each time so that ShardSessions don't get re-used across tests primarySession = &vtgatepb.Session{ TargetString: "@primary", diff --git a/go/vt/vtgate/executor_select_test.go b/go/vt/vtgate/executor_select_test.go index b322205df86..a44d85cc3da 100644 --- a/go/vt/vtgate/executor_select_test.go +++ b/go/vt/vtgate/executor_select_test.go @@ -1476,7 +1476,7 @@ func TestStreamSelectIN(t *testing.T) { } func createExecutor(serv *sandboxTopo, cell string, resolver *Resolver) *Executor { - return NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3) + return NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3, false) } func TestSelectScatter(t *testing.T) { @@ -3001,7 +3001,7 @@ func TestStreamOrderByLimitWithMultipleResults(t *testing.T) { count++ } - executor := NewExecutor(context.Background(), serv, cell, resolver, true, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3) + executor := NewExecutor(context.Background(), serv, cell, resolver, true, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3, false) before := runtime.NumGoroutine() query := "select id, col from user order by id limit 2" diff --git a/go/vt/vtgate/executor_stream_test.go b/go/vt/vtgate/executor_stream_test.go index 8fea4ed985f..ee3038972c3 100644 --- a/go/vt/vtgate/executor_stream_test.go +++ b/go/vt/vtgate/executor_stream_test.go @@ -61,7 +61,7 @@ func TestStreamSQLSharded(t *testing.T) { for _, shard := range shards { _ = hc.AddTestTablet(cell, shard, 1, "TestExecutor", shard, topodatapb.TabletType_PRIMARY, true, 1, nil) } - executor := NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3) + executor := NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false, querypb.ExecuteOptions_V3, false) sql := "stream * from sharded_user_msgs" result, err := executorStreamMessages(executor, sql) diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index beed5584939..f660cae4be2 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "io" + "regexp" "strings" "sync" "time" @@ -27,6 +28,7 @@ import ( "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/discovery" querypb "vitess.io/vitess/go/vt/proto/query" + "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/servenv" "vitess.io/vitess/go/vt/topo" @@ -47,6 +49,9 @@ type vstreamManager struct { resolver *srvtopo.Resolver toposerv srvtopo.Server cell string + // allowVstreamCopy will fail on vstream copy if false and no GTID provided for the stream. + // This is temporary until RDONLYs are properly supported for bootstrapping. + allowVstreamCopy bool vstreamsCreated *stats.CountersWithMultiLabels vstreamsLag *stats.GaugesWithMultiLabels @@ -106,6 +111,9 @@ type vstream struct { // the timestamp of the most recent event, keyed by streamId. streamId is of the form . timestamps map[string]int64 + // the shard map tracking the copy completion, keyed by streamId. streamId is of the form . + copyCompletedShard map[string]struct{} + vsm *vstreamManager eventCh chan []*binlogdatapb.VEvent @@ -119,12 +127,13 @@ type journalEvent struct { done chan struct{} } -func newVStreamManager(resolver *srvtopo.Resolver, serv srvtopo.Server, cell string) *vstreamManager { +func newVStreamManager(resolver *srvtopo.Resolver, serv srvtopo.Server, cell string, allowVstreamCopy bool) *vstreamManager { exporter := servenv.NewExporter(cell, "VStreamManager") return &vstreamManager{ - resolver: resolver, - toposerv: serv, - cell: cell, + resolver: resolver, + toposerv: serv, + cell: cell, + allowVstreamCopy: allowVstreamCopy, vstreamsCreated: exporter.NewCountersWithMultiLabels( "VStreamsCreated", "Number of vstreams created", @@ -166,6 +175,7 @@ func (vsm *vstreamManager) VStream(ctx context.Context, tabletType topodatapb.Ta eventCh: make(chan []*binlogdatapb.VEvent), heartbeatInterval: flags.GetHeartbeatInterval(), ts: ts, + copyCompletedShard: make(map[string]struct{}), } return vs.stream(ctx) } @@ -189,31 +199,51 @@ func (vsm *vstreamManager) resolveParams(ctx context.Context, tabletType topodat return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "vgtid must have at least one value with a starting position") } // To fetch from all keyspaces, the input must contain a single ShardGtid - // that has an empty keyspace, and the Gtid must be "current". In the - // future, we'll allow the Gtid to be empty which will also support - // copying of existing data. - if len(vgtid.ShardGtids) == 1 && vgtid.ShardGtids[0].Keyspace == "" { - if vgtid.ShardGtids[0].Gtid != "current" { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "for an empty keyspace, the Gtid value must be 'current': %v", vgtid) - } - keyspaces, err := vsm.toposerv.GetSrvKeyspaceNames(ctx, vsm.cell, false) - if err != nil { - return nil, nil, nil, err - } - newvgtid := &binlogdatapb.VGtid{} - for _, keyspace := range keyspaces { - newvgtid.ShardGtids = append(newvgtid.ShardGtids, &binlogdatapb.ShardGtid{ - Keyspace: keyspace, - Gtid: "current", - }) + // that has an empty keyspace, and the Gtid must be "current". + // Or the input must contain a single ShardGtid that has keyspace wildcards. + if len(vgtid.ShardGtids) == 1 { + inputKeyspace := vgtid.ShardGtids[0].Keyspace + isEmpty := inputKeyspace == "" + isRegexp := strings.HasPrefix(inputKeyspace, "/") + if isEmpty || isRegexp { + newvgtid := &binlogdatapb.VGtid{} + keyspaces, err := vsm.toposerv.GetSrvKeyspaceNames(ctx, vsm.cell, false) + if err != nil { + return nil, nil, nil, err + } + + if isEmpty { + if vgtid.ShardGtids[0].Gtid != "current" { + return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "for an empty keyspace, the Gtid value must be 'current': %v", vgtid) + } + for _, keyspace := range keyspaces { + newvgtid.ShardGtids = append(newvgtid.ShardGtids, &binlogdatapb.ShardGtid{ + Keyspace: keyspace, + Gtid: "current", + }) + } + } else { + re, err := regexp.Compile(strings.Trim(inputKeyspace, "/")) + if err != nil { + return nil, nil, nil, err + } + for _, keyspace := range keyspaces { + if re.MatchString(keyspace) { + newvgtid.ShardGtids = append(newvgtid.ShardGtids, &binlogdatapb.ShardGtid{ + Keyspace: keyspace, + Gtid: vgtid.ShardGtids[0].Gtid, + }) + } + } + } + vgtid = newvgtid } - vgtid = newvgtid } newvgtid := &binlogdatapb.VGtid{} for _, sgtid := range vgtid.ShardGtids { if sgtid.Shard == "" { - if sgtid.Gtid != "current" { - return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "if shards are unspecified, the Gtid value must be 'current': %v", vgtid) + if sgtid.Gtid != "current" && sgtid.Gtid != "" { + return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "if shards are unspecified, the Gtid value must be 'current' or empty; got: %v", vgtid) } // TODO(sougou): this should work with the new Migrate workflow _, _, allShards, err := vsm.resolver.GetKeyspaceShards(ctx, sgtid.Keyspace, tabletType) @@ -518,6 +548,12 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha TableLastPKs: sgtid.TablePKs, } var vstreamCreatedOnce sync.Once + + if !vs.vsm.allowVstreamCopy && (sgtid.Gtid == "" || len(sgtid.TablePKs) > 0) { + // We are attempting a vstream copy, but are not allowed (temporary until we can properly support RDONLYs for bootstrapping) + return vterrors.NewErrorf(vtrpc.Code_UNIMPLEMENTED, vterrors.NotSupportedYet, "vstream copy is not currently supported") + } + err = tabletConn.VStream(ctx, req, func(events []*binlogdatapb.VEvent) error { // We received a valid event. Reset error count. errCount = 0 @@ -565,6 +601,22 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha return err } + if err := vs.sendAll(ctx, sgtid, eventss); err != nil { + return err + } + eventss = nil + sendevents = nil + case binlogdatapb.VEventType_COPY_COMPLETED: + sendevents = append(sendevents, event) + if fullyCopied, doneEvent := vs.isCopyFullyCompleted(ctx, sgtid, event); fullyCopied { + sendevents = append(sendevents, doneEvent) + } + eventss = append(eventss, sendevents) + + if err := vs.alignStreams(ctx, event, sgtid.Keyspace, sgtid.Shard); err != nil { + return err + } + if err := vs.sendAll(ctx, sgtid, eventss); err != nil { return err } @@ -700,6 +752,25 @@ func (vs *vstream) sendAll(ctx context.Context, sgtid *binlogdatapb.ShardGtid, e return nil } +// isCopyFullyCompleted returns true if all stream has received a copy_completed event. +// If true, it will also return a new copy_completed event that needs to be sent. +// This new event represents the completion of all the copy operations. +func (vs *vstream) isCopyFullyCompleted(ctx context.Context, sgtid *binlogdatapb.ShardGtid, event *binlogdatapb.VEvent) (bool, *binlogdatapb.VEvent) { + vs.mu.Lock() + defer vs.mu.Unlock() + + vs.copyCompletedShard[fmt.Sprintf("%s/%s", event.Keyspace, event.Shard)] = struct{}{} + + for _, shard := range vs.vgtid.ShardGtids { + if _, ok := vs.copyCompletedShard[fmt.Sprintf("%s/%s", shard.Keyspace, shard.Shard)]; !ok { + return false, nil + } + } + return true, &binlogdatapb.VEvent{ + Type: binlogdatapb.VEventType_COPY_COMPLETED, + } +} + func (vs *vstream) getError() error { vs.errMu.Lock() defer vs.errMu.Unlock() diff --git a/go/vt/vtgate/vstream_manager_test.go b/go/vt/vtgate/vstream_manager_test.go index e017ac37932..a9dbc05f203 100644 --- a/go/vt/vtgate/vstream_manager_test.go +++ b/go/vt/vtgate/vstream_manager_test.go @@ -89,7 +89,7 @@ func TestVStreamSkew(t *testing.T) { _ = createSandbox(ks) hc := discovery.NewFakeHealthCheck(nil) st := getSandboxTopo(ctx, cell, ks, []string{"-20", "20-40"}) - vsm := newTestVStreamManager(hc, st, cell) + vsm := newTestVStreamManager(hc, st, cell, true) vgtid := &binlogdatapb.VGtid{ShardGtids: []*binlogdatapb.ShardGtid{}} want := int64(0) var sbc0, sbc1 *sandboxconn.SandboxConn @@ -135,7 +135,7 @@ func TestVStreamEvents(t *testing.T) { hc := discovery.NewFakeHealthCheck(nil) st := getSandboxTopo(ctx, cell, ks, []string{"-20"}) - vsm := newTestVStreamManager(hc, st, cell) + vsm := newTestVStreamManager(hc, st, cell, true) sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet()) @@ -212,7 +212,7 @@ func TestVStreamChunks(t *testing.T) { _ = createSandbox(ks) hc := discovery.NewFakeHealthCheck(nil) st := getSandboxTopo(ctx, cell, ks, []string{"-20", "20-40"}) - vsm := newTestVStreamManager(hc, st, cell) + vsm := newTestVStreamManager(hc, st, cell, true) sbc0 := hc.AddTestTablet("aa", "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet()) sbc1 := hc.AddTestTablet("aa", "1.1.1.1", 1002, ks, "20-40", topodatapb.TabletType_PRIMARY, true, 1, nil) @@ -282,7 +282,7 @@ func TestVStreamMulti(t *testing.T) { _ = createSandbox(ks) hc := discovery.NewFakeHealthCheck(nil) st := getSandboxTopo(ctx, cell, ks, []string{"-20", "20-40"}) - vsm := newTestVStreamManager(hc, st, "aa") + vsm := newTestVStreamManager(hc, st, "aa", true) sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet()) sbc1 := hc.AddTestTablet(cell, "1.1.1.1", 1002, ks, "20-40", topodatapb.TabletType_PRIMARY, true, 1, nil) @@ -344,7 +344,7 @@ func TestVStreamsCreatedAndLagMetrics(t *testing.T) { _ = createSandbox(ks) hc := discovery.NewFakeHealthCheck(nil) st := getSandboxTopo(ctx, cell, ks, []string{"-20", "20-40"}) - vsm := newTestVStreamManager(hc, st, cell) + vsm := newTestVStreamManager(hc, st, cell, true) vsm.vstreamsCreated.ResetAll() vsm.vstreamsLag.ResetAll() sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) @@ -399,7 +399,7 @@ func TestVStreamRetry(t *testing.T) { hc := discovery.NewFakeHealthCheck(nil) st := getSandboxTopo(ctx, cell, ks, []string{"-20"}) - vsm := newTestVStreamManager(hc, st, "aa") + vsm := newTestVStreamManager(hc, st, "aa", true) sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet()) commit := []*binlogdatapb.VEvent{ @@ -440,7 +440,7 @@ func TestVStreamShouldNotSendSourceHeartbeats(t *testing.T) { _ = createSandbox(ks) hc := discovery.NewFakeHealthCheck(nil) st := getSandboxTopo(ctx, cell, ks, []string{"-20"}) - vsm := newTestVStreamManager(hc, st, cell) + vsm := newTestVStreamManager(hc, st, cell, true) sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet()) @@ -490,7 +490,7 @@ func TestVStreamJournalOneToMany(t *testing.T) { _ = createSandbox(ks) hc := discovery.NewFakeHealthCheck(nil) st := getSandboxTopo(ctx, cell, ks, []string{"-20", "-10", "10-20"}) - vsm := newTestVStreamManager(hc, st, "aa") + vsm := newTestVStreamManager(hc, st, "aa", true) sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet()) sbc1 := hc.AddTestTablet(cell, "1.1.1.1", 1002, ks, "-10", topodatapb.TabletType_PRIMARY, true, 1, nil) @@ -603,7 +603,7 @@ func TestVStreamJournalManyToOne(t *testing.T) { _ = createSandbox(ks) hc := discovery.NewFakeHealthCheck(nil) st := getSandboxTopo(ctx, cell, ks, []string{"-20", "-10", "10-20"}) - vsm := newTestVStreamManager(hc, st, cell) + vsm := newTestVStreamManager(hc, st, cell, true) sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet()) sbc1 := hc.AddTestTablet(cell, "1.1.1.1", 1002, ks, "-10", topodatapb.TabletType_PRIMARY, true, 1, nil) @@ -720,7 +720,7 @@ func TestVStreamJournalNoMatch(t *testing.T) { _ = createSandbox(ks) hc := discovery.NewFakeHealthCheck(nil) st := getSandboxTopo(ctx, cell, ks, []string{"-20"}) - vsm := newTestVStreamManager(hc, st, "aa") + vsm := newTestVStreamManager(hc, st, "aa", true) sbc0 := hc.AddTestTablet("aa", "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet()) @@ -849,7 +849,7 @@ func TestVStreamJournalPartialMatch(t *testing.T) { _ = createSandbox(ks) hc := discovery.NewFakeHealthCheck(nil) st := getSandboxTopo(ctx, cell, ks, []string{"-20", "-10", "10-20"}) - vsm := newTestVStreamManager(hc, st, "aa") + vsm := newTestVStreamManager(hc, st, "aa", true) sbc1 := hc.AddTestTablet("aa", "1.1.1.1", 1002, ks, "-10", topodatapb.TabletType_PRIMARY, true, 1, nil) addTabletToSandboxTopo(t, st, ks, "-10", sbc1.Tablet()) sbc2 := hc.AddTestTablet("aa", "1.1.1.1", 1003, ks, "10-20", topodatapb.TabletType_PRIMARY, true, 1, nil) @@ -929,7 +929,7 @@ func TestResolveVStreamParams(t *testing.T) { name := "TestVStream" _ = createSandbox(name) hc := discovery.NewFakeHealthCheck(nil) - vsm := newTestVStreamManager(hc, newSandboxForCells([]string{"aa"}), "aa") + vsm := newTestVStreamManager(hc, newSandboxForCells([]string{"aa"}), "aa", true) testcases := []struct { input *binlogdatapb.VGtid output *binlogdatapb.VGtid @@ -946,9 +946,44 @@ func TestResolveVStreamParams(t *testing.T) { input: &binlogdatapb.VGtid{ ShardGtids: []*binlogdatapb.ShardGtid{{ Keyspace: "TestVStream", + Gtid: "other", + }}, + }, + err: "if shards are unspecified, the Gtid value must be 'current' or empty", + }, { + // Verify that the function maps the input missing the shard to a list of all shards in the topology. + input: &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{{ + Keyspace: "TestVStream", + }}, + }, + output: &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{{ + Keyspace: "TestVStream", + Shard: "-20", + }, { + Keyspace: "TestVStream", + Shard: "20-40", + }, { + Keyspace: "TestVStream", + Shard: "40-60", + }, { + Keyspace: "TestVStream", + Shard: "60-80", + }, { + Keyspace: "TestVStream", + Shard: "80-a0", + }, { + Keyspace: "TestVStream", + Shard: "a0-c0", + }, { + Keyspace: "TestVStream", + Shard: "c0-e0", + }, { + Keyspace: "TestVStream", + Shard: "e0-", }}, }, - err: "if shards are unspecified, the Gtid value must be 'current'", }, { input: &binlogdatapb.VGtid{ ShardGtids: []*binlogdatapb.ShardGtid{{ @@ -1040,17 +1075,49 @@ func TestResolveVStreamParams(t *testing.T) { assert.Equal(t, wantFilter, filter, tcase.input) require.False(t, flags.MinimizeSkew) } - // Special-case: empty keyspace because output is too big. - input := &binlogdatapb.VGtid{ - ShardGtids: []*binlogdatapb.ShardGtid{{ - Gtid: "current", - }}, + + // Special-case: empty keyspace or keyspace containing wildcards because output is too big. + // Verify that the function resolves input for multiple keyspaces into a list of all corresponding shards. + // Ensure that the number of shards returned is greater than the number of shards in a single keyspace named 'TestVStream.' + specialCases := []struct { + input *binlogdatapb.ShardGtid + }{ + { + input: &binlogdatapb.ShardGtid{ + Gtid: "current", + }, + }, + { + input: &binlogdatapb.ShardGtid{ + Keyspace: "/.*", + }, + }, + { + input: &binlogdatapb.ShardGtid{ + Keyspace: "/.*", + Gtid: "current", + }, + }, + { + input: &binlogdatapb.ShardGtid{ + Keyspace: "/Test.*", + }, + }, } - vgtid, _, _, err := vsm.resolveParams(context.Background(), topodatapb.TabletType_REPLICA, input, nil, nil) - require.NoError(t, err, input) - if got, want := len(vgtid.ShardGtids), 8; want >= got { - t.Errorf("len(vgtid.ShardGtids): %v, must be >%d", got, want) + for _, tcase := range specialCases { + input := &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{tcase.input}, + } + vgtid, _, _, err := vsm.resolveParams(context.Background(), topodatapb.TabletType_REPLICA, input, nil, nil) + require.NoError(t, err, tcase.input) + if got, expectTestVStreamShardNumber := len(vgtid.ShardGtids), 8; expectTestVStreamShardNumber >= got { + t.Errorf("len(vgtid.ShardGtids): %v, must be >%d", got, expectTestVStreamShardNumber) + } + for _, s := range vgtid.ShardGtids { + require.Equal(t, tcase.input.Gtid, s.Gtid) + } } + for _, minimizeSkew := range []bool{true, false} { t.Run(fmt.Sprintf("resolveParams MinimizeSkew %t", minimizeSkew), func(t *testing.T) { flags := &vtgatepb.VStreamFlags{MinimizeSkew: minimizeSkew} @@ -1075,7 +1142,7 @@ func TestVStreamIdleHeartbeat(t *testing.T) { _ = createSandbox(ks) hc := discovery.NewFakeHealthCheck(nil) st := getSandboxTopo(ctx, cell, ks, []string{"-20"}) - vsm := newTestVStreamManager(hc, st, cell) + vsm := newTestVStreamManager(hc, st, cell, true) sbc0 := hc.AddTestTablet("aa", "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet()) vgtid := &binlogdatapb.VGtid{ @@ -1124,10 +1191,60 @@ func TestVStreamIdleHeartbeat(t *testing.T) { } } -func newTestVStreamManager(hc discovery.HealthCheck, serv srvtopo.Server, cell string) *vstreamManager { +func TestVstreamCopy(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + cell := "aa" + ks := "TestVStreamCopy" + _ = createSandbox(ks) + hc := discovery.NewFakeHealthCheck(nil) + + st := getSandboxTopo(ctx, cell, ks, []string{"-20"}) + sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) + addTabletToSandboxTopo(t, st, ks, "-20", sbc0.Tablet()) + commit := []*binlogdatapb.VEvent{ + {Type: binlogdatapb.VEventType_COMMIT}, + } + sbc0.AddVStreamEvents(commit, nil) + sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "aa")) + sbc0.AddVStreamEvents(commit, nil) + sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "bb")) + sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "cc")) + sbc0.AddVStreamEvents(nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "final error")) + var count sync2.AtomicInt32 + count.Set(0) + // empty gtid id means no start position = bootstrapping/vstream copy + vgtid := &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{{ + Keyspace: ks, + Shard: "-20", + Gtid: "", + }}, + } + + // allowVstreamCopy = false + vsm := newTestVStreamManager(hc, st, "aa", false) + err := vsm.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, nil, &vtgatepb.VStreamFlags{}, func(events []*binlogdatapb.VEvent) error { + count.Add(1) + return nil + }) + require.Error(t, err) + require.Equal(t, "vstream copy is not currently supported", err.Error()) + + // allowVstreamCopy = true + vsm2 := newTestVStreamManager(hc, st, "aa", true) + err = vsm2.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, nil, &vtgatepb.VStreamFlags{}, func(events []*binlogdatapb.VEvent) error { + count.Add(1) + return nil + }) + require.Equal(t, "target: TestVStreamCopy.-20.primary: final error", err.Error()) +} + +func newTestVStreamManager(hc discovery.HealthCheck, serv srvtopo.Server, cell string, allowVstreamCopy bool) *vstreamManager { gw := NewTabletGateway(context.Background(), hc, serv, cell) srvResolver := srvtopo.NewResolver(serv, gw, cell) - return newVStreamManager(srvResolver, serv, cell) + return newVStreamManager(srvResolver, serv, cell, allowVstreamCopy) } func startVStream(ctx context.Context, t *testing.T, vsm *vstreamManager, vgtid *binlogdatapb.VGtid, flags *vtgatepb.VStreamFlags) <-chan *binlogdatapb.VStreamResponse { diff --git a/go/vt/vtgate/vtgate.go b/go/vt/vtgate/vtgate.go index de3fba2fbe0..d7efe707f2f 100644 --- a/go/vt/vtgate/vtgate.go +++ b/go/vt/vtgate/vtgate.go @@ -74,6 +74,7 @@ var ( warnPayloadSize int noScatter bool + noVstreamCopy bool enableShardRouting bool // TODO(deepthi): change these two vars to unexported and move to healthcheck.go when LegacyHealthcheck is removed @@ -116,6 +117,7 @@ func registerFlags(fs *pflag.FlagSet) { fs.StringVar(&defaultDDLStrategy, "ddl_strategy", defaultDDLStrategy, "Set default strategy for DDL statements. Override with @@ddl_strategy session variable") fs.StringVar(&dbDDLPlugin, "dbddl_plugin", dbDDLPlugin, "controls how to handle CREATE/DROP DATABASE. use it if you are using your own database provisioning service") fs.BoolVar(&noScatter, "no_scatter", noScatter, "when set to true, the planner will fail instead of producing a plan that includes scatter queries") + fs.BoolVar(&noVstreamCopy, "no_vstream_copy", noVstreamCopy, "when set to true, vstream copy will not be allowed - temporary until we can properly support RDONLY for this") fs.BoolVar(&enableShardRouting, "enable-partial-keyspace-migration", enableShardRouting, "(Experimental) Follow shard routing rules: enable only while migrating a keyspace shard by shard. See documentation on Partial MoveTables for more. (default false)") fs.DurationVar(&healthCheckRetryDelay, "healthcheck_retry_delay", healthCheckRetryDelay, "health check retry delay") fs.DurationVar(&healthCheckTimeout, "healthcheck_timeout", healthCheckTimeout, "the health check timeout period") @@ -246,7 +248,7 @@ func Init( sc := NewScatterConn("VttabletCall", tc, gw) srvResolver := srvtopo.NewResolver(serv, gw, cell) resolver := NewResolver(srvResolver, serv, cell, sc) - vsm := newVStreamManager(srvResolver, serv, cell) + vsm := newVStreamManager(srvResolver, serv, cell, !noVstreamCopy) var si SchemaInfo // default nil var st *vtschema.Tracker @@ -274,6 +276,7 @@ func Init( si, noScatter, pv, + noVstreamCopy, ) // connect the schema tracker with the vschema manager diff --git a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go index 40bf27dd0cf..2584a166226 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go @@ -218,7 +218,8 @@ func getQuery(tableName string, filter string) string { query = buf.String() case key.IsKeyRange(filter): buf := sqlparser.NewTrackedBuffer(nil) - buf.Myprintf("select * from %v where in_keyrange(%v)", sqlparser.NewIdentifierCS(tableName), sqlparser.NewStrLiteral(filter)) + buf.Myprintf("select * from %v where in_keyrange(%v)", + sqlparser.NewIdentifierCS(tableName), sqlparser.NewStrLiteral(filter)) query = buf.String() } return query @@ -229,7 +230,40 @@ func (uvs *uvstreamer) Cancel() { uvs.cancel() } -// during copy phase only send streaming events (during catchup/fastforward) for pks already seen +// We have not yet implemented the logic to check if an event is for a row that is already copied, +// so we always return true so that we send all events for this table and so we don't miss events. +func (uvs *uvstreamer) isRowCopied(tableName string, ev *binlogdatapb.VEvent) bool { + return true +} + +// Only send catchup/fastforward events for tables whose copy phase is complete or in progress. +// This ensures we fulfill the at-least-once delivery semantics for events. +// TODO: filter out events for rows not yet copied. Note that we can only do this as a best-effort +// for comparable PKs. +func (uvs *uvstreamer) shouldSendEventForTable(tableName string, ev *binlogdatapb.VEvent) bool { + table, ok := uvs.plans[tableName] + // Event is for a table which is not in its copy phase. + if !ok { + return true + } + + // if table copy was not started and no tablePK was specified we can ignore catchup/fastforward events for it + if table.tablePK == nil || table.tablePK.Lastpk == nil { + return false + } + + // Table is currently in its copy phase. We have not yet implemented the logic to + // check if an event is for a row that is already copied, so we always return true + // there so that we don't miss events. + // We may send duplicate insert events or update/delete events for rows not yet seen + // to the client for the table being copied. This is ok as the client is expected to be + // idempotent: we only promise at-least-once semantics for VStream API (not exactly-once). + // Aside: vreplication workflows handle at-least-once by adding where clauses that render + // DML queries, related to events for rows not yet copied, as no-ops. + return uvs.isRowCopied(tableName, ev) +} + +// Do not send internal heartbeat events. Filter out events for tables whose copy has not been started. func (uvs *uvstreamer) filterEvents(evs []*binlogdatapb.VEvent) []*binlogdatapb.VEvent { if len(uvs.plans) == 0 { return evs @@ -239,25 +273,21 @@ func (uvs *uvstreamer) filterEvents(evs []*binlogdatapb.VEvent) []*binlogdatapb. var shouldSend bool for _, ev := range evs { - shouldSend = false - tableName = "" switch ev.Type { case binlogdatapb.VEventType_ROW: tableName = ev.RowEvent.TableName case binlogdatapb.VEventType_FIELD: tableName = ev.FieldEvent.TableName + default: + tableName = "" + } + switch ev.Type { case binlogdatapb.VEventType_HEARTBEAT: shouldSend = false default: - shouldSend = true - } - if !shouldSend && tableName != "" { - shouldSend = true - _, ok := uvs.plans[tableName] - if ok { - shouldSend = false - } + shouldSend = uvs.shouldSendEventForTable(tableName, ev) } + if shouldSend { evs2 = append(evs2, ev) } @@ -331,7 +361,9 @@ func (uvs *uvstreamer) setStreamStartPosition() error { } if !curPos.AtLeast(pos) { uvs.vse.errorCounts.Add("GTIDSet Mismatch", 1) - return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "GTIDSet Mismatch: requested source position:%v, current target vrep position: %v", mysql.EncodePosition(pos), mysql.EncodePosition(curPos)) + return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, + "GTIDSet Mismatch: requested source position:%v, current target vrep position: %v", + mysql.EncodePosition(pos), mysql.EncodePosition(curPos)) } uvs.pos = pos return nil @@ -346,17 +378,22 @@ func (uvs *uvstreamer) currentPosition() (mysql.Position, error) { return conn.PrimaryPosition() } +// Possible states: +// 1. TablePKs nil, startPos set to gtid or "current" => start replicating from pos +// 2. TablePKs nil, startPos empty => full table copy of tables matching filter +// 3. TablePKs not nil, startPos empty => table copy (for pks > lastPK) +// 4. TablePKs not nil, startPos set => run catchup from startPos, then table copy (for pks > lastPK) func (uvs *uvstreamer) init() error { - if uvs.startPos != "" { - if err := uvs.setStreamStartPosition(); err != nil { + if uvs.startPos == "" /* full copy */ || len(uvs.inTablePKs) > 0 /* resume copy */ { + if err := uvs.buildTablePlan(); err != nil { return err } - } else if uvs.startPos == "" || len(uvs.inTablePKs) > 0 { - if err := uvs.buildTablePlan(); err != nil { + } + if uvs.startPos != "" { + if err := uvs.setStreamStartPosition(); err != nil { return err } } - if uvs.pos.IsZero() && (len(uvs.plans) == 0) { return fmt.Errorf("stream needs a position or a table to copy") } @@ -376,9 +413,12 @@ func (uvs *uvstreamer) Stream() error { uvs.vse.errorCounts.Add("Copy", 1) return err } - uvs.sendTestEvent("Copy Done") + if err := uvs.allCopyComplete(); err != nil { + return err + } } - vs := newVStreamer(uvs.ctx, uvs.cp, uvs.se, mysql.EncodePosition(uvs.pos), mysql.EncodePosition(uvs.stopPos), uvs.filter, uvs.getVSchema(), uvs.send, "replicate", uvs.vse) + vs := newVStreamer(uvs.ctx, uvs.cp, uvs.se, mysql.EncodePosition(uvs.pos), mysql.EncodePosition(uvs.stopPos), + uvs.filter, uvs.getVSchema(), uvs.send, "replicate", uvs.vse) uvs.setVs(vs) return vs.Stream() @@ -418,6 +458,17 @@ func (uvs *uvstreamer) setCopyState(tableName string, qr *querypb.QueryResult) { uvs.plans[tableName].tablePK.Lastpk = qr } +func (uvs *uvstreamer) allCopyComplete() error { + ev := &binlogdatapb.VEvent{ + Type: binlogdatapb.VEventType_COPY_COMPLETED, + } + + if err := uvs.send([]*binlogdatapb.VEvent{ev}); err != nil { + return err + } + return nil +} + // dummy event sent only in test mode func (uvs *uvstreamer) sendTestEvent(msg string) { if !uvstreamerTestMode { diff --git a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go index fdd60b8207f..610b9012f7f 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go @@ -182,6 +182,7 @@ func TestVStreamCopyCompleteFlow(t *testing.T) { uvstreamerTestMode = true defer func() { uvstreamerTestMode = false }() initialize(t) + if err := engine.se.Reload(context.Background()); err != nil { t.Fatal("Error reloading schema") } @@ -190,6 +191,12 @@ func TestVStreamCopyCompleteFlow(t *testing.T) { var tablePKs []*binlogdatapb.TableLastPK for i, table := range testState.tables { rules = append(rules, getRule(table)) + + // for table t2, let tablepk be nil, so that we don't send events for the insert in initTables() + if table == "t2" { + continue + } + tablePKs = append(tablePKs, getTablePK(table, i+1)) } filter := &binlogdatapb.Filter{ @@ -226,7 +233,7 @@ func TestVStreamCopyCompleteFlow(t *testing.T) { } - callbacks["OTHER.*Copy Done"] = func() { + callbacks["COPY_COMPLETED"] = func() { log.Info("Copy done, inserting events to stream") insertRow(t, "t1", 1, numInitialRows+4) insertRow(t, "t2", 2, numInitialRows+3) @@ -245,8 +252,8 @@ commit;" } numCopyEvents := 3 /*t1,t2,t3*/ * (numInitialRows + 1 /*FieldEvent*/ + 1 /*LastPKEvent*/ + 1 /*TestEvent: Copy Start*/ + 2 /*begin,commit*/ + 3 /* LastPK Completed*/) - numCopyEvents += 2 /* GTID + Test event after all copy is done */ - numCatchupEvents := 3 * 5 /*2 t1, 1 t2 : BEGIN+FIELD+ROW+GTID+COMMIT*/ + numCopyEvents += 2 /* GTID + Event after all copy is done */ + numCatchupEvents := 3 * 5 /* 2 t1, 1 t2 : BEGIN+FIELD+ROW+GTID+COMMIT */ numFastForwardEvents := 5 /*t1:FIELD+ROW*/ numMisc := 1 /* t2 insert during t1 catchup that comes in t2 copy */ numReplicateEvents := 2*5 /* insert into t1/t2 */ + 6 /* begin/field/2 inserts/gtid/commit */ @@ -532,7 +539,7 @@ var expectedEvents = []string{ "type:BEGIN", "type:LASTPK last_p_k_event:{table_last_p_k:{table_name:\"t3\"} completed:true}", "type:COMMIT", - "type:OTHER gtid:\"Copy Done\"", + "type:COPY_COMPLETED", "type:BEGIN", "type:FIELD field_event:{table_name:\"t1\" fields:{name:\"id11\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id11\" column_length:11 charset:63 column_type:\"int(11)\"} fields:{name:\"id12\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id12\" column_length:11 charset:63 column_type:\"int(11)\"}}", "type:ROW row_event:{table_name:\"t1\" row_changes:{after:{lengths:2 lengths:3 values:\"14140\"}}}", diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_flaky_test.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_flaky_test.go index cc47daf8b21..84d7d0de138 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_flaky_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_flaky_test.go @@ -447,7 +447,7 @@ func TestVStreamCopySimpleFlow(t *testing.T) { testcases := []testcase{ { input: []string{}, - output: [][]string{t1FieldEvent, {"gtid"}, t1Events, {"begin", "lastpk", "commit"}, t2FieldEvent, t2Events, {"begin", "lastpk", "commit"}}, + output: [][]string{t1FieldEvent, {"gtid"}, t1Events, {"begin", "lastpk", "commit"}, t2FieldEvent, t2Events, {"begin", "lastpk", "commit"}, {"copy_completed"}}, }, { @@ -2181,6 +2181,10 @@ func expectLog(ctx context.Context, t *testing.T, input any, ch <-chan []*binlog if evs[i].Type != binlogdatapb.VEventType_DDL { t.Fatalf("%v (%d): event: %v, want ddl", input, i, evs[i]) } + case "copy_completed": + if evs[i].Type != binlogdatapb.VEventType_COPY_COMPLETED { + t.Fatalf("%v (%d): event: %v, want copy_completed", input, i, evs[i]) + } default: evs[i].Timestamp = 0 if evs[i].Type == binlogdatapb.VEventType_FIELD { diff --git a/go/vt/vttest/local_cluster.go b/go/vt/vttest/local_cluster.go index 40ba6937e27..ba8218963ba 100644 --- a/go/vt/vttest/local_cluster.go +++ b/go/vt/vttest/local_cluster.go @@ -151,20 +151,20 @@ type Config struct { // It then sets the right value for cfg.SchemaDir. // At the end of the test, the caller should os.RemoveAll(cfg.SchemaDir). func (cfg *Config) InitSchemas(keyspace, schema string, vschema *vschemapb.Keyspace) error { - if cfg.SchemaDir != "" { - return fmt.Errorf("SchemaDir is already set to %v", cfg.SchemaDir) - } - - // Create a base temporary directory. - tempSchemaDir, err := os.MkdirTemp("", "vttest") - if err != nil { - return err + schemaDir := cfg.SchemaDir + if schemaDir == "" { + // Create a base temporary directory. + tempSchemaDir, err := os.MkdirTemp("", "vttest") + if err != nil { + return err + } + schemaDir = tempSchemaDir } // Write the schema if set. if schema != "" { - ksDir := path.Join(tempSchemaDir, keyspace) - err = os.Mkdir(ksDir, os.ModeDir|0775) + ksDir := path.Join(schemaDir, keyspace) + err := os.Mkdir(ksDir, os.ModeDir|0775) if err != nil { return err } @@ -177,7 +177,7 @@ func (cfg *Config) InitSchemas(keyspace, schema string, vschema *vschemapb.Keysp // Write in the vschema if set. if vschema != nil { - vschemaFilePath := path.Join(tempSchemaDir, keyspace, "vschema.json") + vschemaFilePath := path.Join(schemaDir, keyspace, "vschema.json") vschemaJSON, err := json.Marshal(vschema) if err != nil { return err @@ -186,7 +186,7 @@ func (cfg *Config) InitSchemas(keyspace, schema string, vschema *vschemapb.Keysp return err } } - cfg.SchemaDir = tempSchemaDir + cfg.SchemaDir = schemaDir return nil } diff --git a/proto/binlogdata.proto b/proto/binlogdata.proto index e23a642d35e..d27adb83331 100644 --- a/proto/binlogdata.proto +++ b/proto/binlogdata.proto @@ -295,6 +295,10 @@ enum VEventType { VERSION = 17; LASTPK = 18; SAVEPOINT = 19; + // COPY_COMPLETED is sent when VTGate's VStream copy operation is done. + // If a client experiences some disruptions before receiving the event, + // the client should restart the copy operation. + COPY_COMPLETED = 20; } // RowChange represents one row change. diff --git a/web/vtadmin/src/proto/vtadmin.d.ts b/web/vtadmin/src/proto/vtadmin.d.ts index 3f1eb62f669..51a4d5f7301 100644 --- a/web/vtadmin/src/proto/vtadmin.d.ts +++ b/web/vtadmin/src/proto/vtadmin.d.ts @@ -52015,7 +52015,8 @@ export namespace binlogdata { JOURNAL = 16, VERSION = 17, LASTPK = 18, - SAVEPOINT = 19 + SAVEPOINT = 19, + COPY_COMPLETED = 20 } /** Properties of a RowChange. */ diff --git a/web/vtadmin/src/proto/vtadmin.js b/web/vtadmin/src/proto/vtadmin.js index eac24db610c..87ba1d2efd8 100644 --- a/web/vtadmin/src/proto/vtadmin.js +++ b/web/vtadmin/src/proto/vtadmin.js @@ -122972,6 +122972,7 @@ $root.binlogdata = (function() { * @property {number} VERSION=17 VERSION value * @property {number} LASTPK=18 LASTPK value * @property {number} SAVEPOINT=19 SAVEPOINT value + * @property {number} COPY_COMPLETED=20 COPY_COMPLETED value */ binlogdata.VEventType = (function() { var valuesById = {}, values = Object.create(valuesById); @@ -122995,6 +122996,7 @@ $root.binlogdata = (function() { values[valuesById[17] = "VERSION"] = 17; values[valuesById[18] = "LASTPK"] = 18; values[valuesById[19] = "SAVEPOINT"] = 19; + values[valuesById[20] = "COPY_COMPLETED"] = 20; return values; })(); @@ -125237,6 +125239,7 @@ $root.binlogdata = (function() { case 17: case 18: case 19: + case 20: break; } if (message.timestamp != null && message.hasOwnProperty("timestamp")) @@ -125384,6 +125387,10 @@ $root.binlogdata = (function() { case 19: message.type = 19; break; + case "COPY_COMPLETED": + case 20: + message.type = 20; + break; } if (object.timestamp != null) if ($util.Long)