diff --git a/go/test/endtoend/vreplication/vstreamclient_test.go b/go/test/endtoend/vreplication/vstreamclient_test.go new file mode 100644 index 00000000000..a8102ce3945 --- /dev/null +++ b/go/test/endtoend/vreplication/vstreamclient_test.go @@ -0,0 +1,158 @@ +package vreplication + +import ( + "context" + "fmt" + "reflect" + "slices" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/vt/log" + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + "vitess.io/vitess/go/vt/vstreamclient" + "vitess.io/vitess/go/vt/vtgate/vtgateconn" +) + +// Customer is the concrete type that will be built from the stream +type Customer struct { + ID int64 `vstream:"customer_id"` + Email string `vstream:"email"` + DeletedAt time.Time `vstream:"-"` +} + +// To run the tests, this currently expects the local example to be running +// ./101_initial_cluster.sh; mysql < ../common/insert_commerce_data.sql; ./201_customer_tablets.sh; ./202_move_tables.sh; ./203_switch_reads.sh; ./204_switch_writes.sh; ./205_clean_commerce.sh; ./301_customer_sharded.sh; ./302_new_shards.sh; ./303_reshard.sh; ./304_switch_reads.sh; ./305_switch_writes.sh; ./306_down_shard_0.sh; ./307_delete_shard_0.sh +func TestVStreamClient(t *testing.T) { + vc = NewVitessCluster(t, nil) + defer vc.TearDown() + + require.NotNil(t, vc) + defaultReplicas = 2 + defaultRdonly = 0 + + defaultCell := vc.Cells[vc.CellNames[0]] + vc.AddKeyspace(t, []*Cell{defaultCell}, "product", "0", initialProductVSchema, initialProductSchema, defaultReplicas, defaultRdonly, 100, nil) + verifyClusterHealth(t, vc) + insertInitialData(t) + + ctx := context.Background() + conn, err := vtgateconn.Dial(ctx, fmt.Sprintf("%s:%d", vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateGrpcPort)) + if err != nil { + log.Fatal(err) + } + defer conn.Close() + + flushCount := 0 + gotCustomers := make([]*Customer, 0) + + tables := []vstreamclient.TableConfig{{ + Keyspace: "customer", + Table: "customer", + MaxRowsPerFlush: 7, + DataType: &Customer{}, + FlushFn: func(ctx context.Context, rows []vstreamclient.Row, meta vstreamclient.FlushMeta) error { + flushCount++ + + fmt.Printf("upserting %d customers\n", len(rows)) + for i, row := range rows { + switch { + // delete event + case row.RowChange.After == nil: + customer := row.Data.(*Customer) + customer.DeletedAt = time.Now() + + gotCustomers = append(gotCustomers, customer) + fmt.Printf("deleting customer %d: %v\n", i, row) + + // insert event + case row.RowChange.Before == nil: + gotCustomers = append(gotCustomers, row.Data.(*Customer)) + fmt.Printf("inserting customer %d: %v\n", i, row) + + // update event + case row.RowChange.Before != nil: + gotCustomers = append(gotCustomers, row.Data.(*Customer)) + fmt.Printf("updating customer %d: %v\n", i, row) + } + } + + // a real implementation would do something more meaningful here. For a data warehouse type workload, + // it would probably look like streaming rows into the data warehouse, or for more complex versions, + // write newline delimited json or a parquet file to object storage, then trigger a load job. + return nil + }, + }} + + t.Run("first vstream run, should succeed", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + vstreamClient, err := vstreamclient.New(ctx, "bob", conn, tables, + vstreamclient.WithMinFlushDuration(500*time.Millisecond), + vstreamclient.WithHeartbeatSeconds(1), + vstreamclient.WithStateTable("commerce", "vstreams"), + vstreamclient.WithEventFunc(func(ctx context.Context, ev *binlogdatapb.VEvent) error { + fmt.Printf("** FIELD EVENT: %v\n", ev) + return nil + }, binlogdatapb.VEventType_FIELD), + ) + if err != nil { + t.Fatalf("failed to create VStreamClient: %v", err) + } + + err = vstreamClient.Run(ctx) + if err != nil && ctx.Err() == nil { + t.Fatalf("failed to run vstreamclient: %v", err) + } + + slices.SortFunc(gotCustomers, func(a, b *Customer) int { + return int(a.ID - b.ID) + }) + + wantCustomers := []*Customer{ + {ID: 1, Email: "alice@domain.com"}, + {ID: 2, Email: "bob@domain.com"}, + {ID: 3, Email: "charlie@domain.com"}, + {ID: 4, Email: "dan@domain.com"}, + {ID: 5, Email: "eve@domain.com"}, + } + + fmt.Printf("got %d customers | flushed %d times\n", len(gotCustomers), flushCount) + if !reflect.DeepEqual(gotCustomers, wantCustomers) { + t.Fatalf("got %d customers, want %d", len(gotCustomers), len(wantCustomers)) + } + }) + + // this should fail because we're going to restart the stream, but with an additional table + t.Run("second vstream run, should fail", func(t *testing.T) { + withAdditionalTable := append(tables, vstreamclient.TableConfig{ + Keyspace: "customer", + Table: "corder", + DataType: &Customer{}, + }) + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + _, err := vstreamclient.New(ctx, "bob", conn, withAdditionalTable, + vstreamclient.WithStateTable("commerce", "vstreams"), + ) + if err == nil { + t.Fatalf("expected VStreamClient error, got nil") + } else if err.Error() != "vstreamclient: provided tables do not match stored tables" { + t.Fatalf("expected error 'vstreamclient: provided tables do not match stored tables', got '%v'", err) + } + }) +} + +func getConn(t *testing.T, ctx context.Context) *vtgateconn.VTGateConn { + t.Helper() + conn, err := vtgateconn.Dial(ctx, "localhost:15991") + if err != nil { + t.Fatal(err) + } + return conn +}