Skip to content

Commit

Permalink
[release-19.0] projection: Return correct collation information (#15801
Browse files Browse the repository at this point in the history
…) (#15804)

Signed-off-by: Dirkjan Bussink <[email protected]>
Co-authored-by: vitess-bot[bot] <108069721+vitess-bot[bot]@users.noreply.github.com>
Co-authored-by: Dirkjan Bussink <[email protected]>
  • Loading branch information
vitess-bot[bot] and dbussink authored Apr 27, 2024
1 parent e04b73c commit 0af0297
Show file tree
Hide file tree
Showing 8 changed files with 186 additions and 43 deletions.
20 changes: 16 additions & 4 deletions go/test/endtoend/backup/vtbackup/backup_only_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,12 +328,24 @@ func tearDown(t *testing.T, initMysql bool) {
caughtUp := waitForReplicationToCatchup([]cluster.Vttablet{*replica1, *replica2})
require.True(t, caughtUp, "Timed out waiting for all replicas to catch up")
promoteCommands := []string{"STOP SLAVE", "RESET SLAVE ALL", "RESET MASTER"}
disableSemiSyncCommands := []string{"SET GLOBAL rpl_semi_sync_master_enabled = false", " SET GLOBAL rpl_semi_sync_slave_enabled = false"}
disableSemiSyncCommandsSource := []string{"SET GLOBAL rpl_semi_sync_source_enabled = false", " SET GLOBAL rpl_semi_sync_replica_enabled = false"}
disableSemiSyncCommandsMaster := []string{"SET GLOBAL rpl_semi_sync_master_enabled = false", " SET GLOBAL rpl_semi_sync_slave_enabled = false"}

for _, tablet := range []cluster.Vttablet{*primary, *replica1, *replica2} {
err := tablet.VttabletProcess.QueryTabletMultiple(promoteCommands, keyspaceName, true)
require.Nil(t, err)
err = tablet.VttabletProcess.QueryTabletMultiple(disableSemiSyncCommands, keyspaceName, true)
require.Nil(t, err)

require.NoError(t, err)
semisyncType, err := tablet.VttabletProcess.SemiSyncExtensionLoaded()
require.NoError(t, err)

switch semisyncType {
case cluster.SemiSyncTypeSource:
err = tablet.VttabletProcess.QueryTabletMultiple(disableSemiSyncCommandsSource, keyspaceName, true)
require.NoError(t, err)
case cluster.SemiSyncTypeMaster:
err = tablet.VttabletProcess.QueryTabletMultiple(disableSemiSyncCommandsMaster, keyspaceName, true)
require.NoError(t, err)
}
}

for _, tablet := range []cluster.Vttablet{*primary, *replica1, *replica2} {
Expand Down
31 changes: 31 additions & 0 deletions go/test/endtoend/cluster/vttablet_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,37 @@ func (vttablet *VttabletProcess) GetDBStatus(status string, ksName string) (stri
return vttablet.getDBSystemValues("status", status, ksName)
}

type SemiSyncType int8

const (
SemiSyncTypeUnknown SemiSyncType = iota
SemiSyncTypeOff
SemiSyncTypeSource
SemiSyncTypeMaster
)

func (vttablet *VttabletProcess) SemiSyncExtensionLoaded() (SemiSyncType, error) {
conn, err := vttablet.defaultConn("")
if err != nil {
return SemiSyncTypeUnknown, err
}
defer conn.Close()

qr, err := conn.ExecuteFetch("SHOW VARIABLES LIKE 'rpl_semi_sync_%_enabled'", 10, false)
if err != nil {
return SemiSyncTypeUnknown, err
}
for _, row := range qr.Rows {
if row[0].ToString() == "rpl_semi_sync_source_enabled" {
return SemiSyncTypeSource, nil
}
if row[0].ToString() == "rpl_semi_sync_master_enabled" {
return SemiSyncTypeMaster, nil
}
}
return SemiSyncTypeOff, nil
}

func (vttablet *VttabletProcess) getDBSystemValues(placeholder string, value string, ksName string) (string, error) {
output, err := vttablet.QueryTablet(fmt.Sprintf("show %s like '%s'", placeholder, value), ksName, true)
if err != nil || output.Rows == nil {
Expand Down
9 changes: 8 additions & 1 deletion go/test/endtoend/reparent/emergencyreparent/ers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,14 @@ func TestPullFromRdonly(t *testing.T) {
require.NoError(t, err)

// stop semi-sync on the primary so that any transaction now added does not require an ack
utils.RunSQL(ctx, t, "SET GLOBAL rpl_semi_sync_master_enabled = false", tablets[0])
semisyncType, err := tablets[0].VttabletProcess.SemiSyncExtensionLoaded()
require.NoError(t, err)
switch semisyncType {
case cluster.SemiSyncTypeSource:
utils.RunSQL(ctx, t, "SET GLOBAL rpl_semi_sync_source_enabled = false", tablets[0])
case cluster.SemiSyncTypeMaster:
utils.RunSQL(ctx, t, "SET GLOBAL rpl_semi_sync_master_enabled = false", tablets[0])
}

// confirm that rdonly is able to replicate from our primary
// This will also introduce a new transaction into the rdonly tablet which the other 2 replicas don't have
Expand Down
14 changes: 12 additions & 2 deletions go/test/endtoend/reparent/newfeaturetest/reparent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,18 @@ func TestChangeTypeWithoutSemiSync(t *testing.T) {
utils.RunSQL(ctx, t, "set global super_read_only = 0", tablet)
}

utils.RunSQL(ctx, t, "UNINSTALL PLUGIN rpl_semi_sync_slave;", tablet)
utils.RunSQL(ctx, t, "UNINSTALL PLUGIN rpl_semi_sync_master;", tablet)
semisyncType, err := tablet.VttabletProcess.SemiSyncExtensionLoaded()
require.NoError(t, err)
switch semisyncType {
case cluster.SemiSyncTypeSource:
utils.RunSQL(ctx, t, "UNINSTALL PLUGIN rpl_semi_sync_replica", tablet)
utils.RunSQL(ctx, t, "UNINSTALL PLUGIN rpl_semi_sync_source", tablet)
case cluster.SemiSyncTypeMaster:
utils.RunSQL(ctx, t, "UNINSTALL PLUGIN rpl_semi_sync_slave", tablet)
utils.RunSQL(ctx, t, "UNINSTALL PLUGIN rpl_semi_sync_master", tablet)
default:
require.Fail(t, "Unknown semi sync type")
}
}

utils.ValidateTopology(t, clusterInstance, true)
Expand Down
32 changes: 15 additions & 17 deletions go/test/endtoend/reparent/plannedreparent/reparent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,9 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql/replication"

"google.golang.org/protobuf/encoding/protojson"

"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/reparent/utils"
"vitess.io/vitess/go/vt/log"
Expand Down Expand Up @@ -356,38 +354,38 @@ func TestChangeTypeSemiSync(t *testing.T) {
// The flag is only an indication of the value to use next time
// we turn replication on, so also check the status.
// rdonly1 is not replicating, so its status is off.
utils.CheckDBvar(ctx, t, replica, "rpl_semi_sync_slave_enabled", "ON")
utils.CheckDBvar(ctx, t, rdonly1, "rpl_semi_sync_slave_enabled", "OFF")
utils.CheckDBvar(ctx, t, rdonly2, "rpl_semi_sync_slave_enabled", "OFF")
utils.CheckDBstatus(ctx, t, replica, "Rpl_semi_sync_slave_status", "ON")
utils.CheckDBstatus(ctx, t, rdonly1, "Rpl_semi_sync_slave_status", "OFF")
utils.CheckDBstatus(ctx, t, rdonly2, "Rpl_semi_sync_slave_status", "OFF")
utils.CheckSemiSyncEnabled(ctx, t, replica, true)
utils.CheckSemiSyncEnabled(ctx, t, rdonly1, false)
utils.CheckSemiSyncEnabled(ctx, t, rdonly2, false)
utils.CheckSemiSyncStatus(ctx, t, replica, true)
utils.CheckSemiSyncStatus(ctx, t, rdonly1, false)
utils.CheckSemiSyncStatus(ctx, t, rdonly2, false)

// Change replica to rdonly while replicating, should turn off semi-sync, and restart replication.
err = clusterInstance.VtctlclientProcess.ExecuteCommand("ChangeTabletType", replica.Alias, "rdonly")
require.NoError(t, err)
utils.CheckDBvar(ctx, t, replica, "rpl_semi_sync_slave_enabled", "OFF")
utils.CheckDBstatus(ctx, t, replica, "Rpl_semi_sync_slave_status", "OFF")
utils.CheckSemiSyncEnabled(ctx, t, replica, false)
utils.CheckSemiSyncStatus(ctx, t, replica, false)

// Change rdonly1 to replica, should turn on semi-sync, and not start replication.
err = clusterInstance.VtctlclientProcess.ExecuteCommand("ChangeTabletType", rdonly1.Alias, "replica")
require.NoError(t, err)
utils.CheckDBvar(ctx, t, rdonly1, "rpl_semi_sync_slave_enabled", "ON")
utils.CheckDBstatus(ctx, t, rdonly1, "Rpl_semi_sync_slave_status", "OFF")
utils.CheckSemiSyncEnabled(ctx, t, rdonly1, true)
utils.CheckSemiSyncStatus(ctx, t, rdonly1, false)
utils.CheckReplicaStatus(ctx, t, rdonly1)

// Now change from replica back to rdonly, make sure replication is still not enabled.
err = clusterInstance.VtctlclientProcess.ExecuteCommand("ChangeTabletType", rdonly1.Alias, "rdonly")
require.NoError(t, err)
utils.CheckDBvar(ctx, t, rdonly1, "rpl_semi_sync_slave_enabled", "OFF")
utils.CheckDBstatus(ctx, t, rdonly1, "Rpl_semi_sync_slave_status", "OFF")
utils.CheckSemiSyncEnabled(ctx, t, rdonly1, false)
utils.CheckSemiSyncStatus(ctx, t, rdonly1, false)
utils.CheckReplicaStatus(ctx, t, rdonly1)

// Change rdonly2 to replica, should turn on semi-sync, and restart replication.
err = clusterInstance.VtctlclientProcess.ExecuteCommand("ChangeTabletType", rdonly2.Alias, "replica")
require.NoError(t, err)
utils.CheckDBvar(ctx, t, rdonly2, "rpl_semi_sync_slave_enabled", "ON")
utils.CheckDBstatus(ctx, t, rdonly2, "Rpl_semi_sync_slave_status", "ON")
utils.CheckSemiSyncEnabled(ctx, t, rdonly2, true)
utils.CheckSemiSyncStatus(ctx, t, rdonly2, true)
}

// TestCrossCellDurability tests 2 things -
Expand Down
69 changes: 55 additions & 14 deletions go/test/endtoend/reparent/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,9 +477,20 @@ func CheckInsertedValues(ctx context.Context, t *testing.T, tablet *cluster.Vtta
}

func CheckSemiSyncSetupCorrectly(t *testing.T, tablet *cluster.Vttablet, semiSyncVal string) {
dbVar, err := tablet.VttabletProcess.GetDBVar("rpl_semi_sync_slave_enabled", "")
semisyncType, err := tablet.VttabletProcess.SemiSyncExtensionLoaded()
require.NoError(t, err)
require.Equal(t, semiSyncVal, dbVar)
switch semisyncType {
case cluster.SemiSyncTypeSource:
dbVar, err := tablet.VttabletProcess.GetDBVar("rpl_semi_sync_replica_enabled", "")
require.NoError(t, err)
require.Equal(t, semiSyncVal, dbVar)
case cluster.SemiSyncTypeMaster:
dbVar, err := tablet.VttabletProcess.GetDBVar("rpl_semi_sync_slave_enabled", "")
require.NoError(t, err)
require.Equal(t, semiSyncVal, dbVar)
default:
require.Fail(t, "Unknown semi sync type")
}
}

// CheckCountOfInsertedValues checks that the number of inserted values matches the given count on the given tablet
Expand Down Expand Up @@ -669,30 +680,60 @@ func assertNodeCount(t *testing.T, result string, want int) {
assert.Equal(t, want, got)
}

// CheckDBvar checks the db var
func CheckDBvar(ctx context.Context, t *testing.T, tablet *cluster.Vttablet, variable string, status string) {
func CheckSemiSyncEnabled(ctx context.Context, t *testing.T, tablet *cluster.Vttablet, enabled bool) {
tabletParams := getMysqlConnParam(tablet)
conn, err := mysql.Connect(ctx, &tabletParams)
require.NoError(t, err)
defer conn.Close()

qr := execute(t, conn, fmt.Sprintf("show variables like '%s'", variable))
got := fmt.Sprintf("%v", qr.Rows)
want := fmt.Sprintf("[[VARCHAR(\"%s\") VARCHAR(\"%s\")]]", variable, status)
assert.Equal(t, want, got)
status := "OFF"
if enabled {
status = "ON"
}

semisyncType, err := tablet.VttabletProcess.SemiSyncExtensionLoaded()
require.NoError(t, err)
switch semisyncType {
case cluster.SemiSyncTypeSource:
qr := execute(t, conn, "show variables like 'rpl_semi_sync_replica_enabled'")
got := fmt.Sprintf("%v", qr.Rows)
want := fmt.Sprintf("[[VARCHAR(\"%s\") VARCHAR(\"%s\")]]", "rpl_semi_sync_replica_enabled", status)
assert.Equal(t, want, got)
case cluster.SemiSyncTypeMaster:
qr := execute(t, conn, "show variables like 'rpl_semi_sync_slave_enabled'")
got := fmt.Sprintf("%v", qr.Rows)
want := fmt.Sprintf("[[VARCHAR(\"%s\") VARCHAR(\"%s\")]]", "rpl_semi_sync_slave_enabled", status)
assert.Equal(t, want, got)
}
}

// CheckDBstatus checks the db status
func CheckDBstatus(ctx context.Context, t *testing.T, tablet *cluster.Vttablet, variable string, status string) {
func CheckSemiSyncStatus(ctx context.Context, t *testing.T, tablet *cluster.Vttablet, enabled bool) {
tabletParams := getMysqlConnParam(tablet)
conn, err := mysql.Connect(ctx, &tabletParams)
require.NoError(t, err)
defer conn.Close()

qr := execute(t, conn, fmt.Sprintf("show status like '%s'", variable))
got := fmt.Sprintf("%v", qr.Rows)
want := fmt.Sprintf("[[VARCHAR(\"%s\") VARCHAR(\"%s\")]]", variable, status)
assert.Equal(t, want, got)
status := "OFF"
if enabled {
status = "ON"
}

semisyncType, err := tablet.VttabletProcess.SemiSyncExtensionLoaded()
require.NoError(t, err)
switch semisyncType {
case cluster.SemiSyncTypeSource:
qr := execute(t, conn, "show status like 'Rpl_semi_sync_replica_status'")
got := fmt.Sprintf("%v", qr.Rows)
want := fmt.Sprintf("[[VARCHAR(\"%s\") VARCHAR(\"%s\")]]", "Rpl_semi_sync_replica_status", status)
assert.Equal(t, want, got)
case cluster.SemiSyncTypeMaster:
qr := execute(t, conn, "show status like 'Rpl_semi_sync_slave_status'")
got := fmt.Sprintf("%v", qr.Rows)
want := fmt.Sprintf("[[VARCHAR(\"%s\") VARCHAR(\"%s\")]]", "Rpl_semi_sync_slave_status", status)
assert.Equal(t, want, got)
default:
assert.Fail(t, "unknown semi-sync type")
}
}

// SetReplicationSourceFailed returns true if the given output from PRS had failed because the given tablet was
Expand Down
16 changes: 11 additions & 5 deletions go/vt/vtgate/engine/projection.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"sync"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/mysql/collations"
"vitess.io/vitess/go/sqltypes"
querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/sqlparser"
Expand Down Expand Up @@ -75,7 +76,7 @@ func (p *Projection) TryExecute(ctx context.Context, vcursor VCursor, bindVars m
resultRows = append(resultRows, resultRow)
}
if wantfields {
result.Fields, err = p.evalFields(env, result.Fields)
result.Fields, err = p.evalFields(env, result.Fields, vcursor.ConnCollation())
if err != nil {
return nil, err
}
Expand All @@ -96,7 +97,7 @@ func (p *Projection) TryStreamExecute(ctx context.Context, vcursor VCursor, bind
defer mu.Unlock()
if wantfields {
once.Do(func() {
fields, err = p.evalFields(env, qr.Fields)
fields, err = p.evalFields(env, qr.Fields, vcursor.ConnCollation())
if err != nil {
return
}
Expand Down Expand Up @@ -135,14 +136,14 @@ func (p *Projection) GetFields(ctx context.Context, vcursor VCursor, bindVars ma
return nil, err
}
env := evalengine.NewExpressionEnv(ctx, bindVars, vcursor)
qr.Fields, err = p.evalFields(env, qr.Fields)
qr.Fields, err = p.evalFields(env, qr.Fields, vcursor.ConnCollation())
if err != nil {
return nil, err
}
return qr, nil
}

func (p *Projection) evalFields(env *evalengine.ExpressionEnv, infields []*querypb.Field) ([]*querypb.Field, error) {
func (p *Projection) evalFields(env *evalengine.ExpressionEnv, infields []*querypb.Field, coll collations.ID) ([]*querypb.Field, error) {
// TODO: once the evalengine becomes smart enough, we should be able to remove the
// dependency on these fields altogether
env.Fields = infields
Expand All @@ -157,10 +158,15 @@ func (p *Projection) evalFields(env *evalengine.ExpressionEnv, infields []*query
if !sqltypes.IsNull(typ.Type()) && !typ.Nullable() {
fl |= uint32(querypb.MySqlFlag_NOT_NULL_FLAG)
}
typCol := typ.Collation()
if sqltypes.IsTextOrBinary(typ.Type()) && typCol != collations.CollationBinaryID {
typCol = coll
}

fields = append(fields, &querypb.Field{
Name: col,
Type: typ.Type(),
Charset: uint32(typ.Collation()),
Charset: uint32(typCol),
ColumnLength: uint32(typ.Size()),
Decimals: uint32(typ.Scale()),
Flags: fl,
Expand Down
38 changes: 38 additions & 0 deletions go/vt/vtgate/engine/projection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,3 +228,41 @@ func TestFields(t *testing.T) {
})
}
}

func TestFieldConversion(t *testing.T) {
var testCases = []struct {
name string
expr string
typ querypb.Type
collation collations.ID
}{
{
name: `convert different charset`,
expr: `_latin1 0xFF`,
typ: sqltypes.VarChar,
collation: collations.MySQL8().DefaultConnectionCharset(),
},
}

for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
arg, err := sqlparser.NewTestParser().ParseExpr(testCase.expr)
require.NoError(t, err)
bindExpr, err := evalengine.Translate(arg, &evalengine.Config{
Environment: vtenv.NewTestEnv(),
Collation: collations.MySQL8().DefaultConnectionCharset(),
})
require.NoError(t, err)
proj := &Projection{
Cols: []string{"col"},
Exprs: []evalengine.Expr{bindExpr},
Input: &SingleRow{},
noTxNeeded: noTxNeeded{},
}
qr, err := proj.TryExecute(context.Background(), &noopVCursor{}, nil, true)
require.NoError(t, err)
assert.Equal(t, testCase.typ, qr.Fields[0].Type)
assert.Equal(t, testCase.collation, collations.ID(qr.Fields[0].Charset))
})
}
}

0 comments on commit 0af0297

Please sign in to comment.