From e922f5faca7fb4871b65df3fbed889727a992b97 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Thu, 25 Jul 2024 16:55:30 +0530 Subject: [PATCH 01/11] test: add the basic framework for 2pc fuzzer testing Signed-off-by: Manan Gupta --- .../endtoend/transaction/twopc/fuzzer_test.go | 186 ++++++++++++++++++ .../endtoend/transaction/twopc/main_test.go | 3 + go/test/endtoend/transaction/twopc/schema.sql | 15 +- .../endtoend/transaction/twopc/vschema.json | 19 ++ 4 files changed, 222 insertions(+), 1 deletion(-) create mode 100644 go/test/endtoend/transaction/twopc/fuzzer_test.go diff --git a/go/test/endtoend/transaction/twopc/fuzzer_test.go b/go/test/endtoend/transaction/twopc/fuzzer_test.go new file mode 100644 index 00000000000..7b3e59936b1 --- /dev/null +++ b/go/test/endtoend/transaction/twopc/fuzzer_test.go @@ -0,0 +1,186 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package transaction + +import ( + "context" + "fmt" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/mysql" +) + +var ( + // updateRowsVals are the rows that we use to ensure 1 update on each shard with the same increment. + updateRowsVals = [][]int{ + { + 4, // 4 maps to 0x20 and ends up in the first shard (-40) + 6, // 6 maps to 0x60 and ends up in the second shard (40-80) + 9, // 9 maps to 0x90 and ends up in the third shard (80-) + // We can increment all of these values by multiples of 16 and they'll always be in the same shard. + }, + } + + insertIntoFuzzUpdate = "INSERT INTO twopc_fuzzer_update (id, col) VALUES (%d, %d)" +) + +// TestTwoPCFuzzTest tests 2PC transactions in a fuzzer environment. +// The testing strategy involves running many transactions and checking that they all must be atomic. +// To this end, we have a very unique strategy. We have two sharded tables `twopc_fuzzer_update`, and `twopc_fuzzer_insert` with the following columns. +// - id: This is the sharding column. We use reverse_bits as the sharding vindex because it is easy to reason about where a row will end up. +// - col in `twopc_fuzzer_insert`: An auto-increment column. +// - col in `twopc_fuzzer_update`: This is a bigint value that we will use to increment on updates. +// - updateSet: This column will store which update set the inserts where done for. +// +// The testing strategy is as follows - +// Every transaction will do 2 things - +// - One, it will increment the `col` on 1 row in each of the shards of the `twopc_fuzzer_update` table. +// To do this, we have sets of rows that each map to one shard. We prepopulate this before the test starts. +// These sets are stored in updateRowsVals. +// - Two, it will insert one row in each of the shards of the `twopc_fuzzer_insert` table and it will also store the update set that it updated the rows off. +// +// We can check that a transaction was atomic by basically checking that the `col` value for all the rows that were updated together should match. +// If any transaction was partially successful, then it would have missed an increment on one of the rows. +// Moreover, the order of rows for a given update set in the 3 shards should be the same to ensure that conflicting transactions got committed in the same exact order. +func TestTwoPCFuzzTest(t *testing.T) { + testcases := []struct { + name string + threads int + updateSets int + timeForTesting time.Duration + }{ + { + name: "Single Thread - Single Set", + threads: 1, + updateSets: 1, + timeForTesting: 5 * time.Second, + }, + } + + for _, tt := range testcases { + t.Run(tt.name, func(t *testing.T) { + conn, closer := start(t) + defer closer() + fz := newFuzzer(tt.threads, tt.updateSets) + + fz.initialize(t, conn) + // Start the fuzzer. + fz.start(t) + + // Wait for the timeForTesting so that the threads continue to run. + time.Sleep(tt.timeForTesting) + + fz.stop() + }) + } +} + +// fuzzer runs threads that runs queries against the databases. +// It has parameters that define the way the queries are constructed. +type fuzzer struct { + threads int + updateSets int + + // shouldStop is an internal state variable, that tells the fuzzer + // whether it should stop or not. + shouldStop atomic.Bool + // wg is an internal state variable, that used to know whether the fuzzer threads are running or not. + wg sync.WaitGroup +} + +// newFuzzer creates a new fuzzer struct. +func newFuzzer(threads int, updateSets int) *fuzzer { + fz := &fuzzer{ + threads: threads, + updateSets: updateSets, + wg: sync.WaitGroup{}, + } + // Initially the fuzzer thread is stopped. + fz.shouldStop.Store(true) + return fz +} + +// stop stops the fuzzer and waits for it to finish execution. +func (fz *fuzzer) stop() { + // Mark the thread to be stopped. + fz.shouldStop.Store(true) + // Wait for the fuzzer thread to stop. + fz.wg.Wait() +} + +// start starts running the fuzzer. +func (fz *fuzzer) start(t *testing.T) { + // We mark the fuzzer thread to be running now. + fz.shouldStop.Store(false) + fz.wg.Add(fz.threads) + for i := 0; i < fz.threads; i++ { + go func() { + fz.runFuzzerThread(t) + }() + } +} + +// runFuzzerThread is used to run a thread of the fuzzer. +func (fz *fuzzer) runFuzzerThread(t *testing.T) { + // Whenever we finish running this thread, we should mark the thread has stopped. + defer func() { + fz.wg.Done() + }() + + // Create a connection to the vtgate to run transactions. + conn, err := mysql.Connect(context.Background(), &vtParams) + require.NoError(t, err) + defer conn.Close() + + for { + // If fuzzer thread is marked to be stopped, then we should exit this go routine. + if fz.shouldStop.Load() == true { + return + } + // Run an atomic transaction + fz.generateAndExecuteTransaction(conn) + } + +} + +// initialize initializes all the variables that will be needed for running the fuzzer. +// It also creates the rows for the `twopc_fuzzer_update` table. +func (fz *fuzzer) initialize(t *testing.T, conn *mysql.Conn) { + for i := 1; i < fz.updateSets; i++ { + updateRowsVals = append(updateRowsVals, []int{ + updateRowsVals[0][0] + i*16, + updateRowsVals[0][1] + i*16, + updateRowsVals[0][2] + i*16, + }) + } + + for _, updateSet := range updateRowsVals { + for _, id := range updateSet { + _, err := conn.ExecuteFetch(fmt.Sprintf(insertIntoFuzzUpdate, id, 0), 0, false) + require.NoError(t, err) + } + } +} + +func (fz *fuzzer) generateAndExecuteTransaction(conn *mysql.Conn) { + +} diff --git a/go/test/endtoend/transaction/twopc/main_test.go b/go/test/endtoend/transaction/twopc/main_test.go index 8ac7cfc1f21..68e5632e79f 100644 --- a/go/test/endtoend/transaction/twopc/main_test.go +++ b/go/test/endtoend/transaction/twopc/main_test.go @@ -110,6 +110,7 @@ func start(t *testing.T) (*mysql.Conn, func()) { ctx := context.Background() conn, err := mysql.Connect(ctx, &vtParams) require.NoError(t, err) + cleanup(t) return conn, func() { conn.Close() @@ -126,6 +127,8 @@ func cleanup(t *testing.T) { defer conn.Close() _, _ = utils.ExecAllowError(t, conn, "delete from twopc_user") + _, _ = utils.ExecAllowError(t, conn, "delete from twopc_fuzzer_insert") + _, _ = utils.ExecAllowError(t, conn, "delete from twopc_fuzzer_update") } type extractInterestingValues func(dtidMap map[string]string, vals []sqltypes.Value) []sqltypes.Value diff --git a/go/test/endtoend/transaction/twopc/schema.sql b/go/test/endtoend/transaction/twopc/schema.sql index 60a7c19837c..f7f1dd0f472 100644 --- a/go/test/endtoend/transaction/twopc/schema.sql +++ b/go/test/endtoend/transaction/twopc/schema.sql @@ -9,4 +9,17 @@ create table twopc_music ( user_id bigint, title varchar(64), primary key (id) -) Engine=InnoDB; \ No newline at end of file +) Engine=InnoDB; + +create table twopc_fuzzer_update ( + id bigint, + col bigint, + primary key (id) +) Engine=InnoDB; + +create table twopc_fuzzer_insert ( + id bigint, + updateSet bigint, + col bigint auto_increment, + primary key (id, col) +) Engine=InnoDB; diff --git a/go/test/endtoend/transaction/twopc/vschema.json b/go/test/endtoend/transaction/twopc/vschema.json index 4ff62df6808..48522aa8171 100644 --- a/go/test/endtoend/transaction/twopc/vschema.json +++ b/go/test/endtoend/transaction/twopc/vschema.json @@ -3,6 +3,9 @@ "vindexes": { "xxhash": { "type": "xxhash" + }, + "reverse_bits": { + "type": "reverse_bits" } }, "tables": { @@ -21,6 +24,22 @@ "name": "xxhash" } ] + }, + "twopc_fuzzer_update": { + "column_vindexes": [ + { + "column": "id", + "name": "reverse_bits" + } + ] + }, + "twopc_fuzzer_insert": { + "column_vindexes": [ + { + "column": "id", + "name": "reverse_bits" + } + ] } } } \ No newline at end of file From 68dab94e1c4a34b2855aa77bb0411f96320544f4 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Thu, 25 Jul 2024 17:20:53 +0530 Subject: [PATCH 02/11] test: generate the queries for the transactions Signed-off-by: Manan Gupta --- .../endtoend/transaction/twopc/fuzzer_test.go | 53 +++++++++++++++++-- 1 file changed, 49 insertions(+), 4 deletions(-) diff --git a/go/test/endtoend/transaction/twopc/fuzzer_test.go b/go/test/endtoend/transaction/twopc/fuzzer_test.go index 7b3e59936b1..635d4c69134 100644 --- a/go/test/endtoend/transaction/twopc/fuzzer_test.go +++ b/go/test/endtoend/transaction/twopc/fuzzer_test.go @@ -25,6 +25,7 @@ import ( "time" "github.com/stretchr/testify/require" + "golang.org/x/exp/rand" "vitess.io/vitess/go/mysql" ) @@ -41,6 +42,8 @@ var ( } insertIntoFuzzUpdate = "INSERT INTO twopc_fuzzer_update (id, col) VALUES (%d, %d)" + updateFuzzUpdate = "UPDATE twopc_fuzzer_update SET col = col + %d WHERE id = %d" + insertIntoFuzzInsert = "INSERT INTO twopc_fuzzer_insert (id, updateSet) VALUES (%d, %d)" ) // TestTwoPCFuzzTest tests 2PC transactions in a fuzzer environment. @@ -134,13 +137,13 @@ func (fz *fuzzer) start(t *testing.T) { fz.wg.Add(fz.threads) for i := 0; i < fz.threads; i++ { go func() { - fz.runFuzzerThread(t) + fz.runFuzzerThread(t, i) }() } } // runFuzzerThread is used to run a thread of the fuzzer. -func (fz *fuzzer) runFuzzerThread(t *testing.T) { +func (fz *fuzzer) runFuzzerThread(t *testing.T, threadId int) { // Whenever we finish running this thread, we should mark the thread has stopped. defer func() { fz.wg.Done() @@ -157,7 +160,7 @@ func (fz *fuzzer) runFuzzerThread(t *testing.T) { return } // Run an atomic transaction - fz.generateAndExecuteTransaction(conn) + fz.generateAndExecuteTransaction(t, conn, threadId) } } @@ -181,6 +184,48 @@ func (fz *fuzzer) initialize(t *testing.T, conn *mysql.Conn) { } } -func (fz *fuzzer) generateAndExecuteTransaction(conn *mysql.Conn) { +// generateAndExecuteTransaction generates the queries of the transaction and then executes them. +func (fz *fuzzer) generateAndExecuteTransaction(t *testing.T, conn *mysql.Conn, threadId int) { + // randomly generate an update set to use and the value to increment it by. + updateSetVal := rand.Intn(fz.updateSets) + incrementVal := rand.Int31() + // We have to generate the update queries first. We can run the inserts only after the update queries. + // Otherwise, our check to see that the ids in the twopc_fuzzer_insert table in all the shards are the exact same + // for each update set ordered by the auto increment column will not be true. + // That assertion depends on all the transactions running updates first to ensure that for any given update set, + // no two transactions are running the insert queries. + queries := fz.generateUpdateQueries(updateSetVal, incrementVal) + queries = append(queries, fz.generateInsertQueries(updateSetVal, threadId)...) + _, err := conn.ExecuteFetch("begin", 0, false) + require.NoError(t, err) + for _, query := range queries { + _, _ = conn.ExecuteFetch(query, 0, false) + } + _, _ = conn.ExecuteFetch("commit", 0, false) +} +// generateUpdateQueries generates the queries to run updates on the twopc_fuzzer_update table. +// It takes the update set index and the value to increment the set by. +func (fz *fuzzer) generateUpdateQueries(updateSet int, incrementVal int32) []string { + var queries []string + for _, id := range updateRowsVals[updateSet] { + queries = append(queries, fmt.Sprintf(updateFuzzUpdate, incrementVal, id)) + } + rand.Shuffle(len(queries), func(i, j int) { + queries[i], queries[j] = queries[j], queries[i] + }) + return queries +} + +// generateInsertQueries generates the queries to run inserts on the twopc_fuzzer_insert table. +// It takes the update set index and the thread id that is generating these inserts. +func (fz *fuzzer) generateInsertQueries(updateSet int, threadId int) []string { + var queries []string + for _, id := range updateRowsVals[0] { + queries = append(queries, fmt.Sprintf(insertIntoFuzzInsert, updateSet, threadId*16+id)) + } + rand.Shuffle(len(queries), func(i, j int) { + queries[i], queries[j] = queries[j], queries[i] + }) + return queries } From baa764f6f571ab275ebb2877f5153275a78b37a8 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Thu, 25 Jul 2024 18:02:47 +0530 Subject: [PATCH 03/11] test: add verification logic to see if all the transactions were indeed atomic Signed-off-by: Manan Gupta --- .../endtoend/transaction/twopc/fuzzer_test.go | 70 +++++++++++++++++-- go/test/endtoend/transaction/twopc/schema.sql | 3 +- 2 files changed, 67 insertions(+), 6 deletions(-) diff --git a/go/test/endtoend/transaction/twopc/fuzzer_test.go b/go/test/endtoend/transaction/twopc/fuzzer_test.go index 635d4c69134..77175276682 100644 --- a/go/test/endtoend/transaction/twopc/fuzzer_test.go +++ b/go/test/endtoend/transaction/twopc/fuzzer_test.go @@ -41,15 +41,18 @@ var ( }, } - insertIntoFuzzUpdate = "INSERT INTO twopc_fuzzer_update (id, col) VALUES (%d, %d)" - updateFuzzUpdate = "UPDATE twopc_fuzzer_update SET col = col + %d WHERE id = %d" - insertIntoFuzzInsert = "INSERT INTO twopc_fuzzer_insert (id, updateSet) VALUES (%d, %d)" + insertIntoFuzzUpdate = "INSERT INTO twopc_fuzzer_update (id, col) VALUES (%d, %d)" + updateFuzzUpdate = "UPDATE twopc_fuzzer_update SET col = col + %d WHERE id = %d" + insertIntoFuzzInsert = "INSERT INTO twopc_fuzzer_insert (id, updateSet) VALUES (%d, '%s')" + selectFromFuzzUpdate = "SELECT col FROM twopc_fuzzer_update WHERE id = %d" + selectIdFromFuzzInsert = "SELECT id FROM twopc_fuzzer_insert WHERE updateSet = '%s' ORDER BY col" ) // TestTwoPCFuzzTest tests 2PC transactions in a fuzzer environment. // The testing strategy involves running many transactions and checking that they all must be atomic. // To this end, we have a very unique strategy. We have two sharded tables `twopc_fuzzer_update`, and `twopc_fuzzer_insert` with the following columns. // - id: This is the sharding column. We use reverse_bits as the sharding vindex because it is easy to reason about where a row will end up. +// For the `twopc_fuzzer_insert` column, it is calculated from the thread id of the fuzzer thread that inserted it. // - col in `twopc_fuzzer_insert`: An auto-increment column. // - col in `twopc_fuzzer_update`: This is a bigint value that we will use to increment on updates. // - updateSet: This column will store which update set the inserts where done for. @@ -92,11 +95,61 @@ func TestTwoPCFuzzTest(t *testing.T) { // Wait for the timeForTesting so that the threads continue to run. time.Sleep(tt.timeForTesting) + // Signal the fuzzer to stop. fz.stop() + + // Verify that all the transactions run were actually atomic and no data issues have occurred. + verifyTransactionsWereAtomic(t, conn) }) } } +// verifyTransactionsWereAtomic verifies that the invariants of test are held. +// It checks the heuristics to ensure that the transactions run were atomic. +func verifyTransactionsWereAtomic(t *testing.T, conn *mysql.Conn) { + for updateSetIdx, updateSet := range updateRowsVals { + // All the three values of the update set must be equal. + shard1Val := getColValueForIdFromFuzzUpdate(t, conn, updateSet[0]) + shard2Val := getColValueForIdFromFuzzUpdate(t, conn, updateSet[1]) + shard3Val := getColValueForIdFromFuzzUpdate(t, conn, updateSet[2]) + require.EqualValues(t, shard1Val, shard2Val) + require.EqualValues(t, shard3Val, shard2Val) + + // Next we get the IDs from all the three shards for the given update set index. + shard1IDs := getThreadIDsForUpdateSetFromFuzzInsert(t, conn, updateSetIdx, 1) + shard2IDs := getThreadIDsForUpdateSetFromFuzzInsert(t, conn, updateSetIdx, 2) + shard3IDs := getThreadIDsForUpdateSetFromFuzzInsert(t, conn, updateSetIdx, 3) + require.EqualValues(t, shard1IDs, shard2IDs) + require.EqualValues(t, shard3IDs, shard2IDs) + } +} + +// getColValueForIdFromFuzzUpdate gets the col column value for the given id in the twopc_fuzzer_update table. +func getColValueForIdFromFuzzUpdate(t *testing.T, conn *mysql.Conn, id int) uint64 { + res, err := conn.ExecuteFetch(fmt.Sprintf(selectFromFuzzUpdate, id), 1, false) + require.NoError(t, err) + require.Len(t, res.Rows, 1) + require.Len(t, res.Rows[0], 1) + val, err := res.Rows[0][0].ToUint64() + require.NoError(t, err) + return val +} + +// getThreadIDsForUpdateSetFromFuzzInsert gets the thread IDs for the given update set ordered by the col column from the twopc_fuzzer_insert table. +func getThreadIDsForUpdateSetFromFuzzInsert(t *testing.T, conn *mysql.Conn, updateSet int, shard int) []int { + res, err := conn.ExecuteFetch(fmt.Sprintf(selectIdFromFuzzInsert, updateSetValueForShard(updateSet, shard)), 10000, false) + require.NoError(t, err) + var ids []int + for _, row := range res.Rows { + require.Len(t, row, 1) + val, err := row[0].ToInt() + require.NoError(t, err) + // We reverse map the value to the thread id that had inserted it so that we can use it to compare the lists. + ids = append(ids, (val-updateRowsVals[0][shard-1])/16) + } + return ids +} + // fuzzer runs threads that runs queries against the databases. // It has parameters that define the way the queries are constructed. type fuzzer struct { @@ -168,6 +221,7 @@ func (fz *fuzzer) runFuzzerThread(t *testing.T, threadId int) { // initialize initializes all the variables that will be needed for running the fuzzer. // It also creates the rows for the `twopc_fuzzer_update` table. func (fz *fuzzer) initialize(t *testing.T, conn *mysql.Conn) { + updateRowsVals = updateRowsVals[0:1] for i := 1; i < fz.updateSets; i++ { updateRowsVals = append(updateRowsVals, []int{ updateRowsVals[0][0] + i*16, @@ -201,6 +255,7 @@ func (fz *fuzzer) generateAndExecuteTransaction(t *testing.T, conn *mysql.Conn, for _, query := range queries { _, _ = conn.ExecuteFetch(query, 0, false) } + // TODO: Check if we want to randomize commit and rollback decisions. _, _ = conn.ExecuteFetch("commit", 0, false) } @@ -221,11 +276,16 @@ func (fz *fuzzer) generateUpdateQueries(updateSet int, incrementVal int32) []str // It takes the update set index and the thread id that is generating these inserts. func (fz *fuzzer) generateInsertQueries(updateSet int, threadId int) []string { var queries []string - for _, id := range updateRowsVals[0] { - queries = append(queries, fmt.Sprintf(insertIntoFuzzInsert, updateSet, threadId*16+id)) + for idx, id := range updateRowsVals[0] { + queries = append(queries, fmt.Sprintf(insertIntoFuzzInsert, threadId*16+id, updateSetValueForShard(updateSet, idx+1))) } rand.Shuffle(len(queries), func(i, j int) { queries[i], queries[j] = queries[j], queries[i] }) return queries } + +// updateSetValueForShard gets a string representation to store the update set value for each shard. +func updateSetValueForShard(updateSet int, shard int) string { + return fmt.Sprintf("Shard-%d:%d", shard, updateSet) +} diff --git a/go/test/endtoend/transaction/twopc/schema.sql b/go/test/endtoend/transaction/twopc/schema.sql index f7f1dd0f472..1d3bb8decb6 100644 --- a/go/test/endtoend/transaction/twopc/schema.sql +++ b/go/test/endtoend/transaction/twopc/schema.sql @@ -19,7 +19,8 @@ create table twopc_fuzzer_update ( create table twopc_fuzzer_insert ( id bigint, - updateSet bigint, + updateSet varchar(50), col bigint auto_increment, + key(col), primary key (id, col) ) Engine=InnoDB; From e0efd76b037381a96011087b1034e74c60e3c87a Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Thu, 25 Jul 2024 18:13:47 +0530 Subject: [PATCH 04/11] test: add multiple threads tests Signed-off-by: Manan Gupta --- go/test/endtoend/transaction/twopc/fuzzer_test.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/go/test/endtoend/transaction/twopc/fuzzer_test.go b/go/test/endtoend/transaction/twopc/fuzzer_test.go index 77175276682..1f4e56772ed 100644 --- a/go/test/endtoend/transaction/twopc/fuzzer_test.go +++ b/go/test/endtoend/transaction/twopc/fuzzer_test.go @@ -80,6 +80,18 @@ func TestTwoPCFuzzTest(t *testing.T) { updateSets: 1, timeForTesting: 5 * time.Second, }, + { + name: "Multiple Threads - Single Set", + threads: 5, + updateSets: 1, + timeForTesting: 5 * time.Second, + }, + { + name: "Multiple Threads - Multiple Set", + threads: 15, + updateSets: 15, + timeForTesting: 5 * time.Second, + }, } for _, tt := range testcases { From 8d9f8c76256582497adaba490d725f92e3ac2b88 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Thu, 25 Jul 2024 18:18:37 +0530 Subject: [PATCH 05/11] test: rollback transactions if any query fails Signed-off-by: Manan Gupta --- go/test/endtoend/transaction/twopc/fuzzer_test.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/go/test/endtoend/transaction/twopc/fuzzer_test.go b/go/test/endtoend/transaction/twopc/fuzzer_test.go index 1f4e56772ed..9bef31c25cd 100644 --- a/go/test/endtoend/transaction/twopc/fuzzer_test.go +++ b/go/test/endtoend/transaction/twopc/fuzzer_test.go @@ -82,7 +82,7 @@ func TestTwoPCFuzzTest(t *testing.T) { }, { name: "Multiple Threads - Single Set", - threads: 5, + threads: 2, updateSets: 1, timeForTesting: 5 * time.Second, }, @@ -264,11 +264,17 @@ func (fz *fuzzer) generateAndExecuteTransaction(t *testing.T, conn *mysql.Conn, queries = append(queries, fz.generateInsertQueries(updateSetVal, threadId)...) _, err := conn.ExecuteFetch("begin", 0, false) require.NoError(t, err) + finalCommand := "commit" for _, query := range queries { - _, _ = conn.ExecuteFetch(query, 0, false) + _, err = conn.ExecuteFetch(query, 0, false) + // If any command fails because of deadlocks or timeout or whatever, then we need to rollback the transaction. + if err != nil { + finalCommand = "rollback" + break + } } // TODO: Check if we want to randomize commit and rollback decisions. - _, _ = conn.ExecuteFetch("commit", 0, false) + _, _ = conn.ExecuteFetch(finalCommand, 0, false) } // generateUpdateQueries generates the queries to run updates on the twopc_fuzzer_update table. From 2cfa73329eeaaf57257b9864c93ed71ba6245bdf Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Fri, 26 Jul 2024 11:29:14 +0530 Subject: [PATCH 06/11] feat: fix deletions from table Signed-off-by: Manan Gupta --- .../endtoend/transaction/twopc/main_test.go | 22 +++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/go/test/endtoend/transaction/twopc/main_test.go b/go/test/endtoend/transaction/twopc/main_test.go index 68e5632e79f..0d94a6b4c8a 100644 --- a/go/test/endtoend/transaction/twopc/main_test.go +++ b/go/test/endtoend/transaction/twopc/main_test.go @@ -127,8 +127,26 @@ func cleanup(t *testing.T) { defer conn.Close() _, _ = utils.ExecAllowError(t, conn, "delete from twopc_user") - _, _ = utils.ExecAllowError(t, conn, "delete from twopc_fuzzer_insert") - _, _ = utils.ExecAllowError(t, conn, "delete from twopc_fuzzer_update") + clearOutTable(t, conn, "twopc_fuzzer_insert") + clearOutTable(t, conn, "twopc_fuzzer_update") +} + +// clearOutTable deletes everything from a table. Sometimes the table might have more rows than allowed in a single delete query, +// so we have to do the deletions iteratively. +func clearOutTable(t *testing.T, conn *mysql.Conn, tableName string) { + for { + res, err := conn.ExecuteFetch(fmt.Sprintf("SELECT count(*) FROM %v", tableName), 1, false) + require.NoError(t, err) + require.Len(t, res.Rows, 1) + require.Len(t, res.Rows[0], 1) + rowCount, err := res.Rows[0][0].ToInt() + require.NoError(t, err) + if rowCount == 0 { + return + } + _, err = conn.ExecuteFetch(fmt.Sprintf("DELETE FROM %v LIMIT 10000", tableName), 10000, false) + require.NoError(t, err) + } } type extractInterestingValues func(dtidMap map[string]string, vals []sqltypes.Value) []sqltypes.Value From 8c0bb8a7aa517f885cf896fefa83f9098775590b Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Fri, 26 Jul 2024 14:52:07 +0530 Subject: [PATCH 07/11] test: make update set values a member of the fuzzer struct Signed-off-by: Manan Gupta --- .../endtoend/transaction/twopc/fuzzer_test.go | 41 +++++++++---------- 1 file changed, 20 insertions(+), 21 deletions(-) diff --git a/go/test/endtoend/transaction/twopc/fuzzer_test.go b/go/test/endtoend/transaction/twopc/fuzzer_test.go index 9bef31c25cd..da502aad1de 100644 --- a/go/test/endtoend/transaction/twopc/fuzzer_test.go +++ b/go/test/endtoend/transaction/twopc/fuzzer_test.go @@ -31,14 +31,12 @@ import ( ) var ( - // updateRowsVals are the rows that we use to ensure 1 update on each shard with the same increment. - updateRowsVals = [][]int{ - { - 4, // 4 maps to 0x20 and ends up in the first shard (-40) - 6, // 6 maps to 0x60 and ends up in the second shard (40-80) - 9, // 9 maps to 0x90 and ends up in the third shard (80-) - // We can increment all of these values by multiples of 16 and they'll always be in the same shard. - }, + // updateRowBaseVals is the base row values that we use to ensure 1 update on each shard with the same increment. + updateRowBaseVals = [3]int{ + 4, // 4 maps to 0x20 and ends up in the first shard (-40) + 6, // 6 maps to 0x60 and ends up in the second shard (40-80) + 9, // 9 maps to 0x90 and ends up in the third shard (80-) + // We can increment all of these values by multiples of 16 and they'll always be in the same shard. } insertIntoFuzzUpdate = "INSERT INTO twopc_fuzzer_update (id, col) VALUES (%d, %d)" @@ -111,15 +109,15 @@ func TestTwoPCFuzzTest(t *testing.T) { fz.stop() // Verify that all the transactions run were actually atomic and no data issues have occurred. - verifyTransactionsWereAtomic(t, conn) + fz.verifyTransactionsWereAtomic(t, conn) }) } } // verifyTransactionsWereAtomic verifies that the invariants of test are held. // It checks the heuristics to ensure that the transactions run were atomic. -func verifyTransactionsWereAtomic(t *testing.T, conn *mysql.Conn) { - for updateSetIdx, updateSet := range updateRowsVals { +func (fz *fuzzer) verifyTransactionsWereAtomic(t *testing.T, conn *mysql.Conn) { + for updateSetIdx, updateSet := range fz.updateRowsVals { // All the three values of the update set must be equal. shard1Val := getColValueForIdFromFuzzUpdate(t, conn, updateSet[0]) shard2Val := getColValueForIdFromFuzzUpdate(t, conn, updateSet[1]) @@ -157,7 +155,7 @@ func getThreadIDsForUpdateSetFromFuzzInsert(t *testing.T, conn *mysql.Conn, upda val, err := row[0].ToInt() require.NoError(t, err) // We reverse map the value to the thread id that had inserted it so that we can use it to compare the lists. - ids = append(ids, (val-updateRowsVals[0][shard-1])/16) + ids = append(ids, (val-updateRowBaseVals[shard-1])/16) } return ids } @@ -173,6 +171,8 @@ type fuzzer struct { shouldStop atomic.Bool // wg is an internal state variable, that used to know whether the fuzzer threads are running or not. wg sync.WaitGroup + // updateRowVals are the rows that we use to ensure 1 update on each shard with the same increment. + updateRowsVals [][]int } // newFuzzer creates a new fuzzer struct. @@ -233,16 +233,15 @@ func (fz *fuzzer) runFuzzerThread(t *testing.T, threadId int) { // initialize initializes all the variables that will be needed for running the fuzzer. // It also creates the rows for the `twopc_fuzzer_update` table. func (fz *fuzzer) initialize(t *testing.T, conn *mysql.Conn) { - updateRowsVals = updateRowsVals[0:1] - for i := 1; i < fz.updateSets; i++ { - updateRowsVals = append(updateRowsVals, []int{ - updateRowsVals[0][0] + i*16, - updateRowsVals[0][1] + i*16, - updateRowsVals[0][2] + i*16, + for i := 0; i < fz.updateSets; i++ { + fz.updateRowsVals = append(fz.updateRowsVals, []int{ + updateRowBaseVals[0] + i*16, + updateRowBaseVals[1] + i*16, + updateRowBaseVals[2] + i*16, }) } - for _, updateSet := range updateRowsVals { + for _, updateSet := range fz.updateRowsVals { for _, id := range updateSet { _, err := conn.ExecuteFetch(fmt.Sprintf(insertIntoFuzzUpdate, id, 0), 0, false) require.NoError(t, err) @@ -281,7 +280,7 @@ func (fz *fuzzer) generateAndExecuteTransaction(t *testing.T, conn *mysql.Conn, // It takes the update set index and the value to increment the set by. func (fz *fuzzer) generateUpdateQueries(updateSet int, incrementVal int32) []string { var queries []string - for _, id := range updateRowsVals[updateSet] { + for _, id := range fz.updateRowsVals[updateSet] { queries = append(queries, fmt.Sprintf(updateFuzzUpdate, incrementVal, id)) } rand.Shuffle(len(queries), func(i, j int) { @@ -294,7 +293,7 @@ func (fz *fuzzer) generateUpdateQueries(updateSet int, incrementVal int32) []str // It takes the update set index and the thread id that is generating these inserts. func (fz *fuzzer) generateInsertQueries(updateSet int, threadId int) []string { var queries []string - for idx, id := range updateRowsVals[0] { + for idx, id := range updateRowBaseVals { queries = append(queries, fmt.Sprintf(insertIntoFuzzInsert, threadId*16+id, updateSetValueForShard(updateSet, idx+1))) } rand.Shuffle(len(queries), func(i, j int) { From 7fd96a5301796a987d19dd24dc87157aa16b297d Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Fri, 26 Jul 2024 15:12:32 +0530 Subject: [PATCH 08/11] test: split the information into more columns instead of overloading the same column with extra information Signed-off-by: Manan Gupta --- .../endtoend/transaction/twopc/fuzzer_test.go | 35 +++++++++++-------- go/test/endtoend/transaction/twopc/schema.sql | 3 +- 2 files changed, 22 insertions(+), 16 deletions(-) diff --git a/go/test/endtoend/transaction/twopc/fuzzer_test.go b/go/test/endtoend/transaction/twopc/fuzzer_test.go index da502aad1de..efcc38fb1b0 100644 --- a/go/test/endtoend/transaction/twopc/fuzzer_test.go +++ b/go/test/endtoend/transaction/twopc/fuzzer_test.go @@ -41,30 +41,30 @@ var ( insertIntoFuzzUpdate = "INSERT INTO twopc_fuzzer_update (id, col) VALUES (%d, %d)" updateFuzzUpdate = "UPDATE twopc_fuzzer_update SET col = col + %d WHERE id = %d" - insertIntoFuzzInsert = "INSERT INTO twopc_fuzzer_insert (id, updateSet) VALUES (%d, '%s')" + insertIntoFuzzInsert = "INSERT INTO twopc_fuzzer_insert (id, updateSet, threadId) VALUES (%d, %d, %d)" selectFromFuzzUpdate = "SELECT col FROM twopc_fuzzer_update WHERE id = %d" - selectIdFromFuzzInsert = "SELECT id FROM twopc_fuzzer_insert WHERE updateSet = '%s' ORDER BY col" + selectIdFromFuzzInsert = "SELECT threadId FROM twopc_fuzzer_insert WHERE updateSet = %d AND id = %d ORDER BY col" ) // TestTwoPCFuzzTest tests 2PC transactions in a fuzzer environment. // The testing strategy involves running many transactions and checking that they all must be atomic. // To this end, we have a very unique strategy. We have two sharded tables `twopc_fuzzer_update`, and `twopc_fuzzer_insert` with the following columns. // - id: This is the sharding column. We use reverse_bits as the sharding vindex because it is easy to reason about where a row will end up. -// For the `twopc_fuzzer_insert` column, it is calculated from the thread id of the fuzzer thread that inserted it. // - col in `twopc_fuzzer_insert`: An auto-increment column. // - col in `twopc_fuzzer_update`: This is a bigint value that we will use to increment on updates. // - updateSet: This column will store which update set the inserts where done for. +// - threadId: It stores the thread id of the fuzzer thread that inserted the row. // // The testing strategy is as follows - // Every transaction will do 2 things - // - One, it will increment the `col` on 1 row in each of the shards of the `twopc_fuzzer_update` table. // To do this, we have sets of rows that each map to one shard. We prepopulate this before the test starts. -// These sets are stored in updateRowsVals. +// These sets are stored in the fuzzer in updateRowsVals. // - Two, it will insert one row in each of the shards of the `twopc_fuzzer_insert` table and it will also store the update set that it updated the rows off. // // We can check that a transaction was atomic by basically checking that the `col` value for all the rows that were updated together should match. // If any transaction was partially successful, then it would have missed an increment on one of the rows. -// Moreover, the order of rows for a given update set in the 3 shards should be the same to ensure that conflicting transactions got committed in the same exact order. +// Moreover, the threadIDs of rows for a given update set in the 3 shards should be the same to ensure that conflicting transactions got committed in the same exact order. func TestTwoPCFuzzTest(t *testing.T) { testcases := []struct { name string @@ -147,15 +147,16 @@ func getColValueForIdFromFuzzUpdate(t *testing.T, conn *mysql.Conn, id int) uint // getThreadIDsForUpdateSetFromFuzzInsert gets the thread IDs for the given update set ordered by the col column from the twopc_fuzzer_insert table. func getThreadIDsForUpdateSetFromFuzzInsert(t *testing.T, conn *mysql.Conn, updateSet int, shard int) []int { - res, err := conn.ExecuteFetch(fmt.Sprintf(selectIdFromFuzzInsert, updateSetValueForShard(updateSet, shard)), 10000, false) + // We will select all the rows for the given update set for the given shard. To get all the rows for the given shard, + // we can use the id column for filtering, since we know that the first shard will have all the values of id as 4, second shard as 6 and the last one as 9. + res, err := conn.ExecuteFetch(fmt.Sprintf(selectIdFromFuzzInsert, updateSet, updateRowBaseVals[shard-1]), 10000, false) require.NoError(t, err) var ids []int for _, row := range res.Rows { require.Len(t, row, 1) - val, err := row[0].ToInt() + threadId, err := row[0].ToInt() require.NoError(t, err) - // We reverse map the value to the thread id that had inserted it so that we can use it to compare the lists. - ids = append(ids, (val-updateRowBaseVals[shard-1])/16) + ids = append(ids, threadId) } return ids } @@ -259,13 +260,12 @@ func (fz *fuzzer) generateAndExecuteTransaction(t *testing.T, conn *mysql.Conn, // for each update set ordered by the auto increment column will not be true. // That assertion depends on all the transactions running updates first to ensure that for any given update set, // no two transactions are running the insert queries. - queries := fz.generateUpdateQueries(updateSetVal, incrementVal) + queries := []string{"begin"} + queries = append(queries, fz.generateUpdateQueries(updateSetVal, incrementVal)...) queries = append(queries, fz.generateInsertQueries(updateSetVal, threadId)...) - _, err := conn.ExecuteFetch("begin", 0, false) - require.NoError(t, err) finalCommand := "commit" for _, query := range queries { - _, err = conn.ExecuteFetch(query, 0, false) + _, err := conn.ExecuteFetch(query, 0, false) // If any command fails because of deadlocks or timeout or whatever, then we need to rollback the transaction. if err != nil { finalCommand = "rollback" @@ -293,8 +293,13 @@ func (fz *fuzzer) generateUpdateQueries(updateSet int, incrementVal int32) []str // It takes the update set index and the thread id that is generating these inserts. func (fz *fuzzer) generateInsertQueries(updateSet int, threadId int) []string { var queries []string - for idx, id := range updateRowBaseVals { - queries = append(queries, fmt.Sprintf(insertIntoFuzzInsert, threadId*16+id, updateSetValueForShard(updateSet, idx+1))) + for _, baseVal := range updateRowBaseVals { + // In the twopc_fuzzer_insert table we are going to be inserting the following values - + // - id: We use the updateRowBaseVals to ensure that the 3 insertions happen on 3 different shards. + // This also allows us to read rows any of the shards without shard targeting by just filtering by this column. + // - updateSet: The update set index that these insertions correspond too. + // - threadId: The thread ID of the fuzzer thread that is running the transaction. + queries = append(queries, fmt.Sprintf(insertIntoFuzzInsert, baseVal, updateSet, threadId)) } rand.Shuffle(len(queries), func(i, j int) { queries[i], queries[j] = queries[j], queries[i] diff --git a/go/test/endtoend/transaction/twopc/schema.sql b/go/test/endtoend/transaction/twopc/schema.sql index 1d3bb8decb6..f7133b590af 100644 --- a/go/test/endtoend/transaction/twopc/schema.sql +++ b/go/test/endtoend/transaction/twopc/schema.sql @@ -19,7 +19,8 @@ create table twopc_fuzzer_update ( create table twopc_fuzzer_insert ( id bigint, - updateSet varchar(50), + updateSet bigint, + threadId bigint, col bigint auto_increment, key(col), primary key (id, col) From a382456b887b0606886b32d0eeca3c275da26024 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Fri, 26 Jul 2024 15:13:39 +0530 Subject: [PATCH 09/11] feat: use a different connection for verification Signed-off-by: Manan Gupta --- go/test/endtoend/transaction/twopc/fuzzer_test.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/go/test/endtoend/transaction/twopc/fuzzer_test.go b/go/test/endtoend/transaction/twopc/fuzzer_test.go index efcc38fb1b0..29e6f01032a 100644 --- a/go/test/endtoend/transaction/twopc/fuzzer_test.go +++ b/go/test/endtoend/transaction/twopc/fuzzer_test.go @@ -109,14 +109,16 @@ func TestTwoPCFuzzTest(t *testing.T) { fz.stop() // Verify that all the transactions run were actually atomic and no data issues have occurred. - fz.verifyTransactionsWereAtomic(t, conn) + fz.verifyTransactionsWereAtomic(t) }) } } // verifyTransactionsWereAtomic verifies that the invariants of test are held. // It checks the heuristics to ensure that the transactions run were atomic. -func (fz *fuzzer) verifyTransactionsWereAtomic(t *testing.T, conn *mysql.Conn) { +func (fz *fuzzer) verifyTransactionsWereAtomic(t *testing.T) { + conn, err := mysql.Connect(context.Background(), &vtParams) + require.NoError(t, err) for updateSetIdx, updateSet := range fz.updateRowsVals { // All the three values of the update set must be equal. shard1Val := getColValueForIdFromFuzzUpdate(t, conn, updateSet[0]) From 813d5ddc04e6333e8e4818073ce2199b17f97191 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Mon, 29 Jul 2024 10:54:57 +0530 Subject: [PATCH 10/11] feat: address review comments Signed-off-by: Manan Gupta --- .../twopc/{ => fuzzer}/fuzzer_test.go | 23 ++- .../transaction/twopc/fuzzer/main_test.go | 138 ++++++++++++++++++ .../transaction/twopc/fuzzer/schema.sql | 14 ++ .../transaction/twopc/fuzzer/vschema.json | 26 ++++ .../endtoend/transaction/twopc/main_test.go | 20 --- go/test/endtoend/transaction/twopc/schema.sql | 15 -- .../endtoend/transaction/twopc/vschema.json | 16 -- test/config.json | 9 ++ 8 files changed, 196 insertions(+), 65 deletions(-) rename go/test/endtoend/transaction/twopc/{ => fuzzer}/fuzzer_test.go (96%) create mode 100644 go/test/endtoend/transaction/twopc/fuzzer/main_test.go create mode 100644 go/test/endtoend/transaction/twopc/fuzzer/schema.sql create mode 100644 go/test/endtoend/transaction/twopc/fuzzer/vschema.json diff --git a/go/test/endtoend/transaction/twopc/fuzzer_test.go b/go/test/endtoend/transaction/twopc/fuzzer/fuzzer_test.go similarity index 96% rename from go/test/endtoend/transaction/twopc/fuzzer_test.go rename to go/test/endtoend/transaction/twopc/fuzzer/fuzzer_test.go index 29e6f01032a..ff440164042 100644 --- a/go/test/endtoend/transaction/twopc/fuzzer_test.go +++ b/go/test/endtoend/transaction/twopc/fuzzer/fuzzer_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package transaction +package fuzzer import ( "context" @@ -217,18 +217,13 @@ func (fz *fuzzer) runFuzzerThread(t *testing.T, threadId int) { fz.wg.Done() }() - // Create a connection to the vtgate to run transactions. - conn, err := mysql.Connect(context.Background(), &vtParams) - require.NoError(t, err) - defer conn.Close() - for { // If fuzzer thread is marked to be stopped, then we should exit this go routine. if fz.shouldStop.Load() == true { return } // Run an atomic transaction - fz.generateAndExecuteTransaction(t, conn, threadId) + fz.generateAndExecuteTransaction(threadId) } } @@ -253,7 +248,13 @@ func (fz *fuzzer) initialize(t *testing.T, conn *mysql.Conn) { } // generateAndExecuteTransaction generates the queries of the transaction and then executes them. -func (fz *fuzzer) generateAndExecuteTransaction(t *testing.T, conn *mysql.Conn, threadId int) { +func (fz *fuzzer) generateAndExecuteTransaction(threadId int) { + // Create a connection to the vtgate to run transactions. + conn, err := mysql.Connect(context.Background(), &vtParams) + if err != nil { + return + } + defer conn.Close() // randomly generate an update set to use and the value to increment it by. updateSetVal := rand.Intn(fz.updateSets) incrementVal := rand.Int31() @@ -274,7 +275,6 @@ func (fz *fuzzer) generateAndExecuteTransaction(t *testing.T, conn *mysql.Conn, break } } - // TODO: Check if we want to randomize commit and rollback decisions. _, _ = conn.ExecuteFetch(finalCommand, 0, false) } @@ -308,8 +308,3 @@ func (fz *fuzzer) generateInsertQueries(updateSet int, threadId int) []string { }) return queries } - -// updateSetValueForShard gets a string representation to store the update set value for each shard. -func updateSetValueForShard(updateSet int, shard int) string { - return fmt.Sprintf("Shard-%d:%d", shard, updateSet) -} diff --git a/go/test/endtoend/transaction/twopc/fuzzer/main_test.go b/go/test/endtoend/transaction/twopc/fuzzer/main_test.go new file mode 100644 index 00000000000..5e1d14d77e4 --- /dev/null +++ b/go/test/endtoend/transaction/twopc/fuzzer/main_test.go @@ -0,0 +1,138 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fuzzer + +import ( + "context" + _ "embed" + "flag" + "fmt" + "os" + "testing" + + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/test/endtoend/cluster" +) + +var ( + clusterInstance *cluster.LocalProcessCluster + vtParams mysql.ConnParams + vtgateGrpcAddress string + keyspaceName = "ks" + cell = "zone1" + hostname = "localhost" + sidecarDBName = "vt_ks" + + //go:embed schema.sql + SchemaSQL string + + //go:embed vschema.json + VSchema string +) + +func TestMain(m *testing.M) { + defer cluster.PanicHandler(nil) + flag.Parse() + + exitcode := func() int { + clusterInstance = cluster.NewCluster(cell, hostname) + defer clusterInstance.Teardown() + + // Start topo server + if err := clusterInstance.StartTopo(); err != nil { + return 1 + } + + // Reserve vtGate port in order to pass it to vtTablet + clusterInstance.VtgateGrpcPort = clusterInstance.GetAndReservePort() + + // Set extra args for twopc + clusterInstance.VtGateExtraArgs = append(clusterInstance.VtGateExtraArgs, + "--transaction_mode", "TWOPC", + "--grpc_use_effective_callerid", + ) + clusterInstance.VtTabletExtraArgs = append(clusterInstance.VtTabletExtraArgs, + "--twopc_enable", + "--twopc_abandon_age", "1", + ) + + // Start keyspace + keyspace := &cluster.Keyspace{ + Name: keyspaceName, + SchemaSQL: SchemaSQL, + VSchema: VSchema, + SidecarDBName: sidecarDBName, + } + if err := clusterInstance.StartKeyspace(*keyspace, []string{"-40", "40-80", "80-"}, 0, false); err != nil { + return 1 + } + + // Start Vtgate + if err := clusterInstance.StartVtgate(); err != nil { + return 1 + } + vtParams = clusterInstance.GetVTParams(keyspaceName) + vtgateGrpcAddress = fmt.Sprintf("%s:%d", clusterInstance.Hostname, clusterInstance.VtgateGrpcPort) + + return m.Run() + }() + os.Exit(exitcode) +} + +func start(t *testing.T) (*mysql.Conn, func()) { + ctx := context.Background() + conn, err := mysql.Connect(ctx, &vtParams) + require.NoError(t, err) + cleanup(t) + + return conn, func() { + conn.Close() + cleanup(t) + } +} + +func cleanup(t *testing.T) { + cluster.PanicHandler(t) + + ctx := context.Background() + conn, err := mysql.Connect(ctx, &vtParams) + require.NoError(t, err) + defer conn.Close() + + clearOutTable(t, conn, "twopc_fuzzer_insert") + clearOutTable(t, conn, "twopc_fuzzer_update") +} + +// clearOutTable deletes everything from a table. Sometimes the table might have more rows than allowed in a single delete query, +// so we have to do the deletions iteratively. +func clearOutTable(t *testing.T, conn *mysql.Conn, tableName string) { + for { + res, err := conn.ExecuteFetch(fmt.Sprintf("SELECT count(*) FROM %v", tableName), 1, false) + require.NoError(t, err) + require.Len(t, res.Rows, 1) + require.Len(t, res.Rows[0], 1) + rowCount, err := res.Rows[0][0].ToInt() + require.NoError(t, err) + if rowCount == 0 { + return + } + _, err = conn.ExecuteFetch(fmt.Sprintf("DELETE FROM %v LIMIT 10000", tableName), 10000, false) + require.NoError(t, err) + } +} diff --git a/go/test/endtoend/transaction/twopc/fuzzer/schema.sql b/go/test/endtoend/transaction/twopc/fuzzer/schema.sql new file mode 100644 index 00000000000..290da808991 --- /dev/null +++ b/go/test/endtoend/transaction/twopc/fuzzer/schema.sql @@ -0,0 +1,14 @@ +create table twopc_fuzzer_update ( + id bigint, + col bigint, + primary key (id) +) Engine=InnoDB; + +create table twopc_fuzzer_insert ( + id bigint, + updateSet bigint, + threadId bigint, + col bigint auto_increment, + key(col), + primary key (id, col) +) Engine=InnoDB; diff --git a/go/test/endtoend/transaction/twopc/fuzzer/vschema.json b/go/test/endtoend/transaction/twopc/fuzzer/vschema.json new file mode 100644 index 00000000000..e3854f8f101 --- /dev/null +++ b/go/test/endtoend/transaction/twopc/fuzzer/vschema.json @@ -0,0 +1,26 @@ +{ + "sharded":true, + "vindexes": { + "reverse_bits": { + "type": "reverse_bits" + } + }, + "tables": { + "twopc_fuzzer_update": { + "column_vindexes": [ + { + "column": "id", + "name": "reverse_bits" + } + ] + }, + "twopc_fuzzer_insert": { + "column_vindexes": [ + { + "column": "id", + "name": "reverse_bits" + } + ] + } + } +} \ No newline at end of file diff --git a/go/test/endtoend/transaction/twopc/main_test.go b/go/test/endtoend/transaction/twopc/main_test.go index 0d94a6b4c8a..b1332177c83 100644 --- a/go/test/endtoend/transaction/twopc/main_test.go +++ b/go/test/endtoend/transaction/twopc/main_test.go @@ -127,26 +127,6 @@ func cleanup(t *testing.T) { defer conn.Close() _, _ = utils.ExecAllowError(t, conn, "delete from twopc_user") - clearOutTable(t, conn, "twopc_fuzzer_insert") - clearOutTable(t, conn, "twopc_fuzzer_update") -} - -// clearOutTable deletes everything from a table. Sometimes the table might have more rows than allowed in a single delete query, -// so we have to do the deletions iteratively. -func clearOutTable(t *testing.T, conn *mysql.Conn, tableName string) { - for { - res, err := conn.ExecuteFetch(fmt.Sprintf("SELECT count(*) FROM %v", tableName), 1, false) - require.NoError(t, err) - require.Len(t, res.Rows, 1) - require.Len(t, res.Rows[0], 1) - rowCount, err := res.Rows[0][0].ToInt() - require.NoError(t, err) - if rowCount == 0 { - return - } - _, err = conn.ExecuteFetch(fmt.Sprintf("DELETE FROM %v LIMIT 10000", tableName), 10000, false) - require.NoError(t, err) - } } type extractInterestingValues func(dtidMap map[string]string, vals []sqltypes.Value) []sqltypes.Value diff --git a/go/test/endtoend/transaction/twopc/schema.sql b/go/test/endtoend/transaction/twopc/schema.sql index f7133b590af..2336c553502 100644 --- a/go/test/endtoend/transaction/twopc/schema.sql +++ b/go/test/endtoend/transaction/twopc/schema.sql @@ -10,18 +10,3 @@ create table twopc_music ( title varchar(64), primary key (id) ) Engine=InnoDB; - -create table twopc_fuzzer_update ( - id bigint, - col bigint, - primary key (id) -) Engine=InnoDB; - -create table twopc_fuzzer_insert ( - id bigint, - updateSet bigint, - threadId bigint, - col bigint auto_increment, - key(col), - primary key (id, col) -) Engine=InnoDB; diff --git a/go/test/endtoend/transaction/twopc/vschema.json b/go/test/endtoend/transaction/twopc/vschema.json index 48522aa8171..cdd17ebe1f0 100644 --- a/go/test/endtoend/transaction/twopc/vschema.json +++ b/go/test/endtoend/transaction/twopc/vschema.json @@ -24,22 +24,6 @@ "name": "xxhash" } ] - }, - "twopc_fuzzer_update": { - "column_vindexes": [ - { - "column": "id", - "name": "reverse_bits" - } - ] - }, - "twopc_fuzzer_insert": { - "column_vindexes": [ - { - "column": "id", - "name": "reverse_bits" - } - ] } } } \ No newline at end of file diff --git a/test/config.json b/test/config.json index 1cdf92127ef..49f77e1b7fb 100644 --- a/test/config.json +++ b/test/config.json @@ -842,6 +842,15 @@ "RetryMax": 1, "Tags": [] }, + "vtgate_transaction_twopc_fuzzer": { + "File": "unused.go", + "Args": ["vitess.io/vitess/go/test/endtoend/transaction/twopc/fuzzer"], + "Command": [], + "Manual": false, + "Shard": "vtgate_transaction", + "RetryMax": 1, + "Tags": [] + }, "vtgate_transaction_partial_exec": { "File": "unused.go", "Args": ["vitess.io/vitess/go/test/endtoend/vtgate/partialfailure"], From 745a4919f4805167e6f90fdb80db72fed516a2ae Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Mon, 29 Jul 2024 10:57:38 +0530 Subject: [PATCH 11/11] cleanup remove unrequired changes Signed-off-by: Manan Gupta --- go/test/endtoend/transaction/twopc/main_test.go | 1 - go/test/endtoend/transaction/twopc/schema.sql | 2 +- go/test/endtoend/transaction/twopc/vschema.json | 3 --- 3 files changed, 1 insertion(+), 5 deletions(-) diff --git a/go/test/endtoend/transaction/twopc/main_test.go b/go/test/endtoend/transaction/twopc/main_test.go index b1332177c83..8ac7cfc1f21 100644 --- a/go/test/endtoend/transaction/twopc/main_test.go +++ b/go/test/endtoend/transaction/twopc/main_test.go @@ -110,7 +110,6 @@ func start(t *testing.T) (*mysql.Conn, func()) { ctx := context.Background() conn, err := mysql.Connect(ctx, &vtParams) require.NoError(t, err) - cleanup(t) return conn, func() { conn.Close() diff --git a/go/test/endtoend/transaction/twopc/schema.sql b/go/test/endtoend/transaction/twopc/schema.sql index 2336c553502..60a7c19837c 100644 --- a/go/test/endtoend/transaction/twopc/schema.sql +++ b/go/test/endtoend/transaction/twopc/schema.sql @@ -9,4 +9,4 @@ create table twopc_music ( user_id bigint, title varchar(64), primary key (id) -) Engine=InnoDB; +) Engine=InnoDB; \ No newline at end of file diff --git a/go/test/endtoend/transaction/twopc/vschema.json b/go/test/endtoend/transaction/twopc/vschema.json index cdd17ebe1f0..4ff62df6808 100644 --- a/go/test/endtoend/transaction/twopc/vschema.json +++ b/go/test/endtoend/transaction/twopc/vschema.json @@ -3,9 +3,6 @@ "vindexes": { "xxhash": { "type": "xxhash" - }, - "reverse_bits": { - "type": "reverse_bits" } }, "tables": {