Skip to content

Commit

Permalink
VReplication: Move ENUM and SET mappings from vplayer to vstreamer (#…
Browse files Browse the repository at this point in the history
…15723)

Signed-off-by: Matt Lord <[email protected]>
Signed-off-by: Shlomi Noach <[email protected]>
Co-authored-by: Shlomi Noach <[email protected]>
  • Loading branch information
mattlord and shlomi-noach authored May 9, 2024
1 parent f86f131 commit 0353ad4
Show file tree
Hide file tree
Showing 25 changed files with 776 additions and 409 deletions.
68 changes: 68 additions & 0 deletions changelog/20.0/20.0.0/summary.md

Large diffs are not rendered by default.

28 changes: 17 additions & 11 deletions go/test/endtoend/vreplication/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ limitations under the License.

package vreplication

import (
"fmt"
"strings"
)

// The product, customer, Lead, Lead-1 tables are used to exercise and test most Workflow variants.
// We violate the NO_ZERO_DATES and NO_ZERO_IN_DATE sql_modes that are enabled by default in
// MySQL 5.7+ and MariaDB 10.2+ to ensure that vreplication still works everywhere and the
Expand All @@ -40,9 +45,10 @@ package vreplication
// default collation as it has to work across versions and the 8.0 default does not exist in 5.7.
var (
// All standard user tables should have a primary key and at least one secondary key.
initialProductSchema = `
customerTypes = []string{"'individual'", "'soho'", "'enterprise'"}
initialProductSchema = fmt.Sprintf(`
create table product(pid int, description varbinary(128), date1 datetime not null default '0000-00-00 00:00:00', date2 datetime not null default '2021-00-01 00:00:00', primary key(pid), key(date1,date2)) CHARSET=utf8mb4;
create table customer(cid int auto_increment, name varchar(128) collate utf8mb4_bin, meta json default null, typ enum('individual','soho','enterprise'), sport set('football','cricket','baseball'),
create table customer(cid int auto_increment, name varchar(128) collate utf8mb4_bin, meta json default null, typ enum(%s), sport set('football','cricket','baseball'),
ts timestamp not null default current_timestamp, bits bit(2) default b'11', date1 datetime not null default '0000-00-00 00:00:00',
date2 datetime not null default '2021-00-01 00:00:00', dec80 decimal(8,0), blb blob, primary key(cid,typ), key(name)) CHARSET=utf8mb4;
create table customer_seq(id int, next_id bigint, cache bigint, primary key(id)) comment 'vitess_sequence';
Expand All @@ -51,19 +57,19 @@ create table orders(oid int, cid int, pid int, mname varchar(128), price int, qt
create table order_seq(id int, next_id bigint, cache bigint, primary key(id)) comment 'vitess_sequence';
create table customer2(cid int, name varchar(128), typ enum('individual','soho','enterprise'), sport set('football','cricket','baseball'),ts timestamp not null default current_timestamp, primary key(cid), key(ts)) CHARSET=utf8;
create table customer_seq2(id int, next_id bigint, cache bigint, primary key(id)) comment 'vitess_sequence';
create table ` + "`Lead`(`Lead-id`" + ` binary(16), name varbinary(16), date1 datetime not null default '0000-00-00 00:00:00', date2 datetime not null default '2021-00-01 00:00:00', primary key (` + "`Lead-id`" + `), key (date1));
create table ` + "`Lead-1`(`Lead`" + ` binary(16), name varbinary(16), date1 datetime not null default '0000-00-00 00:00:00', date2 datetime not null default '2021-00-01 00:00:00', primary key (` + "`Lead`" + `), key (date2));
create table `+"`Lead`(`Lead-id`"+` binary(16), name varbinary(16), date1 datetime not null default '0000-00-00 00:00:00', date2 datetime not null default '2021-00-01 00:00:00', primary key (`+"`Lead-id`"+`), key (date1));
create table `+"`Lead-1`(`Lead`"+` binary(16), name varbinary(16), date1 datetime not null default '0000-00-00 00:00:00', date2 datetime not null default '2021-00-01 00:00:00', primary key (`+"`Lead`"+`), key (date2));
create table _vt_PURGE_4f9194b43b2011eb8a0104ed332e05c2_20221210194431(id int, val varbinary(128), primary key(id), key(val));
create table db_order_test (c_uuid varchar(64) not null default '', created_at datetime not null, dstuff varchar(128), dtstuff text, dbstuff blob, cstuff char(32), primary key (c_uuid,created_at), key (dstuff)) CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
create table vdiff_order (order_id varchar(50) collate utf8mb4_unicode_ci not null, primary key (order_id), key (order_id)) charset=utf8mb4 COLLATE=utf8mb4_unicode_ci;
create table datze (id int, dt1 datetime not null default current_timestamp, dt2 datetime not null, ts1 timestamp default current_timestamp, primary key (id), key (dt1));
create table json_tbl (id int, j1 json, j2 json, j3 json not null, primary key(id));
create table geom_tbl (id int, g geometry, p point, ls linestring, pg polygon, mp multipoint, mls multilinestring, mpg multipolygon, gc geometrycollection, primary key(id));
create table ` + "`blüb_tbl`" + ` (id int, val1 varchar(20), ` + "`blöb1`" + ` blob, val2 varbinary(20), ` + "`bl@b2`" + ` longblob, txt1 text, blb3 tinyblob, txt2 longtext, blb4 mediumblob, primary key(id));
create table `+"`blüb_tbl`"+` (id int, val1 varchar(20), `+"`blöb1`"+` blob, val2 varbinary(20), `+"`bl@b2`"+` longblob, txt1 text, blb3 tinyblob, txt2 longtext, blb4 mediumblob, primary key(id));
create table reftable (id int, val1 varchar(20), primary key(id), key(val1));
create table loadtest (id int, name varchar(256), primary key(id), key(name));
create table nopk (name varchar(128), age int unsigned);
`
`, strings.Join(customerTypes, ","))
// These should always be ignored in vreplication
internalSchema = `
create table _1e275eef_3b20_11eb_a38f_04ed332e05c2_20201210204529_gho(id int, val varbinary(128), primary key(id));
Expand Down Expand Up @@ -152,7 +158,7 @@ create table nopk (name varchar(128), age int unsigned);
}
]
},
"customer_type": {
"enterprise_customer": {
"column_vindexes": [
{
"column": "cid",
Expand Down Expand Up @@ -434,13 +440,13 @@ create table nopk (name varchar(128), age int unsigned);

materializeCustomerTypeSpec = `
{
"workflow": "customer_type",
"workflow": "enterprise_customer",
"source_keyspace": "customer",
"target_keyspace": "customer",
"table_settings": [{
"target_table": "customer_type",
"source_expression": "select cid, typ from customer",
"create_ddl": "create table if not exists customer_type (cid bigint not null, typ enum('individual','soho','enterprise'), primary key(cid), key(typ))"
"target_table": "enterprise_customer",
"source_expression": "select cid, name, typ from customer where typ = 'enterprise'",
"create_ddl": "create table if not exists enterprise_customer (cid bigint not null, name varchar(128), typ varchar(64), primary key(cid), key(typ))"
}]
}
`
Expand Down
29 changes: 16 additions & 13 deletions go/test/endtoend/vreplication/resharding_workflows_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"encoding/json"
"fmt"
"math/rand/v2"
"net"
"strconv"
"strings"
Expand Down Expand Up @@ -539,7 +540,7 @@ func testReshardV2Workflow(t *testing.T) {

// Generate customer records in the background for the rest of the test
// in order to confirm that no writes are lost in either the customer
// table or the customer_name and customer_type materializations
// table or the customer_name and enterprise_customer materializations
// against it during the Reshard and all of the traffic switches.
dataGenCtx, dataGenCancel := context.WithCancel(context.Background())
defer dataGenCancel()
Expand All @@ -555,7 +556,9 @@ func testReshardV2Workflow(t *testing.T) {
case <-dataGenCtx.Done():
return
default:
_ = execVtgateQuery(t, dataGenConn, "customer", fmt.Sprintf("insert into customer (cid, name) values (%d, 'tempCustomer%d')", id, id))
// Use a random customer type for each record.
_ = execVtgateQuery(t, dataGenConn, "customer", fmt.Sprintf("insert into customer (cid, name, typ) values (%d, 'tempCustomer%d', %s)",
id, id, customerTypes[rand.IntN(len(customerTypes))]))
}
time.Sleep(1 * time.Millisecond)
id++
Expand Down Expand Up @@ -591,17 +594,17 @@ func testReshardV2Workflow(t *testing.T) {
cnres := execVtgateQuery(t, dataGenConn, "customer", "select count(*) from customer_name")
require.Len(t, cnres.Rows, 1)
require.EqualValues(t, cres.Rows, cnres.Rows)
waitForNoWorkflowLag(t, vc, "customer", "customer_type")
ctres := execVtgateQuery(t, dataGenConn, "customer", "select count(*) from customer_type")
require.Len(t, ctres.Rows, 1)
require.EqualValues(t, cres.Rows, ctres.Rows)
if debugMode {
t.Logf("Done inserting customer data. Record counts in customer: %s, customer_name: %s, customer_type: %s",
cres.Rows[0][0].ToString(), cnres.Rows[0][0].ToString(), ctres.Rows[0][0].ToString())
// We expect the row count to differ in enterprise_customer because it is
// using a `where typ='enterprise'` filter. So the count is only for debug
// info.
ecres := execVtgateQuery(t, dataGenConn, "customer", "select count(*) from enterprise_customer")
t.Logf("Done inserting customer data. Record counts in customer: %s, customer_name: %s, enterprise_customer: %s",
cres.Rows[0][0].ToString(), cnres.Rows[0][0].ToString(), ecres.Rows[0][0].ToString())
}
// We also do a vdiff on the materialize workflows for good measure.
doVtctldclientVDiff(t, "customer", "customer_name", "", nil)
doVtctldclientVDiff(t, "customer", "customer_type", "", nil)
doVtctldclientVDiff(t, "customer", "enterprise_customer", "", nil)
}

func testMoveTablesV2Workflow(t *testing.T) {
Expand Down Expand Up @@ -669,7 +672,7 @@ func testMoveTablesV2Workflow(t *testing.T) {

output, err = vc.VtctldClient.ExecuteCommandWithOutput(listAllArgs...)
require.NoError(t, err)
require.True(t, listOutputContainsWorkflow(output, "customer_name") && listOutputContainsWorkflow(output, "customer_type") && !listOutputContainsWorkflow(output, "wf1"))
require.True(t, listOutputContainsWorkflow(output, "customer_name") && listOutputContainsWorkflow(output, "enterprise_customer") && !listOutputContainsWorkflow(output, "wf1"))

testVSchemaForSequenceAfterMoveTables(t)

Expand All @@ -684,14 +687,14 @@ func testMoveTablesV2Workflow(t *testing.T) {
createMoveTablesWorkflow(t, "Lead,Lead-1")
output, err = vc.VtctldClient.ExecuteCommandWithOutput(listAllArgs...)
require.NoError(t, err)
require.True(t, listOutputContainsWorkflow(output, "wf1") && listOutputContainsWorkflow(output, "customer_name") && listOutputContainsWorkflow(output, "customer_type"))
require.True(t, listOutputContainsWorkflow(output, "wf1") && listOutputContainsWorkflow(output, "customer_name") && listOutputContainsWorkflow(output, "enterprise_customer"))

err = tstWorkflowCancel(t)
require.NoError(t, err)

output, err = vc.VtctldClient.ExecuteCommandWithOutput(listAllArgs...)
require.NoError(t, err)
require.True(t, listOutputContainsWorkflow(output, "customer_name") && listOutputContainsWorkflow(output, "customer_type") && !listOutputContainsWorkflow(output, "wf1"))
require.True(t, listOutputContainsWorkflow(output, "customer_name") && listOutputContainsWorkflow(output, "enterprise_customer") && !listOutputContainsWorkflow(output, "wf1"))
}

func testPartialSwitches(t *testing.T) {
Expand Down Expand Up @@ -812,7 +815,7 @@ func testRestOfWorkflow(t *testing.T) {
// fully switch and complete
waitForLowLag(t, "customer", "wf1")
waitForLowLag(t, "customer", "customer_name")
waitForLowLag(t, "customer", "customer_type")
waitForLowLag(t, "customer", "enterprise_customer")
tstWorkflowSwitchReadsAndWrites(t)
validateReadsRoute(t, "rdonly", targetRdonlyTab1)
validateReadsRouteToTarget(t, "replica")
Expand Down
Loading

0 comments on commit 0353ad4

Please sign in to comment.