diff --git a/go/test/endtoend/vtgate/transaction/twopc/main_test.go b/go/test/endtoend/vtgate/transaction/twopc/main_test.go new file mode 100644 index 00000000000..102235d672a --- /dev/null +++ b/go/test/endtoend/vtgate/transaction/twopc/main_test.go @@ -0,0 +1,271 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package transaction + +import ( + "context" + _ "embed" + "encoding/json" + "flag" + "fmt" + "io" + "os" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/test/endtoend/cluster" + "vitess.io/vitess/go/test/endtoend/utils" + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + querypb "vitess.io/vitess/go/vt/proto/query" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/vtgate/vtgateconn" +) + +var ( + clusterInstance *cluster.LocalProcessCluster + vtParams mysql.ConnParams + vtgateGrpcAddress string + keyspaceName = "ks" + cell = "zone1" + hostname = "localhost" + sidecarDBName = "vt_ks" + + //go:embed schema.sql + SchemaSQL string + + //go:embed vschema.json + VSchema string +) + +func TestMain(m *testing.M) { + defer cluster.PanicHandler(nil) + flag.Parse() + + exitcode := func() int { + clusterInstance = cluster.NewCluster(cell, hostname) + defer clusterInstance.Teardown() + + // Start topo server + if err := clusterInstance.StartTopo(); err != nil { + return 1 + } + + // Reserve vtGate port in order to pass it to vtTablet + clusterInstance.VtgateGrpcPort = clusterInstance.GetAndReservePort() + + // Set extra args for twopc + clusterInstance.VtGateExtraArgs = append(clusterInstance.VtGateExtraArgs, + "--transaction_mode", "TWOPC", + ) + clusterInstance.VtTabletExtraArgs = append(clusterInstance.VtTabletExtraArgs, + "--twopc_enable", + "--twopc_coordinator_address", fmt.Sprintf("localhost:%d", clusterInstance.VtgateGrpcPort), + "--twopc_abandon_age", "3600", + "--queryserver-config-transaction-cap", "3", + ) + + // Start keyspace + keyspace := &cluster.Keyspace{ + Name: keyspaceName, + SchemaSQL: SchemaSQL, + VSchema: VSchema, + SidecarDBName: sidecarDBName, + } + if err := clusterInstance.StartKeyspace(*keyspace, []string{"-80", "80-"}, 1, false); err != nil { + return 1 + } + + // Start Vtgate + if err := clusterInstance.StartVtgate(); err != nil { + return 1 + } + vtParams = clusterInstance.GetVTParams(keyspaceName) + vtgateGrpcAddress = fmt.Sprintf("%s:%d", clusterInstance.Hostname, clusterInstance.VtgateGrpcPort) + + return m.Run() + }() + os.Exit(exitcode) +} + +func start(t *testing.T) (*mysql.Conn, func()) { + ctx := context.Background() + conn, err := mysql.Connect(ctx, &vtParams) + require.NoError(t, err) + + deleteAll := func() { + tables := []string{"twopc_user"} + for _, table := range tables { + _, _ = utils.ExecAllowError(t, conn, "delete from "+table) + } + } + + deleteAll() + + return conn, func() { + deleteAll() + conn.Close() + cluster.PanicHandler(t) + } +} + +type extractInterestingValues func(dtidMap map[string]string, vals []sqltypes.Value) []sqltypes.Value + +var tables = map[string]extractInterestingValues{ + "ks.dt_state": func(dtidMap map[string]string, vals []sqltypes.Value) (out []sqltypes.Value) { + dtid := getDTID(dtidMap, vals[0].ToString()) + dtState := getDTState(vals[1]) + out = append(out, sqltypes.NewVarChar(dtid), sqltypes.NewVarChar(dtState.String())) + return + }, + "ks.dt_participant": func(dtidMap map[string]string, vals []sqltypes.Value) (out []sqltypes.Value) { + dtid := getDTID(dtidMap, vals[0].ToString()) + out = append([]sqltypes.Value{sqltypes.NewVarChar(dtid)}, vals[1:]...) + return + }, + "ks.redo_state": func(dtidMap map[string]string, vals []sqltypes.Value) (out []sqltypes.Value) { + dtid := getDTID(dtidMap, vals[0].ToString()) + dtState := getDTState(vals[1]) + out = append(out, sqltypes.NewVarChar(dtid), sqltypes.NewVarChar(dtState.String())) + return + }, + "ks.redo_statement": func(dtidMap map[string]string, vals []sqltypes.Value) (out []sqltypes.Value) { + dtid := getDTID(dtidMap, vals[0].ToString()) + out = append([]sqltypes.Value{sqltypes.NewVarChar(dtid)}, vals[1:]...) + return + }, + "ks.twopc_user": func(_ map[string]string, vals []sqltypes.Value) []sqltypes.Value { return vals }, +} + +func getDTState(val sqltypes.Value) querypb.TransactionState { + s, _ := val.ToInt() + return querypb.TransactionState(s) +} + +func getDTID(dtidMap map[string]string, dtKey string) string { + dtid, exists := dtidMap[dtKey] + if !exists { + dtid = fmt.Sprintf("dtid-%d", len(dtidMap)+1) + dtidMap[dtKey] = dtid + } + return dtid +} + +func runVStream(t *testing.T, ctx context.Context, ch chan *binlogdatapb.VEvent, vtgateConn *vtgateconn.VTGateConn) { + vgtid := &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{ + {Keyspace: keyspaceName, Shard: "-80", Gtid: "current"}, + {Keyspace: keyspaceName, Shard: "80-", Gtid: "current"}, + }} + filter := &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "/.*/", + }}, + } + vReader, err := vtgateConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, nil) + require.NoError(t, err) + + // Use a channel to signal that the first VGTID event has been processed + firstEventProcessed := make(chan struct{}) + var once sync.Once + + go func() { + for { + evs, err := vReader.Recv() + if err == io.EOF || ctx.Err() != nil { + return + } + require.NoError(t, err) + + for _, ev := range evs { + // Signal the first event has been processed using sync.Once + if ev.Type == binlogdatapb.VEventType_VGTID { + once.Do(func() { close(firstEventProcessed) }) + } + if ev.Type == binlogdatapb.VEventType_ROW || ev.Type == binlogdatapb.VEventType_FIELD { + ch <- ev + } + } + } + }() + + // Wait for the first event to be processed + <-firstEventProcessed +} + +func retrieveTransitions(t *testing.T, ch chan *binlogdatapb.VEvent, tableMap map[string][]*querypb.Field, dtMap map[string]string) map[string][]string { + logTable := make(map[string][]string) + + keepWaiting := true + for keepWaiting { + select { + case re := <-ch: + if re.RowEvent != nil { + shard := re.RowEvent.Shard + tableName := re.RowEvent.TableName + fields, ok := tableMap[tableName] + require.Truef(t, ok, "table %s not found in fields map", tableName) + for _, rc := range re.RowEvent.RowChanges { + logEvent(logTable, dtMap, shard, tableName, fields, rc) + } + } + if re.FieldEvent != nil { + tableMap[re.FieldEvent.TableName] = re.FieldEvent.Fields + } + case <-time.After(1 * time.Second): + keepWaiting = false + } + } + return logTable +} + +func logEvent(logTable map[string][]string, dtMap map[string]string, shard string, tbl string, fields []*querypb.Field, rc *binlogdatapb.RowChange) { + key := fmt.Sprintf("%s:%s", tbl, shard) + + var eventType string + var vals []sqltypes.Value + switch { + case rc.Before == nil && rc.After == nil: + panic("do not expect row event with both before and after nil") + case rc.Before == nil: + eventType = "insert" + vals = sqltypes.MakeRowTrusted(fields, rc.After) + case rc.After == nil: + eventType = "delete" + vals = sqltypes.MakeRowTrusted(fields, rc.Before) + default: + eventType = "update" + vals = sqltypes.MakeRowTrusted(fields, rc.After) + } + execFunc, exists := tables[tbl] + if exists { + vals = execFunc(dtMap, vals) + } + logTable[key] = append(logTable[key], fmt.Sprintf("%s:%v", eventType, vals)) +} + +func prettyPrint(v interface{}) string { + b, err := json.MarshalIndent(v, "", " ") + if err != nil { + return fmt.Sprintf("got error marshalling: %v", err) + } + return string(b) +} diff --git a/go/test/endtoend/vtgate/transaction/twopc/schema.sql b/go/test/endtoend/vtgate/transaction/twopc/schema.sql new file mode 100644 index 00000000000..60a7c19837c --- /dev/null +++ b/go/test/endtoend/vtgate/transaction/twopc/schema.sql @@ -0,0 +1,12 @@ +create table twopc_user ( + id bigint, + name varchar(64), + primary key (id) +) Engine=InnoDB; + +create table twopc_music ( + id varchar(64), + user_id bigint, + title varchar(64), + primary key (id) +) Engine=InnoDB; \ No newline at end of file diff --git a/go/test/endtoend/vtgate/transaction/twopc/twopc_test.go b/go/test/endtoend/vtgate/transaction/twopc/twopc_test.go new file mode 100644 index 00000000000..f18073c5827 --- /dev/null +++ b/go/test/endtoend/vtgate/transaction/twopc/twopc_test.go @@ -0,0 +1,523 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package transaction + +import ( + "context" + _ "embed" + "reflect" + "sort" + "sync" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/test/endtoend/cluster" + "vitess.io/vitess/go/test/endtoend/utils" + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + querypb "vitess.io/vitess/go/vt/proto/query" +) + +// TestDTCommit tests distributed transaction commit for insert, update and delete operations +// It verifies the binlog events for the same with transaction state changes and redo statements. +func TestDTCommit(t *testing.T) { + conn, closer := start(t) + defer closer() + + vtgateConn, err := cluster.DialVTGate(context.Background(), t.Name(), vtgateGrpcAddress, "fk_user", "") + require.NoError(t, err) + defer vtgateConn.Close() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ch := make(chan *binlogdatapb.VEvent) + runVStream(t, ctx, ch, vtgateConn) + + // Insert into multiple shards + utils.Exec(t, conn, "begin") + utils.Exec(t, conn, "insert into twopc_user(id, name) values(7,'foo')") + utils.Exec(t, conn, "insert into twopc_user(id, name) values(8,'bar')") + utils.Exec(t, conn, "insert into twopc_user(id, name) values(9,'baz')") + utils.Exec(t, conn, "insert into twopc_user(id, name) values(10,'apa')") + utils.Exec(t, conn, "commit") + + tableMap := make(map[string][]*querypb.Field) + dtMap := make(map[string]string) + logTable := retrieveTransitions(t, ch, tableMap, dtMap) + expectations := map[string][]string{ + "ks.dt_state:80-": { + "insert:[VARCHAR(\"dtid-1\") VARCHAR(\"PREPARE\")]", + "update:[VARCHAR(\"dtid-1\") VARCHAR(\"COMMIT\")]", + "delete:[VARCHAR(\"dtid-1\") VARCHAR(\"COMMIT\")]", + }, + "ks.dt_participant:80-": { + "insert:[VARCHAR(\"dtid-1\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"-80\")]", + "delete:[VARCHAR(\"dtid-1\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"-80\")]", + }, + "ks.redo_state:-80": { + "insert:[VARCHAR(\"dtid-1\") VARCHAR(\"PREPARE\")]", + "delete:[VARCHAR(\"dtid-1\") VARCHAR(\"PREPARE\")]", + }, + "ks.redo_statement:-80": { + "insert:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"insert into twopc_user(id, `name`) values (8, 'bar')\")]", + "insert:[VARCHAR(\"dtid-1\") INT64(2) BLOB(\"insert into twopc_user(id, `name`) values (10, 'apa')\")]", + "delete:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"insert into twopc_user(id, `name`) values (8, 'bar')\")]", + "delete:[VARCHAR(\"dtid-1\") INT64(2) BLOB(\"insert into twopc_user(id, `name`) values (10, 'apa')\")]", + }, + "ks.twopc_user:-80": { + `insert:[INT64(8) VARCHAR("bar")]`, + `insert:[INT64(10) VARCHAR("apa")]`, + }, + "ks.twopc_user:80-": { + `insert:[INT64(7) VARCHAR("foo")]`, + `insert:[INT64(9) VARCHAR("baz")]`, + }, + } + assert.Equal(t, expectations, logTable, + "mismatch expected: \n got: %s, want: %s", prettyPrint(logTable), prettyPrint(expectations)) + + // Update from multiple shard + utils.Exec(t, conn, "begin") + utils.Exec(t, conn, "update twopc_user set name='newfoo' where id = 7") + utils.Exec(t, conn, "update twopc_user set name='newfoo' where id = 8") + utils.Exec(t, conn, "commit") + + logTable = retrieveTransitions(t, ch, tableMap, dtMap) + expectations = map[string][]string{ + "ks.dt_state:80-": { + "insert:[VARCHAR(\"dtid-2\") VARCHAR(\"PREPARE\")]", + "update:[VARCHAR(\"dtid-2\") VARCHAR(\"COMMIT\")]", + "delete:[VARCHAR(\"dtid-2\") VARCHAR(\"COMMIT\")]", + }, + "ks.dt_participant:80-": { + "insert:[VARCHAR(\"dtid-2\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"-80\")]", + "delete:[VARCHAR(\"dtid-2\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"-80\")]", + }, + "ks.redo_state:-80": { + "insert:[VARCHAR(\"dtid-2\") VARCHAR(\"PREPARE\")]", + "delete:[VARCHAR(\"dtid-2\") VARCHAR(\"PREPARE\")]", + }, + "ks.redo_statement:-80": { + "insert:[VARCHAR(\"dtid-2\") INT64(1) BLOB(\"update twopc_user set `name` = 'newfoo' where id = 8 limit 10001 /* INT64 */\")]", + "delete:[VARCHAR(\"dtid-2\") INT64(1) BLOB(\"update twopc_user set `name` = 'newfoo' where id = 8 limit 10001 /* INT64 */\")]", + }, + "ks.twopc_user:-80": {"update:[INT64(8) VARCHAR(\"newfoo\")]"}, + "ks.twopc_user:80-": {"update:[INT64(7) VARCHAR(\"newfoo\")]"}, + } + assert.Equal(t, expectations, logTable, + "mismatch expected: \n got: %s, want: %s", prettyPrint(logTable), prettyPrint(expectations)) + + // DELETE from multiple shard + utils.Exec(t, conn, "begin") + utils.Exec(t, conn, "delete from twopc_user where id = 9") + utils.Exec(t, conn, "delete from twopc_user where id = 10") + utils.Exec(t, conn, "commit") + + logTable = retrieveTransitions(t, ch, tableMap, dtMap) + expectations = map[string][]string{ + "ks.dt_state:80-": { + "insert:[VARCHAR(\"dtid-3\") VARCHAR(\"PREPARE\")]", + "update:[VARCHAR(\"dtid-3\") VARCHAR(\"COMMIT\")]", + "delete:[VARCHAR(\"dtid-3\") VARCHAR(\"COMMIT\")]", + }, + "ks.dt_participant:80-": { + "insert:[VARCHAR(\"dtid-3\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"-80\")]", + "delete:[VARCHAR(\"dtid-3\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"-80\")]", + }, + "ks.redo_state:-80": { + "insert:[VARCHAR(\"dtid-3\") VARCHAR(\"PREPARE\")]", + "delete:[VARCHAR(\"dtid-3\") VARCHAR(\"PREPARE\")]", + }, + "ks.redo_statement:-80": { + "insert:[VARCHAR(\"dtid-3\") INT64(1) BLOB(\"delete from twopc_user where id = 10 limit 10001 /* INT64 */\")]", + "delete:[VARCHAR(\"dtid-3\") INT64(1) BLOB(\"delete from twopc_user where id = 10 limit 10001 /* INT64 */\")]", + }, + "ks.twopc_user:-80": {"delete:[INT64(10) VARCHAR(\"apa\")]"}, + "ks.twopc_user:80-": {"delete:[INT64(9) VARCHAR(\"baz\")]"}, + } + assert.Equal(t, expectations, logTable, + "mismatch expected: \n got: %s, want: %s", prettyPrint(logTable), prettyPrint(expectations)) +} + +// TestDTRollback tests distributed transaction rollback for insert, update and delete operations +// There would not be any binlog events for rollback +func TestDTRollback(t *testing.T) { + conn, closer := start(t) + defer closer() + + // Insert initial Data + utils.Exec(t, conn, "insert into twopc_user(id, name) values(7,'foo'), (8,'bar')") + + // run vstream to stream binlogs + vtgateConn, err := cluster.DialVTGate(context.Background(), t.Name(), vtgateGrpcAddress, "fk_user", "") + require.NoError(t, err) + defer vtgateConn.Close() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ch := make(chan *binlogdatapb.VEvent) + runVStream(t, ctx, ch, vtgateConn) + + // Insert into multiple shards + utils.Exec(t, conn, "begin") + utils.Exec(t, conn, "insert into twopc_user(id, name) values(9,'baz')") + utils.Exec(t, conn, "insert into twopc_user(id, name) values(10,'apa')") + utils.Exec(t, conn, "rollback") + + tableMap := make(map[string][]*querypb.Field) + logTable := retrieveTransitions(t, ch, tableMap, nil) + assert.Zero(t, len(logTable), + "no change in binlog expected: got: %s", prettyPrint(logTable)) + + // Update from multiple shard + utils.Exec(t, conn, "begin") + utils.Exec(t, conn, "update twopc_user set name='newfoo' where id = 7") + utils.Exec(t, conn, "update twopc_user set name='newfoo' where id = 8") + utils.Exec(t, conn, "rollback") + + logTable = retrieveTransitions(t, ch, tableMap, nil) + assert.Zero(t, len(logTable), + "no change in binlog expected: got: %s", prettyPrint(logTable)) + + // DELETE from multiple shard + utils.Exec(t, conn, "begin") + utils.Exec(t, conn, "delete from twopc_user where id = 7") + utils.Exec(t, conn, "delete from twopc_user where id = 8") + utils.Exec(t, conn, "rollback") + + logTable = retrieveTransitions(t, ch, tableMap, nil) + assert.Zero(t, len(logTable), + "no change in binlog expected: got: %s", prettyPrint(logTable)) +} + +// TestDTCommitMultiShardTxSingleShardDML tests distributed transaction commit for insert, update and delete operations +// There is DML operation only on single shard but transaction open on multiple shards. +// Metdata Manager is the one which executed the DML operation on the shard. +func TestDTCommitDMLOnlyOnMM(t *testing.T) { + conn, closer := start(t) + defer closer() + + vtgateConn, err := cluster.DialVTGate(context.Background(), t.Name(), vtgateGrpcAddress, "fk_user", "") + require.NoError(t, err) + defer vtgateConn.Close() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ch := make(chan *binlogdatapb.VEvent) + runVStream(t, ctx, ch, vtgateConn) + + // Insert into multiple shards + utils.Exec(t, conn, "begin") + utils.Exec(t, conn, "insert into twopc_user(id, name) values(7,'foo')") + utils.Exec(t, conn, "select * from twopc_user") + utils.Exec(t, conn, "commit") + + tableMap := make(map[string][]*querypb.Field) + dtMap := make(map[string]string) + logTable := retrieveTransitions(t, ch, tableMap, dtMap) + expectations := map[string][]string{ + "ks.dt_state:80-": { + "insert:[VARCHAR(\"dtid-1\") VARCHAR(\"PREPARE\")]", + "update:[VARCHAR(\"dtid-1\") VARCHAR(\"COMMIT\")]", + "delete:[VARCHAR(\"dtid-1\") VARCHAR(\"COMMIT\")]", + }, + "ks.dt_participant:80-": { + "insert:[VARCHAR(\"dtid-1\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"-80\")]", + "delete:[VARCHAR(\"dtid-1\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"-80\")]", + }, + "ks.twopc_user:80-": {"insert:[INT64(7) VARCHAR(\"foo\")]"}, + } + assert.Equal(t, expectations, logTable, + "mismatch expected: \n got: %s, want: %s", prettyPrint(logTable), prettyPrint(expectations)) + + // Update from multiple shard + utils.Exec(t, conn, "begin") + utils.Exec(t, conn, "update twopc_user set name='newfoo' where id = 7") + utils.Exec(t, conn, "select * from twopc_user") + utils.Exec(t, conn, "commit") + + logTable = retrieveTransitions(t, ch, tableMap, dtMap) + expectations = map[string][]string{ + "ks.dt_state:80-": { + "insert:[VARCHAR(\"dtid-2\") VARCHAR(\"PREPARE\")]", + "update:[VARCHAR(\"dtid-2\") VARCHAR(\"COMMIT\")]", + "delete:[VARCHAR(\"dtid-2\") VARCHAR(\"COMMIT\")]", + }, + "ks.dt_participant:80-": { + "insert:[VARCHAR(\"dtid-2\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"-80\")]", + "delete:[VARCHAR(\"dtid-2\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"-80\")]", + }, + "ks.twopc_user:80-": {"update:[INT64(7) VARCHAR(\"newfoo\")]"}, + } + assert.Equal(t, expectations, logTable, + "mismatch expected: \n got: %s, want: %s", prettyPrint(logTable), prettyPrint(expectations)) + + // DELETE from multiple shard + utils.Exec(t, conn, "begin") + utils.Exec(t, conn, "delete from twopc_user where id = 7") + utils.Exec(t, conn, "select * from twopc_user") + utils.Exec(t, conn, "commit") + + logTable = retrieveTransitions(t, ch, tableMap, dtMap) + expectations = map[string][]string{ + "ks.dt_state:80-": { + "insert:[VARCHAR(\"dtid-3\") VARCHAR(\"PREPARE\")]", + "update:[VARCHAR(\"dtid-3\") VARCHAR(\"COMMIT\")]", + "delete:[VARCHAR(\"dtid-3\") VARCHAR(\"COMMIT\")]", + }, + "ks.dt_participant:80-": { + "insert:[VARCHAR(\"dtid-3\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"-80\")]", + "delete:[VARCHAR(\"dtid-3\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"-80\")]", + }, + "ks.twopc_user:80-": {"delete:[INT64(7) VARCHAR(\"newfoo\")]"}, + } + assert.Equal(t, expectations, logTable, + "mismatch expected: \n got: %s, want: %s", prettyPrint(logTable), prettyPrint(expectations)) +} + +// TestDTCommitMultiShardTxSingleShardDML tests distributed transaction commit for insert, update and delete operations +// There is DML operation only on single shard but transaction open on multiple shards. +// Resource Manager is the one which executed the DML operation on the shard. +func TestDTCommitDMLOnlyOnRM(t *testing.T) { + conn, closer := start(t) + defer closer() + + vtgateConn, err := cluster.DialVTGate(context.Background(), t.Name(), vtgateGrpcAddress, "fk_user", "") + require.NoError(t, err) + defer vtgateConn.Close() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ch := make(chan *binlogdatapb.VEvent) + runVStream(t, ctx, ch, vtgateConn) + + // Insert into multiple shards + utils.Exec(t, conn, "begin") + utils.Exec(t, conn, "select * from twopc_user where id = 8") + utils.Exec(t, conn, "insert into twopc_user(id, name) values(7,'foo')") + utils.Exec(t, conn, "commit") + + tableMap := make(map[string][]*querypb.Field) + dtMap := make(map[string]string) + logTable := retrieveTransitions(t, ch, tableMap, dtMap) + expectations := map[string][]string{ + "ks.dt_state:-80": { + "insert:[VARCHAR(\"dtid-1\") VARCHAR(\"PREPARE\")]", + "update:[VARCHAR(\"dtid-1\") VARCHAR(\"COMMIT\")]", + "delete:[VARCHAR(\"dtid-1\") VARCHAR(\"COMMIT\")]", + }, + "ks.dt_participant:-80": { + "insert:[VARCHAR(\"dtid-1\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"80-\")]", + "delete:[VARCHAR(\"dtid-1\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"80-\")]", + }, + "ks.redo_state:80-": { + "insert:[VARCHAR(\"dtid-1\") VARCHAR(\"PREPARE\")]", + "delete:[VARCHAR(\"dtid-1\") VARCHAR(\"PREPARE\")]", + }, + "ks.redo_statement:80-": { + "insert:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"insert into twopc_user(id, `name`) values (7, 'foo')\")]", + "delete:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"insert into twopc_user(id, `name`) values (7, 'foo')\")]", + }, + "ks.twopc_user:80-": {"insert:[INT64(7) VARCHAR(\"foo\")]"}, + } + assert.Equal(t, expectations, logTable, + "mismatch expected: \n got: %s, want: %s", prettyPrint(logTable), prettyPrint(expectations)) + + // Update from multiple shard + utils.Exec(t, conn, "begin") + utils.Exec(t, conn, "select * from twopc_user where id = 8") + utils.Exec(t, conn, "update twopc_user set name='newfoo' where id = 7") + utils.Exec(t, conn, "commit") + + logTable = retrieveTransitions(t, ch, tableMap, dtMap) + expectations = map[string][]string{ + "ks.dt_state:-80": { + "insert:[VARCHAR(\"dtid-2\") VARCHAR(\"PREPARE\")]", + "update:[VARCHAR(\"dtid-2\") VARCHAR(\"COMMIT\")]", + "delete:[VARCHAR(\"dtid-2\") VARCHAR(\"COMMIT\")]", + }, + "ks.dt_participant:-80": { + "insert:[VARCHAR(\"dtid-2\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"80-\")]", + "delete:[VARCHAR(\"dtid-2\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"80-\")]", + }, + "ks.redo_state:80-": { + "insert:[VARCHAR(\"dtid-2\") VARCHAR(\"PREPARE\")]", + "delete:[VARCHAR(\"dtid-2\") VARCHAR(\"PREPARE\")]", + }, + "ks.redo_statement:80-": { + "insert:[VARCHAR(\"dtid-2\") INT64(1) BLOB(\"update twopc_user set `name` = 'newfoo' where id = 7 limit 10001 /* INT64 */\")]", + "delete:[VARCHAR(\"dtid-2\") INT64(1) BLOB(\"update twopc_user set `name` = 'newfoo' where id = 7 limit 10001 /* INT64 */\")]", + }, + "ks.twopc_user:80-": {"update:[INT64(7) VARCHAR(\"newfoo\")]"}, + } + assert.Equal(t, expectations, logTable, + "mismatch expected: \n got: %s, want: %s", prettyPrint(logTable), prettyPrint(expectations)) + + // DELETE from multiple shard + utils.Exec(t, conn, "begin") + utils.Exec(t, conn, "select * from twopc_user where id = 8") + utils.Exec(t, conn, "delete from twopc_user where id = 7") + utils.Exec(t, conn, "commit") + + logTable = retrieveTransitions(t, ch, tableMap, dtMap) + expectations = map[string][]string{ + "ks.dt_state:-80": { + "insert:[VARCHAR(\"dtid-3\") VARCHAR(\"PREPARE\")]", + "update:[VARCHAR(\"dtid-3\") VARCHAR(\"COMMIT\")]", + "delete:[VARCHAR(\"dtid-3\") VARCHAR(\"COMMIT\")]", + }, + "ks.dt_participant:-80": { + "insert:[VARCHAR(\"dtid-3\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"80-\")]", + "delete:[VARCHAR(\"dtid-3\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"80-\")]", + }, + "ks.redo_state:80-": { + "insert:[VARCHAR(\"dtid-3\") VARCHAR(\"PREPARE\")]", + "delete:[VARCHAR(\"dtid-3\") VARCHAR(\"PREPARE\")]", + }, + "ks.redo_statement:80-": { + "insert:[VARCHAR(\"dtid-3\") INT64(1) BLOB(\"delete from twopc_user where id = 7 limit 10001 /* INT64 */\")]", + "delete:[VARCHAR(\"dtid-3\") INT64(1) BLOB(\"delete from twopc_user where id = 7 limit 10001 /* INT64 */\")]", + }, + "ks.twopc_user:80-": {"delete:[INT64(7) VARCHAR(\"newfoo\")]"}, + } + assert.Equal(t, expectations, logTable, + "mismatch expected: \n got: %s, want: %s", prettyPrint(logTable), prettyPrint(expectations)) +} + +// TestDTPrepareFailOnRM tests distributed transaction prepare failure on resource manager +func TestDTPrepareFailOnRM(t *testing.T) { + conn, closer := start(t) + defer closer() + + vtgateConn, err := cluster.DialVTGate(context.Background(), t.Name(), vtgateGrpcAddress, "fk_user", "") + require.NoError(t, err) + defer vtgateConn.Close() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ch := make(chan *binlogdatapb.VEvent) + runVStream(t, ctx, ch, vtgateConn) + + // Insert into multiple shards + utils.Exec(t, conn, "begin") + utils.Exec(t, conn, "insert into twopc_user(id, name) values(7,'foo')") + utils.Exec(t, conn, "insert into twopc_user(id, name) values(8,'bar')") + + ctx2 := context.Background() + conn2, err := mysql.Connect(ctx2, &vtParams) + require.NoError(t, err) + + utils.Exec(t, conn2, "begin") + utils.Exec(t, conn2, "insert into twopc_user(id, name) values(9,'baz')") + utils.Exec(t, conn2, "insert into twopc_user(id, name) values(10,'apa')") + + var wg sync.WaitGroup + wg.Add(2) + var commitErr error + go func() { + _, err := utils.ExecAllowError(t, conn, "commit") + if err != nil { + commitErr = err + } + wg.Done() + }() + go func() { + _, err := utils.ExecAllowError(t, conn2, "commit") + wg.Done() + if err != nil { + commitErr = err + } + }() + wg.Wait() + require.ErrorContains(t, commitErr, "ResourceExhausted desc = prepare failed") + + tableMap := make(map[string][]*querypb.Field) + dtMap := make(map[string]string) + logTable := retrieveTransitions(t, ch, tableMap, dtMap) + expectations := map[string][]string{ + "ks.dt_state:80-": { + "insert:[VARCHAR(\"dtid-1\") VARCHAR(\"PREPARE\")]", + "insert:[VARCHAR(\"dtid-2\") VARCHAR(\"PREPARE\")]", + "update:[VARCHAR(\"dtid-1\") VARCHAR(\"COMMIT\")]", + "update:[VARCHAR(\"dtid-2\") VARCHAR(\"ROLLBACK\")]", + "delete:[VARCHAR(\"dtid-1\") VARCHAR(\"COMMIT\")]", + "delete:[VARCHAR(\"dtid-2\") VARCHAR(\"ROLLBACK\")]", + }, + "ks.dt_participant:80-": { + "insert:[VARCHAR(\"dtid-1\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"-80\")]", + "insert:[VARCHAR(\"dtid-2\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"-80\")]", + "delete:[VARCHAR(\"dtid-1\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"-80\")]", + "delete:[VARCHAR(\"dtid-2\") INT64(1) VARCHAR(\"ks\") VARCHAR(\"-80\")]", + }, + "ks.redo_state:-80": { + "insert:[VARCHAR(\"dtid-1\") VARCHAR(\"PREPARE\")]", + "delete:[VARCHAR(\"dtid-1\") VARCHAR(\"PREPARE\")]", + }, + "ks.redo_statement:-80": { /* flexi Expectation */ }, + "ks.twopc_user:-80": { /* flexi Expectation */ }, + "ks.twopc_user:80-": { /* flexi Expectation */ }, + } + flexiExpectations := map[string][2][]string{ + "ks.redo_statement:-80": {{ + "insert:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"insert into twopc_user(id, `name`) values (8, 'bar')\")]", + "delete:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"insert into twopc_user(id, `name`) values (8, 'bar')\")]", + }, { + "insert:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"insert into twopc_user(id, `name`) values (10, 'apa')\")]", + "delete:[VARCHAR(\"dtid-1\") INT64(1) BLOB(\"insert into twopc_user(id, `name`) values (10, 'apa')\")]", + }}, + "ks.twopc_user:-80": {{ + "insert:[INT64(8) VARCHAR(\"bar\")]", + }, { + "insert:[INT64(10) VARCHAR(\"apa\")]", + }}, + "ks.twopc_user:80-": {{ + "insert:[INT64(7) VARCHAR(\"foo\")]", + }, { + "insert:[INT64(9) VARCHAR(\"baz\")]", + }}, + } + + compareMaps(t, expectations, logTable, flexiExpectations) +} + +func compareMaps(t *testing.T, expected, actual map[string][]string, flexibleExp map[string][2][]string) { + assert.Equal(t, len(expected), len(actual), "mismatch in number of keys: expected: %d, got: %d", len(expected), len(actual)) + + for key, expectedValue := range expected { + actualValue, ok := actual[key] + require.Truef(t, ok, "key %s not found in actual map", key) + + if validValues, isFlexi := flexibleExp[key]; isFlexi { + // For the flexible key, check if the actual value matches one of the valid values + if !reflect.DeepEqual(actualValue, validValues[0]) && !reflect.DeepEqual(actualValue, validValues[1]) { + t.Fatalf("mismatch in values for key '%s': expected one of: %v, got: %v", key, validValues, actualValue) + } + } else { + // Sort the slices before comparison + sort.Strings(expectedValue) + sort.Strings(actualValue) + assert.Equal(t, expectedValue, actualValue, "mismatch in values for key %s: expected: %v, got: %v", key, expectedValue, actualValue) + } + } +} diff --git a/go/test/endtoend/vtgate/transaction/twopc/vschema.json b/go/test/endtoend/vtgate/transaction/twopc/vschema.json new file mode 100644 index 00000000000..4ff62df6808 --- /dev/null +++ b/go/test/endtoend/vtgate/transaction/twopc/vschema.json @@ -0,0 +1,26 @@ +{ + "sharded":true, + "vindexes": { + "xxhash": { + "type": "xxhash" + } + }, + "tables": { + "twopc_user":{ + "column_vindexes": [ + { + "column": "id", + "name": "xxhash" + } + ] + }, + "twopc_music": { + "column_vindexes": [ + { + "column": "user_id", + "name": "xxhash" + } + ] + } + } +} \ No newline at end of file diff --git a/go/vt/vttablet/tabletserver/twopc.go b/go/vt/vttablet/tabletserver/twopc.go index bbc54b8ea57..e53dd40ca8a 100644 --- a/go/vt/vttablet/tabletserver/twopc.go +++ b/go/vt/vttablet/tabletserver/twopc.go @@ -86,6 +86,10 @@ type TwoPC struct { // NewTwoPC creates a TwoPC variable. func NewTwoPC(readPool *connpool.Pool) *TwoPC { tpc := &TwoPC{readPool: readPool} + return tpc +} + +func (tpc *TwoPC) initializeQueries() { dbname := sidecar.GetIdentifier() tpc.insertRedoTx = sqlparser.BuildParsedQuery( "insert into %s.redo_state(dtid, state, time_created) values (%a, %a, %a)", @@ -132,7 +136,6 @@ func NewTwoPC(readPool *connpool.Pool) *TwoPC { "select dtid, time_created from %s.dt_state where time_created < %a", dbname, ":time_created") tpc.readAllTransactions = fmt.Sprintf(sqlReadAllTransactions, dbname, dbname) - return tpc } // Open starts the TwoPC service. @@ -143,6 +146,7 @@ func (tpc *TwoPC) Open(dbconfigs *dbconfigs.DBConfigs) error { } defer conn.Close() tpc.readPool.Open(dbconfigs.AppWithDB(), dbconfigs.DbaWithDB(), dbconfigs.DbaWithDB()) + tpc.initializeQueries() log.Infof("TwoPC: Engine open succeeded") return nil } diff --git a/test/config.json b/test/config.json index 9f753f37ba8..713faf97024 100644 --- a/test/config.json +++ b/test/config.json @@ -824,6 +824,15 @@ "RetryMax": 1, "Tags": [] }, + "vtgate_transaction_twopc": { + "File": "unused.go", + "Args": ["vitess.io/vitess/go/test/endtoend/vtgate/transaction/twopc"], + "Command": [], + "Manual": false, + "Shard": "vtgate_transaction", + "RetryMax": 1, + "Tags": [] + }, "vtgate_transaction_partial_exec": { "File": "unused.go", "Args": ["vitess.io/vitess/go/test/endtoend/vtgate/partialfailure"],