Skip to content

Commit

Permalink
resolved conflict
Browse files Browse the repository at this point in the history
Signed-off-by: Shlomi Noach <[email protected]>
  • Loading branch information
shlomi-noach committed Aug 15, 2024
1 parent 93307a2 commit c8082c7
Show file tree
Hide file tree
Showing 9 changed files with 112 additions and 21 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
MODIFY `t1` varchar(128) CHARACTER SET utf8mb4 NOT NULL, MODIFY `t2` varchar(128) CHARACTER SET latin2 NOT NULL, MODIFY `tutf8` varchar(128) CHARACTER SET latin1 NOT NULL
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
drop table if exists onlineddl_test;
create table onlineddl_test (
id int auto_increment,
t1 varchar(128) charset latin1 collate latin1_swedish_ci,
t2 varchar(128) charset latin1 collate latin1_swedish_ci,
tutf8 varchar(128) charset utf8,
tutf8mb4 varchar(128) charset utf8mb4,
tlatin1 varchar(128) charset latin1 collate latin1_swedish_ci,
primary key(id)
) auto_increment=1;

insert into onlineddl_test values (null, md5(rand()), md5(rand()), md5(rand()), md5(rand()), md5(rand()));
insert into onlineddl_test values (null, 'átesting', 'átesting', 'átesting', 'átesting', 'átesting');
insert into onlineddl_test values (null, 'testátest', 'testátest', 'testátest', '🍻😀', 'átesting');
insert into onlineddl_test values (null, 'átesting-binlog', 'átesting-binlog', 'átesting-binlog', 'átesting-binlog', 'átesting-binlog');
insert into onlineddl_test values (null, 'testátest-binlog', 'testátest-binlog', 'testátest-binlog', '🍻😀', 'átesting-binlog');
insert into onlineddl_test values (null, 'átesting-bnull', 'átesting-bnull', 'átesting-bnull', null, null);

drop event if exists onlineddl_test;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
(5.5|5.6|5.7)
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
drop table if exists onlineddl_test;
create table onlineddl_test (
id varchar(128) charset latin1 collate latin1_swedish_ci,
t1 varchar(128) charset latin1 collate latin1_swedish_ci,
t2 varchar(128) charset latin1 collate latin1_swedish_ci,
tutf8 varchar(128) charset utf8,
tutf8mb4 varchar(128) charset utf8mb4,
tlatin1 varchar(128) charset latin1 collate latin1_swedish_ci,
primary key(id)
) auto_increment=1;

insert into onlineddl_test values (concat('átesting-', md5(rand())), md5(rand()), md5(rand()), md5(rand()), md5(rand()), md5(rand()));
insert into onlineddl_test values (concat('átesting-', md5(rand())), 'átesting', 'átesting', 'átesting', 'átesting', 'átesting');
insert into onlineddl_test values (concat('átesting-', md5(rand())), 'testátest', 'testátest', 'testátest', '🍻😀', 'átesting');

drop event if exists onlineddl_test;
delimiter ;;
create event onlineddl_test
on schedule every 1 second
starts current_timestamp
ends current_timestamp + interval 60 second
on completion not preserve
enable
do
begin
insert into onlineddl_test values (concat('átesting-', md5(rand())), md5(rand()), md5(rand()), md5(rand()), md5(rand()), md5(rand()));
insert into onlineddl_test values (concat('átesting-', md5(rand())), 'átesting-binlog', 'átesting-binlog', 'átesting-binlog', 'átesting-binlog', 'átesting-binlog');
insert into onlineddl_test values (concat('átesting-', md5(rand())), 'testátest-binlog', 'testátest-binlog', 'testátest-binlog', '🍻😀', 'átesting-binlog');
insert into onlineddl_test values (concat('átesting-', md5(rand())), 'átesting-bnull', 'átesting-bnull', 'átesting-bnull', null, null);
end ;;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
(5.5|5.6|5.7)
15 changes: 9 additions & 6 deletions go/vt/vttablet/onlineddl/vrepl.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,9 @@ func (v *VRepl) generateFilterQuery(ctx context.Context) error {
sb.WriteString(fmt.Sprintf("CONCAT(%s)", escapeName(name)))
case sourceCol.Type == vrepl.JSONColumnType:
sb.WriteString(fmt.Sprintf("convert(%s using utf8mb4)", escapeName(name)))
case targetCol.Type == vrepl.JSONColumnType:
// Convert any type to JSON: encode the type as utf8mb4 text
sb.WriteString(fmt.Sprintf("convert(%s using utf8mb4)", escapeName(name)))
case sourceCol.Type == vrepl.StringColumnType:
// Check source and target charset/encoding. If needed, create
// a binlogdatapb.CharsetConversion entry (later written to vreplication)
Expand All @@ -583,19 +586,19 @@ func (v *VRepl) generateFilterQuery(ctx context.Context) error {
if targetCol.Type == vrepl.StringColumnType && toCollation == collations.Unknown {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "Character set %s not supported for column %s", targetCol.Charset, targetCol.Name)
}

if trivialCharset(fromCollation) && trivialCharset(toCollation) && targetCol.Type != vrepl.JSONColumnType {
if trivialCharset(fromCollation) && trivialCharset(toCollation) {
sb.WriteString(escapeName(name))
} else if fromCollation == toCollation {
// No need for charset conversions as both have the same collation.
sb.WriteString(escapeName(name))
} else {
// Charset conversion required:
v.convertCharset[targetName] = &binlogdatapb.CharsetConversion{
FromCharset: sourceCol.Charset,
ToCharset: targetCol.Charset,
}
sb.WriteString(fmt.Sprintf("convert(%s using utf8mb4)", escapeName(name)))
sb.WriteString(escapeName(name))
}
case targetCol.Type == vrepl.JSONColumnType && sourceCol.Type != vrepl.JSONColumnType:
// Convert any type to JSON: encode the type as utf8mb4 text
sb.WriteString(fmt.Sprintf("convert(%s using utf8mb4)", escapeName(name)))
default:
sb.WriteString(escapeName(name))
}
Expand Down
66 changes: 51 additions & 15 deletions go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"vitess.io/vitess/go/mysql/collations/charset"
"vitess.io/vitess/go/mysql/collations/colldata"
vjson "vitess.io/vitess/go/mysql/json"
"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/sqlparser"
Expand Down Expand Up @@ -258,7 +259,7 @@ func (tp *TablePlan) applyBulkInsert(sqlbuffer *bytes2.Buffer, rows []*querypb.R
if i > 0 {
sqlbuffer.WriteString(", ")
}
if err := appendFromRow(tp.BulkInsertValues, sqlbuffer, tp.Fields, row, tp.FieldsToSkip); err != nil {
if err := tp.appendFromRow(sqlbuffer, row); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -313,6 +314,30 @@ func (tp *TablePlan) isOutsidePKRange(bindvars map[string]*querypb.BindVariable,
return false
}

// convertStringCharset does a charset conversion given raw data and an applicable conversion rule.
// In case of a conversion error, it returns an equivalent of MySQL error 1366, which is what you'd
// get in a failed `CONVERT()` function, e.g.:
//
// > create table tascii(v varchar(100) charset ascii);
// > insert into tascii values ('€');
// ERROR 1366 (HY000): Incorrect string value: '\xE2\x82\xAC' for column 'v' at row 1
func (tp *TablePlan) convertStringCharset(raw []byte, conversion *binlogdatapb.CharsetConversion, fieldName string) ([]byte, error) {
fromCollation := tp.CollationEnv.DefaultCollationForCharset(conversion.FromCharset)
if fromCollation == collations.Unknown {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "character set %s not supported for column %s", conversion.FromCharset, fieldName)
}
toCollation := tp.CollationEnv.DefaultCollationForCharset(conversion.ToCharset)
if toCollation == collations.Unknown {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "character set %s not supported for column %s", conversion.ToCharset, fieldName)
}

out, err := charset.Convert(nil, colldata.Lookup(toCollation).Charset(), raw, colldata.Lookup(fromCollation).Charset())
if err != nil {
return nil, sqlerror.NewSQLError(sqlerror.ERTruncatedWrongValueForField, sqlerror.SSUnknownSQLState, "Incorrect string value: %s", err.Error())
}
return out, nil
}

// bindFieldVal returns a bind variable based on given field and value.
// Most values will just bind directly. But some values may need manipulation:
// - text values with charset conversion
Expand All @@ -321,11 +346,7 @@ func (tp *TablePlan) isOutsidePKRange(bindvars map[string]*querypb.BindVariable,
func (tp *TablePlan) bindFieldVal(field *querypb.Field, val *sqltypes.Value) (*querypb.BindVariable, error) {
if conversion, ok := tp.ConvertCharset[field.Name]; ok && !val.IsNull() {
// Non-null string value, for which we have a charset conversion instruction
fromCollation := tp.CollationEnv.DefaultCollationForCharset(conversion.FromCharset)
if fromCollation == collations.Unknown {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "Character set %s not supported for column %s", conversion.FromCharset, field.Name)
}
out, err := charset.Convert(nil, charset.Charset_utf8mb4{}, val.Raw(), colldata.Lookup(fromCollation).Charset())
out, err := tp.convertStringCharset(val.Raw(), conversion, field.Name)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -619,28 +640,30 @@ func valsEqual(v1, v2 sqltypes.Value) bool {
// note: there can be more fields than bind locations since extra columns might be requested from the source if not all
// primary keys columns are present in the target table, for example. Also some values in the row may not correspond for
// values from the database on the source: sum/count for aggregation queries, for example
func appendFromRow(pq *sqlparser.ParsedQuery, buf *bytes2.Buffer, fields []*querypb.Field, row *querypb.Row, skipFields map[string]bool) error {
bindLocations := pq.BindLocations()
if len(fields) < len(bindLocations) {
func (tp *TablePlan) appendFromRow(buf *bytes2.Buffer, row *querypb.Row) error {
bindLocations := tp.BulkInsertValues.BindLocations()
if len(tp.Fields) < len(bindLocations) {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "wrong number of fields: got %d fields for %d bind locations ",
len(fields), len(bindLocations))
len(tp.Fields), len(bindLocations))
}

type colInfo struct {
typ querypb.Type
length int64
offset int64
field *querypb.Field
}
rowInfo := make([]*colInfo, 0)

offset := int64(0)
for i, field := range fields { // collect info required for fields to be bound
for i, field := range tp.Fields { // collect info required for fields to be bound
length := row.Lengths[i]
if !skipFields[strings.ToLower(field.Name)] {
if !tp.FieldsToSkip[strings.ToLower(field.Name)] {
rowInfo = append(rowInfo, &colInfo{
typ: field.Type,
length: length,
offset: offset,
field: field,
})
}
if length > 0 {
Expand All @@ -652,7 +675,7 @@ func appendFromRow(pq *sqlparser.ParsedQuery, buf *bytes2.Buffer, fields []*quer
var offsetQuery int
for i, loc := range bindLocations {
col := rowInfo[i]
buf.WriteString(pq.Query[offsetQuery:loc.Offset])
buf.WriteString(tp.BulkInsertValues.Query[offsetQuery:loc.Offset])
typ := col.typ

switch typ {
Expand All @@ -674,12 +697,25 @@ func appendFromRow(pq *sqlparser.ParsedQuery, buf *bytes2.Buffer, fields []*quer
// -1 means a null variable; serialize it directly
buf.WriteString(sqltypes.NullStr)
} else {
vv := sqltypes.MakeTrusted(typ, row.Values[col.offset:col.offset+col.length])
raw := row.Values[col.offset : col.offset+col.length]
var vv sqltypes.Value

if conversion, ok := tp.ConvertCharset[col.field.Name]; ok && col.length > 0 {
// Non-null string value, for which we have a charset conversion instruction
out, err := tp.convertStringCharset(raw, conversion, col.field.Name)
if err != nil {
return err
}
vv = sqltypes.MakeTrusted(typ, out)
} else {
vv = sqltypes.MakeTrusted(typ, raw)
}

vv.EncodeSQLBytes2(buf)
}
}
offsetQuery = loc.Offset + loc.Length
}
buf.WriteString(pq.Query[offsetQuery:])
buf.WriteString(tp.BulkInsertValues.Query[offsetQuery:])
return nil
}

0 comments on commit c8082c7

Please sign in to comment.