Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for vtgate traffic mirroring (queryserving) #15992

Merged
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
79983de
add support for vtgate traffic mirroring (topo, workflow)
maxenglander May 22, 2024
c9430d0
cr: rename var, extract common test code to helper fn
maxenglander May 28, 2024
02113ff
merge <- main
maxenglander Jun 18, 2024
a9dbf34
update tests
maxenglander Jun 18, 2024
a0cffe5
Update go/cmd/vtctldclient/command/vreplication/common/mirrortraffic.go
maxenglander Jun 21, 2024
859b421
Update go/cmd/vtctldclient/command/vreplication/common/mirrortraffic.go
maxenglander Jun 21, 2024
71b9a00
cr: test repeat calls to mirror traffic
maxenglander Jun 21, 2024
446445c
cr: remove ApplyMirrorRules vtctldclient command
maxenglander Jun 24, 2024
2a786e5
add support for vtgate traffic mirroring (query serving)
maxenglander Jun 21, 2024
4007bfe
merge <- main
maxenglander Jul 1, 2024
8297086
cr: require.Error, ctx with timeout, license, lightweight
maxenglander Jul 13, 2024
ec7d174
cr: go/vt/vtgate/endtoend => go/test/endtoend/vtgate
maxenglander Jul 13, 2024
fb1d605
merge <- main
maxenglander Jul 13, 2024
099c6ec
rm vtg/mirror test
maxenglander Jul 13, 2024
6f5829b
fix test
maxenglander Jul 13, 2024
2cd6230
checkpoint
maxenglander Jul 14, 2024
0666e68
revert unintended changes
maxenglander Jul 15, 2024
c8ada40
cr: move mirror planning into operators
maxenglander Jul 17, 2024
8bbae3f
cr: require.NoError
maxenglander Jul 17, 2024
4177ba8
cr: rm unintentional space
maxenglander Jul 17, 2024
a7c3da2
revert unintended changes
maxenglander Jul 17, 2024
bfaadf9
cleanup
maxenglander Jul 17, 2024
4c01614
test: add TestOneMirror
systay Jul 17, 2024
6e7b063
merge <- main
maxenglander Jul 29, 2024
a330972
cr: use vcursor.Execute, rm explicit mirrorCtxCancel, fix mirror case…
maxenglander Aug 8, 2024
bb0cbc1
cr: panic(vterrors), mirror cases percentages, zero-percentage mirror…
maxenglander Aug 8, 2024
7d1cdaf
cr: rm mirror_vschema.go
maxenglander Aug 9, 2024
5b03958
cr: check inputs len
maxenglander Aug 9, 2024
531869a
merge <- main
maxenglander Aug 13, 2024
ec1994a
cr: command semantics pkg mirrorInfo()
maxenglander Aug 13, 2024
9239ab7
Apply suggestions from code review
maxenglander Aug 13, 2024
83dce0c
fix
maxenglander Aug 13, 2024
43b8c1a
merge <- main
maxenglander Aug 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 183 additions & 3 deletions go/test/endtoend/vtgate/queries/benchmark/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,20 @@ package dml

import (
"fmt"
"maps"
"math/rand/v2"
"strconv"
"strings"
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"
mapsx "golang.org/x/exp/maps"
"google.golang.org/protobuf/encoding/protojson"

"vitess.io/vitess/go/test/endtoend/utils"
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
)

type testQuery struct {
Expand Down Expand Up @@ -127,7 +135,7 @@ func BenchmarkShardedTblNoLookup(b *testing.B) {
}
for _, rows := range []int{1, 10, 100, 500, 1000, 5000, 10000} {
insStmt := tq.getInsertQuery(rows)
b.Run(fmt.Sprintf("16-shards-%d-rows", rows), func(b *testing.B) {
b.Run(fmt.Sprintf("4-shards-%d-rows", rows), func(b *testing.B) {
harshit-gangal marked this conversation as resolved.
Show resolved Hide resolved
for i := 0; i < b.N; i++ {
_ = utils.Exec(b, conn, insStmt)
}
Expand All @@ -150,7 +158,7 @@ func BenchmarkShardedTblUpdateIn(b *testing.B) {
_ = utils.Exec(b, conn, insStmt)
for _, rows := range []int{1, 10, 100, 500, 1000, 5000, 10000} {
updStmt := tq.getUpdateQuery(rows)
b.Run(fmt.Sprintf("16-shards-%d-rows", rows), func(b *testing.B) {
b.Run(fmt.Sprintf("4-shards-%d-rows", rows), func(b *testing.B) {
for i := 0; i < b.N; i++ {
_ = utils.Exec(b, conn, updStmt)
}
Expand All @@ -168,7 +176,7 @@ func BenchmarkShardedTblDeleteIn(b *testing.B) {
insStmt := tq.getInsertQuery(rows)
_ = utils.Exec(b, conn, insStmt)
delStmt := tq.getDeleteQuery(rows)
b.Run(fmt.Sprintf("16-shards-%d-rows", rows), func(b *testing.B) {
b.Run(fmt.Sprintf("4-shards-%d-rows", rows), func(b *testing.B) {
for i := 0; i < b.N; i++ {
_ = utils.Exec(b, conn, delStmt)
}
Expand Down Expand Up @@ -197,3 +205,175 @@ func BenchmarkShardedAggrPushDown(b *testing.B) {
}
}
}

var mirrorInitOnce sync.Once

func BenchmarkMirror(b *testing.B) {
const numRows = 10000

conn, closer := start(b)
defer closer()

// Each time this BenchmarkMirror runs, use a different source of
// randomness. But use the same source of randomness across test cases and
// mirror percentages sub test cases.
pcg := rand.NewPCG(rand.Uint64(), rand.Uint64())

ksTables := map[string]string{
sKs2: "mirror_tbl1",
sKs3: "mirror_tbl2",
}
targetKeyspaces := mapsx.Keys(ksTables)

mirrorInitOnce.Do(func() {
b.Logf("seeding database for benchmark...")

for i := 0; i < numRows; i++ {
_, err := conn.ExecuteFetch(
fmt.Sprintf("INSERT INTO %s.mirror_tbl1(id) VALUES(%d)", sKs1, i), -1, false)
require.NoError(b, err)

_, err = conn.ExecuteFetch(
fmt.Sprintf("INSERT INTO %s.mirror_tbl2(id) VALUES(%d)", sKs1, i), -1, false)
require.NoError(b, err)
}

_, err := conn.ExecuteFetch(
fmt.Sprintf("SELECT COUNT(id) FROM %s.%s", sKs1, "mirror_tbl1"), 1, false)
require.NoError(b, err)

b.Logf("finished (inserted %d rows)", numRows)

b.Logf("using MoveTables to copy data from source keyspace to target keyspaces")

// Set up MoveTables workflows, which is (at present) the only way to set up
// mirror rules.
for tks, tbl := range ksTables {
output, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput(
"MoveTables", "--target-keyspace", tks, "--workflow", fmt.Sprintf("%s2%s", sKs1, tks),
"create", "--source-keyspace", sKs1, "--tables", tbl)
require.NoError(b, err, output)
}

// Wait for tables to be copied from source to targets.
pending := make(map[string]string, len(ksTables))
maps.Copy(pending, ksTables)
for len(pending) > 0 {
for tks := range ksTables {
output, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput(
"Workflow", "--keyspace", tks, "show", "--workflow", fmt.Sprintf("%s2%s", sKs1, tks))
require.NoError(b, err, output)

var response vtctldatapb.GetWorkflowsResponse
require.NoError(b, protojson.Unmarshal([]byte(output), &response))

require.Len(b, response.Workflows, 1)
workflow := response.Workflows[0]

require.Len(b, workflow.ShardStreams, 4 /*shards*/)
for _, ss := range workflow.ShardStreams {
for _, s := range ss.Streams {
if s.State == "Running" {
delete(pending, tks)
} else {
b.Logf("waiting for workflow %s.%s stream %s=>%s to be running; last state: %s",
workflow.Target, workflow.Name, s.BinlogSource.Shard, s.Shard, s.State)
time.Sleep(1 * time.Second)
}
}
}
}
}
})

testCases := []struct {
name string
run func(*testing.B, *rand.Rand)
}{
{
name: "point select, { sks1 => sks2 }.mirror_tbl1",
run: func(b *testing.B, rnd *rand.Rand) {
for i := 0; i < b.N; i++ {
id := rnd.Int32N(numRows)
_, err := conn.ExecuteFetch(fmt.Sprintf(
"SELECT t1.id FROM %s.mirror_tbl1 AS t1 WHERE t1.id = %d",
sKs1, id,
), 1, false)
if err != nil {
b.Error(err)
}
}
},
},
{
name: "point select, { sks1 => sks2 }.mirror_tbl1, { sks1 => sks3 }.mirror_tbl2",
run: func(b *testing.B, rnd *rand.Rand) {
for i := 0; i < b.N; i++ {
id := rnd.Int32N(numRows)
_, err := conn.ExecuteFetch(fmt.Sprintf(
"SELECT t1.id, t2.id FROM %s.mirror_tbl1 AS t1, %s.mirror_tbl2 AS t2 WHERE t1.id = %d AND t2.id = %d",
sKs1, sKs1, id, id,
), 1, false)
if err != nil {
b.Error(err)
}
}
},
},
}

for _, tc := range testCases {
b.Run(tc.name, func(b *testing.B) {
b.Run("mirror 0%", func(b *testing.B) {
mirrorTraffic(b, targetKeyspaces, 0)
b.ResetTimer()
tc.run(b, rand.New(pcg))
})

b.Run("mirror 1%", func(b *testing.B) {
mirrorTraffic(b, targetKeyspaces, 1)
b.ResetTimer()
tc.run(b, rand.New(pcg))
})

b.Run("mirror 5%", func(b *testing.B) {
mirrorTraffic(b, targetKeyspaces, 5)
b.ResetTimer()
tc.run(b, rand.New(pcg))
})

b.Run("mirror 10%", func(b *testing.B) {
mirrorTraffic(b, targetKeyspaces, 10)
b.ResetTimer()
tc.run(b, rand.New(pcg))
})

b.Run("mirror 25%", func(b *testing.B) {
mirrorTraffic(b, targetKeyspaces, 25)
b.ResetTimer()
tc.run(b, rand.New(pcg))
})

b.Run("mirror 50%", func(b *testing.B) {
mirrorTraffic(b, targetKeyspaces, 50)
b.ResetTimer()
tc.run(b, rand.New(pcg))
})

b.Run("mirror 100%", func(b *testing.B) {
mirrorTraffic(b, targetKeyspaces, 100)
b.ResetTimer()
tc.run(b, rand.New(pcg))
})
})
}
}

func mirrorTraffic(b *testing.B, targetKeyspaces []string, percent float32) {
for _, tks := range targetKeyspaces {
output, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput(
"MoveTables", "--target-keyspace", tks, "--workflow", fmt.Sprintf("%s2%s", sKs1, tks),
"mirrortraffic", "--percent", fmt.Sprintf("%.02f", percent))
require.NoError(b, err, output)
}
}
108 changes: 79 additions & 29 deletions go/test/endtoend/vtgate/queries/benchmark/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import (
"context"
_ "embed"
"flag"
"fmt"
"os"
"testing"
"time"

"github.com/stretchr/testify/require"

Expand All @@ -34,36 +36,34 @@ var (
clusterInstance *cluster.LocalProcessCluster
vtParams mysql.ConnParams
mysqlParams mysql.ConnParams
sKs = "sks"
uKs = "uks"
sKs1 = "sks1"
sKs2 = "sks2"
sKs3 = "sks3"
cell = "test"

//go:embed sharded_schema.sql
sSchemaSQL string
//go:embed sharded_schema1.sql
sSchemaSQL1 string

//go:embed vschema.json
sVSchema string
)
//go:embed vschema1.json
sVSchema1 string

var (
shards4 = []string{
"-40", "40-80", "80-c0", "c0-",
}
//go:embed sharded_schema2.sql
sSchemaSQL2 string

shards8 = []string{
"-20", "20-40", "40-60", "60-80", "80-a0", "a0-c0", "c0-e0", "e0-",
}
//go:embed vschema2.json
sVSchema2 string

shards16 = []string{
"-10", "10-20", "20-30", "30-40", "40-50", "50-60", "60-70", "70-80", "80-90", "90-a0", "a0-b0", "b0-c0", "c0-d0", "d0-e0", "e0-f0", "f0-",
}
//go:embed sharded_schema3.sql
sSchemaSQL3 string

shards32 = []string{
"-05", "05-10", "10-15", "15-20", "20-25", "25-30", "30-35", "35-40", "40-45", "45-50", "50-55", "55-60", "60-65", "65-70", "70-75", "75-80",
"80-85", "85-90", "90-95", "95-a0", "a0-a5", "a5-b0", "b0-b5", "b5-c0", "c0-c5", "c5-d0", "d0-d5", "d5-e0", "e0-e5", "e5-f0", "f0-f5", "f5-",
}
//go:embed vschema3.json
sVSchema3 string
)

var shards4 = []string{
"-40", "40-80", "80-c0", "c0-",
}

func TestMain(m *testing.M) {
defer cluster.PanicHandler(nil)
flag.Parse()
Expand All @@ -78,14 +78,38 @@ func TestMain(m *testing.M) {
return 1
}

// Start sharded keyspace
sKeyspace := &cluster.Keyspace{
Name: sKs,
SchemaSQL: sSchemaSQL,
VSchema: sVSchema,
// Start sharded keyspace 1
sKeyspace1 := &cluster.Keyspace{
Name: sKs1,
SchemaSQL: sSchemaSQL1,
VSchema: sVSchema1,
}

err = clusterInstance.StartKeyspace(*sKeyspace, shards4, 0, false)
err = clusterInstance.StartKeyspace(*sKeyspace1, shards4, 0, false)
if err != nil {
return 1
}

// Start sharded keyspace 2
sKeyspace2 := &cluster.Keyspace{
Name: sKs2,
SchemaSQL: sSchemaSQL2,
VSchema: sVSchema2,
}

err = clusterInstance.StartKeyspace(*sKeyspace2, shards4, 0, false)
if err != nil {
return 1
}

// Start sharded keyspace 3
sKeyspace3 := &cluster.Keyspace{
Name: sKs3,
SchemaSQL: sSchemaSQL3,
VSchema: sVSchema3,
}

err = clusterInstance.StartKeyspace(*sKeyspace3, shards4, 0, false)
if err != nil {
return 1
}
Expand All @@ -96,7 +120,7 @@ func TestMain(m *testing.M) {
return 1
}

vtParams = clusterInstance.GetVTParams(sKs)
vtParams = clusterInstance.GetVTParams("@primary")

return m.Run()
}()
Expand All @@ -108,12 +132,38 @@ func start(b *testing.B) (*mysql.Conn, func()) {
require.NoError(b, err)

deleteAll := func() {
tables := []string{"tbl_no_lkp_vdx"}
tables := []string{
fmt.Sprintf("%s.tbl_no_lkp_vdx", sKs1),
fmt.Sprintf("%s.mirror_tbl1", sKs1),
fmt.Sprintf("%s.mirror_tbl2", sKs1),
fmt.Sprintf("%s.mirror_tbl1", sKs2),
fmt.Sprintf("%s.mirror_tbl2", sKs3),
}
for _, table := range tables {
_, _ = utils.ExecAllowError(b, conn, "delete from "+table)
}
}

// Make sure all keyspaces are serving.
pending := map[string]string{
sKs1: "mirror_tbl1",
sKs2: "mirror_tbl1",
sKs3: "mirror_tbl2",
}
for len(pending) > 0 {
for ks, tbl := range pending {
_, err := conn.ExecuteFetch(
fmt.Sprintf("SELECT COUNT(id) FROM %s.%s", ks, tbl), 1, false)
if err != nil {
b.Logf("waiting for keyspace %s to be serving; last error: %v", ks, err)
time.Sleep(1 * time.Second)
} else {
delete(pending, ks)
}
}
}

// Delete any pre-existing data.
deleteAll()

return conn, func() {
Expand Down
Loading
Loading