Skip to content

Commit

Permalink
add support for vtgate traffic mirroring (query serving)
Browse files Browse the repository at this point in the history
Signed-off-by: Max Englander <[email protected]>
  • Loading branch information
maxenglander committed Jun 24, 2024
1 parent 446445c commit 2a786e5
Show file tree
Hide file tree
Showing 38 changed files with 2,631 additions and 100 deletions.
14 changes: 14 additions & 0 deletions go/test/vschemawrapper/vschema_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,3 +342,17 @@ func (vw *VSchemaWrapper) FindRoutedShard(keyspace, shard string) (string, error
func (vw *VSchemaWrapper) IsViewsEnabled() bool {
return vw.EnableViews
}

// FindMirrorRule finds the mirror rule for the requested keyspace, table
// name, and the tablet type in the VSchema.
func (vs *VSchemaWrapper) FindMirrorRule(tab sqlparser.TableName) (*vindexes.MirrorRule, string, topodatapb.TabletType, key.Destination, error) {
destKeyspace, destTabletType, dest, err := topoproto.ParseDestination(tab.Qualifier.String(), topodatapb.TabletType_PRIMARY)
if err != nil {
return nil, "", destTabletType, nil, err
}
mirrorRule, err := vs.V.FindMirrorRule(destKeyspace, tab.Name.String(), destTabletType)
if err != nil {
return nil, "", destTabletType, nil, err
}
return mirrorRule, destKeyspace, destTabletType, dest, err
}
1 change: 0 additions & 1 deletion go/vt/vtgate/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ func init() {
fmt.Fprintf(buf, " and v%d = '%d%s'", i, i, baseval.String())
}
benchQuery = buf.String()
// fmt.Printf("len: %d\n", len(benchQuery))
}

func BenchmarkWithNormalizer(b *testing.B) {
Expand Down
72 changes: 72 additions & 0 deletions go/vt/vtgate/endtoend/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,18 @@ var (
Type: sqltypes.Char,
}},
},
"mirror_tbl1": {
ColumnVindexes: []*vschemapb.ColumnVindex{{
Column: "id",
Name: "hash",
}},
},
"mirror_tbl2": {
ColumnVindexes: []*vschemapb.ColumnVindex{{
Column: "id",
Name: "hash",
}},
},
},
}

Expand All @@ -175,6 +187,11 @@ create table t1_copy_all_ks2(
id2 bigint,
primary key(id1)
) Engine=InnoDB;
create table mirror_tbl1(
id bigint not null,
primary key(id)
) Engine=InnoDB;
`

vschema2 = &vschemapb.Keyspace{
Expand All @@ -191,6 +208,48 @@ create table t1_copy_all_ks2(
Name: "hash",
}},
},
"mirror_tbl1": {
ColumnVindexes: []*vschemapb.ColumnVindex{{
Column: "id",
Name: "hash",
}},
},
},
}

schema3 = `
create table t1_copy_all_ks3(
id1 bigint,
id2 bigint,
primary key(id1)
) Engine=InnoDB;
create table mirror_tbl2(
id bigint not null,
primary key(id)
) Engine=InnoDB;
`

vschema3 = &vschemapb.Keyspace{
Sharded: true,
Vindexes: map[string]*vschemapb.Vindex{
"hash": {
Type: "hash",
},
},
Tables: map[string]*vschemapb.Table{
"t1_copy_all_ks3": {
ColumnVindexes: []*vschemapb.ColumnVindex{{
Column: "id1",
Name: "hash",
}},
},
"mirror_tbl2": {
ColumnVindexes: []*vschemapb.ColumnVindex{{
Column: "id",
Name: "hash",
}},
},
},
}
)
Expand Down Expand Up @@ -218,6 +277,14 @@ func TestMain(m *testing.M) {
Name: "80-",
}},
},
{
Name: "ks3",
Shards: []*vttestpb.Shard{{
Name: "-80",
}, {
Name: "80-",
}},
},
},
}
if err := cfg.InitSchemas("ks", Schema, vschema); err != nil {
Expand All @@ -231,6 +298,11 @@ func TestMain(m *testing.M) {
os.RemoveAll(cfg.SchemaDir)
return 1
}
if err := cfg.InitSchemas("ks3", schema3, vschema3); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.RemoveAll(cfg.SchemaDir)
return 1
}

cluster = &vttest.LocalCluster{
Config: cfg,
Expand Down
180 changes: 180 additions & 0 deletions go/vt/vtgate/endtoend/mirror_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
package endtoend

import (
"bytes"
"context"
"fmt"
"math/rand"
osExec "os/exec"
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql"
)

var mirrorInitOnce sync.Once

func BenchmarkMirror(b *testing.B) {
const numRows = 10000

ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
if err != nil {
b.Fatal(err)
}
defer conn.Close()

var query bytes.Buffer

mirrorInitOnce.Do(func() {
b.Logf("seeding database for benchmark...")

for i := 0; i < numRows; i++ {
query.Reset()
query.WriteString(fmt.Sprintf("INSERT INTO ks.mirror_tbl1(id) VALUES(%d)", i))
_, err = conn.ExecuteFetch(query.String(), -1, false)
if err != nil {
b.Fatal(err)
}

query.Reset()
query.WriteString(fmt.Sprintf("INSERT INTO ks2.mirror_tbl1(id) VALUES(%d)", i))
_, err = conn.ExecuteFetch(query.String(), -1, false)
if err != nil {
b.Fatal(err)
}

query.Reset()
query.WriteString(fmt.Sprintf("INSERT INTO ks.mirror_tbl2(id) VALUES(%d)", i))
_, err = conn.ExecuteFetch(query.String(), -1, false)
if err != nil {
b.Fatal(err)
}

query.Reset()
query.WriteString(fmt.Sprintf("INSERT INTO ks3.mirror_tbl2(id) VALUES(%d)", i))
_, err = conn.ExecuteFetch(query.String(), -1, false)
if err != nil {
b.Fatal(err)
}
}

b.Logf("finished (inserted %d rows)", numRows)
})

testCases := []struct {
name string
run func(*testing.B, *rand.Rand)
}{
{
name: "point select, { ks => ks1 }.tbl1",
run: func(b *testing.B, rnd *rand.Rand) {
for i := 0; i < b.N; i++ {
id := rnd.Intn(numRows)
query.Reset()
_, _ = fmt.Fprintf(
&query,
"SELECT t1.id, t2.id FROM ks.mirror_tbl1 AS t1, ks.mirror_tbl2 AS t2 WHERE t1.id = %d AND t2.id = %d",
id, id,
)
_, err := conn.ExecuteFetch(query.String(), 1, false)
if err != nil {
b.Error(err)
}
}
},
},
{
name: "point select, { ks => ks2 }.tbl1, { ks => ks3 }.tbl2",
run: func(b *testing.B, rnd *rand.Rand) {
for i := 0; i < b.N; i++ {
id := rnd.Intn(numRows)
query.Reset()
_, _ = fmt.Fprintf(
&query,
"SELECT t1.id, t2.id FROM ks.mirror_tbl1 AS t1, ks.mirror_tbl2 AS t2 WHERE t1.id = %d AND t2.id = %d",
id, id,
)
_, err := conn.ExecuteFetch(query.String(), 1, false)
if err != nil {
b.Error(err)
}
}
},
},
}

// Each time this BenchmarkMirror runs, use a different source of
// random-ness. But use the same source of randomness across test cases and
// mirror percentages sub-tests.
randSeed := time.Now().UnixNano()

for _, tc := range testCases {
b.Run(tc.name, func(b *testing.B) {
b.Run("mirror 0%", func(b *testing.B) {
mirrorTraffic(b, 0)
b.ResetTimer()
tc.run(b, rand.New(rand.NewSource(randSeed)))
})

b.Run("mirror 1%", func(b *testing.B) {
mirrorTraffic(b, 1)
b.ResetTimer()
tc.run(b, rand.New(rand.NewSource(randSeed)))
})

b.Run("mirror 5%", func(b *testing.B) {
mirrorTraffic(b, 5)
b.ResetTimer()
tc.run(b, rand.New(rand.NewSource(randSeed)))
})

b.Run("mirror 10%", func(b *testing.B) {
mirrorTraffic(b, 10)
b.ResetTimer()
tc.run(b, rand.New(rand.NewSource(randSeed)))
})

b.Run("mirror 25%", func(b *testing.B) {
mirrorTraffic(b, 25)
b.ResetTimer()
tc.run(b, rand.New(rand.NewSource(randSeed)))
})

b.Run("mirror 50%", func(b *testing.B) {
mirrorTraffic(b, 50)
b.ResetTimer()
tc.run(b, rand.New(rand.NewSource(randSeed)))
})

b.Run("mirror 100%", func(b *testing.B) {
mirrorTraffic(b, 100)
b.ResetTimer()
tc.run(b, rand.New(rand.NewSource(randSeed)))
})
})
}
}

func mirrorTraffic(b *testing.B, percent float32) {
server := fmt.Sprintf("localhost:%v", cluster.VTProcess().PortGrpc)
rules := fmt.Sprintf(`{
"rules": [
{
"from_table": "ks.mirror_tbl1",
"to_table": "ks2.mirror_tbl1",
"percent": %f
},
{
"from_table": "ks.mirror_tbl2",
"to_table": "ks3.mirror_tbl2",
"percent": %f
}
]
}`, percent, percent)
_, err := osExec.Command("vtctldclient", "--server", server, "ApplyMirrorRules", "--rules", rules).CombinedOutput()
require.NoError(b, err)
}
12 changes: 11 additions & 1 deletion go/vt/vtgate/endtoend/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,14 @@ create table oltp_test(
c char(120) default '' not null,
pad char(60) default '' not null,
primary key (id)
) Engine=InnoDB;
) Engine=InnoDB;

create table mirror_tbl1(
id bigint not null,
primary key(id)
) Engine=InnoDB;

create table mirror_tbl2(
id bigint not null,
primary key(id)
) Engine=InnoDB;
10 changes: 9 additions & 1 deletion go/vt/vtgate/endtoend/vstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,11 @@ func TestVStreamCopyUnspecifiedShardGtid(t *testing.T) {
require.NoError(t, err)
}

_, err = conn.ExecuteFetch("insert into t1_copy_all_ks3(id1,id2) values(30,30), (40,40)", 1, false)
if err != nil {
require.NoError(t, err)
}

filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "/t1_copy_all.*/",
Expand All @@ -303,6 +308,7 @@ func TestVStreamCopyUnspecifiedShardGtid(t *testing.T) {
// copy phase operations in the vstream.
expectedKs1EventNum := 2 /* num shards */ * (9 /* begin/field/vgtid:pos/4 rowevents avg/vgitd: lastpk/commit) */ + 3 /* begin/vgtid/commit for completed table */ + 1 /* copy operation completed */)
expectedKs2EventNum := 2 /* num shards */ * (6 /* begin/field/vgtid:pos/1 rowevents avg/vgitd: lastpk/commit) */ + 3 /* begin/vgtid/commit for completed table */ + 1 /* copy operation completed */)
expectedKs3EventNum := 2 /* num shards */ * (6 /* begin/field/vgtid:pos/1 rowevents avg/vgitd: lastpk/commit) */ + 3 /* begin/vgtid/commit for completed table */ + 1 /* copy operation completed */)
expectedFullyCopyCompletedNum := 1

cases := []struct {
Expand All @@ -316,12 +322,14 @@ func TestVStreamCopyUnspecifiedShardGtid(t *testing.T) {
shardGtid: &binlogdatapb.ShardGtid{
Keyspace: "/.*",
},
expectedEventNum: expectedKs1EventNum + expectedKs2EventNum + expectedFullyCopyCompletedNum,
expectedEventNum: expectedKs1EventNum + expectedKs2EventNum + expectedKs3EventNum + expectedFullyCopyCompletedNum,
expectedCompletedEvents: []string{
`type:COPY_COMPLETED keyspace:"ks" shard:"-80"`,
`type:COPY_COMPLETED keyspace:"ks" shard:"80-"`,
`type:COPY_COMPLETED keyspace:"ks2" shard:"-80"`,
`type:COPY_COMPLETED keyspace:"ks2" shard:"80-"`,
`type:COPY_COMPLETED keyspace:"ks3" shard:"-80"`,
`type:COPY_COMPLETED keyspace:"ks3" shard:"80-"`,
`type:COPY_COMPLETED`,
},
},
Expand Down
Loading

0 comments on commit 2a786e5

Please sign in to comment.