Skip to content

Commit

Permalink
CI: testing self referencing tables in foreign key stress tests (#16216)
Browse files Browse the repository at this point in the history
Signed-off-by: Shlomi Noach <[email protected]>
  • Loading branch information
shlomi-noach authored Jun 19, 2024
1 parent 4a7ad80 commit 1cc3e14
Showing 1 changed file with 153 additions and 46 deletions.
199 changes: 153 additions & 46 deletions go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ import (
// +- stress_child
// +- stress_grandchild
// +- stress_child2
// stress_self <- (self-referencing)
// stress_nofk (no foreign key; control group)
// - Create these tables. Then, on the MySQL replica, remove the foreign key constraints.
// - Static test:
// - Randomly populate all tables via highly-contentive INSERT/UPDATE/DELETE statements
Expand Down Expand Up @@ -151,8 +153,9 @@ var (
childTableName = "stress_child"
child2TableName = "stress_child2"
grandchildTableName = "stress_grandchild"
selfTableName = "stress_self"
nofkTableName = "stress_nofk"
tableNames = []string{parentTableName, childTableName, child2TableName, grandchildTableName, nofkTableName}
tableNames = []string{parentTableName, childTableName, child2TableName, grandchildTableName, selfTableName, nofkTableName}
reverseTableNames []string

seedOnce sync.Once
Expand Down Expand Up @@ -193,6 +196,21 @@ var (
) ENGINE=InnoDB
`,
`
CREATE TABLE stress_self (
id bigint not null,
parent_id bigint,
rand_val varchar(32) null default '',
hint_col varchar(64) not null default '',
created_timestamp timestamp not null default current_timestamp,
updates int unsigned not null default 0,
PRIMARY KEY (id),
key parent_id_idx(parent_id),
key created_idx(created_timestamp),
key updates_idx(updates),
CONSTRAINT self_fk FOREIGN KEY (parent_id) REFERENCES stress_self (id) ON DELETE %s ON UPDATE %s
) ENGINE=InnoDB
`,
`
CREATE TABLE stress_child (
id bigint not null,
parent_id bigint,
Expand Down Expand Up @@ -248,6 +266,7 @@ var (
`ALTER TABLE stress_child DROP CONSTRAINT child_parent_fk`,
`ALTER TABLE stress_child2 DROP CONSTRAINT child2_parent_fk`,
`ALTER TABLE stress_grandchild DROP CONSTRAINT grandchild_child_fk`,
`ALTER TABLE stress_self DROP CONSTRAINT self_fk`,
}
alterHintStatement = `
ALTER TABLE %s modify hint_col varchar(64) not null default '%s'
Expand Down Expand Up @@ -277,6 +296,9 @@ var (
selectMatchingRowsGrandchild = `
select stress_grandchild.id from stress_grandchild join stress_child on (stress_child.id = stress_grandchild.parent_id)
`
selectMatchingRowsSelf = `
select stress_self_child.id from stress_self as stress_self_child join stress_self as stress_self_parent on (stress_self_parent.id = stress_self_child.parent_id)
`
selectOrphanedRowsChild = `
select stress_child.id from stress_child left join stress_parent on (stress_parent.id = stress_child.parent_id) where stress_parent.id is null
`
Expand All @@ -286,6 +308,10 @@ var (
selectOrphanedRowsGrandchild = `
select stress_grandchild.id from stress_grandchild left join stress_child on (stress_child.id = stress_grandchild.parent_id) where stress_child.id is null
`
selectOrphanedRowsSelf = `
select stress_self_child.id from stress_self as stress_self_child left join stress_self as stress_self_parent on (stress_self_parent.id = stress_self_child.parent_id) where stress_self_parent.id is null
`

selectOrphanedRowsNoFK = `
select stress_nofk.id from stress_nofk left join stress_parent on (stress_parent.id = stress_nofk.parent_id) where stress_parent.id is null
`
Expand Down Expand Up @@ -571,7 +597,7 @@ func ExecuteFKTest(t *testing.T, tcase *testCase) {
wg.Add(1)
go func() {
defer wg.Done()
runSingleConnection(ctx, t, tableName, sleepInterval)
runSingleConnection(ctx, t, tableName, tcase, sleepInterval)
}()
}

Expand Down Expand Up @@ -733,7 +759,7 @@ func TestStressFK(t *testing.T) {

func validateTableDefinitions(t *testing.T, afterOnlineDDL bool) {
t.Run("validate definitions", func(t *testing.T) {
for _, tableName := range []string{childTableName, child2TableName, grandchildTableName} {
for _, tableName := range []string{childTableName, child2TableName, grandchildTableName, selfTableName} {
t.Run(tableName, func(t *testing.T) {
childFKFollowedParentRenameMsg := "found traces of internal vitess table name, suggesting Online DDL on parent table caused this child table to follow the renames parent. 'rename_table_preserve_foreign_key' should have prevented this"
var primaryStmt string
Expand Down Expand Up @@ -787,7 +813,9 @@ func createInitialSchema(t *testing.T, tcase *testCase) {
})
t.Run("waiting for vschema deletions to apply", func(t *testing.T) {
for _, tableName := range tableNames {
utils.WaitForTableDeletions(t, clusterInstance.VtgateProcess, keyspaceName, tableName)
t.Run(tableName, func(t *testing.T) {
utils.WaitForTableDeletions(t, clusterInstance.VtgateProcess, keyspaceName, tableName)
})
}
})
t.Run("creating tables", func(t *testing.T) {
Expand All @@ -797,17 +825,35 @@ func createInitialSchema(t *testing.T, tcase *testCase) {
switch i {
case 0:
// parent table, no foreign keys
require.Contains(t, sql, "CREATE TABLE stress_parent")
b.WriteString(sql)
case 1:
// stress_nofk, no foreign keys
require.Contains(t, sql, "CREATE TABLE stress_nofk")
b.WriteString(sql)
case 2:
// stress_self, self-referencing table
require.Contains(t, sql, "CREATE TABLE stress_self")
onDeleteAction := tcase.onDeleteAction
if onDeleteAction == sqlparser.Cascade {
// We don't test self-referencing tables with ON DELETE CASCADE as some queries/scenarios
// are unsopported and can be rejected.
onDeleteAction = sqlparser.NoAction
}
onUpdateAction := tcase.onUpdateAction
if onUpdateAction == sqlparser.Cascade {
// We don't test self-referencing tables with ON UPDATE CASCADE as some queries/scenarios
// are unsopported and can be rejected.
onUpdateAction = sqlparser.NoAction
}
b.WriteString(fmt.Sprintf(sql, referenceActionMap[onDeleteAction], referenceActionMap[onUpdateAction]))
default:
b.WriteString(fmt.Sprintf(sql, referenceActionMap[tcase.onDeleteAction], referenceActionMap[tcase.onUpdateAction]))
}
b.WriteString(";")
}
err := clusterInstance.VtctldClientProcess.ApplySchema(keyspaceName, b.String())
require.NoError(t, err)
require.NoError(t, err, b.String())
})
if tcase.preStatement != "" {
t.Run("pre-statement", func(t *testing.T) {
Expand All @@ -824,6 +870,7 @@ func createInitialSchema(t *testing.T, tcase *testCase) {
checkTable(t, childTableName, "hint_col")
checkTable(t, child2TableName, "hint_col")
checkTable(t, grandchildTableName, "hint_col")
checkTable(t, selfTableName, "hint_col")
checkTable(t, nofkTableName, "hint_col")
})
t.Run("validating tables: vtgate", func(t *testing.T) {
Expand All @@ -832,12 +879,15 @@ func createInitialSchema(t *testing.T, tcase *testCase) {
waitForTable(t, childTableName, conn)
waitForTable(t, child2TableName, conn)
waitForTable(t, grandchildTableName, conn)
waitForTable(t, selfTableName, conn)
waitForTable(t, nofkTableName, conn)
})
t.Run("waiting for vschema definition to apply", func(t *testing.T) {
for _, tableName := range tableNames {
err := utils.WaitForColumn(t, clusterInstance.VtgateProcess, keyspaceName, tableName, "id")
require.NoError(t, err)
t.Run(tableName, func(t *testing.T) {
err := utils.WaitForColumn(t, clusterInstance.VtgateProcess, keyspaceName, tableName, "id")
require.NoError(t, err)
})
}
})

Expand Down Expand Up @@ -990,6 +1040,8 @@ func isFKError(err error) bool {
return false // bummer, but deadlocks can happen, it's a legit error.
case sqlerror.ERLockNowait:
return false // For some queries we use NOWAIT. Bummer, but this can happen, it's a legit error.
case sqlerror.ERQueryTimeout:
return false // query timed out, not a FK error
case sqlerror.ERNoReferencedRow,
sqlerror.ERRowIsReferenced,
sqlerror.ERRowIsReferenced2,
Expand All @@ -1007,6 +1059,19 @@ func isFKError(err error) bool {
func generateInsert(t *testing.T, tableName string, conn *mysql.Conn) error {
id := rand.Int32N(int32(maxTableRows))
parentId := rand.Int32N(int32(maxTableRows))
if tableName == selfTableName {
// for the self referencing table we really need to help it out in initial population
if rand.IntN(2) == 0 {
parentId = id
}
// Also, we want to avoid loops. We ensure parentId <= id
if parentId > id {
parentId = id - 1
}
if parentId < 0 {
parentId = 0
}
}
query := fmt.Sprintf(insertRowStatement, tableName, id, parentId)
qr, err := conn.ExecuteFetch(query, 1000, true)

Expand Down Expand Up @@ -1097,7 +1162,7 @@ func generateDelete(t *testing.T, tableName string, conn *mysql.Conn) error {
return err
}

func runSingleConnection(ctx context.Context, t *testing.T, tableName string, sleepInterval time.Duration) {
func runSingleConnection(ctx context.Context, t *testing.T, tableName string, tcase *testCase, sleepInterval time.Duration) {
log.Infof("Running single connection on %s", tableName)
conn, err := mysql.Connect(ctx, &vtParams)
require.Nil(t, err)
Expand All @@ -1108,6 +1173,8 @@ func runSingleConnection(ctx context.Context, t *testing.T, tableName string, sl
_, err = conn.ExecuteFetch("set transaction isolation level read committed", 1000, true)
require.Nil(t, err)

ticker := time.NewTicker(sleepInterval)
defer ticker.Stop()
for {
switch rand.Int32N(3) {
case 0:
Expand All @@ -1121,15 +1188,11 @@ func runSingleConnection(ctx context.Context, t *testing.T, tableName string, sl
case <-ctx.Done():
log.Infof("Terminating single connection")
return
case <-time.After(sleepInterval):
case <-ticker.C:
}
}
}

func wrapWithNoFKChecks(sql string) string {
return fmt.Sprintf("set foreign_key_checks=0; %s; set foreign_key_checks=1;", sql)
}

// populateTables randomly populates all test tables. This is done sequentially.
func populateTables(t *testing.T, tcase *testCase) {
log.Infof("initTable begin")
Expand All @@ -1140,13 +1203,26 @@ func populateTables(t *testing.T, tcase *testCase) {
require.Nil(t, err)
defer conn.Close()

t.Logf("===== clearing tables")
for _, tableName := range reverseTableNames {
writeMetrics[tableName].Clear()
deleteQuery := fmt.Sprintf(deleteAllStatement, tableName)
_, err = conn.ExecuteFetch(deleteQuery, 1000, true)
require.Nil(t, err)
}
t.Run("clearing", func(t *testing.T) {
for _, tableName := range reverseTableNames {
t.Run(tableName, func(t *testing.T) {
writeMetrics[tableName].Clear()
t.Run("deleting", func(t *testing.T) {
deleteQuery := fmt.Sprintf(deleteAllStatement, tableName)
_, err = conn.ExecuteFetch(deleteQuery, 1000, true)
require.Nil(t, err)
})
t.Run("counting after delete", func(t *testing.T) {
rs, err := conn.ExecuteFetch(fmt.Sprintf(selectCountRowsStatement, tableName), 1000, true)
require.Nil(t, err)
row := rs.Named().Row()
require.NotNil(t, row)
numRows := row.AsInt64("num_rows", -1)
require.Zero(t, numRows)
})
})
}
})
// In an ideal world we would randomly re-seed the tables in each and every instance of the test.
// In reality, that takes a lot of time, and while the seeding is important, it's not the heart of
// the test. To that effect, the seeding works as follows:
Expand Down Expand Up @@ -1198,36 +1274,59 @@ func populateTables(t *testing.T, tcase *testCase) {
if !tablesSeeded {
t.Run("reseeding", func(t *testing.T) {
for _, tableName := range tableNames {
ignoreModifier := ""
if tcase.reseedInsertIgnore {
ignoreModifier = "ignore"
}
seedQuery := fmt.Sprintf("insert %s into %s select * from %s_seed", ignoreModifier, tableName, tableName)
_, err := conn.ExecuteFetch(seedQuery, 1000, true)
require.NoError(t, err)
t.Run(tableName, func(t *testing.T) {
ignoreModifier := ""
if tcase.reseedInsertIgnore {
ignoreModifier = "ignore"
}
if tableName == selfTableName {
ignoreModifier = "ignore"
}

for {
// In MySQL, for self-referencing tables, you can't just INSERT INTO ... SELECT ; some rows will fail.
// So we use "IGNORE" and repeatetly try until all rows are inserted.
// At least one row is expected to succeed at each attempt
seedQuery := fmt.Sprintf("insert %s into %s select * from %s_seed", ignoreModifier, tableName, tableName)
rs, err := conn.ExecuteFetch(seedQuery, 1000, true)
if tableName != selfTableName {
require.NoError(t, err)
return
}
if err == nil {
// All done
return
}
require.ErrorContains(t, err, "foreign key constraint fails")
require.NotNil(t, rs)
require.NotZero(t, rs.RowsAffected) // This ensures we make a progress of at least one row at each iteration
}
})
}
})
}

t.Run("validating table rows", func(t *testing.T) {
for _, tableName := range tableNames {
validationQuery := fmt.Sprintf(selectCountRowsStatement, tableName)
rs, err := conn.ExecuteFetch(validationQuery, 1000, true)
require.NoError(t, err)
row := rs.Named().Row()
require.NotNil(t, row)
numRows := row.AsInt64("num_rows", 0)
sumUpdates := row.AsInt64("sum_updates", 0)
require.NotZero(t, numRows)
if !tablesSeeded {
// We cloned the data from *_seed tables. This means we didn't populate writeMetrics. Now,
// this function only takes care of the base seed. We will later on run a stress workload on
// these tables, at the end of which we will examine the writeMetrics. We thus have to have those
// metrics consistent with the cloned data. It's a bit ugly, but we inject fake writeMetrics.
writeMetrics[tableName].deletes = 1
writeMetrics[tableName].inserts = numRows + writeMetrics[tableName].deletes
writeMetrics[tableName].updates = sumUpdates + writeMetrics[tableName].deletes
}
t.Run(tableName, func(t *testing.T) {
validationQuery := fmt.Sprintf(selectCountRowsStatement, tableName)
rs, err := conn.ExecuteFetch(validationQuery, 1000, true)
require.NoError(t, err)
row := rs.Named().Row()
require.NotNil(t, row)
numRows := row.AsInt64("num_rows", 0)
sumUpdates := row.AsInt64("sum_updates", 0)
require.NotZero(t, numRows)
if !tablesSeeded {
// We cloned the data from *_seed tables. This means we didn't populate writeMetrics. Now,
// this function only takes care of the base seed. We will later on run a stress workload on
// these tables, at the end of which we will examine the writeMetrics. We thus have to have those
// metrics consistent with the cloned data. It's a bit ugly, but we inject fake writeMetrics.
writeMetrics[tableName].deletes = 1
writeMetrics[tableName].inserts = numRows + writeMetrics[tableName].deletes
writeMetrics[tableName].updates = sumUpdates + writeMetrics[tableName].deletes
}
})
}
})
}
Expand Down Expand Up @@ -1283,10 +1382,10 @@ func testSelectTableFKErrors(
writeMetrics[tableName].mu.Lock()
defer writeMetrics[tableName].mu.Unlock()

if tcase.onDeleteAction == sqlparser.Cascade {
if tcase.onDeleteAction == sqlparser.Cascade && tableName != selfTableName {
assert.Zerof(t, writeMetrics[tableName].deletesFKErrors, "unexpected foreign key errors for DELETEs in ON DELETE CASCADE. Sample error: %v", writeMetrics[tableName].sampleDeleteFKError)
}
if tcase.onUpdateAction == sqlparser.Cascade {
if tcase.onUpdateAction == sqlparser.Cascade && tableName != selfTableName {
assert.Zerof(t, writeMetrics[tableName].updatesFKErrors, "unexpected foreign key errors for UPDATEs in ON UPDATE CASCADE. Sample error: %v", writeMetrics[tableName].sampleUpdateFKError)
}
}
Expand Down Expand Up @@ -1319,6 +1418,10 @@ func testFKIntegrity(
rs := queryTablet(t, tablet, selectMatchingRowsGrandchild, "")
assert.NotZero(t, len(rs.Rows))
})
t.Run("matching self rows", func(t *testing.T) {
rs := queryTablet(t, tablet, selectMatchingRowsSelf, "")
assert.NotZero(t, len(rs.Rows))
})
if tcase.onDeleteAction != sqlparser.SetNull && tcase.onUpdateAction != sqlparser.SetNull {
// Because with SET NULL there _are_ orphaned rows
t.Run("parent-child orphaned rows", func(t *testing.T) {
Expand All @@ -1333,6 +1436,10 @@ func testFKIntegrity(
rs := queryTablet(t, tablet, selectOrphanedRowsGrandchild, "")
assert.Zero(t, len(rs.Rows))
})
t.Run("self orphaned rows", func(t *testing.T) {
rs := queryTablet(t, tablet, selectOrphanedRowsSelf, "")
assert.Zero(t, len(rs.Rows))
})
if !tcase.skipNofkOrphanedRows {
t.Run("parent-nofk orphaned rows", func(t *testing.T) {
rs := queryTablet(t, tablet, selectOrphanedRowsNoFK, "")
Expand Down

0 comments on commit 1cc3e14

Please sign in to comment.