Skip to content

Commit

Permalink
fix distributed transactions disruptions test with move table (#16765)
Browse files Browse the repository at this point in the history
Signed-off-by: Harshit Gangal <[email protected]>
  • Loading branch information
harshit-gangal authored Sep 12, 2024
1 parent d9e59b6 commit da4b81d
Show file tree
Hide file tree
Showing 7 changed files with 290 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package stress
package fuzz

import (
"context"
Expand All @@ -36,6 +36,7 @@ import (
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/syscallutil"
"vitess.io/vitess/go/test/endtoend/cluster"
twopcutil "vitess.io/vitess/go/test/endtoend/transaction/twopc/utils"
"vitess.io/vitess/go/vt/log"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/schema"
Expand Down Expand Up @@ -131,7 +132,7 @@ func TestTwoPCFuzzTest(t *testing.T) {
fz.stop()

// Wait for all transactions to be resolved.
waitForResults(t, fmt.Sprintf(`show unresolved transactions for %v`, keyspaceName), "[]", 30*time.Second)
twopcutil.WaitForResults(t, &vtParams, fmt.Sprintf(`show unresolved transactions for %v`, keyspaceName), "[]", 30*time.Second)
// Verify that all the transactions run were actually atomic and no data issues have occurred.
fz.verifyTransactionsWereAtomic(t)

Expand Down Expand Up @@ -428,12 +429,16 @@ func vttabletRestarts(t *testing.T) {
}
}

var orderedDDLFuzzer = []string{
"alter table twopc_fuzzer_insert add column extra_col1 varchar(20)",
"alter table twopc_fuzzer_insert add column extra_col2 varchar(20)",
"alter table twopc_fuzzer_insert drop column extra_col1",
"alter table twopc_fuzzer_insert drop column extra_col2",
}
var (
count = 0

orderedDDLFuzzer = []string{
"alter table twopc_fuzzer_insert add column extra_col1 varchar(20)",
"alter table twopc_fuzzer_insert add column extra_col2 varchar(20)",
"alter table twopc_fuzzer_insert drop column extra_col1",
"alter table twopc_fuzzer_insert drop column extra_col2",
}
)

// onlineDDLFuzzer runs an online DDL statement while ignoring any errors for the fuzzer.
func onlineDDLFuzzer(t *testing.T) {
Expand All @@ -445,7 +450,8 @@ func onlineDDLFuzzer(t *testing.T) {
return
}
fmt.Println("Running online DDL with uuid: ", output)
waitForMigrationStatus(t, &vtParams, clusterInstance.Keyspaces[0].Shards, strings.TrimSpace(output), 2*time.Minute, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
twopcutil.WaitForMigrationStatus(t, &vtParams, keyspaceName, clusterInstance.Keyspaces[0].Shards,
strings.TrimSpace(output), 2*time.Minute, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
}

var moveTablesCount int
Expand Down
129 changes: 129 additions & 0 deletions go/test/endtoend/transaction/twopc/fuzz/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
Copyright 2024 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package fuzz

import (
"context"
_ "embed"
"flag"
"fmt"
"os"
"testing"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/transaction/twopc/utils"
)

var (
clusterInstance *cluster.LocalProcessCluster
vtParams mysql.ConnParams
vtgateGrpcAddress string
keyspaceName = "ks"
unshardedKeyspaceName = "uks"
cell = "zone1"
hostname = "localhost"

//go:embed schema.sql
SchemaSQL string

//go:embed vschema.json
VSchema string
)

func TestMain(m *testing.M) {
defer cluster.PanicHandler(nil)
flag.Parse()

exitcode := func() int {
clusterInstance = cluster.NewCluster(cell, hostname)
defer clusterInstance.Teardown()

// Start topo server
if err := clusterInstance.StartTopo(); err != nil {
return 1
}

// Reserve vtGate port in order to pass it to vtTablet
clusterInstance.VtgateGrpcPort = clusterInstance.GetAndReservePort()

// Set extra args for twopc
clusterInstance.VtGateExtraArgs = append(clusterInstance.VtGateExtraArgs,
"--transaction_mode", "TWOPC",
"--grpc_use_effective_callerid",
)
clusterInstance.VtTabletExtraArgs = append(clusterInstance.VtTabletExtraArgs,
"--twopc_enable",
"--twopc_abandon_age", "1",
"--migration_check_interval", "2s",
)

// Start keyspace
keyspace := &cluster.Keyspace{
Name: keyspaceName,
SchemaSQL: SchemaSQL,
VSchema: VSchema,
DurabilityPolicy: "semi_sync",
}
if err := clusterInstance.StartKeyspace(*keyspace, []string{"-40", "40-80", "80-"}, 2, false); err != nil {
return 1
}

// Start an unsharded keyspace
unshardedKeyspace := &cluster.Keyspace{
Name: unshardedKeyspaceName,
SchemaSQL: "",
VSchema: "{}",
DurabilityPolicy: "semi_sync",
}
if err := clusterInstance.StartUnshardedKeyspace(*unshardedKeyspace, 2, false); err != nil {
return 1
}

// Start Vtgate
if err := clusterInstance.StartVtgate(); err != nil {
return 1
}
vtParams = clusterInstance.GetVTParams("")
vtgateGrpcAddress = fmt.Sprintf("%s:%d", clusterInstance.Hostname, clusterInstance.VtgateGrpcPort)

return m.Run()
}()
os.Exit(exitcode)
}

func start(t *testing.T) (*mysql.Conn, func()) {
ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
require.NoError(t, err)
cleanup(t)

return conn, func() {
conn.Close()
cleanup(t)
}
}

func cleanup(t *testing.T) {
cluster.PanicHandler(t)

utils.ClearOutTable(t, vtParams, "twopc_fuzzer_insert")
utils.ClearOutTable(t, vtParams, "twopc_fuzzer_update")
utils.ClearOutTable(t, vtParams, "twopc_t1")
}
20 changes: 20 additions & 0 deletions go/test/endtoend/transaction/twopc/fuzz/schema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
create table twopc_fuzzer_update (
id bigint,
col bigint,
primary key (id)
) Engine=InnoDB;

create table twopc_fuzzer_insert (
id bigint,
updateSet bigint,
threadId bigint,
col bigint auto_increment,
key(col),
primary key (id, col)
) Engine=InnoDB;

create table twopc_t1 (
id bigint,
col bigint,
primary key (id)
) Engine=InnoDB;
34 changes: 34 additions & 0 deletions go/test/endtoend/transaction/twopc/fuzz/vschema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
{
"sharded":true,
"vindexes": {
"reverse_bits": {
"type": "reverse_bits"
}
},
"tables": {
"twopc_fuzzer_update": {
"column_vindexes": [
{
"column": "id",
"name": "reverse_bits"
}
]
},
"twopc_fuzzer_insert": {
"column_vindexes": [
{
"column": "id",
"name": "reverse_bits"
}
]
},
"twopc_t1": {
"column_vindexes": [
{
"column": "id",
"name": "reverse_bits"
}
]
}
}
}
82 changes: 3 additions & 79 deletions go/test/endtoend/transaction/twopc/stress/stress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"golang.org/x/exp/rand"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/syscallutil"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/onlineddl"
Expand Down Expand Up @@ -146,7 +145,7 @@ func TestDisruptions(t *testing.T) {
// But since we are waiting in CommitPrepared, the decision to commit the transaction should have already been taken.
wg.Wait()
// Check the data in the table.
waitForResults(t, "select id, col from twopc_t1 where col = 4 order by id", `[[INT64(4) INT64(4)] [INT64(6) INT64(4)] [INT64(9) INT64(4)]]`, 30*time.Second)
twopcutil.WaitForResults(t, &vtParams, "select id, col from twopc_t1 where col = 4 order by id", `[[INT64(4) INT64(4)] [INT64(6) INT64(4)] [INT64(9) INT64(4)]]`, 30*time.Second)
writeCancel()
writerWg.Wait()

Expand Down Expand Up @@ -184,32 +183,6 @@ func reparentToFirstTablet(t *testing.T) {
}
}

// waitForResults waits for the results of the query to be as expected.
func waitForResults(t *testing.T, query string, resultExpected string, waitTime time.Duration) {
timeout := time.After(waitTime)
var prevRes []sqltypes.Row
for {
select {
case <-timeout:
t.Fatalf("didn't reach expected results for %s. Last results - %v", query, prevRes)
default:
ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
if err == nil {
res, _ := utils.ExecAllowError(t, conn, query)
conn.Close()
if res != nil {
prevRes = res.Rows
if fmt.Sprintf("%v", res.Rows) == resultExpected {
return
}
}
}
time.Sleep(100 * time.Millisecond)
}
}
}

/*
Cluster Level Disruptions for the fuzzer
*/
Expand Down Expand Up @@ -328,58 +301,9 @@ func onlineDDL(t *testing.T) error {
require.NoError(t, err)
count++
fmt.Println("uuid: ", output)
status := waitForMigrationStatus(t, &vtParams, clusterInstance.Keyspaces[0].Shards, strings.TrimSpace(output), 2*time.Minute, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
status := twopcutil.WaitForMigrationStatus(t, &vtParams, keyspaceName, clusterInstance.Keyspaces[0].Shards,
strings.TrimSpace(output), 2*time.Minute, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
onlineddl.CheckMigrationStatus(t, &vtParams, clusterInstance.Keyspaces[0].Shards, strings.TrimSpace(output), status)
require.Equal(t, schema.OnlineDDLStatusComplete, status)
return nil
}

func waitForMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []cluster.Shard, uuid string, timeout time.Duration, expectStatuses ...schema.OnlineDDLStatus) schema.OnlineDDLStatus {
shardNames := map[string]bool{}
for _, shard := range shards {
shardNames[shard.Name] = true
}
query := fmt.Sprintf("show vitess_migrations from %s like '%s'", keyspaceName, uuid)

statusesMap := map[string]bool{}
for _, status := range expectStatuses {
statusesMap[string(status)] = true
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

lastKnownStatus := ""
for {
select {
case <-ctx.Done():
return schema.OnlineDDLStatus(lastKnownStatus)
case <-ticker.C:
}
countMatchedShards := 0
conn, err := mysql.Connect(ctx, vtParams)
if err != nil {
continue
}
r, err := utils.ExecAllowError(t, conn, query)
conn.Close()
if err != nil {
continue
}
for _, row := range r.Named().Rows {
shardName := row["shard"].ToString()
if !shardNames[shardName] {
// irrelevant shard
continue
}
lastKnownStatus = row["migration_status"].ToString()
if row["migration_uuid"].ToString() == uuid && statusesMap[lastKnownStatus] {
countMatchedShards++
}
}
if countMatchedShards == len(shards) {
return schema.OnlineDDLStatus(lastKnownStatus)
}
}
}
Loading

0 comments on commit da4b81d

Please sign in to comment.