Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Foreign Key Fuzzer Benchmark #14542

Merged
merged 8 commits into from
Nov 27, 2023
2 changes: 1 addition & 1 deletion go/test/endtoend/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ func WaitForTableDeletions(ctx context.Context, t *testing.T, vtgateProcess clus
}

// WaitForColumn waits for a table's column to be present
func WaitForColumn(t *testing.T, vtgateProcess cluster.VtgateProcess, ks, tbl, col string) error {
func WaitForColumn(t testing.TB, vtgateProcess cluster.VtgateProcess, ks, tbl, col string) error {
timeout := time.After(60 * time.Second)
for {
select {
Expand Down
83 changes: 74 additions & 9 deletions go/test/endtoend/vtgate/foreignkey/fk_fuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package foreignkey

import (
"context"
"database/sql"
"fmt"
"math/rand"
Expand All @@ -28,6 +29,7 @@ import (
_ "github.com/go-sql-driver/mysql"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/test/endtoend/utils"
"vitess.io/vitess/go/vt/log"
Expand All @@ -53,6 +55,7 @@ type fuzzer struct {
updateShare int
concurrency int
queryFormat QueryFormat
noFkSetVar bool
fkState *bool

// shouldStop is an internal state variable, that tells the fuzzer
Expand Down Expand Up @@ -83,6 +86,7 @@ func newFuzzer(concurrency int, maxValForId int, maxValForCol int, insertShare i
updateShare: updateShare,
queryFormat: queryFormat,
fkState: fkState,
noFkSetVar: false,
wg: sync.WaitGroup{},
}
// Initially the fuzzer thread is stopped.
Expand Down Expand Up @@ -475,7 +479,7 @@ func (fz *fuzzer) generateParameterizedDeleteQuery() (query string, params []any

// getSetVarFkChecksVal generates an optimizer hint to randomly set the foreign key checks to on or off or leave them unaltered.
func (fz *fuzzer) getSetVarFkChecksVal() string {
if fz.concurrency != 1 {
if fz.concurrency != 1 || fz.noFkSetVar {
return ""
}
val := rand.Intn(3)
Expand Down Expand Up @@ -703,14 +707,75 @@ func TestFkFuzzTest(t *testing.T) {
}
}

func validateReplication(t *testing.T) {
for _, keyspace := range clusterInstance.Keyspaces {
for _, shard := range keyspace.Shards {
for _, vttablet := range shard.Vttablets {
if vttablet.Type != "primary" {
checkReplicationHealthy(t, vttablet)
}
}
// BenchmarkFkFuzz benchmarks the performance of Vitess unmanaged, Vitess managed and vanilla MySQL performance on a given set of queries generated by the fuzzer.
func BenchmarkFkFuzz(b *testing.B) {
maxValForCol := 10
maxValForId := 10
insertShare := 50
deleteShare := 50
updateShare := 50
numQueries := 1000
// Wait for schema-tracking to be complete.
waitForSchemaTrackingForFkTables(b)
for i := 0; i < b.N; i++ {
// Clear out all the data to ensure we start with a clean slate.
startBenchmark(b)
// Create a fuzzer to generate and store a certain set of queries.
fz := newFuzzer(1, maxValForId, maxValForCol, insertShare, deleteShare, updateShare, SQLQueries, nil)
fz.noFkSetVar = true
var queries []string
for j := 0; j < numQueries; j++ {
genQueries := fz.generateQuery()
require.Len(b, genQueries, 1)
queries = append(queries, genQueries[0])
}

// Connect to MySQL and run all the queries
mysqlConn, err := mysql.Connect(context.Background(), &mysqlParams)
require.NoError(b, err)
// Connect to Vitess managed foreign keys keyspace
vtConn, err := mysql.Connect(context.Background(), &vtParams)
GuptaManan100 marked this conversation as resolved.
Show resolved Hide resolved
require.NoError(b, err)
utils.Exec(b, vtConn, fmt.Sprintf("use `%v`", unshardedKs))
// Connect to Vitess unmanaged foreign keys keyspace
vtUnmanagedConn, err := mysql.Connect(context.Background(), &vtParams)
require.NoError(b, err)
utils.Exec(b, vtUnmanagedConn, fmt.Sprintf("use `%v`", unshardedUnmanagedKs))

// First we make sure that running all the queries in both the Vitess modes and MySQL gives the same data.
// So we run all the queries and then check that the data in all of them matches.
runQueries(b, mysqlConn, queries)
runQueries(b, vtConn, queries)
runQueries(b, vtUnmanagedConn, queries)
for _, table := range fkTables {
query := fmt.Sprintf("SELECT * FROM %v ORDER BY id", table)
resVitessManaged, _ := vtConn.ExecuteFetch(query, 10000, true)
resMySQL, _ := mysqlConn.ExecuteFetch(query, 10000, true)
resVitessUnmanaged, _ := vtUnmanagedConn.ExecuteFetch(query, 10000, true)
require.True(b, compareResultRows(resVitessManaged, resMySQL), "Results for %v don't match\nVitess Managed\n%v\nMySQL\n%v", table, resVitessManaged, resMySQL)
require.True(b, compareResultRows(resVitessUnmanaged, resMySQL), "Results for %v don't match\nVitess Unmanaged\n%v\nMySQL\n%v", table, resVitessUnmanaged, resMySQL)
GuptaManan100 marked this conversation as resolved.
Show resolved Hide resolved
}

// Now we run the benchmarks!
b.Run("MySQL", func(b *testing.B) {
startBenchmark(b)
runQueries(b, mysqlConn, queries)
})

b.Run("Vitess Managed Foreign Keys", func(b *testing.B) {
startBenchmark(b)
runQueries(b, vtConn, queries)
})

b.Run("Vitess Unmanaged Foreign Keys", func(b *testing.B) {
startBenchmark(b)
runQueries(b, vtUnmanagedConn, queries)
})
}
}

func runQueries(t testing.TB, conn *mysql.Conn, queries []string) {
for _, query := range queries {
_, _ = utils.ExecAllowError(t, conn, query)
}
}
83 changes: 60 additions & 23 deletions go/test/endtoend/vtgate/foreignkey/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package foreignkey

import (
"context"
_ "embed"
"flag"
"fmt"
Expand All @@ -31,13 +32,14 @@ import (
)

var (
clusterInstance *cluster.LocalProcessCluster
vtParams mysql.ConnParams
mysqlParams mysql.ConnParams
vtgateGrpcAddress string
shardedKs = "ks"
unshardedKs = "uks"
Cell = "test"
clusterInstance *cluster.LocalProcessCluster
vtParams mysql.ConnParams
mysqlParams mysql.ConnParams
vtgateGrpcAddress string
shardedKs = "ks"
unshardedKs = "uks"
unshardedUnmanagedKs = "unmanaged_uks"
Cell = "test"

//go:embed schema.sql
schemaSQL string
Expand All @@ -48,6 +50,9 @@ var (
//go:embed unsharded_vschema.json
unshardedVSchema string

//go:embed unsharded_unmanaged_vschema.json
unshardedUnmanagedVSchema string

fkTables = []string{"fk_t1", "fk_t2", "fk_t3", "fk_t4", "fk_t5", "fk_t6", "fk_t7",
"fk_t10", "fk_t11", "fk_t12", "fk_t13", "fk_t15", "fk_t16", "fk_t17", "fk_t18", "fk_t19", "fk_t20",
"fk_multicol_t1", "fk_multicol_t2", "fk_multicol_t3", "fk_multicol_t4", "fk_multicol_t5", "fk_multicol_t6", "fk_multicol_t7",
Expand Down Expand Up @@ -124,6 +129,16 @@ func TestMain(m *testing.M) {
return 1
}

unmanagedKs := &cluster.Keyspace{
Name: unshardedUnmanagedKs,
SchemaSQL: schemaSQL,
VSchema: unshardedUnmanagedVSchema,
}
err = clusterInstance.StartUnshardedKeyspace(*unmanagedKs, 1, false)
if err != nil {
return 1
}

err = clusterInstance.VtctlclientProcess.ExecuteCommand("RebuildVSchemaGraph")
if err != nil {
return 1
Expand Down Expand Up @@ -157,22 +172,7 @@ func start(t *testing.T) (utils.MySQLCompare, func()) {
require.NoError(t, err)

deleteAll := func() {
_ = utils.Exec(t, mcmp.VtConn, "use `ks/-80`")
tables := []string{"t4", "t3", "t2", "t1", "multicol_tbl2", "multicol_tbl1"}
tables = append(tables, fkTables...)
for _, table := range tables {
_, _ = mcmp.ExecAndIgnore("delete /*+ SET_VAR(foreign_key_checks=OFF) */ from " + table)
}
_ = utils.Exec(t, mcmp.VtConn, "use `ks/80-`")
for _, table := range tables {
_, _ = mcmp.ExecAndIgnore("delete /*+ SET_VAR(foreign_key_checks=OFF) */ from " + table)
}
_ = utils.Exec(t, mcmp.VtConn, "use `uks`")
tables = []string{"u_t1", "u_t2", "u_t3"}
tables = append(tables, fkTables...)
for _, table := range tables {
_, _ = mcmp.ExecAndIgnore("delete /*+ SET_VAR(foreign_key_checks=OFF) */ from " + table)
}
clearOutAllData(t, mcmp.VtConn, mcmp.MySQLConn)
_ = utils.Exec(t, mcmp.VtConn, "use `ks`")
}

Expand All @@ -184,3 +184,40 @@ func start(t *testing.T) (utils.MySQLCompare, func()) {
cluster.PanicHandler(t)
}
}

func startBenchmark(b *testing.B) {
ctx := context.Background()
vtConn, err := mysql.Connect(ctx, &vtParams)
require.NoError(b, err)
mysqlConn, err := mysql.Connect(ctx, &mysqlParams)
require.NoError(b, err)

clearOutAllData(b, vtConn, mysqlConn)
}

func clearOutAllData(t testing.TB, vtConn *mysql.Conn, mysqlConn *mysql.Conn) {
_ = utils.Exec(t, vtConn, "use `ks/-80`")
tables := []string{"t4", "t3", "t2", "t1", "multicol_tbl2", "multicol_tbl1"}
tables = append(tables, fkTables...)
for _, table := range tables {
_, _ = utils.ExecAllowError(t, vtConn, "delete /*+ SET_VAR(foreign_key_checks=OFF) */ from "+table)
_, _ = utils.ExecAllowError(t, mysqlConn, "delete /*+ SET_VAR(foreign_key_checks=OFF) */ from "+table)
}
_ = utils.Exec(t, vtConn, "use `ks/80-`")
for _, table := range tables {
_, _ = utils.ExecAllowError(t, vtConn, "delete /*+ SET_VAR(foreign_key_checks=OFF) */ from "+table)
_, _ = utils.ExecAllowError(t, mysqlConn, "delete /*+ SET_VAR(foreign_key_checks=OFF) */ from "+table)
}
_ = utils.Exec(t, vtConn, "use `uks`")
tables = []string{"u_t1", "u_t2", "u_t3"}
tables = append(tables, fkTables...)
for _, table := range tables {
_, _ = utils.ExecAllowError(t, vtConn, "delete /*+ SET_VAR(foreign_key_checks=OFF) */ from "+table)
_, _ = utils.ExecAllowError(t, mysqlConn, "delete /*+ SET_VAR(foreign_key_checks=OFF) */ from "+table)
}
_ = utils.Exec(t, vtConn, "use `unmanaged_uks`")
for _, table := range tables {
_, _ = utils.ExecAllowError(t, vtConn, "delete /*+ SET_VAR(foreign_key_checks=OFF) */ from "+table)
_, _ = utils.ExecAllowError(t, mysqlConn, "delete /*+ SET_VAR(foreign_key_checks=OFF) */ from "+table)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
{
"sharded": false,
"foreignKeyMode": "unmanaged",
"tables": {
"u_t1": {},
"u_t2": {},
"fk_t1": {},
"fk_t2": {},
"fk_t3": {},
"fk_t4": {},
"fk_t5": {},
"fk_t6": {},
"fk_t7": {},
"fk_t10": {},
"fk_t11": {},
"fk_t12": {},
"fk_t13": {},
"fk_t15": {},
"fk_t16": {},
"fk_t17": {},
"fk_t18": {},
"fk_t19": {},
"fk_t20": {},
"fk_multicol_t1": {},
"fk_multicol_t2": {},
"fk_multicol_t3": {},
"fk_multicol_t4": {},
"fk_multicol_t5": {},
"fk_multicol_t6": {},
"fk_multicol_t7": {},
"fk_multicol_t10": {},
"fk_multicol_t11": {},
"fk_multicol_t12": {},
"fk_multicol_t13": {},
"fk_multicol_t15": {},
"fk_multicol_t16": {},
"fk_multicol_t17": {},
"fk_multicol_t18": {},
"fk_multicol_t19": {}
}
}
26 changes: 24 additions & 2 deletions go/test/endtoend/vtgate/foreignkey/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"database/sql"
"fmt"
"math/rand"
"slices"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -75,7 +76,7 @@ func convertIntValueToString(value int) string {

// waitForSchemaTrackingForFkTables waits for schema tracking to have run and seen the tables used
// for foreign key tests.
func waitForSchemaTrackingForFkTables(t *testing.T) {
func waitForSchemaTrackingForFkTables(t testing.TB) {
err := utils.WaitForColumn(t, clusterInstance.VtgateProcess, shardedKs, "fk_t1", "col")
require.NoError(t, err)
err = utils.WaitForColumn(t, clusterInstance.VtgateProcess, shardedKs, "fk_t18", "col")
Expand All @@ -88,6 +89,8 @@ func waitForSchemaTrackingForFkTables(t *testing.T) {
require.NoError(t, err)
err = utils.WaitForColumn(t, clusterInstance.VtgateProcess, unshardedKs, "fk_t11", "col")
require.NoError(t, err)
err = utils.WaitForColumn(t, clusterInstance.VtgateProcess, unshardedUnmanagedKs, "fk_t11", "col")
require.NoError(t, err)
}

// getReplicaTablets gets all the replica tablets.
Expand Down Expand Up @@ -238,7 +241,7 @@ func verifyDataIsCorrect(t *testing.T, mcmp utils.MySQLCompare, concurrency int)
}

// verifyDataMatches verifies that the two list of results are the same.
func verifyDataMatches(t *testing.T, resOne []*sqltypes.Result, resTwo []*sqltypes.Result) {
func verifyDataMatches(t testing.TB, resOne []*sqltypes.Result, resTwo []*sqltypes.Result) {
require.EqualValues(t, len(resTwo), len(resOne), "Res 1 - %v, Res 2 - %v", resOne, resTwo)
for idx, resultOne := range resOne {
resultTwo := resTwo[idx]
Expand All @@ -256,3 +259,22 @@ func collectFkTablesState(conn *mysql.Conn) []*sqltypes.Result {
}
return tablesData
}

func validateReplication(t *testing.T) {
for _, keyspace := range clusterInstance.Keyspaces {
for _, shard := range keyspace.Shards {
for _, vttablet := range shard.Vttablets {
if vttablet.Type != "primary" {
checkReplicationHealthy(t, vttablet)
}
}
}
}
}

// compareResultRows compares the rows of the two results provided.
func compareResultRows(resOne *sqltypes.Result, resTwo *sqltypes.Result) bool {
return slices.EqualFunc(resOne.Rows, resTwo.Rows, func(a, b sqltypes.Row) bool {
return sqltypes.RowEqual(a, b)
})
}
Loading