From 886c7c33b538f2ac744711c699456476708699e3 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Mon, 2 Dec 2024 20:19:09 +0530 Subject: [PATCH] test: add test for vindexes in TestVindexes Signed-off-by: Manan Gupta --- .../endtoend/transaction/twopc/main_test.go | 3 + go/test/endtoend/transaction/twopc/schema.sql | 25 +- .../endtoend/transaction/twopc/twopc_test.go | 222 ++++++++++++++++++ .../endtoend/transaction/twopc/vschema.json | 53 +++++ 4 files changed, 302 insertions(+), 1 deletion(-) diff --git a/go/test/endtoend/transaction/twopc/main_test.go b/go/test/endtoend/transaction/twopc/main_test.go index 2f27198fd2e..631b29647c9 100644 --- a/go/test/endtoend/transaction/twopc/main_test.go +++ b/go/test/endtoend/transaction/twopc/main_test.go @@ -143,6 +143,9 @@ func cleanup(t *testing.T) { cluster.PanicHandler(t) twopcutil.ClearOutTable(t, vtParams, "twopc_user") twopcutil.ClearOutTable(t, vtParams, "twopc_t1") + twopcutil.ClearOutTable(t, vtParams, "twopc_lookup") + twopcutil.ClearOutTable(t, vtParams, "lookup_unique") + twopcutil.ClearOutTable(t, vtParams, "lookup") sm.reset() } diff --git a/go/test/endtoend/transaction/twopc/schema.sql b/go/test/endtoend/transaction/twopc/schema.sql index 7c289a03c2a..aff839eabe9 100644 --- a/go/test/endtoend/transaction/twopc/schema.sql +++ b/go/test/endtoend/transaction/twopc/schema.sql @@ -18,4 +18,27 @@ create table twopc_t1 id bigint, col bigint, primary key (id) -) Engine=InnoDB; \ No newline at end of file +) Engine=InnoDB; + +create table twopc_lookup +( + id bigint, + col bigint, + col_unique bigint, + primary key (id) +) Engine=InnoDB; + +create table lookup +( + col varchar(128), + id bigint, + keyspace_id varbinary(100), + primary key (id) +) Engine = InnoDB; + +create table lookup_unique +( + col_unique varchar(128), + keyspace_id varbinary(100), + primary key (col_unique) +) Engine = InnoDB; diff --git a/go/test/endtoend/transaction/twopc/twopc_test.go b/go/test/endtoend/transaction/twopc/twopc_test.go index df064fb16cd..5a97f79a79f 100644 --- a/go/test/endtoend/transaction/twopc/twopc_test.go +++ b/go/test/endtoend/transaction/twopc/twopc_test.go @@ -1449,6 +1449,228 @@ func TestReadTransactionStatus(t *testing.T) { wg.Wait() } +// TestVindexes tests that different vindexes work well with two-phase commit. +func TestVindexes(t *testing.T) { + testcases := []struct { + name string + initQueries []string + testQueries []string + logExpected map[string][]string + }{ + { + name: "Lookup Single Update", + initQueries: []string{ + "insert into twopc_lookup(id, col, col_unique) values(4, 4, 6)", + "insert into twopc_lookup(id, col, col_unique) values(6, 4, 9)", + "insert into twopc_lookup(id, col, col_unique) values(9, 4, 4)", + }, + testQueries: []string{ + "begin", + "update twopc_lookup set col = 9 where col_unique = 9", + "commit", + }, + logExpected: map[string][]string{ + "ks.redo_statement:80-": { + "insert:[VARCHAR(\"dtid-3\") INT64(1) BLOB(\"delete from lookup where col = 4 and id = 6 and keyspace_id = _binary'`\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0' limit 10001\")]", + "insert:[VARCHAR(\"dtid-3\") INT64(2) BLOB(\"insert into lookup(col, id, keyspace_id) values (9, 6, _binary'`\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0')\")]", + "delete:[VARCHAR(\"dtid-3\") INT64(1) BLOB(\"delete from lookup where col = 4 and id = 6 and keyspace_id = _binary'`\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0' limit 10001\")]", + "delete:[VARCHAR(\"dtid-3\") INT64(2) BLOB(\"insert into lookup(col, id, keyspace_id) values (9, 6, _binary'`\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0')\")]", + }, + "ks.twopc_lookup:40-80": { + "update:[INT64(6) INT64(9) INT64(9)]", + }, + "ks.lookup:80-": { + "delete:[VARCHAR(\"4\") INT64(6) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", + "insert:[VARCHAR(\"9\") INT64(6) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", + }, + }, + }, + { + name: "Lookup-Unique Single Update", + initQueries: []string{ + "insert into twopc_lookup(id, col, col_unique) values(4, 4, 6)", + "insert into twopc_lookup(id, col, col_unique) values(6, 4, 9)", + "insert into twopc_lookup(id, col, col_unique) values(9, 4, 4)", + }, + testQueries: []string{ + "begin", + "update twopc_lookup set col_unique = 20 where col_unique = 9", + "commit", + }, + logExpected: map[string][]string{ + "ks.redo_statement:80-": { + "insert:[VARCHAR(\"dtid-3\") INT64(1) BLOB(\"delete from lookup_unique where col_unique = 9 and keyspace_id = _binary'`\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0' limit 10001\")]", + "insert:[VARCHAR(\"dtid-3\") INT64(2) BLOB(\"insert into lookup_unique(col_unique, keyspace_id) values (20, _binary'`\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0')\")]", + "delete:[VARCHAR(\"dtid-3\") INT64(1) BLOB(\"delete from lookup_unique where col_unique = 9 and keyspace_id = _binary'`\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0' limit 10001\")]", + "delete:[VARCHAR(\"dtid-3\") INT64(2) BLOB(\"insert into lookup_unique(col_unique, keyspace_id) values (20, _binary'`\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0')\")]", + }, + "ks.twopc_lookup:40-80": { + "update:[INT64(6) INT64(4) INT64(20)]", + }, + "ks.lookup_unique:80-": { + "delete:[VARCHAR(\"9\") VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", + "insert:[VARCHAR(\"20\") VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", + }, + }, + }, + { + name: "Lookup And Lookup-Unique Single Delete", + initQueries: []string{ + "insert into twopc_lookup(id, col, col_unique) values(4, 4, 6)", + "insert into twopc_lookup(id, col, col_unique) values(6, 4, 9)", + "insert into twopc_lookup(id, col, col_unique) values(9, 4, 4)", + }, + testQueries: []string{ + "begin", + "delete from twopc_lookup where col_unique = 9", + "commit", + }, + logExpected: map[string][]string{ + "ks.redo_statement:80-": { + "insert:[VARCHAR(\"dtid-3\") INT64(1) BLOB(\"delete from lookup where col = 4 and id = 6 and keyspace_id = _binary'`\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0' limit 10001\")]", + "insert:[VARCHAR(\"dtid-3\") INT64(2) BLOB(\"delete from lookup_unique where col_unique = 9 and keyspace_id = _binary'`\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0' limit 10001\")]", + "delete:[VARCHAR(\"dtid-3\") INT64(1) BLOB(\"delete from lookup where col = 4 and id = 6 and keyspace_id = _binary'`\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0' limit 10001\")]", + "delete:[VARCHAR(\"dtid-3\") INT64(2) BLOB(\"delete from lookup_unique where col_unique = 9 and keyspace_id = _binary'`\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0' limit 10001\")]", + }, + "ks.twopc_lookup:40-80": { + "delete:[INT64(6) INT64(4) INT64(9)]", + }, + "ks.lookup_unique:80-": { + "delete:[VARCHAR(\"9\") VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", + }, + "ks.lookup:80-": { + "delete:[VARCHAR(\"4\") INT64(6) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", + }, + }, + }, + { + name: "Lookup And Lookup-Unique Single Insertion", + initQueries: []string{ + "insert into twopc_lookup(id, col, col_unique) values(4, 4, 6)", + "insert into twopc_lookup(id, col, col_unique) values(6, 4, 9)", + "insert into twopc_lookup(id, col, col_unique) values(9, 4, 4)", + }, + testQueries: []string{ + "begin", + "insert into twopc_lookup(id, col, col_unique) values(20, 4, 22)", + "commit", + }, + logExpected: map[string][]string{ + "ks.redo_statement:80-": { + "insert:[VARCHAR(\"dtid-3\") INT64(1) BLOB(\"insert into lookup(col, id, keyspace_id) values (4, 20, _binary'(\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0')\")]", + "delete:[VARCHAR(\"dtid-3\") INT64(1) BLOB(\"insert into lookup(col, id, keyspace_id) values (4, 20, _binary'(\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0')\")]", + }, + "ks.lookup:80-": { + "insert:[VARCHAR(\"4\") INT64(20) VARBINARY(\"(\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", + }, + "ks.lookup_unique:-40": { + "insert:[VARCHAR(\"22\") VARBINARY(\"(\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", + }, + "ks.twopc_lookup:-40": { + "insert:[INT64(20) INT64(4) INT64(22)]", + }, + }, + }, + { + name: "Lookup And Lookup-Unique Mix", + initQueries: []string{ + "insert into twopc_lookup(id, col, col_unique) values(4, 4, 6)", + "insert into twopc_lookup(id, col, col_unique) values(6, 4, 9)", + "insert into twopc_lookup(id, col, col_unique) values(9, 4, 4)", + }, + testQueries: []string{ + "begin", + "insert into twopc_lookup(id, col, col_unique) values(20, 4, 22)", + "update twopc_lookup set col = 9 where col_unique = 9", + "delete from twopc_lookup where id = 9", + "commit", + }, + logExpected: map[string][]string{ + "ks.redo_statement:80-": { + "insert:[VARCHAR(\"dtid-3\") INT64(1) BLOB(\"insert into lookup(col, id, keyspace_id) values (4, 20, _binary'(\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0')\")]", + "insert:[VARCHAR(\"dtid-3\") INT64(2) BLOB(\"delete from lookup where col = 4 and id = 6 and keyspace_id = _binary'`\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0' limit 10001\")]", + "insert:[VARCHAR(\"dtid-3\") INT64(3) BLOB(\"insert into lookup(col, id, keyspace_id) values (9, 6, _binary'`\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0')\")]", + "insert:[VARCHAR(\"dtid-3\") INT64(4) BLOB(\"delete from lookup where col = 4 and id = 9 and keyspace_id = _binary'\\x90\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0' limit 10001\")]", + "insert:[VARCHAR(\"dtid-3\") INT64(5) BLOB(\"delete from lookup_unique where col_unique = 4 and keyspace_id = _binary'\\x90\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0' limit 10001\")]", + "insert:[VARCHAR(\"dtid-3\") INT64(6) BLOB(\"delete from twopc_lookup where id = 9 limit 10001 /* INT64 */\")]", + "delete:[VARCHAR(\"dtid-3\") INT64(1) BLOB(\"insert into lookup(col, id, keyspace_id) values (4, 20, _binary'(\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0')\")]", + "delete:[VARCHAR(\"dtid-3\") INT64(2) BLOB(\"delete from lookup where col = 4 and id = 6 and keyspace_id = _binary'`\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0' limit 10001\")]", + "delete:[VARCHAR(\"dtid-3\") INT64(3) BLOB(\"insert into lookup(col, id, keyspace_id) values (9, 6, _binary'`\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0')\")]", + "delete:[VARCHAR(\"dtid-3\") INT64(4) BLOB(\"delete from lookup where col = 4 and id = 9 and keyspace_id = _binary'\\x90\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0' limit 10001\")]", + "delete:[VARCHAR(\"dtid-3\") INT64(5) BLOB(\"delete from lookup_unique where col_unique = 4 and keyspace_id = _binary'\\x90\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0\\\\0' limit 10001\")]", + "delete:[VARCHAR(\"dtid-3\") INT64(6) BLOB(\"delete from twopc_lookup where id = 9 limit 10001 /* INT64 */\")]", + }, + "ks.redo_statement:40-80": { + "insert:[VARCHAR(\"dtid-3\") INT64(1) BLOB(\"update twopc_lookup set col = 9 where col_unique = 9 limit 10001 /* INT64 */\")]", + "delete:[VARCHAR(\"dtid-3\") INT64(1) BLOB(\"update twopc_lookup set col = 9 where col_unique = 9 limit 10001 /* INT64 */\")]", + }, + "ks.twopc_lookup:-40": { + "insert:[INT64(20) INT64(4) INT64(22)]", + }, + "ks.twopc_lookup:40-80": { + "update:[INT64(6) INT64(9) INT64(9)]", + }, + "ks.twopc_lookup:80-": { + "delete:[INT64(9) INT64(4) INT64(4)]", + }, + "ks.lookup_unique:-40": { + "insert:[VARCHAR(\"22\") VARBINARY(\"(\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", + }, + "ks.lookup_unique:80-": { + "delete:[VARCHAR(\"4\") VARBINARY(\"\\x90\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", + }, + "ks.lookup:80-": { + "insert:[VARCHAR(\"4\") INT64(20) VARBINARY(\"(\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", + "delete:[VARCHAR(\"4\") INT64(6) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", + "insert:[VARCHAR(\"9\") INT64(6) VARBINARY(\"`\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", + "delete:[VARCHAR(\"4\") INT64(9) VARBINARY(\"\\x90\\x00\\x00\\x00\\x00\\x00\\x00\\x00\")]", + }, + }, + }, + } + + for _, tt := range testcases { + t.Run(tt.name, func(t *testing.T) { + defer cleanup(t) + + vtgateConn, err := cluster.DialVTGate(context.Background(), t.Name(), vtgateGrpcAddress, "dt_user", "") + require.NoError(t, err) + defer vtgateConn.Close() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ch := make(chan *binlogdatapb.VEvent) + runVStream(t, ctx, ch, vtgateConn) + + conn := vtgateConn.Session("", nil) + qCtx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // initial insert + for _, query := range tt.initQueries { + execute(qCtx, t, conn, query) + } + + // ignore initial change + tableMap := make(map[string][]*querypb.Field) + dtMap := make(map[string]string) + _ = retrieveTransitionsWithTimeout(t, ch, tableMap, dtMap, 2*time.Second) + + // Insert into multiple shards + for _, query := range tt.testQueries { + execute(qCtx, t, conn, query) + } + + // Below check ensures that the transaction is resolved by the resolver on receiving unresolved transaction signal from MM. + logTable := retrieveTransitionsWithTimeout(t, ch, tableMap, dtMap, 2*time.Second) + for key, val := range tt.logExpected { + assert.EqualValues(t, val, logTable[key], key) + } + }) + } +} + func getTablet(tabletGrpcPort int) *tabletpb.Tablet { portMap := make(map[string]int32) portMap["grpc"] = int32(tabletGrpcPort) diff --git a/go/test/endtoend/transaction/twopc/vschema.json b/go/test/endtoend/transaction/twopc/vschema.json index bca58b05c1e..0c22f40d54b 100644 --- a/go/test/endtoend/transaction/twopc/vschema.json +++ b/go/test/endtoend/transaction/twopc/vschema.json @@ -6,6 +6,24 @@ }, "reverse_bits": { "type": "reverse_bits" + }, + "lookup_vdx": { + "type": "lookup", + "params": { + "table": "lookup", + "from": "col,id", + "to": "keyspace_id" + }, + "owner": "twopc_lookup" + }, + "lookup_unique_vdx": { + "type": "lookup_unique", + "params": { + "table": "lookup_unique", + "from": "col_unique", + "to": "keyspace_id" + }, + "owner": "twopc_lookup" } }, "tables": { @@ -32,6 +50,41 @@ "name": "reverse_bits" } ] + }, + "twopc_lookup": { + "column_vindexes": [ + { + "column": "id", + "name": "reverse_bits" + }, + { + "columns": [ + "col", + "id" + ], + "name": "lookup_vdx" + }, + { + "column": "col_unique", + "name": "lookup_unique_vdx" + } + ] + }, + "lookup": { + "column_vindexes": [ + { + "column": "col", + "name": "xxhash" + } + ] + }, + "lookup_unique": { + "column_vindexes": [ + { + "column": "col_unique", + "name": "xxhash" + } + ] } } } \ No newline at end of file