Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Foreign Key Fuzzer Benchmark #14542

Merged
merged 8 commits into from
Nov 27, 2023
2 changes: 1 addition & 1 deletion go/test/endtoend/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ func WaitForTableDeletions(ctx context.Context, t *testing.T, vtgateProcess clus
}

// WaitForColumn waits for a table's column to be present
func WaitForColumn(t *testing.T, vtgateProcess cluster.VtgateProcess, ks, tbl, col string) error {
func WaitForColumn(t testing.TB, vtgateProcess cluster.VtgateProcess, ks, tbl, col string) error {
timeout := time.After(60 * time.Second)
for {
select {
Expand Down
69 changes: 61 additions & 8 deletions go/test/endtoend/vtgate/foreignkey/fk_fuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package foreignkey

import (
"context"
"database/sql"
"fmt"
"math/rand"
Expand All @@ -28,6 +29,7 @@ import (
_ "github.com/go-sql-driver/mysql"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/test/endtoend/utils"
"vitess.io/vitess/go/vt/log"
Expand Down Expand Up @@ -671,14 +673,65 @@ func TestFkFuzzTest(t *testing.T) {
}
}

func validateReplication(t *testing.T) {
for _, keyspace := range clusterInstance.Keyspaces {
for _, shard := range keyspace.Shards {
for _, vttablet := range shard.Vttablets {
if vttablet.Type != "primary" {
checkReplicationHealthy(t, vttablet)
}
}
// BenchmarkFkFuzz benchmarks the performance of Vitess unmanaged, Vitess managed and vanilla MySQL performance on a given set of queries generated by the fuzzer.
func BenchmarkFkFuzz(b *testing.B) {
maxValForCol := 10
maxValForId := 10
insertShare := 50
deleteShare := 50
updateShare := 50
numQueries := 1000
// Wait for schema-tracking to be complete.
waitForSchemaTrackingForFkTables(b)
for i := 0; i < b.N; i++ {
// Clear out all the data to ensure we start with a clean slate.
startBenchmark(b)
// Create a fuzzer to generate and store a certain set of queries.
fz := newFuzzer(1, maxValForId, maxValForCol, insertShare, deleteShare, updateShare, SQLQueries)
var queries []string
for j := 0; j < numQueries; j++ {
genQueries := fz.generateQuery()
require.Len(b, genQueries, 1)
queries = append(queries, genQueries[0])
}

// Connect to MySQL and run all the queries
mysqlConn, err := mysql.Connect(context.Background(), &mysqlParams)
require.NoError(b, err)
b.Run("MySQL", func(b *testing.B) {
runQueries(b, mysqlConn, queries)
})

// Connect to Vitess managed foreign keys keyspace and run all the queries
vtConn, err := mysql.Connect(context.Background(), &vtParams)
require.NoError(b, err)
utils.Exec(b, vtConn, fmt.Sprintf("use `%v`", unshardedKs))
b.Run("Vitess Managed Foreign Keys", func(b *testing.B) {
runQueries(b, vtConn, queries)
})

// Connect to Vitess managed foreign keys keyspace and run all the queries
GuptaManan100 marked this conversation as resolved.
Show resolved Hide resolved
vtUnmanagedConn, err := mysql.Connect(context.Background(), &vtParams)
require.NoError(b, err)
utils.Exec(b, vtUnmanagedConn, fmt.Sprintf("use `%v`", unshardedUnmanagedKs))
b.Run("Vitess Unmanaged Foreign Keys", func(b *testing.B) {
runQueries(b, vtUnmanagedConn, queries)
})

// Verify that the data in MySQL and Vitess matches. This ensures, we ended up running the same set of queries with the same outputs.
for _, table := range fkTables {
query := fmt.Sprintf("SELECT * FROM %v ORDER BY id", table)
resVitessManaged, _ := vtConn.ExecuteFetch(query, 10000, true)
resMySQL, _ := mysqlConn.ExecuteFetch(query, 10000, true)
resVitessUnmanaged, _ := vtUnmanagedConn.ExecuteFetch(query, 10000, true)
require.True(b, compareResultRows(resVitessManaged, resMySQL), "Results for %v don't match\nVitess Managed\n%v\nMySQL\n%v", table, resVitessManaged, resMySQL)
require.True(b, compareResultRows(resVitessUnmanaged, resMySQL), "Results for %v don't match\nVitess Unmanaged\n%v\nMySQL\n%v", table, resVitessUnmanaged, resMySQL)
}
}
}

func runQueries(t testing.TB, conn *mysql.Conn, queries []string) {
for _, query := range queries {
_, _ = utils.ExecAllowError(t, conn, query)
}
}
83 changes: 60 additions & 23 deletions go/test/endtoend/vtgate/foreignkey/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package foreignkey

import (
"context"
_ "embed"
"flag"
"fmt"
Expand All @@ -31,13 +32,14 @@ import (
)

var (
clusterInstance *cluster.LocalProcessCluster
vtParams mysql.ConnParams
mysqlParams mysql.ConnParams
vtgateGrpcAddress string
shardedKs = "ks"
unshardedKs = "uks"
Cell = "test"
clusterInstance *cluster.LocalProcessCluster
vtParams mysql.ConnParams
mysqlParams mysql.ConnParams
vtgateGrpcAddress string
shardedKs = "ks"
unshardedKs = "uks"
unshardedUnmanagedKs = "unmanaged_uks"
Cell = "test"

//go:embed schema.sql
schemaSQL string
Expand All @@ -48,6 +50,9 @@ var (
//go:embed unsharded_vschema.json
unshardedVSchema string

//go:embed unsharded_unmanaged_vschema.json
unshardedUnmanagedVSchema string

fkTables = []string{"fk_t1", "fk_t2", "fk_t3", "fk_t4", "fk_t5", "fk_t6", "fk_t7",
"fk_t10", "fk_t11", "fk_t12", "fk_t13", "fk_t15", "fk_t16", "fk_t17", "fk_t18", "fk_t19", "fk_t20",
"fk_multicol_t1", "fk_multicol_t2", "fk_multicol_t3", "fk_multicol_t4", "fk_multicol_t5", "fk_multicol_t6", "fk_multicol_t7",
Expand Down Expand Up @@ -124,6 +129,16 @@ func TestMain(m *testing.M) {
return 1
}

unmanagedKs := &cluster.Keyspace{
Name: unshardedUnmanagedKs,
SchemaSQL: schemaSQL,
VSchema: unshardedUnmanagedVSchema,
}
err = clusterInstance.StartUnshardedKeyspace(*unmanagedKs, 1, false)
if err != nil {
return 1
}

err = clusterInstance.VtctlclientProcess.ExecuteCommand("RebuildVSchemaGraph")
if err != nil {
return 1
Expand Down Expand Up @@ -157,22 +172,7 @@ func start(t *testing.T) (utils.MySQLCompare, func()) {
require.NoError(t, err)

deleteAll := func() {
_ = utils.Exec(t, mcmp.VtConn, "use `ks/-80`")
tables := []string{"t4", "t3", "t2", "t1", "multicol_tbl2", "multicol_tbl1"}
tables = append(tables, fkTables...)
for _, table := range tables {
_, _ = mcmp.ExecAndIgnore("delete /*+ SET_VAR(foreign_key_checks=OFF) */ from " + table)
}
_ = utils.Exec(t, mcmp.VtConn, "use `ks/80-`")
for _, table := range tables {
_, _ = mcmp.ExecAndIgnore("delete /*+ SET_VAR(foreign_key_checks=OFF) */ from " + table)
}
_ = utils.Exec(t, mcmp.VtConn, "use `uks`")
tables = []string{"u_t1", "u_t2", "u_t3"}
tables = append(tables, fkTables...)
for _, table := range tables {
_, _ = mcmp.ExecAndIgnore("delete /*+ SET_VAR(foreign_key_checks=OFF) */ from " + table)
}
clearOutAllData(t, mcmp.VtConn, mcmp.MySQLConn)
_ = utils.Exec(t, mcmp.VtConn, "use `ks`")
}

Expand All @@ -184,3 +184,40 @@ func start(t *testing.T) (utils.MySQLCompare, func()) {
cluster.PanicHandler(t)
}
}

func startBenchmark(b *testing.B) {
ctx := context.Background()
vtConn, err := mysql.Connect(ctx, &vtParams)
require.NoError(b, err)
mysqlConn, err := mysql.Connect(ctx, &mysqlParams)
require.NoError(b, err)

clearOutAllData(b, vtConn, mysqlConn)
}

func clearOutAllData(t testing.TB, vtConn *mysql.Conn, mysqlConn *mysql.Conn) {
_ = utils.Exec(t, vtConn, "use `ks/-80`")
tables := []string{"t4", "t3", "t2", "t1", "multicol_tbl2", "multicol_tbl1"}
tables = append(tables, fkTables...)
for _, table := range tables {
_, _ = utils.ExecAllowError(t, vtConn, "delete /*+ SET_VAR(foreign_key_checks=OFF) */ from "+table)
_, _ = utils.ExecAllowError(t, mysqlConn, "delete /*+ SET_VAR(foreign_key_checks=OFF) */ from "+table)
}
_ = utils.Exec(t, vtConn, "use `ks/80-`")
for _, table := range tables {
_, _ = utils.ExecAllowError(t, vtConn, "delete /*+ SET_VAR(foreign_key_checks=OFF) */ from "+table)
_, _ = utils.ExecAllowError(t, mysqlConn, "delete /*+ SET_VAR(foreign_key_checks=OFF) */ from "+table)
}
_ = utils.Exec(t, vtConn, "use `uks`")
tables = []string{"u_t1", "u_t2", "u_t3"}
tables = append(tables, fkTables...)
for _, table := range tables {
_, _ = utils.ExecAllowError(t, vtConn, "delete /*+ SET_VAR(foreign_key_checks=OFF) */ from "+table)
_, _ = utils.ExecAllowError(t, mysqlConn, "delete /*+ SET_VAR(foreign_key_checks=OFF) */ from "+table)
}
_ = utils.Exec(t, vtConn, "use `unmanaged_uks`")
for _, table := range tables {
_, _ = utils.ExecAllowError(t, vtConn, "delete /*+ SET_VAR(foreign_key_checks=OFF) */ from "+table)
_, _ = utils.ExecAllowError(t, mysqlConn, "delete /*+ SET_VAR(foreign_key_checks=OFF) */ from "+table)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
{
"sharded": false,
"foreignKeyMode": "unmanaged",
"tables": {
"u_t1": {},
"u_t2": {},
"fk_t1": {},
"fk_t2": {},
"fk_t3": {},
"fk_t4": {},
"fk_t5": {},
"fk_t6": {},
"fk_t7": {},
"fk_t10": {},
"fk_t11": {},
"fk_t12": {},
"fk_t13": {},
"fk_t15": {},
"fk_t16": {},
"fk_t17": {},
"fk_t18": {},
"fk_t19": {},
"fk_t20": {},
"fk_multicol_t1": {},
"fk_multicol_t2": {},
"fk_multicol_t3": {},
"fk_multicol_t4": {},
"fk_multicol_t5": {},
"fk_multicol_t6": {},
"fk_multicol_t7": {},
"fk_multicol_t10": {},
"fk_multicol_t11": {},
"fk_multicol_t12": {},
"fk_multicol_t13": {},
"fk_multicol_t15": {},
"fk_multicol_t16": {},
"fk_multicol_t17": {},
"fk_multicol_t18": {},
"fk_multicol_t19": {}
}
}
26 changes: 24 additions & 2 deletions go/test/endtoend/vtgate/foreignkey/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"database/sql"
"fmt"
"math/rand"
"slices"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -75,7 +76,7 @@ func convertIntValueToString(value int) string {

// waitForSchemaTrackingForFkTables waits for schema tracking to have run and seen the tables used
// for foreign key tests.
func waitForSchemaTrackingForFkTables(t *testing.T) {
func waitForSchemaTrackingForFkTables(t testing.TB) {
err := utils.WaitForColumn(t, clusterInstance.VtgateProcess, shardedKs, "fk_t1", "col")
require.NoError(t, err)
err = utils.WaitForColumn(t, clusterInstance.VtgateProcess, shardedKs, "fk_t18", "col")
Expand All @@ -88,6 +89,8 @@ func waitForSchemaTrackingForFkTables(t *testing.T) {
require.NoError(t, err)
err = utils.WaitForColumn(t, clusterInstance.VtgateProcess, unshardedKs, "fk_t11", "col")
require.NoError(t, err)
err = utils.WaitForColumn(t, clusterInstance.VtgateProcess, unshardedUnmanagedKs, "fk_t11", "col")
require.NoError(t, err)
}

// getReplicaTablets gets all the replica tablets.
Expand Down Expand Up @@ -238,7 +241,7 @@ func verifyDataIsCorrect(t *testing.T, mcmp utils.MySQLCompare, concurrency int)
}

// verifyDataMatches verifies that the two list of results are the same.
func verifyDataMatches(t *testing.T, resOne []*sqltypes.Result, resTwo []*sqltypes.Result) {
func verifyDataMatches(t testing.TB, resOne []*sqltypes.Result, resTwo []*sqltypes.Result) {
require.EqualValues(t, len(resTwo), len(resOne), "Res 1 - %v, Res 2 - %v", resOne, resTwo)
for idx, resultOne := range resOne {
resultTwo := resTwo[idx]
Expand All @@ -256,3 +259,22 @@ func collectFkTablesState(conn *mysql.Conn) []*sqltypes.Result {
}
return tablesData
}

func validateReplication(t *testing.T) {
for _, keyspace := range clusterInstance.Keyspaces {
for _, shard := range keyspace.Shards {
for _, vttablet := range shard.Vttablets {
if vttablet.Type != "primary" {
checkReplicationHealthy(t, vttablet)
}
}
}
}
}

// compareResultRows compares the rows of the two results provided.
func compareResultRows(resOne *sqltypes.Result, resTwo *sqltypes.Result) bool {
return slices.EqualFunc(resOne.Rows, resTwo.Rows, func(a, b sqltypes.Row) bool {
return sqltypes.RowEqual(a, b)
})
}
Loading