Skip to content

Commit

Permalink
test: added test to check binlogs to contain the cascade events
Browse files Browse the repository at this point in the history
Signed-off-by: Harshit Gangal <[email protected]>
  • Loading branch information
harshit-gangal committed Sep 14, 2023
1 parent 98c754a commit 57d30a1
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 10 deletions.
128 changes: 127 additions & 1 deletion go/test/endtoend/vtgate/foreignkey/fk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,19 @@ limitations under the License.
package foreignkey

import (
"context"
"io"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/utils"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/vtgate/vtgateconn"
)

// TestInsertWithFK tests that insertions work as expected when foreign key management is enabled in Vitess.
Expand Down Expand Up @@ -101,7 +108,7 @@ func TestDeleteWithFK(t *testing.T) {
utils.AssertMatches(t, conn, `select * from u_t2`, `[[INT64(342) NULL] [INT64(19) INT64(1234)]]`)
}

// TestUpdations tests that update work as expected when foreign key management is enabled in Vitess.
// TestUpdateWithFK tests that update work as expected when foreign key management is enabled in Vitess.
func TestUpdateWithFK(t *testing.T) {
mcmp, closer := start(t)
conn := mcmp.VtConn
Expand Down Expand Up @@ -162,6 +169,125 @@ func TestUpdateWithFK(t *testing.T) {
utils.AssertMatches(t, conn, `select * from u_t3 order by id`, `[[INT64(1) INT64(12)] [INT64(32) INT64(13)]]`)
}

// TestVstreamForFKBinLog tests that dml queries with fks are written with child row first approach in the binary logs.
func TestVstreamForFKBinLog(t *testing.T) {
vtgateConn, err := cluster.DialVTGate(context.Background(), t.Name(), vtgateGrpcAddress, "fk_user", "")
require.NoError(t, err)
defer vtgateConn.Close()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

ch := make(chan *binlogdatapb.VEvent)
runVStream(t, ctx, ch, vtgateConn)

mcmp, closer := start(t)
conn := mcmp.VtConn
defer closer()
defer cancel()

utils.Exec(t, conn, `use uks`)

// insert some data.
utils.Exec(t, conn, `insert into u_t1(id, col1) values (1,2), (11,4), (111,6)`)
utils.Exec(t, conn, `insert into u_t2(id, col2) values (2,2), (22,4)`)
utils.Exec(t, conn, `insert into u_t3(id, col3) values (33,4), (333,6)`)
// drain 3 row events.
_ = drainEvents(t, ch, 3)

tcases := []struct {
query string
events int
rowEvents []string
}{{
query: `update u_t1 set col1 = 3 where id = 11`,
events: 3,
rowEvents: []string{
`table_name:"uks.u_t3" row_changes:{before:{lengths:2 lengths:1 values:"334"} after:{lengths:2 lengths:1 values:"333"}} keyspace:"uks" shard:"0" flags:3`,
`table_name:"uks.u_t2" row_changes:{before:{lengths:2 lengths:1 values:"224"} after:{lengths:2 lengths:-1 values:"22"}} keyspace:"uks" shard:"0" flags:1`,
`table_name:"uks.u_t1" row_changes:{before:{lengths:2 lengths:1 values:"114"} after:{lengths:2 lengths:1 values:"113"}} keyspace:"uks" shard:"0" flags:1`,
},
}, {
query: `update u_t1 set col1 = 5 where id = 11`,
events: 2,
rowEvents: []string{
`table_name:"uks.u_t3" row_changes:{before:{lengths:2 lengths:1 values:"333"} after:{lengths:2 lengths:1 values:"335"}} keyspace:"uks" shard:"0" flags:3`,
`table_name:"uks.u_t1" row_changes:{before:{lengths:2 lengths:1 values:"113"} after:{lengths:2 lengths:1 values:"115"}} keyspace:"uks" shard:"0" flags:1`,
},
}, {
query: `delete from u_t1 where col1 = 6`,
events: 2,
rowEvents: []string{
`table_name:"uks.u_t3" row_changes:{before:{lengths:3 lengths:1 values:"3336"}} keyspace:"uks" shard:"0" flags:1`,
`table_name:"uks.u_t1" row_changes:{before:{lengths:3 lengths:1 values:"1116"}} keyspace:"uks" shard:"0" flags:1`,
},
}, {
query: `update u_t1 set col1 = null where id = 11`,
events: 2,
rowEvents: []string{
`table_name:"uks.u_t3" row_changes:{before:{lengths:2 lengths:1 values:"335"} after:{lengths:2 lengths:-1 values:"33"}} keyspace:"uks" shard:"0" flags:3`,
`table_name:"uks.u_t1" row_changes:{before:{lengths:2 lengths:1 values:"115"} after:{lengths:2 lengths:-1 values:"11"}} keyspace:"uks" shard:"0" flags:1`,
},
}, {
query: `delete from u_t1 where id = 11`,
events: 1,
rowEvents: []string{
`table_name:"uks.u_t1" row_changes:{before:{lengths:2 lengths:-1 values:"11"}} keyspace:"uks" shard:"0" flags:1`,
},
}}
for _, tcase := range tcases {
t.Run(tcase.query, func(t *testing.T) {
utils.Exec(t, conn, tcase.query)
// drain row events.
rowEvents := drainEvents(t, ch, tcase.events)
assert.ElementsMatch(t, tcase.rowEvents, rowEvents)
})
}
}

func runVStream(t *testing.T, ctx context.Context, ch chan *binlogdatapb.VEvent, vtgateConn *vtgateconn.VTGateConn) {
vgtid := &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{
{Keyspace: unshardedKs, Shard: "0", Gtid: "current"},
}}
filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "/u.*",
}},
}
vReader, err := vtgateConn.VStream(ctx, topodatapb.TabletType_REPLICA, vgtid, filter, nil)
require.NoError(t, err)

go func() {
for {
evs, err := vReader.Recv()
if err == io.EOF || ctx.Err() != nil {
return
}
require.NoError(t, err)

for _, ev := range evs {
if ev.Type == binlogdatapb.VEventType_ROW {
ch <- ev
}
}
}
}()
}

func drainEvents(t *testing.T, ch chan *binlogdatapb.VEvent, count int) []string {
var rowEvents []string
for i := 0; i < count; i++ {
select {
case re := <-ch:
rowEvents = append(rowEvents, re.RowEvent.String())
case <-time.After(10 * time.Second):
t.Fatalf("timeout waiting for event number: %d", i+1)
}
}
return rowEvents
}

// TestFkScenarios tests the various foreign key scenarios with different constraints
// and makes sure that Vitess works with them as expected. All the tables are present in both sharded and unsharded keyspace
// and all the foreign key constraints are cross-shard ones for the sharded keyspace.
Expand Down
19 changes: 10 additions & 9 deletions go/test/endtoend/vtgate/foreignkey/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,19 @@ import (

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/test/endtoend/utils"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/utils"
)

var (
clusterInstance *cluster.LocalProcessCluster
vtParams mysql.ConnParams
mysqlParams mysql.ConnParams
shardedKs = "ks"
unshardedKs = "uks"
Cell = "test"
clusterInstance *cluster.LocalProcessCluster
vtParams mysql.ConnParams
mysqlParams mysql.ConnParams
vtgateGrpcAddress string
shardedKs = "ks"
unshardedKs = "uks"
Cell = "test"
//go:embed sharded_schema.sql
shardedSchemaSQL string

Expand Down Expand Up @@ -82,7 +82,7 @@ func TestMain(m *testing.M) {
SchemaSQL: unshardedSchemaSQL,
VSchema: unshardedVSchema,
}
err = clusterInstance.StartUnshardedKeyspace(*uKs, 0, false)
err = clusterInstance.StartUnshardedKeyspace(*uKs, 1, false)
if err != nil {
return 1
}
Expand All @@ -101,6 +101,7 @@ func TestMain(m *testing.M) {
Host: clusterInstance.Hostname,
Port: clusterInstance.VtgateMySQLPort,
}
vtgateGrpcAddress = fmt.Sprintf("%s:%d", clusterInstance.Hostname, clusterInstance.VtgateGrpcPort)

connParams, closer, err := utils.NewMySQL(clusterInstance, shardedKs, shardedSchemaSQL)
if err != nil {
Expand Down

0 comments on commit 57d30a1

Please sign in to comment.