Skip to content

Commit

Permalink
Replace into statement plan with foreign keys : unsharded (#14396)
Browse files Browse the repository at this point in the history
Signed-off-by: Harshit Gangal <[email protected]>
Co-authored-by: Manan Gupta <[email protected]>
  • Loading branch information
harshit-gangal and GuptaManan100 authored Nov 15, 2023
1 parent 4a0aa63 commit 9ddf932
Show file tree
Hide file tree
Showing 34 changed files with 1,375 additions and 677 deletions.
1 change: 1 addition & 0 deletions go/mysql/sqlerror/sql_error.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ var stateToMysqlCode = map[vterrors.State]mysqlCode{
vterrors.CharacterSetMismatch: {num: ERCharacterSetMismatch, state: SSUnknownSQLState},
vterrors.WrongParametersToNativeFct: {num: ERWrongParametersToNativeFct, state: SSUnknownSQLState},
vterrors.KillDeniedError: {num: ERKillDenied, state: SSUnknownSQLState},
vterrors.BadNullError: {num: ERBadNullError, state: SSConstraintViolation},
}

func getStateToMySQLState(state vterrors.State) mysqlCode {
Expand Down
164 changes: 96 additions & 68 deletions go/test/endtoend/vtgate/foreignkey/fk_fuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,9 @@ func (fz *fuzzer) generateQuery() []string {
if val < fz.insertShare {
switch fz.queryFormat {
case OlapSQLQueries, SQLQueries:
return []string{fz.generateInsertDMLQuery()}
return []string{fz.generateInsertDMLQuery(getInsertType())}
case PreparedStatmentQueries:
return fz.getPreparedInsertQueries()
return fz.getPreparedInsertQueries(getInsertType())
default:
panic("Unknown query type")
}
Expand All @@ -122,22 +122,26 @@ func (fz *fuzzer) generateQuery() []string {
}
}

func getInsertType() string {
return "insert"
}

// generateInsertDMLQuery generates an INSERT query from the parameters for the fuzzer.
func (fz *fuzzer) generateInsertDMLQuery() string {
func (fz *fuzzer) generateInsertDMLQuery(insertType string) string {
tableId := rand.Intn(len(fkTables))
idValue := 1 + rand.Intn(fz.maxValForId)
tableName := fkTables[tableId]
if tableName == "fk_t20" {
colValue := rand.Intn(1 + fz.maxValForCol)
col2Value := rand.Intn(1 + fz.maxValForCol)
return fmt.Sprintf("insert into %v (id, col, col2) values (%v, %v, %v)", tableName, idValue, convertIntValueToString(colValue), convertIntValueToString(col2Value))
return fmt.Sprintf("%s into %v (id, col, col2) values (%v, %v, %v)", insertType, tableName, idValue, convertIntValueToString(colValue), convertIntValueToString(col2Value))
} else if isMultiColFkTable(tableName) {
colaValue := rand.Intn(1 + fz.maxValForCol)
colbValue := rand.Intn(1 + fz.maxValForCol)
return fmt.Sprintf("insert into %v (id, cola, colb) values (%v, %v, %v)", tableName, idValue, convertIntValueToString(colaValue), convertIntValueToString(colbValue))
return fmt.Sprintf("%s into %v (id, cola, colb) values (%v, %v, %v)", insertType, tableName, idValue, convertIntValueToString(colaValue), convertIntValueToString(colbValue))
} else {
colValue := rand.Intn(1 + fz.maxValForCol)
return fmt.Sprintf("insert into %v (id, col) values (%v, %v)", tableName, idValue, convertIntValueToString(colValue))
return fmt.Sprintf("%s into %v (id, col) values (%v, %v)", insertType, tableName, idValue, convertIntValueToString(colValue))
}
}

Expand Down Expand Up @@ -332,15 +336,15 @@ func (fz *fuzzer) getPreparedDeleteQueries() []string {
}

// getPreparedInsertQueries gets the list of queries to run for executing an INSERT using prepared statements.
func (fz *fuzzer) getPreparedInsertQueries() []string {
func (fz *fuzzer) getPreparedInsertQueries(insertType string) []string {
tableId := rand.Intn(len(fkTables))
idValue := 1 + rand.Intn(fz.maxValForId)
tableName := fkTables[tableId]
if tableName == "fk_t20" {
colValue := rand.Intn(1 + fz.maxValForCol)
col2Value := rand.Intn(1 + fz.maxValForCol)
return []string{
"prepare stmt_insert from 'insert into fk_t20 (id, col, col2) values (?, ?, ?)'",
fmt.Sprintf("prepare stmt_insert from '%s into fk_t20 (id, col, col2) values (?, ?, ?)'", insertType),
fmt.Sprintf("SET @id = %v", idValue),
fmt.Sprintf("SET @col = %v", convertIntValueToString(colValue)),
fmt.Sprintf("SET @col2 = %v", convertIntValueToString(col2Value)),
Expand All @@ -350,7 +354,7 @@ func (fz *fuzzer) getPreparedInsertQueries() []string {
colaValue := rand.Intn(1 + fz.maxValForCol)
colbValue := rand.Intn(1 + fz.maxValForCol)
return []string{
fmt.Sprintf("prepare stmt_insert from 'insert into %v (id, cola, colb) values (?, ?, ?)'", tableName),
fmt.Sprintf("prepare stmt_insert from '%s into %v (id, cola, colb) values (?, ?, ?)'", insertType, tableName),
fmt.Sprintf("SET @id = %v", idValue),
fmt.Sprintf("SET @cola = %v", convertIntValueToString(colaValue)),
fmt.Sprintf("SET @colb = %v", convertIntValueToString(colbValue)),
Expand All @@ -359,7 +363,7 @@ func (fz *fuzzer) getPreparedInsertQueries() []string {
} else {
colValue := rand.Intn(1 + fz.maxValForCol)
return []string{
fmt.Sprintf("prepare stmt_insert from 'insert into %v (id, col) values (?, ?)'", tableName),
fmt.Sprintf("prepare stmt_insert from '%s into %v (id, col) values (?, ?)'", insertType, tableName),
fmt.Sprintf("SET @id = %v", idValue),
fmt.Sprintf("SET @col = %v", convertIntValueToString(colValue)),
"execute stmt_insert using @id, @col",
Expand Down Expand Up @@ -407,7 +411,7 @@ func (fz *fuzzer) getPreparedUpdateQueries() []string {
func (fz *fuzzer) generateParameterizedQuery() (query string, params []any) {
val := rand.Intn(fz.insertShare + fz.updateShare + fz.deleteShare)
if val < fz.insertShare {
return fz.generateParameterizedInsertQuery()
return fz.generateParameterizedInsertQuery(getInsertType())
}
if val < fz.insertShare+fz.updateShare {
return fz.generateParameterizedUpdateQuery()
Expand All @@ -416,21 +420,21 @@ func (fz *fuzzer) generateParameterizedQuery() (query string, params []any) {
}

// generateParameterizedInsertQuery generates a parameterized INSERT query for the query format PreparedStatementPacket.
func (fz *fuzzer) generateParameterizedInsertQuery() (query string, params []any) {
func (fz *fuzzer) generateParameterizedInsertQuery(insertType string) (query string, params []any) {
tableId := rand.Intn(len(fkTables))
idValue := 1 + rand.Intn(fz.maxValForId)
tableName := fkTables[tableId]
if tableName == "fk_t20" {
colValue := rand.Intn(1 + fz.maxValForCol)
col2Value := rand.Intn(1 + fz.maxValForCol)
return fmt.Sprintf("insert into %v (id, col, col2) values (?, ?, ?)", tableName), []any{idValue, convertIntValueToString(colValue), convertIntValueToString(col2Value)}
return fmt.Sprintf("%s into %v (id, col, col2) values (?, ?, ?)", insertType, tableName), []any{idValue, convertIntValueToString(colValue), convertIntValueToString(col2Value)}
} else if isMultiColFkTable(tableName) {
colaValue := rand.Intn(1 + fz.maxValForCol)
colbValue := rand.Intn(1 + fz.maxValForCol)
return fmt.Sprintf("insert into %v (id, cola, colb) values (?, ?, ?)", tableName), []any{idValue, convertIntValueToString(colaValue), convertIntValueToString(colbValue)}
return fmt.Sprintf("%s into %v (id, cola, colb) values (?, ?, ?)", insertType, tableName), []any{idValue, convertIntValueToString(colaValue), convertIntValueToString(colbValue)}
} else {
colValue := rand.Intn(1 + fz.maxValForCol)
return fmt.Sprintf("insert into %v (id, col) values (?, ?)", tableName), []any{idValue, convertIntValueToString(colValue)}
return fmt.Sprintf("%s into %v (id, col) values (?, ?)", insertType, tableName), []any{idValue, convertIntValueToString(colValue)}
}
}

Expand Down Expand Up @@ -555,58 +559,61 @@ func TestFkFuzzTest(t *testing.T) {
insertShare int
deleteShare int
updateShare int
}{
{
name: "Single Thread - Only Inserts",
concurrency: 1,
timeForTesting: 5 * time.Second,
maxValForCol: 5,
maxValForId: 10,
insertShare: 100,
deleteShare: 0,
updateShare: 0,
},
{
name: "Single Thread - Balanced Inserts and Deletes",
concurrency: 1,
timeForTesting: 5 * time.Second,
maxValForCol: 5,
maxValForId: 10,
insertShare: 50,
deleteShare: 50,
updateShare: 0,
},
{
name: "Single Thread - Balanced Inserts and Updates",
concurrency: 1,
timeForTesting: 5 * time.Second,
maxValForCol: 5,
maxValForId: 10,
insertShare: 50,
deleteShare: 0,
updateShare: 50,
},
{
name: "Single Thread - Balanced Inserts, Updates and Deletes",
concurrency: 1,
timeForTesting: 5 * time.Second,
maxValForCol: 5,
maxValForId: 10,
insertShare: 50,
deleteShare: 50,
updateShare: 50,
},
{
name: "Multi Thread - Balanced Inserts, Updates and Deletes",
concurrency: 30,
timeForTesting: 5 * time.Second,
maxValForCol: 5,
maxValForId: 30,
insertShare: 50,
deleteShare: 50,
updateShare: 50,
},
}
}{{
name: "Single Thread - Only Inserts",
concurrency: 1,
timeForTesting: 5 * time.Second,
maxValForCol: 5,
maxValForId: 10,
insertShare: 100,
deleteShare: 0,
updateShare: 0,
}, {
name: "Single Thread - Balanced Inserts and Deletes",
concurrency: 1,
timeForTesting: 5 * time.Second,
maxValForCol: 5,
maxValForId: 10,
insertShare: 50,
deleteShare: 50,
updateShare: 0,
}, {
name: "Single Thread - Balanced Inserts and Updates",
concurrency: 1,
timeForTesting: 5 * time.Second,
maxValForCol: 5,
maxValForId: 10,
insertShare: 50,
deleteShare: 0,
updateShare: 50,
}, {
name: "Single Thread - Balanced Inserts, Updates and Deletes",
concurrency: 1,
timeForTesting: 5 * time.Second,
maxValForCol: 5,
maxValForId: 10,
insertShare: 50,
deleteShare: 50,
updateShare: 50,
}, {
name: "Multi Thread - Only Inserts",
concurrency: 30,
timeForTesting: 5 * time.Second,
maxValForCol: 5,
maxValForId: 30,
insertShare: 100,
deleteShare: 0,
updateShare: 0,
}, {
name: "Multi Thread - Balanced Inserts, Updates and Deletes",
concurrency: 30,
timeForTesting: 5 * time.Second,
maxValForCol: 5,
maxValForId: 30,
insertShare: 50,
deleteShare: 50,
updateShare: 50,
}}

for _, tt := range testcases {
for _, testSharded := range []bool{false, true} {
Expand All @@ -632,7 +639,16 @@ func TestFkFuzzTest(t *testing.T) {
fz.start(t, testSharded)

// Wait for the timeForTesting so that the threads continue to run.
time.Sleep(tt.timeForTesting)
totalTime := time.After(tt.timeForTesting)
done := false
for !done {
select {
case <-totalTime:
done = true
case <-time.After(10 * time.Millisecond):
validateReplication(t)
}
}

fz.stop()

Expand All @@ -654,3 +670,15 @@ func TestFkFuzzTest(t *testing.T) {
}
}
}

func validateReplication(t *testing.T) {
for _, keyspace := range clusterInstance.Keyspaces {
for _, shard := range keyspace.Shards {
for _, vttablet := range shard.Vttablets {
if vttablet.Type != "primary" {
checkReplicationHealthy(t, vttablet)
}
}
}
}
}
Loading

0 comments on commit 9ddf932

Please sign in to comment.