Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Foreign key on update action with non literal values #14278

Merged
merged 47 commits into from
Nov 10, 2023
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
5002c3a
store child foreign key to update exprs mapping
harshit-gangal Oct 14, 2023
929b7b7
refactor: add only unique columns to select expressions
harshit-gangal Oct 14, 2023
3bcf8ae
Merge remote-tracking branch 'upstream/main' into fk-updstmt
GuptaManan100 Oct 18, 2023
fab5e3f
test: improve the fuzzer test to generate expressions containing colu…
GuptaManan100 Oct 18, 2023
f7734cd
feat: add basic planning and execution support for update non-literal…
GuptaManan100 Oct 19, 2023
c0c9ae6
test: update tests and only print values if they aren't empty
GuptaManan100 Oct 19, 2023
ca13490
feat: handle nils in comperator expression
GuptaManan100 Oct 19, 2023
a68c8d6
test: add a failing test
GuptaManan100 Oct 19, 2023
27a78be
feat: copy the order by from the update query
GuptaManan100 Oct 19, 2023
9b191a5
feat: turn off foreign key checks on updates that have foreign key co…
GuptaManan100 Oct 19, 2023
17c8d90
Merge remote-tracking branch 'upstream/main' into fk-updstmt
GuptaManan100 Oct 20, 2023
8470476
test: refactor tests and permanently add cases for updates with non-l…
GuptaManan100 Oct 20, 2023
b1432c4
test: udpate tests output
GuptaManan100 Oct 20, 2023
fd79f83
feat: add AND rewriter to drop some unnecessary predicates
GuptaManan100 Oct 20, 2023
465df5d
feat: add non-literal update parent foreign key verification
GuptaManan100 Oct 20, 2023
a1b04e4
feat: augment verification queries to work with non-literal values in…
GuptaManan100 Oct 20, 2023
4401200
some refactor to club update related offsets
harshit-gangal Oct 26, 2023
adfd62f
reject queries with fk column update dependent column also getting up…
harshit-gangal Oct 26, 2023
2a04ecf
fuzz: fix fuzzer to only produce queries that work with Vitess and My…
GuptaManan100 Oct 27, 2023
474a337
feat: fix update planning to produce correct not in tuple comparison
GuptaManan100 Oct 27, 2023
1ec7b00
feat: fix fk_cascade engine to handle null comparisons correctly
GuptaManan100 Oct 27, 2023
5f9ad38
feat: refactor and fix update expressions dependency logic
GuptaManan100 Oct 30, 2023
64bbabd
feat: fix literal updates clubbed with non-literal updates
GuptaManan100 Oct 30, 2023
c2a3ff2
feat: refactor code so that checking for non-literal updates is now a…
GuptaManan100 Oct 30, 2023
f3c4d7f
feat: remove unused code
GuptaManan100 Oct 30, 2023
b14dc7b
test: fix the timeout for waiting for replication
GuptaManan100 Oct 31, 2023
139f4f5
Merge remote-tracking branch 'upstream/main' into fk-updstmt
GuptaManan100 Oct 31, 2023
b777870
test: add fuzzer test for olap queries too
GuptaManan100 Oct 31, 2023
14caa24
feat: add support for non-literal update in streaming
GuptaManan100 Oct 31, 2023
47ee14d
feat: fix fkVerify to rollback transaction when select returns rows
GuptaManan100 Oct 31, 2023
c40e9c4
test: update test output post merge
GuptaManan100 Oct 31, 2023
6e152df
feat: fix serializing of foreign keys
GuptaManan100 Oct 31, 2023
16ff17c
refactor: make childFkToUpdExprs map unexported
GuptaManan100 Oct 31, 2023
252f859
refactor: minor refactors and adding comments
GuptaManan100 Oct 31, 2023
e1abd98
test: add tests for fk_cascade engine changes
GuptaManan100 Nov 1, 2023
a758a78
test: add tests for the function checking for foreign key columns bei…
GuptaManan100 Nov 1, 2023
fcf6eba
test: add tests for the function checking for non-literal fk updates
GuptaManan100 Nov 1, 2023
fa8370c
test: add tests for check fk updated expresssions map
GuptaManan100 Nov 1, 2023
8dc17e4
comments: fix typing errors
GuptaManan100 Nov 1, 2023
81c395c
Merge remote-tracking branch 'upstream/main' into fk-updstmt
GuptaManan100 Nov 2, 2023
a74c0aa
feat: change non-literal comparison added in updates to use null safe…
GuptaManan100 Nov 8, 2023
b3e1f6e
feat: change the error to also contain the column names that are caus…
GuptaManan100 Nov 8, 2023
2ffd0dd
feat: refactor code to keep all the non-literal update information to…
GuptaManan100 Nov 8, 2023
0736eff
feat: remove stream execute from fk_cascade and fk_verify. Instead ju…
GuptaManan100 Nov 10, 2023
f5ae666
refactor: simplify code
GuptaManan100 Nov 10, 2023
4969e50
refactor: fix comment and refactor code
GuptaManan100 Nov 10, 2023
d319cd6
feat: refactor code and tests to address review comments
GuptaManan100 Nov 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 26 additions & 118 deletions go/test/endtoend/vtgate/foreignkey/fk_fuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@ import (
_ "github.com/go-sql-driver/mysql"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/utils"
"vitess.io/vitess/go/vt/log"
)
Expand Down Expand Up @@ -131,41 +129,33 @@ func (fz *fuzzer) generateInsertDMLQuery() string {
if tableName == "fk_t20" {
colValue := rand.Intn(1 + fz.maxValForCol)
col2Value := rand.Intn(1 + fz.maxValForCol)
return fmt.Sprintf("insert into %v (id, col, col2) values (%v, %v, %v)", tableName, idValue, convertColValueToString(colValue), convertColValueToString(col2Value))
return fmt.Sprintf("insert into %v (id, col, col2) values (%v, %v, %v)", tableName, idValue, convertIntValueToString(colValue), convertIntValueToString(col2Value))
} else if isMultiColFkTable(tableName) {
colaValue := rand.Intn(1 + fz.maxValForCol)
colbValue := rand.Intn(1 + fz.maxValForCol)
return fmt.Sprintf("insert into %v (id, cola, colb) values (%v, %v, %v)", tableName, idValue, convertColValueToString(colaValue), convertColValueToString(colbValue))
return fmt.Sprintf("insert into %v (id, cola, colb) values (%v, %v, %v)", tableName, idValue, convertIntValueToString(colaValue), convertIntValueToString(colbValue))
} else {
colValue := rand.Intn(1 + fz.maxValForCol)
return fmt.Sprintf("insert into %v (id, col) values (%v, %v)", tableName, idValue, convertColValueToString(colValue))
return fmt.Sprintf("insert into %v (id, col) values (%v, %v)", tableName, idValue, convertIntValueToString(colValue))
}
}

// convertColValueToString converts the given value to a string
func convertColValueToString(value int) string {
if value == 0 {
return "NULL"
}
return fmt.Sprintf("%d", value)
}

// generateUpdateDMLQuery generates an UPDATE query from the parameters for the fuzzer.
func (fz *fuzzer) generateUpdateDMLQuery() string {
tableId := rand.Intn(len(fkTables))
idValue := 1 + rand.Intn(fz.maxValForId)
tableName := fkTables[tableId]
if tableName == "fk_t20" {
colValue := rand.Intn(1 + fz.maxValForCol)
col2Value := rand.Intn(1 + fz.maxValForCol)
return fmt.Sprintf("update %v set col = %v, col2 = %v where id = %v", tableName, convertColValueToString(colValue), convertColValueToString(col2Value), idValue)
colValue := fz.generateExpression(rand.Intn(4)+1, "col", "col2")
col2Value := fz.generateExpression(rand.Intn(4)+1, "col", "col2")
return fmt.Sprintf("update %v set col = %v, col2 = %v where id = %v", tableName, colValue, col2Value, idValue)
} else if isMultiColFkTable(tableName) {
colaValue := rand.Intn(1 + fz.maxValForCol)
colbValue := rand.Intn(1 + fz.maxValForCol)
return fmt.Sprintf("update %v set cola = %v, colb = %v where id = %v", tableName, convertColValueToString(colaValue), convertColValueToString(colbValue), idValue)
colaValue := fz.generateExpression(rand.Intn(4)+1, "cola", "colb")
colbValue := fz.generateExpression(rand.Intn(4)+1, "cola", "colb")
return fmt.Sprintf("update %v set cola = %v, colb = %v where id = %v", tableName, colaValue, colbValue, idValue)
} else {
colValue := rand.Intn(1 + fz.maxValForCol)
return fmt.Sprintf("update %v set col = %v where id = %v", tableName, convertColValueToString(colValue), idValue)
return fmt.Sprintf("update %v set col = %v where id = %v", tableName, convertIntValueToString(colValue), idValue)
}
}

Expand Down Expand Up @@ -338,8 +328,8 @@ func (fz *fuzzer) getPreparedInsertQueries() []string {
return []string{
"prepare stmt_insert from 'insert into fk_t20 (id, col, col2) values (?, ?, ?)'",
fmt.Sprintf("SET @id = %v", idValue),
fmt.Sprintf("SET @col = %v", convertColValueToString(colValue)),
fmt.Sprintf("SET @col2 = %v", convertColValueToString(col2Value)),
fmt.Sprintf("SET @col = %v", convertIntValueToString(colValue)),
fmt.Sprintf("SET @col2 = %v", convertIntValueToString(col2Value)),
"execute stmt_insert using @id, @col, @col2",
}
} else if isMultiColFkTable(tableName) {
Expand All @@ -348,16 +338,16 @@ func (fz *fuzzer) getPreparedInsertQueries() []string {
return []string{
fmt.Sprintf("prepare stmt_insert from 'insert into %v (id, cola, colb) values (?, ?, ?)'", tableName),
fmt.Sprintf("SET @id = %v", idValue),
fmt.Sprintf("SET @cola = %v", convertColValueToString(colaValue)),
fmt.Sprintf("SET @colb = %v", convertColValueToString(colbValue)),
fmt.Sprintf("SET @cola = %v", convertIntValueToString(colaValue)),
fmt.Sprintf("SET @colb = %v", convertIntValueToString(colbValue)),
"execute stmt_insert using @id, @cola, @colb",
}
} else {
colValue := rand.Intn(1 + fz.maxValForCol)
return []string{
fmt.Sprintf("prepare stmt_insert from 'insert into %v (id, col) values (?, ?)'", tableName),
fmt.Sprintf("SET @id = %v", idValue),
fmt.Sprintf("SET @col = %v", convertColValueToString(colValue)),
fmt.Sprintf("SET @col = %v", convertIntValueToString(colValue)),
"execute stmt_insert using @id, @col",
}
}
Expand All @@ -374,8 +364,8 @@ func (fz *fuzzer) getPreparedUpdateQueries() []string {
return []string{
"prepare stmt_update from 'update fk_t20 set col = ?, col2 = ? where id = ?'",
fmt.Sprintf("SET @id = %v", idValue),
fmt.Sprintf("SET @col = %v", convertColValueToString(colValue)),
fmt.Sprintf("SET @col2 = %v", convertColValueToString(col2Value)),
fmt.Sprintf("SET @col = %v", convertIntValueToString(colValue)),
fmt.Sprintf("SET @col2 = %v", convertIntValueToString(col2Value)),
"execute stmt_update using @col, @col2, @id",
}
} else if isMultiColFkTable(tableName) {
Expand All @@ -384,16 +374,16 @@ func (fz *fuzzer) getPreparedUpdateQueries() []string {
return []string{
fmt.Sprintf("prepare stmt_update from 'update %v set cola = ?, colb = ? where id = ?'", tableName),
fmt.Sprintf("SET @id = %v", idValue),
fmt.Sprintf("SET @cola = %v", convertColValueToString(colaValue)),
fmt.Sprintf("SET @colb = %v", convertColValueToString(colbValue)),
fmt.Sprintf("SET @cola = %v", convertIntValueToString(colaValue)),
fmt.Sprintf("SET @colb = %v", convertIntValueToString(colbValue)),
"execute stmt_update using @cola, @colb, @id",
}
} else {
colValue := rand.Intn(1 + fz.maxValForCol)
return []string{
fmt.Sprintf("prepare stmt_update from 'update %v set col = ? where id = ?'", tableName),
fmt.Sprintf("SET @id = %v", idValue),
fmt.Sprintf("SET @col = %v", convertColValueToString(colValue)),
fmt.Sprintf("SET @col = %v", convertIntValueToString(colValue)),
"execute stmt_update using @col, @id",
}
}
Expand All @@ -419,14 +409,14 @@ func (fz *fuzzer) generateParameterizedInsertQuery() (query string, params []any
if tableName == "fk_t20" {
colValue := rand.Intn(1 + fz.maxValForCol)
col2Value := rand.Intn(1 + fz.maxValForCol)
return fmt.Sprintf("insert into %v (id, col, col2) values (?, ?, ?)", tableName), []any{idValue, convertColValueToString(colValue), convertColValueToString(col2Value)}
return fmt.Sprintf("insert into %v (id, col, col2) values (?, ?, ?)", tableName), []any{idValue, convertIntValueToString(colValue), convertIntValueToString(col2Value)}
} else if isMultiColFkTable(tableName) {
colaValue := rand.Intn(1 + fz.maxValForCol)
colbValue := rand.Intn(1 + fz.maxValForCol)
return fmt.Sprintf("insert into %v (id, cola, colb) values (?, ?, ?)", tableName), []any{idValue, convertColValueToString(colaValue), convertColValueToString(colbValue)}
return fmt.Sprintf("insert into %v (id, cola, colb) values (?, ?, ?)", tableName), []any{idValue, convertIntValueToString(colaValue), convertIntValueToString(colbValue)}
} else {
colValue := rand.Intn(1 + fz.maxValForCol)
return fmt.Sprintf("insert into %v (id, col) values (?, ?)", tableName), []any{idValue, convertColValueToString(colValue)}
return fmt.Sprintf("insert into %v (id, col) values (?, ?)", tableName), []any{idValue, convertIntValueToString(colValue)}
}
}

Expand All @@ -438,14 +428,14 @@ func (fz *fuzzer) generateParameterizedUpdateQuery() (query string, params []any
if tableName == "fk_t20" {
colValue := rand.Intn(1 + fz.maxValForCol)
col2Value := rand.Intn(1 + fz.maxValForCol)
return fmt.Sprintf("update %v set col = ?, col2 = ? where id = ?", tableName), []any{convertColValueToString(colValue), convertColValueToString(col2Value), idValue}
return fmt.Sprintf("update %v set col = ?, col2 = ? where id = ?", tableName), []any{convertIntValueToString(colValue), convertIntValueToString(col2Value), idValue}
} else if isMultiColFkTable(tableName) {
colaValue := rand.Intn(1 + fz.maxValForCol)
colbValue := rand.Intn(1 + fz.maxValForCol)
return fmt.Sprintf("update %v set cola = ?, colb = ? where id = ?", tableName), []any{convertColValueToString(colaValue), convertColValueToString(colbValue), idValue}
return fmt.Sprintf("update %v set cola = ?, colb = ? where id = ?", tableName), []any{convertIntValueToString(colaValue), convertIntValueToString(colbValue), idValue}
} else {
colValue := rand.Intn(1 + fz.maxValForCol)
return fmt.Sprintf("update %v set col = ? where id = ?", tableName), []any{convertColValueToString(colValue), idValue}
return fmt.Sprintf("update %v set col = ? where id = ?", tableName), []any{convertIntValueToString(colValue), idValue}
}
}

Expand Down Expand Up @@ -650,85 +640,3 @@ func TestFkFuzzTest(t *testing.T) {
}
}
}

// ensureDatabaseState ensures that the database is either empty or not.
func ensureDatabaseState(t *testing.T, vtconn *mysql.Conn, empty bool) {
results := collectFkTablesState(vtconn)
isEmpty := true
for _, res := range results {
if len(res.Rows) > 0 {
isEmpty = false
}
}
require.Equal(t, isEmpty, empty)
}

// verifyDataIsCorrect verifies that the data in MySQL database matches the data in the Vitess database.
func verifyDataIsCorrect(t *testing.T, mcmp utils.MySQLCompare, concurrency int) {
// For single concurrent thread, we run all the queries on both MySQL and Vitess, so we can verify correctness
// by just checking if the data in MySQL and Vitess match.
if concurrency == 1 {
for _, table := range fkTables {
query := fmt.Sprintf("SELECT * FROM %v ORDER BY id", table)
mcmp.Exec(query)
}
} else {
// For higher concurrency, we don't have MySQL data to verify everything is fine,
// so we'll have to do something different.
// We run LEFT JOIN queries on all the parent and child tables linked by foreign keys
// to make sure that nothing is broken in the database.
for _, reference := range fkReferences {
query := fmt.Sprintf("select %v.id from %v left join %v on (%v.col = %v.col) where %v.col is null and %v.col is not null", reference.childTable, reference.childTable, reference.parentTable, reference.parentTable, reference.childTable, reference.parentTable, reference.childTable)
if isMultiColFkTable(reference.childTable) {
query = fmt.Sprintf("select %v.id from %v left join %v on (%v.cola = %v.cola and %v.colb = %v.colb) where %v.cola is null and %v.cola is not null and %v.colb is not null", reference.childTable, reference.childTable, reference.parentTable, reference.parentTable, reference.childTable, reference.parentTable, reference.childTable, reference.parentTable, reference.childTable, reference.childTable)
}
res, err := mcmp.VtConn.ExecuteFetch(query, 1000, false)
require.NoError(t, err)
require.Zerof(t, len(res.Rows), "Query %v gave non-empty results", query)
}
}
// We also verify that the results in Primary and Replica table match as is.
for _, keyspace := range clusterInstance.Keyspaces {
for _, shard := range keyspace.Shards {
var primaryTab, replicaTab *cluster.Vttablet
for _, vttablet := range shard.Vttablets {
if vttablet.Type == "primary" {
primaryTab = vttablet
} else {
replicaTab = vttablet
}
}
require.NotNil(t, primaryTab)
require.NotNil(t, replicaTab)
checkReplicationHealthy(t, replicaTab)
cluster.WaitForReplicationPos(t, primaryTab, replicaTab, true, 60.0)
primaryConn, err := utils.GetMySQLConn(primaryTab, fmt.Sprintf("vt_%v", keyspace.Name))
require.NoError(t, err)
replicaConn, err := utils.GetMySQLConn(replicaTab, fmt.Sprintf("vt_%v", keyspace.Name))
require.NoError(t, err)
primaryRes := collectFkTablesState(primaryConn)
replicaRes := collectFkTablesState(replicaConn)
verifyDataMatches(t, primaryRes, replicaRes)
}
}
}

// verifyDataMatches verifies that the two list of results are the same.
func verifyDataMatches(t *testing.T, resOne []*sqltypes.Result, resTwo []*sqltypes.Result) {
require.EqualValues(t, len(resTwo), len(resOne), "Res 1 - %v, Res 2 - %v", resOne, resTwo)
for idx, resultOne := range resOne {
resultTwo := resTwo[idx]
require.True(t, resultOne.Equal(resultTwo), "Data for %v doesn't match\nRows 1\n%v\nRows 2\n%v", fkTables[idx], resultOne.Rows, resultTwo.Rows)
}
}

// collectFkTablesState collects the data stored in the foreign key tables for the given connection.
func collectFkTablesState(conn *mysql.Conn) []*sqltypes.Result {
var tablesData []*sqltypes.Result
for _, table := range fkTables {
query := fmt.Sprintf("SELECT * FROM %v ORDER BY id", table)
res, _ := conn.ExecuteFetch(query, 10000, true)
tablesData = append(tablesData, res)
}
return tablesData
}
131 changes: 131 additions & 0 deletions go/test/endtoend/vtgate/foreignkey/fk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/utils"
"vitess.io/vitess/go/vt/log"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/vtgate/vtgateconn"
Expand Down Expand Up @@ -774,3 +775,133 @@ func TestFkScenarios(t *testing.T) {
})
}
}

// TestFkQueries is for testing a specific set of queries one after the other.
func TestFkQueries(t *testing.T) {
// Wait for schema-tracking to be complete.
waitForSchemaTrackingForFkTables(t)
// Remove all the foreign key constraints for all the replicas.
// We can then verify that the replica, and the primary have the same data, to ensure
// that none of the queries ever lead to cascades/updates on MySQL level.
for _, ks := range []string{shardedKs, unshardedKs} {
replicas := getReplicaTablets(ks)
for _, replica := range replicas {
removeAllForeignKeyConstraints(t, replica, ks)
}
}

testcases := []struct {
name string
queries []string
}{
{
name: "Non-literal update",
queries: []string{
"insert into fk_t10 (id, col) values (1,1),(2,2),(3,3),(4,4),(5,5)",
"insert into fk_t11 (id, col) values (1,1),(2,2),(3,3),(4,4),(5,5)",
"update fk_t10 set col = id + 3",
},
}, {
name: "Non-literal update with order by",
queries: []string{
"insert into fk_t10 (id, col) values (1,1),(2,2),(3,3),(4,4),(5,5)",
"insert into fk_t11 (id, col) values (1,1),(2,2),(3,3),(4,4),(5,5)",
"update fk_t10 set col = id + 3 order by id desc",
},
}, {
name: "Non-literal update with order by that require parent and child foreign keys verification - success",
queries: []string{
"insert into fk_t10 (id, col) values (1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(8,8)",
"insert into fk_t11 (id, col) values (1,1),(2,2),(3,3),(4,4),(5,5)",
"insert into fk_t12 (id, col) values (1,1),(2,2),(3,3),(4,4),(5,5)",
"insert into fk_t13 (id, col) values (1,1),(2,2)",
"update fk_t11 set col = id + 3 where id >= 3",
},
}, {
name: "Non-literal update with order by that require parent and child foreign keys verification - parent fails",
queries: []string{
"insert into fk_t10 (id, col) values (1,1),(2,2),(3,3),(4,4),(5,5)",
"insert into fk_t11 (id, col) values (1,1),(2,2),(3,3),(4,4),(5,5)",
"insert into fk_t12 (id, col) values (1,1),(2,2),(3,3),(4,4),(5,5)",
"update fk_t11 set col = id + 3",
},
}, {
name: "Non-literal update with order by that require parent and child foreign keys verification - child fails",
queries: []string{
"insert into fk_t10 (id, col) values (1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(8,8)",
"insert into fk_t11 (id, col) values (1,1),(2,2),(3,3),(4,4),(5,5)",
"insert into fk_t12 (id, col) values (1,1),(2,2),(3,3),(4,4),(5,5)",
"insert into fk_t13 (id, col) values (1,1),(2,2)",
"update fk_t11 set col = id + 3",
},
},
}

for _, testcase := range testcases {
t.Run(testcase.name, func(t *testing.T) {
mcmp, closer := start(t)
defer closer()
_ = utils.Exec(t, mcmp.VtConn, "use `uks`")

// Ensure that the Vitess database is originally empty
ensureDatabaseState(t, mcmp.VtConn, true)
ensureDatabaseState(t, mcmp.MySQLConn, true)

for _, query := range testcase.queries {
_, _ = mcmp.ExecAllowAndCompareError(query)
if t.Failed() {
break
}
}

// ensure Vitess database has some data. This ensures not all the commands failed.
ensureDatabaseState(t, mcmp.VtConn, false)
// Verify the consistency of the data.
verifyDataIsCorrect(t, mcmp, 1)
})
}
}

// TestFkOneCase is for testing a specific set of queries. On the CI this test won't run since we'll keep the queries empty.
func TestFkOneCase(t *testing.T) {
queries := []string{}
if len(queries) == 0 {
t.Skip("No queries to test")
}
// Wait for schema-tracking to be complete.
waitForSchemaTrackingForFkTables(t)
// Remove all the foreign key constraints for all the replicas.
// We can then verify that the replica, and the primary have the same data, to ensure
// that none of the queries ever lead to cascades/updates on MySQL level.
for _, ks := range []string{shardedKs, unshardedKs} {
replicas := getReplicaTablets(ks)
for _, replica := range replicas {
removeAllForeignKeyConstraints(t, replica, ks)
}
}

mcmp, closer := start(t)
defer closer()
_ = utils.Exec(t, mcmp.VtConn, "use `uks`")

// Ensure that the Vitess database is originally empty
ensureDatabaseState(t, mcmp.VtConn, true)
ensureDatabaseState(t, mcmp.MySQLConn, true)

for _, query := range queries {
_, _ = mcmp.ExecAllowAndCompareError(query)
if t.Failed() {
log.Errorf("Query failed - %v", query)
break
}
}
vitessData := collectFkTablesState(mcmp.VtConn)
for idx, table := range fkTables {
log.Errorf("Vitess data for %v -\n%v", table, vitessData[idx].Rows)
}

// ensure Vitess database has some data. This ensures not all the commands failed.
ensureDatabaseState(t, mcmp.VtConn, false)
// Verify the consistency of the data.
verifyDataIsCorrect(t, mcmp, 1)
}
Loading
Loading