diff --git a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go index 1723b182d7c..34114152e4e 100644 --- a/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go +++ b/go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go @@ -65,6 +65,8 @@ import ( // +- stress_child // +- stress_grandchild // +- stress_child2 +// stress_self <- (self-referencing) +// stress_nofk (no foreign key; control group) // - Create these tables. Then, on the MySQL replica, remove the foreign key constraints. // - Static test: // - Randomly populate all tables via highly-contentive INSERT/UPDATE/DELETE statements @@ -151,8 +153,9 @@ var ( childTableName = "stress_child" child2TableName = "stress_child2" grandchildTableName = "stress_grandchild" + selfTableName = "stress_self" nofkTableName = "stress_nofk" - tableNames = []string{parentTableName, childTableName, child2TableName, grandchildTableName, nofkTableName} + tableNames = []string{parentTableName, childTableName, child2TableName, grandchildTableName, selfTableName, nofkTableName} reverseTableNames []string seedOnce sync.Once @@ -193,6 +196,21 @@ var ( ) ENGINE=InnoDB `, ` + CREATE TABLE stress_self ( + id bigint not null, + parent_id bigint, + rand_val varchar(32) null default '', + hint_col varchar(64) not null default '', + created_timestamp timestamp not null default current_timestamp, + updates int unsigned not null default 0, + PRIMARY KEY (id), + key parent_id_idx(parent_id), + key created_idx(created_timestamp), + key updates_idx(updates), + CONSTRAINT self_fk FOREIGN KEY (parent_id) REFERENCES stress_self (id) ON DELETE %s ON UPDATE %s + ) ENGINE=InnoDB + `, + ` CREATE TABLE stress_child ( id bigint not null, parent_id bigint, @@ -248,6 +266,7 @@ var ( `ALTER TABLE stress_child DROP CONSTRAINT child_parent_fk`, `ALTER TABLE stress_child2 DROP CONSTRAINT child2_parent_fk`, `ALTER TABLE stress_grandchild DROP CONSTRAINT grandchild_child_fk`, + `ALTER TABLE stress_self DROP CONSTRAINT self_fk`, } alterHintStatement = ` ALTER TABLE %s modify hint_col varchar(64) not null default '%s' @@ -277,6 +296,9 @@ var ( selectMatchingRowsGrandchild = ` select stress_grandchild.id from stress_grandchild join stress_child on (stress_child.id = stress_grandchild.parent_id) ` + selectMatchingRowsSelf = ` + select stress_self_child.id from stress_self as stress_self_child join stress_self as stress_self_parent on (stress_self_parent.id = stress_self_child.parent_id) + ` selectOrphanedRowsChild = ` select stress_child.id from stress_child left join stress_parent on (stress_parent.id = stress_child.parent_id) where stress_parent.id is null ` @@ -286,6 +308,10 @@ var ( selectOrphanedRowsGrandchild = ` select stress_grandchild.id from stress_grandchild left join stress_child on (stress_child.id = stress_grandchild.parent_id) where stress_child.id is null ` + selectOrphanedRowsSelf = ` + select stress_self_child.id from stress_self as stress_self_child left join stress_self as stress_self_parent on (stress_self_parent.id = stress_self_child.parent_id) where stress_self_parent.id is null + ` + selectOrphanedRowsNoFK = ` select stress_nofk.id from stress_nofk left join stress_parent on (stress_parent.id = stress_nofk.parent_id) where stress_parent.id is null ` @@ -571,7 +597,7 @@ func ExecuteFKTest(t *testing.T, tcase *testCase) { wg.Add(1) go func() { defer wg.Done() - runSingleConnection(ctx, t, tableName, sleepInterval) + runSingleConnection(ctx, t, tableName, tcase, sleepInterval) }() } @@ -733,7 +759,7 @@ func TestStressFK(t *testing.T) { func validateTableDefinitions(t *testing.T, afterOnlineDDL bool) { t.Run("validate definitions", func(t *testing.T) { - for _, tableName := range []string{childTableName, child2TableName, grandchildTableName} { + for _, tableName := range []string{childTableName, child2TableName, grandchildTableName, selfTableName} { t.Run(tableName, func(t *testing.T) { childFKFollowedParentRenameMsg := "found traces of internal vitess table name, suggesting Online DDL on parent table caused this child table to follow the renames parent. 'rename_table_preserve_foreign_key' should have prevented this" var primaryStmt string @@ -787,7 +813,9 @@ func createInitialSchema(t *testing.T, tcase *testCase) { }) t.Run("waiting for vschema deletions to apply", func(t *testing.T) { for _, tableName := range tableNames { - utils.WaitForTableDeletions(t, clusterInstance.VtgateProcess, keyspaceName, tableName) + t.Run(tableName, func(t *testing.T) { + utils.WaitForTableDeletions(t, clusterInstance.VtgateProcess, keyspaceName, tableName) + }) } }) t.Run("creating tables", func(t *testing.T) { @@ -797,17 +825,35 @@ func createInitialSchema(t *testing.T, tcase *testCase) { switch i { case 0: // parent table, no foreign keys + require.Contains(t, sql, "CREATE TABLE stress_parent") b.WriteString(sql) case 1: // stress_nofk, no foreign keys + require.Contains(t, sql, "CREATE TABLE stress_nofk") b.WriteString(sql) + case 2: + // stress_self, self-referencing table + require.Contains(t, sql, "CREATE TABLE stress_self") + onDeleteAction := tcase.onDeleteAction + if onDeleteAction == sqlparser.Cascade { + // We don't test self-referencing tables with ON DELETE CASCADE as some queries/scenarios + // are unsopported and can be rejected. + onDeleteAction = sqlparser.NoAction + } + onUpdateAction := tcase.onUpdateAction + if onUpdateAction == sqlparser.Cascade { + // We don't test self-referencing tables with ON UPDATE CASCADE as some queries/scenarios + // are unsopported and can be rejected. + onUpdateAction = sqlparser.NoAction + } + b.WriteString(fmt.Sprintf(sql, referenceActionMap[onDeleteAction], referenceActionMap[onUpdateAction])) default: b.WriteString(fmt.Sprintf(sql, referenceActionMap[tcase.onDeleteAction], referenceActionMap[tcase.onUpdateAction])) } b.WriteString(";") } err := clusterInstance.VtctldClientProcess.ApplySchema(keyspaceName, b.String()) - require.NoError(t, err) + require.NoError(t, err, b.String()) }) if tcase.preStatement != "" { t.Run("pre-statement", func(t *testing.T) { @@ -824,6 +870,7 @@ func createInitialSchema(t *testing.T, tcase *testCase) { checkTable(t, childTableName, "hint_col") checkTable(t, child2TableName, "hint_col") checkTable(t, grandchildTableName, "hint_col") + checkTable(t, selfTableName, "hint_col") checkTable(t, nofkTableName, "hint_col") }) t.Run("validating tables: vtgate", func(t *testing.T) { @@ -832,12 +879,15 @@ func createInitialSchema(t *testing.T, tcase *testCase) { waitForTable(t, childTableName, conn) waitForTable(t, child2TableName, conn) waitForTable(t, grandchildTableName, conn) + waitForTable(t, selfTableName, conn) waitForTable(t, nofkTableName, conn) }) t.Run("waiting for vschema definition to apply", func(t *testing.T) { for _, tableName := range tableNames { - err := utils.WaitForColumn(t, clusterInstance.VtgateProcess, keyspaceName, tableName, "id") - require.NoError(t, err) + t.Run(tableName, func(t *testing.T) { + err := utils.WaitForColumn(t, clusterInstance.VtgateProcess, keyspaceName, tableName, "id") + require.NoError(t, err) + }) } }) @@ -990,6 +1040,8 @@ func isFKError(err error) bool { return false // bummer, but deadlocks can happen, it's a legit error. case sqlerror.ERLockNowait: return false // For some queries we use NOWAIT. Bummer, but this can happen, it's a legit error. + case sqlerror.ERQueryTimeout: + return false // query timed out, not a FK error case sqlerror.ERNoReferencedRow, sqlerror.ERRowIsReferenced, sqlerror.ERRowIsReferenced2, @@ -1007,6 +1059,19 @@ func isFKError(err error) bool { func generateInsert(t *testing.T, tableName string, conn *mysql.Conn) error { id := rand.Int32N(int32(maxTableRows)) parentId := rand.Int32N(int32(maxTableRows)) + if tableName == selfTableName { + // for the self referencing table we really need to help it out in initial population + if rand.IntN(2) == 0 { + parentId = id + } + // Also, we want to avoid loops. We ensure parentId <= id + if parentId > id { + parentId = id - 1 + } + if parentId < 0 { + parentId = 0 + } + } query := fmt.Sprintf(insertRowStatement, tableName, id, parentId) qr, err := conn.ExecuteFetch(query, 1000, true) @@ -1097,7 +1162,7 @@ func generateDelete(t *testing.T, tableName string, conn *mysql.Conn) error { return err } -func runSingleConnection(ctx context.Context, t *testing.T, tableName string, sleepInterval time.Duration) { +func runSingleConnection(ctx context.Context, t *testing.T, tableName string, tcase *testCase, sleepInterval time.Duration) { log.Infof("Running single connection on %s", tableName) conn, err := mysql.Connect(ctx, &vtParams) require.Nil(t, err) @@ -1108,6 +1173,8 @@ func runSingleConnection(ctx context.Context, t *testing.T, tableName string, sl _, err = conn.ExecuteFetch("set transaction isolation level read committed", 1000, true) require.Nil(t, err) + ticker := time.NewTicker(sleepInterval) + defer ticker.Stop() for { switch rand.Int32N(3) { case 0: @@ -1121,15 +1188,11 @@ func runSingleConnection(ctx context.Context, t *testing.T, tableName string, sl case <-ctx.Done(): log.Infof("Terminating single connection") return - case <-time.After(sleepInterval): + case <-ticker.C: } } } -func wrapWithNoFKChecks(sql string) string { - return fmt.Sprintf("set foreign_key_checks=0; %s; set foreign_key_checks=1;", sql) -} - // populateTables randomly populates all test tables. This is done sequentially. func populateTables(t *testing.T, tcase *testCase) { log.Infof("initTable begin") @@ -1140,13 +1203,26 @@ func populateTables(t *testing.T, tcase *testCase) { require.Nil(t, err) defer conn.Close() - t.Logf("===== clearing tables") - for _, tableName := range reverseTableNames { - writeMetrics[tableName].Clear() - deleteQuery := fmt.Sprintf(deleteAllStatement, tableName) - _, err = conn.ExecuteFetch(deleteQuery, 1000, true) - require.Nil(t, err) - } + t.Run("clearing", func(t *testing.T) { + for _, tableName := range reverseTableNames { + t.Run(tableName, func(t *testing.T) { + writeMetrics[tableName].Clear() + t.Run("deleting", func(t *testing.T) { + deleteQuery := fmt.Sprintf(deleteAllStatement, tableName) + _, err = conn.ExecuteFetch(deleteQuery, 1000, true) + require.Nil(t, err) + }) + t.Run("counting after delete", func(t *testing.T) { + rs, err := conn.ExecuteFetch(fmt.Sprintf(selectCountRowsStatement, tableName), 1000, true) + require.Nil(t, err) + row := rs.Named().Row() + require.NotNil(t, row) + numRows := row.AsInt64("num_rows", -1) + require.Zero(t, numRows) + }) + }) + } + }) // In an ideal world we would randomly re-seed the tables in each and every instance of the test. // In reality, that takes a lot of time, and while the seeding is important, it's not the heart of // the test. To that effect, the seeding works as follows: @@ -1198,36 +1274,59 @@ func populateTables(t *testing.T, tcase *testCase) { if !tablesSeeded { t.Run("reseeding", func(t *testing.T) { for _, tableName := range tableNames { - ignoreModifier := "" - if tcase.reseedInsertIgnore { - ignoreModifier = "ignore" - } - seedQuery := fmt.Sprintf("insert %s into %s select * from %s_seed", ignoreModifier, tableName, tableName) - _, err := conn.ExecuteFetch(seedQuery, 1000, true) - require.NoError(t, err) + t.Run(tableName, func(t *testing.T) { + ignoreModifier := "" + if tcase.reseedInsertIgnore { + ignoreModifier = "ignore" + } + if tableName == selfTableName { + ignoreModifier = "ignore" + } + + for { + // In MySQL, for self-referencing tables, you can't just INSERT INTO ... SELECT ; some rows will fail. + // So we use "IGNORE" and repeatetly try until all rows are inserted. + // At least one row is expected to succeed at each attempt + seedQuery := fmt.Sprintf("insert %s into %s select * from %s_seed", ignoreModifier, tableName, tableName) + rs, err := conn.ExecuteFetch(seedQuery, 1000, true) + if tableName != selfTableName { + require.NoError(t, err) + return + } + if err == nil { + // All done + return + } + require.ErrorContains(t, err, "foreign key constraint fails") + require.NotNil(t, rs) + require.NotZero(t, rs.RowsAffected) // This ensures we make a progress of at least one row at each iteration + } + }) } }) } t.Run("validating table rows", func(t *testing.T) { for _, tableName := range tableNames { - validationQuery := fmt.Sprintf(selectCountRowsStatement, tableName) - rs, err := conn.ExecuteFetch(validationQuery, 1000, true) - require.NoError(t, err) - row := rs.Named().Row() - require.NotNil(t, row) - numRows := row.AsInt64("num_rows", 0) - sumUpdates := row.AsInt64("sum_updates", 0) - require.NotZero(t, numRows) - if !tablesSeeded { - // We cloned the data from *_seed tables. This means we didn't populate writeMetrics. Now, - // this function only takes care of the base seed. We will later on run a stress workload on - // these tables, at the end of which we will examine the writeMetrics. We thus have to have those - // metrics consistent with the cloned data. It's a bit ugly, but we inject fake writeMetrics. - writeMetrics[tableName].deletes = 1 - writeMetrics[tableName].inserts = numRows + writeMetrics[tableName].deletes - writeMetrics[tableName].updates = sumUpdates + writeMetrics[tableName].deletes - } + t.Run(tableName, func(t *testing.T) { + validationQuery := fmt.Sprintf(selectCountRowsStatement, tableName) + rs, err := conn.ExecuteFetch(validationQuery, 1000, true) + require.NoError(t, err) + row := rs.Named().Row() + require.NotNil(t, row) + numRows := row.AsInt64("num_rows", 0) + sumUpdates := row.AsInt64("sum_updates", 0) + require.NotZero(t, numRows) + if !tablesSeeded { + // We cloned the data from *_seed tables. This means we didn't populate writeMetrics. Now, + // this function only takes care of the base seed. We will later on run a stress workload on + // these tables, at the end of which we will examine the writeMetrics. We thus have to have those + // metrics consistent with the cloned data. It's a bit ugly, but we inject fake writeMetrics. + writeMetrics[tableName].deletes = 1 + writeMetrics[tableName].inserts = numRows + writeMetrics[tableName].deletes + writeMetrics[tableName].updates = sumUpdates + writeMetrics[tableName].deletes + } + }) } }) } @@ -1283,10 +1382,10 @@ func testSelectTableFKErrors( writeMetrics[tableName].mu.Lock() defer writeMetrics[tableName].mu.Unlock() - if tcase.onDeleteAction == sqlparser.Cascade { + if tcase.onDeleteAction == sqlparser.Cascade && tableName != selfTableName { assert.Zerof(t, writeMetrics[tableName].deletesFKErrors, "unexpected foreign key errors for DELETEs in ON DELETE CASCADE. Sample error: %v", writeMetrics[tableName].sampleDeleteFKError) } - if tcase.onUpdateAction == sqlparser.Cascade { + if tcase.onUpdateAction == sqlparser.Cascade && tableName != selfTableName { assert.Zerof(t, writeMetrics[tableName].updatesFKErrors, "unexpected foreign key errors for UPDATEs in ON UPDATE CASCADE. Sample error: %v", writeMetrics[tableName].sampleUpdateFKError) } } @@ -1319,6 +1418,10 @@ func testFKIntegrity( rs := queryTablet(t, tablet, selectMatchingRowsGrandchild, "") assert.NotZero(t, len(rs.Rows)) }) + t.Run("matching self rows", func(t *testing.T) { + rs := queryTablet(t, tablet, selectMatchingRowsSelf, "") + assert.NotZero(t, len(rs.Rows)) + }) if tcase.onDeleteAction != sqlparser.SetNull && tcase.onUpdateAction != sqlparser.SetNull { // Because with SET NULL there _are_ orphaned rows t.Run("parent-child orphaned rows", func(t *testing.T) { @@ -1333,6 +1436,10 @@ func testFKIntegrity( rs := queryTablet(t, tablet, selectOrphanedRowsGrandchild, "") assert.Zero(t, len(rs.Rows)) }) + t.Run("self orphaned rows", func(t *testing.T) { + rs := queryTablet(t, tablet, selectOrphanedRowsSelf, "") + assert.Zero(t, len(rs.Rows)) + }) if !tcase.skipNofkOrphanedRows { t.Run("parent-nofk orphaned rows", func(t *testing.T) { rs := queryTablet(t, tablet, selectOrphanedRowsNoFK, "")