Skip to content

Commit

Permalink
Foreign Key Fuzzer Benchmark (#14542)
Browse files Browse the repository at this point in the history
Signed-off-by: Manan Gupta <[email protected]>
  • Loading branch information
GuptaManan100 authored Nov 27, 2023
1 parent faf9815 commit 36a52e9
Show file tree
Hide file tree
Showing 5 changed files with 206 additions and 36 deletions.
2 changes: 1 addition & 1 deletion go/test/endtoend/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ func WaitForTableDeletions(ctx context.Context, t *testing.T, vtgateProcess clus
}

// WaitForColumn waits for a table's column to be present
func WaitForColumn(t *testing.T, vtgateProcess cluster.VtgateProcess, ks, tbl, col string) error {
func WaitForColumn(t testing.TB, vtgateProcess cluster.VtgateProcess, ks, tbl, col string) error {
timeout := time.After(60 * time.Second)
for {
select {
Expand Down
41 changes: 31 additions & 10 deletions go/test/endtoend/vtgate/foreignkey/fk_fuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type fuzzer struct {
updateShare int
concurrency int
queryFormat QueryFormat
noFkSetVar bool
fkState *bool

// shouldStop is an internal state variable, that tells the fuzzer
Expand Down Expand Up @@ -83,6 +84,7 @@ func newFuzzer(concurrency int, maxValForId int, maxValForCol int, insertShare i
updateShare: updateShare,
queryFormat: queryFormat,
fkState: fkState,
noFkSetVar: false,
wg: sync.WaitGroup{},
}
// Initially the fuzzer thread is stopped.
Expand Down Expand Up @@ -475,7 +477,7 @@ func (fz *fuzzer) generateParameterizedDeleteQuery() (query string, params []any

// getSetVarFkChecksVal generates an optimizer hint to randomly set the foreign key checks to on or off or leave them unaltered.
func (fz *fuzzer) getSetVarFkChecksVal() string {
if fz.concurrency != 1 {
if fz.concurrency != 1 || fz.noFkSetVar {
return ""
}
val := rand.Intn(3)
Expand Down Expand Up @@ -703,14 +705,33 @@ func TestFkFuzzTest(t *testing.T) {
}
}

func validateReplication(t *testing.T) {
for _, keyspace := range clusterInstance.Keyspaces {
for _, shard := range keyspace.Shards {
for _, vttablet := range shard.Vttablets {
if vttablet.Type != "primary" {
checkReplicationHealthy(t, vttablet)
}
}
}
// BenchmarkFkFuzz benchmarks the performance of Vitess unmanaged, Vitess managed and vanilla MySQL performance on a given set of queries generated by the fuzzer.
func BenchmarkFkFuzz(b *testing.B) {
maxValForCol := 10
maxValForId := 10
insertShare := 50
deleteShare := 50
updateShare := 50
numQueries := 1000
// Wait for schema-tracking to be complete.
waitForSchemaTrackingForFkTables(b)
for i := 0; i < b.N; i++ {
queries, mysqlConn, vtConn, vtUnmanagedConn := setupBenchmark(b, maxValForId, maxValForCol, insertShare, deleteShare, updateShare, numQueries)

// Now we run the benchmarks!
b.Run("MySQL", func(b *testing.B) {
startBenchmark(b)
runQueries(b, mysqlConn, queries)
})

b.Run("Vitess Managed Foreign Keys", func(b *testing.B) {
startBenchmark(b)
runQueries(b, vtConn, queries)
})

b.Run("Vitess Unmanaged Foreign Keys", func(b *testing.B) {
startBenchmark(b)
runQueries(b, vtUnmanagedConn, queries)
})
}
}
83 changes: 60 additions & 23 deletions go/test/endtoend/vtgate/foreignkey/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package foreignkey

import (
"context"
_ "embed"
"flag"
"fmt"
Expand All @@ -31,13 +32,14 @@ import (
)

var (
clusterInstance *cluster.LocalProcessCluster
vtParams mysql.ConnParams
mysqlParams mysql.ConnParams
vtgateGrpcAddress string
shardedKs = "ks"
unshardedKs = "uks"
Cell = "test"
clusterInstance *cluster.LocalProcessCluster
vtParams mysql.ConnParams
mysqlParams mysql.ConnParams
vtgateGrpcAddress string
shardedKs = "ks"
unshardedKs = "uks"
unshardedUnmanagedKs = "unmanaged_uks"
Cell = "test"

//go:embed schema.sql
schemaSQL string
Expand All @@ -48,6 +50,9 @@ var (
//go:embed unsharded_vschema.json
unshardedVSchema string

//go:embed unsharded_unmanaged_vschema.json
unshardedUnmanagedVSchema string

fkTables = []string{"fk_t1", "fk_t2", "fk_t3", "fk_t4", "fk_t5", "fk_t6", "fk_t7",
"fk_t10", "fk_t11", "fk_t12", "fk_t13", "fk_t15", "fk_t16", "fk_t17", "fk_t18", "fk_t19", "fk_t20",
"fk_multicol_t1", "fk_multicol_t2", "fk_multicol_t3", "fk_multicol_t4", "fk_multicol_t5", "fk_multicol_t6", "fk_multicol_t7",
Expand Down Expand Up @@ -124,6 +129,16 @@ func TestMain(m *testing.M) {
return 1
}

unmanagedKs := &cluster.Keyspace{
Name: unshardedUnmanagedKs,
SchemaSQL: schemaSQL,
VSchema: unshardedUnmanagedVSchema,
}
err = clusterInstance.StartUnshardedKeyspace(*unmanagedKs, 1, false)
if err != nil {
return 1
}

err = clusterInstance.VtctlclientProcess.ExecuteCommand("RebuildVSchemaGraph")
if err != nil {
return 1
Expand Down Expand Up @@ -157,22 +172,7 @@ func start(t *testing.T) (utils.MySQLCompare, func()) {
require.NoError(t, err)

deleteAll := func() {
_ = utils.Exec(t, mcmp.VtConn, "use `ks/-80`")
tables := []string{"t4", "t3", "t2", "t1", "multicol_tbl2", "multicol_tbl1"}
tables = append(tables, fkTables...)
for _, table := range tables {
_, _ = mcmp.ExecAndIgnore("delete /*+ SET_VAR(foreign_key_checks=OFF) */ from " + table)
}
_ = utils.Exec(t, mcmp.VtConn, "use `ks/80-`")
for _, table := range tables {
_, _ = mcmp.ExecAndIgnore("delete /*+ SET_VAR(foreign_key_checks=OFF) */ from " + table)
}
_ = utils.Exec(t, mcmp.VtConn, "use `uks`")
tables = []string{"u_t1", "u_t2", "u_t3"}
tables = append(tables, fkTables...)
for _, table := range tables {
_, _ = mcmp.ExecAndIgnore("delete /*+ SET_VAR(foreign_key_checks=OFF) */ from " + table)
}
clearOutAllData(t, mcmp.VtConn, mcmp.MySQLConn)
_ = utils.Exec(t, mcmp.VtConn, "use `ks`")
}

Expand All @@ -184,3 +184,40 @@ func start(t *testing.T) (utils.MySQLCompare, func()) {
cluster.PanicHandler(t)
}
}

func startBenchmark(b *testing.B) {
ctx := context.Background()
vtConn, err := mysql.Connect(ctx, &vtParams)
require.NoError(b, err)
mysqlConn, err := mysql.Connect(ctx, &mysqlParams)
require.NoError(b, err)

clearOutAllData(b, vtConn, mysqlConn)
}

func clearOutAllData(t testing.TB, vtConn *mysql.Conn, mysqlConn *mysql.Conn) {
_ = utils.Exec(t, vtConn, "use `ks/-80`")
tables := []string{"t4", "t3", "t2", "t1", "multicol_tbl2", "multicol_tbl1"}
tables = append(tables, fkTables...)
for _, table := range tables {
_, _ = utils.ExecAllowError(t, vtConn, "delete /*+ SET_VAR(foreign_key_checks=OFF) */ from "+table)
_, _ = utils.ExecAllowError(t, mysqlConn, "delete /*+ SET_VAR(foreign_key_checks=OFF) */ from "+table)
}
_ = utils.Exec(t, vtConn, "use `ks/80-`")
for _, table := range tables {
_, _ = utils.ExecAllowError(t, vtConn, "delete /*+ SET_VAR(foreign_key_checks=OFF) */ from "+table)
_, _ = utils.ExecAllowError(t, mysqlConn, "delete /*+ SET_VAR(foreign_key_checks=OFF) */ from "+table)
}
_ = utils.Exec(t, vtConn, "use `uks`")
tables = []string{"u_t1", "u_t2", "u_t3"}
tables = append(tables, fkTables...)
for _, table := range tables {
_, _ = utils.ExecAllowError(t, vtConn, "delete /*+ SET_VAR(foreign_key_checks=OFF) */ from "+table)
_, _ = utils.ExecAllowError(t, mysqlConn, "delete /*+ SET_VAR(foreign_key_checks=OFF) */ from "+table)
}
_ = utils.Exec(t, vtConn, "use `unmanaged_uks`")
for _, table := range tables {
_, _ = utils.ExecAllowError(t, vtConn, "delete /*+ SET_VAR(foreign_key_checks=OFF) */ from "+table)
_, _ = utils.ExecAllowError(t, mysqlConn, "delete /*+ SET_VAR(foreign_key_checks=OFF) */ from "+table)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
{
"sharded": false,
"foreignKeyMode": "unmanaged",
"tables": {
"u_t1": {},
"u_t2": {},
"fk_t1": {},
"fk_t2": {},
"fk_t3": {},
"fk_t4": {},
"fk_t5": {},
"fk_t6": {},
"fk_t7": {},
"fk_t10": {},
"fk_t11": {},
"fk_t12": {},
"fk_t13": {},
"fk_t15": {},
"fk_t16": {},
"fk_t17": {},
"fk_t18": {},
"fk_t19": {},
"fk_t20": {},
"fk_multicol_t1": {},
"fk_multicol_t2": {},
"fk_multicol_t3": {},
"fk_multicol_t4": {},
"fk_multicol_t5": {},
"fk_multicol_t6": {},
"fk_multicol_t7": {},
"fk_multicol_t10": {},
"fk_multicol_t11": {},
"fk_multicol_t12": {},
"fk_multicol_t13": {},
"fk_multicol_t15": {},
"fk_multicol_t16": {},
"fk_multicol_t17": {},
"fk_multicol_t18": {},
"fk_multicol_t19": {}
}
}
75 changes: 73 additions & 2 deletions go/test/endtoend/vtgate/foreignkey/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ limitations under the License.
package foreignkey

import (
"context"
"database/sql"
"fmt"
"math/rand"
"slices"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -75,7 +77,7 @@ func convertIntValueToString(value int) string {

// waitForSchemaTrackingForFkTables waits for schema tracking to have run and seen the tables used
// for foreign key tests.
func waitForSchemaTrackingForFkTables(t *testing.T) {
func waitForSchemaTrackingForFkTables(t testing.TB) {
err := utils.WaitForColumn(t, clusterInstance.VtgateProcess, shardedKs, "fk_t1", "col")
require.NoError(t, err)
err = utils.WaitForColumn(t, clusterInstance.VtgateProcess, shardedKs, "fk_t18", "col")
Expand All @@ -88,6 +90,8 @@ func waitForSchemaTrackingForFkTables(t *testing.T) {
require.NoError(t, err)
err = utils.WaitForColumn(t, clusterInstance.VtgateProcess, unshardedKs, "fk_t11", "col")
require.NoError(t, err)
err = utils.WaitForColumn(t, clusterInstance.VtgateProcess, unshardedUnmanagedKs, "fk_t11", "col")
require.NoError(t, err)
}

// getReplicaTablets gets all the replica tablets.
Expand Down Expand Up @@ -238,7 +242,7 @@ func verifyDataIsCorrect(t *testing.T, mcmp utils.MySQLCompare, concurrency int)
}

// verifyDataMatches verifies that the two list of results are the same.
func verifyDataMatches(t *testing.T, resOne []*sqltypes.Result, resTwo []*sqltypes.Result) {
func verifyDataMatches(t testing.TB, resOne []*sqltypes.Result, resTwo []*sqltypes.Result) {
require.EqualValues(t, len(resTwo), len(resOne), "Res 1 - %v, Res 2 - %v", resOne, resTwo)
for idx, resultOne := range resOne {
resultTwo := resTwo[idx]
Expand All @@ -256,3 +260,70 @@ func collectFkTablesState(conn *mysql.Conn) []*sqltypes.Result {
}
return tablesData
}

func validateReplication(t *testing.T) {
for _, keyspace := range clusterInstance.Keyspaces {
for _, shard := range keyspace.Shards {
for _, vttablet := range shard.Vttablets {
if vttablet.Type != "primary" {
checkReplicationHealthy(t, vttablet)
}
}
}
}
}

// compareResultRows compares the rows of the two results provided.
func compareResultRows(resOne *sqltypes.Result, resTwo *sqltypes.Result) bool {
return slices.EqualFunc(resOne.Rows, resTwo.Rows, func(a, b sqltypes.Row) bool {
return sqltypes.RowEqual(a, b)
})
}

// setupBenchmark sets up the benchmark by creating the set of queries that we want to run. It also ensures that the 3 modes (MySQL, Vitess Managed, Vitess Unmanaged) we verify all return the same results after the queries have been executed.
func setupBenchmark(b *testing.B, maxValForId int, maxValForCol int, insertShare int, deleteShare int, updateShare int, numQueries int) ([]string, *mysql.Conn, *mysql.Conn, *mysql.Conn) {
// Clear out all the data to ensure we start with a clean slate.
startBenchmark(b)
// Create a fuzzer to generate and store a certain set of queries.
fz := newFuzzer(1, maxValForId, maxValForCol, insertShare, deleteShare, updateShare, SQLQueries, nil)
fz.noFkSetVar = true
var queries []string
for j := 0; j < numQueries; j++ {
genQueries := fz.generateQuery()
require.Len(b, genQueries, 1)
queries = append(queries, genQueries[0])
}

// Connect to MySQL and run all the queries
mysqlConn, err := mysql.Connect(context.Background(), &mysqlParams)
require.NoError(b, err)
// Connect to Vitess managed foreign keys keyspace
vtConn, err := mysql.Connect(context.Background(), &vtParams)
require.NoError(b, err)
utils.Exec(b, vtConn, fmt.Sprintf("use `%v`", unshardedKs))
// Connect to Vitess unmanaged foreign keys keyspace
vtUnmanagedConn, err := mysql.Connect(context.Background(), &vtParams)
require.NoError(b, err)
utils.Exec(b, vtUnmanagedConn, fmt.Sprintf("use `%v`", unshardedUnmanagedKs))

// First we make sure that running all the queries in both the Vitess modes and MySQL gives the same data.
// So we run all the queries and then check that the data in all of them matches.
runQueries(b, mysqlConn, queries)
runQueries(b, vtConn, queries)
runQueries(b, vtUnmanagedConn, queries)
for _, table := range fkTables {
query := fmt.Sprintf("SELECT * FROM %v ORDER BY id", table)
resVitessManaged, _ := vtConn.ExecuteFetch(query, 10000, true)
resMySQL, _ := mysqlConn.ExecuteFetch(query, 10000, true)
resVitessUnmanaged, _ := vtUnmanagedConn.ExecuteFetch(query, 10000, true)
require.True(b, compareResultRows(resVitessManaged, resMySQL), "Results for %v don't match\nVitess Managed\n%v\nMySQL\n%v", table, resVitessManaged, resMySQL)
require.True(b, compareResultRows(resVitessUnmanaged, resMySQL), "Results for %v don't match\nVitess Unmanaged\n%v\nMySQL\n%v", table, resVitessUnmanaged, resMySQL)
}
return queries, mysqlConn, vtConn, vtUnmanagedConn
}

func runQueries(t testing.TB, conn *mysql.Conn, queries []string) {
for _, query := range queries {
_, _ = utils.ExecAllowError(t, conn, query)
}
}

0 comments on commit 36a52e9

Please sign in to comment.