Skip to content

Commit

Permalink
[release-17.0] Rewrite USING to ON condition for joins (#13931) (#…
Browse files Browse the repository at this point in the history
…13941)

Signed-off-by: Florent Poinsard <[email protected]>
Co-authored-by: Florent Poinsard <[email protected]>
Co-authored-by: Andres Taylor <[email protected]>
Co-authored-by: Florent Poinsard <[email protected]>
  • Loading branch information
4 people authored Sep 22, 2023
1 parent f731bec commit 3b20a40
Show file tree
Hide file tree
Showing 17 changed files with 194 additions and 93 deletions.
16 changes: 12 additions & 4 deletions go/test/endtoend/cluster/cluster_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,13 +334,12 @@ func (cluster *LocalProcessCluster) startKeyspace(keyspace Keyspace, shardNames
}
// Create the keyspace if it doesn't already exist.
_ = cluster.VtctlProcess.CreateKeyspace(keyspace.Name, keyspace.SidecarDBName)
var mysqlctlProcessList []*exec.Cmd
for _, shardName := range shardNames {
shard := &Shard{
Name: shardName,
}
log.Infof("Starting shard: %v", shardName)
mysqlctlProcessList = []*exec.Cmd{}
var mysqlctlProcessList []*exec.Cmd
for i := 0; i < totalTabletsRequired; i++ {
// instantiate vttablet object with reserved ports
tabletUID := cluster.GetAndReserveTabletUID()
Expand Down Expand Up @@ -1276,8 +1275,16 @@ func (cluster *LocalProcessCluster) VtprocessInstanceFromVttablet(tablet *Vttabl
}

// StartVttablet starts a new tablet
func (cluster *LocalProcessCluster) StartVttablet(tablet *Vttablet, servingStatus string,
supportBackup bool, cell string, keyspaceName string, hostname string, shardName string) error {
func (cluster *LocalProcessCluster) StartVttablet(
tablet *Vttablet,
explicitServingStatus bool,
servingStatus string,
supportBackup bool,
cell string,
keyspaceName string,
hostname string,
shardName string,
) error {
tablet.VttabletProcess = VttabletProcessInstance(
tablet.HTTPPort,
tablet.GrpcPort,
Expand All @@ -1295,6 +1302,7 @@ func (cluster *LocalProcessCluster) StartVttablet(tablet *Vttablet, servingStatu

tablet.VttabletProcess.SupportsBackup = supportBackup
tablet.VttabletProcess.ServingStatus = servingStatus
tablet.VttabletProcess.ExplicitServingStatus = explicitServingStatus
return tablet.VttabletProcess.Setup()
}

Expand Down
15 changes: 12 additions & 3 deletions go/test/endtoend/cluster/vttablet_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import (
"vitess.io/vitess/go/vt/sqlparser"
)

const vttabletStateTimeout = 30 * time.Second
const vttabletStateTimeout = 60 * time.Second

// VttabletProcess is a generic handle for a running vttablet .
// It can be spawned manually
Expand Down Expand Up @@ -71,6 +71,7 @@ type VttabletProcess struct {
QueryzURL string
StatusDetailsURL string
SupportsBackup bool
ExplicitServingStatus bool
ServingStatus string
DbPassword string
DbPort int
Expand All @@ -79,7 +80,7 @@ type VttabletProcess struct {
Charset string
ConsolidationsURL string

//Extra Args to be set before starting the vttablet process
// Extra Args to be set before starting the vttablet process
ExtraArgs []string

proc *exec.Cmd
Expand Down Expand Up @@ -149,7 +150,15 @@ func (vttablet *VttabletProcess) Setup() (err error) {
}()

if vttablet.ServingStatus != "" {
if err = vttablet.WaitForTabletStatus(vttablet.ServingStatus); err != nil {
// If the tablet has an explicit serving status we use the serving status
// otherwise we wait for any serving status to show up in the healthcheck.
var servingStatus []string
if vttablet.ExplicitServingStatus {
servingStatus = append(servingStatus, vttablet.ServingStatus)
} else {
servingStatus = append(servingStatus, "SERVING", "NOT_SERVING")
}
if err = vttablet.WaitForTabletStatuses(servingStatus); err != nil {
errFileContent, _ := os.ReadFile(fname)
if errFileContent != nil {
log.Infof("vttablet error:\n%s\n", string(errFileContent))
Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/tabletmanager/custom_rule_topo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestTopoCustomRule(t *testing.T) {
require.Nil(t, err, "error should be Nil")

// Start Vttablet
err = clusterInstance.StartVttablet(rTablet, "SERVING", false, cell, keyspaceName, hostname, shardName)
err = clusterInstance.StartVttablet(rTablet, false, "SERVING", false, cell, keyspaceName, hostname, shardName)
require.Nil(t, err, "error should be Nil")

err = clusterInstance.VtctlclientProcess.ExecuteCommand("Validate")
Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/tabletmanager/primary/tablet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func TestPrimaryRestartSetsTERTimestamp(t *testing.T) {
require.NoError(t, err)

// Start Vttablet
err = clusterInstance.StartVttablet(&replicaTablet, "SERVING", false, cell, keyspaceName, hostname, shardName)
err = clusterInstance.StartVttablet(&replicaTablet, false, "SERVING", false, cell, keyspaceName, hostname, shardName)
require.NoError(t, err)

// Make sure that the TER did not change
Expand Down
8 changes: 4 additions & 4 deletions go/test/endtoend/tabletmanager/tablet_health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func TestTabletReshuffle(t *testing.T) {

// SupportsBackup=False prevents vttablet from trying to restore
// Start vttablet process
err = clusterInstance.StartVttablet(rTablet, "SERVING", false, cell, keyspaceName, hostname, shardName)
err = clusterInstance.StartVttablet(rTablet, false, "SERVING", false, cell, keyspaceName, hostname, shardName)
require.NoError(t, err)

sql := "select value from t1"
Expand Down Expand Up @@ -106,7 +106,7 @@ func TestHealthCheck(t *testing.T) {
defer replicaConn.Close()

// start vttablet process, should be in SERVING state as we already have a primary
err = clusterInstance.StartVttablet(rTablet, "SERVING", false, cell, keyspaceName, hostname, shardName)
err = clusterInstance.StartVttablet(rTablet, true, "SERVING", false, cell, keyspaceName, hostname, shardName)
require.NoError(t, err)

conn, err := mysql.Connect(ctx, &primaryTabletParams)
Expand Down Expand Up @@ -227,7 +227,7 @@ func TestHealthCheckSchemaChangeSignal(t *testing.T) {
clusterInstance.VtTabletExtraArgs = oldArgs
}()
// start vttablet process, should be in SERVING state as we already have a primary.
err = clusterInstance.StartVttablet(tempTablet, "SERVING", false, cell, keyspaceName, hostname, shardName)
err = clusterInstance.StartVttablet(tempTablet, false, "SERVING", false, cell, keyspaceName, hostname, shardName)
require.NoError(t, err)

defer func() {
Expand Down Expand Up @@ -381,7 +381,7 @@ func TestHealthCheckDrainedStateDoesNotShutdownQueryService(t *testing.T) {
// - the second tablet will be set to 'drained' and we expect that
// - the query service won't be shutdown

//Wait if tablet is not in service state
// Wait if tablet is not in service state
defer cluster.PanicHandler(t)
clusterInstance.DisableVTOrcRecoveries(t)
defer clusterInstance.EnableVTOrcRecoveries(t)
Expand Down
6 changes: 3 additions & 3 deletions go/test/endtoend/tabletmanager/tablet_security_policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestFallbackSecurityPolicy(t *testing.T) {

// Requesting an unregistered security_policy should fallback to deny-all.
clusterInstance.VtTabletExtraArgs = []string{"--security_policy", "bogus"}
err = clusterInstance.StartVttablet(mTablet, "SERVING", false, cell, keyspaceName, hostname, shardName)
err = clusterInstance.StartVttablet(mTablet, false, "SERVING", false, cell, keyspaceName, hostname, shardName)
require.NoError(t, err)

// It should deny ADMIN role.
Expand Down Expand Up @@ -94,7 +94,7 @@ func TestDenyAllSecurityPolicy(t *testing.T) {

// Requesting a deny-all security_policy.
clusterInstance.VtTabletExtraArgs = []string{"--security_policy", "deny-all"}
err = clusterInstance.StartVttablet(mTablet, "SERVING", false, cell, keyspaceName, hostname, shardName)
err = clusterInstance.StartVttablet(mTablet, false, "SERVING", false, cell, keyspaceName, hostname, shardName)
require.NoError(t, err)

// It should deny ADMIN role.
Expand Down Expand Up @@ -126,7 +126,7 @@ func TestReadOnlySecurityPolicy(t *testing.T) {

// Requesting a read-only security_policy.
clusterInstance.VtTabletExtraArgs = []string{"--security_policy", "read-only"}
err = clusterInstance.StartVttablet(mTablet, "SERVING", false, cell, keyspaceName, hostname, shardName)
err = clusterInstance.StartVttablet(mTablet, false, "SERVING", false, cell, keyspaceName, hostname, shardName)
require.NoError(t, err)

// It should deny ADMIN role.
Expand Down
4 changes: 2 additions & 2 deletions go/test/endtoend/tabletmanager/tablet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestEnsureDB(t *testing.T) {

log.Info(fmt.Sprintf("Started vttablet %v", tablet))
// Start vttablet process as replica. It won't be able to serve because there's no db.
err = clusterInstance.StartVttablet(tablet, "NOT_SERVING", false, cell, "dbtest", hostname, "0")
err = clusterInstance.StartVttablet(tablet, false, "NOT_SERVING", false, cell, "dbtest", hostname, "0")
require.NoError(t, err)

// Make it the primary.
Expand Down Expand Up @@ -78,7 +78,7 @@ func TestResetReplicationParameters(t *testing.T) {

log.Info(fmt.Sprintf("Started vttablet %v", tablet))
// Start vttablet process as replica. It won't be able to serve because there's no db.
err = clusterInstance.StartVttablet(tablet, "NOT_SERVING", false, cell, "dbtest", hostname, "0")
err = clusterInstance.StartVttablet(tablet, false, "NOT_SERVING", false, cell, "dbtest", hostname, "0")
require.NoError(t, err)

// Set a replication source on the tablet and start replication
Expand Down
10 changes: 5 additions & 5 deletions go/test/endtoend/vtgate/queries/aggregation/fuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestFuzzAggregations(t *testing.T) {
mcmp, closer := start(t)
defer closer()

noOfRows := rand.Intn(20)
noOfRows := rand.Intn(20) + 1
var values []string
for i := 0; i < noOfRows; i++ {
values = append(values, fmt.Sprintf("(%d, 'name%d', 'value%d', %d)", i, i, i, i))
Expand Down Expand Up @@ -160,10 +160,10 @@ func createAggregations(tables []tableT, maxAggrs int, randomCol func(tblIdx int
aggregations := []func(string) string{
func(_ string) string { return "count(*)" },
func(e string) string { return fmt.Sprintf("count(%s)", e) },
//func(e string) string { return fmt.Sprintf("sum(%s)", e) },
//func(e string) string { return fmt.Sprintf("avg(%s)", e) },
//func(e string) string { return fmt.Sprintf("min(%s)", e) },
//func(e string) string { return fmt.Sprintf("max(%s)", e) },
// func(e string) string { return fmt.Sprintf("sum(%s)", e) },
// func(e string) string { return fmt.Sprintf("avg(%s)", e) },
// func(e string) string { return fmt.Sprintf("min(%s)", e) },
// func(e string) string { return fmt.Sprintf("max(%s)", e) },
}

noOfAggrs := rand.Intn(maxAggrs) + 1
Expand Down
8 changes: 8 additions & 0 deletions go/test/endtoend/vtgate/queries/misc/misc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,3 +244,11 @@ func TestBuggyOuterJoin(t *testing.T) {

mcmp.Exec("select t1.id1, t2.id1 from t1 left join t1 as t2 on t2.id1 = t2.id2")
}

func TestLeftJoinUsingUnsharded(t *testing.T) {
mcmp, closer := start(t)
defer closer()

utils.Exec(t, mcmp.VtConn, "insert /*vt+ QUERY_TIMEOUT_MS=2000 */ into uks.unsharded(id1) values (1),(2),(3),(4),(5)")
utils.Exec(t, mcmp.VtConn, "select /*vt+ QUERY_TIMEOUT_MS=2000 */ * from uks.unsharded as A left join uks.unsharded as B using(id1)")
}
2 changes: 1 addition & 1 deletion go/vt/schemadiff/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,7 @@ func TestViewReferences(t *testing.T) {
"create table t2(id int primary key, n int, info int)",
"create view v1 as select id, c as ch from t1 where id > 0",
"create view v2 as select n as num, info from t2",
"create view v3 as select num, v1.id, ch from v1 join v2 using (id) where info > 5",
"create view v3 as select num, v1.id, ch from v1 join v2 on v1.id = v2.num where info > 5",
},
},
{
Expand Down
2 changes: 2 additions & 0 deletions go/vt/vterrors/code.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ var (
VT09012 = errorWithoutState("VT09012", vtrpcpb.Code_FAILED_PRECONDITION, "%s statement with %s tablet not allowed", "This type of statement is not allowed on the given tablet.")
VT09013 = errorWithoutState("VT09013", vtrpcpb.Code_FAILED_PRECONDITION, "semi-sync plugins are not loaded", "Durability policy wants Vitess to use semi-sync, but the MySQL instances don't have the semi-sync plugin loaded.")
VT09014 = errorWithoutState("VT09014", vtrpcpb.Code_FAILED_PRECONDITION, "vindex cannot be modified", "The vindex cannot be used as table in DML statement")
VT09015 = errorWithoutState("VT09015", vtrpcpb.Code_FAILED_PRECONDITION, "schema tracking required", "This query cannot be planned without more information on the SQL schema. Please turn on schema tracking or add authoritative columns information to your VSchema.")

VT10001 = errorWithoutState("VT10001", vtrpcpb.Code_ABORTED, "foreign key constraints are not allowed", "Foreign key constraints are not allowed, see https://vitess.io/blog/2021-06-15-online-ddl-why-no-fk/.")

Expand Down Expand Up @@ -136,6 +137,7 @@ var (
VT09012,
VT09013,
VT09014,
VT09015,
VT10001,
VT12001,
VT13001,
Expand Down
22 changes: 22 additions & 0 deletions go/vt/vtgate/planbuilder/testdata/from_cases.json
Original file line number Diff line number Diff line change
Expand Up @@ -6533,5 +6533,27 @@
"user.multicol_tbl"
]
}
},
{
"comment": "left join with using has to be transformed into inner join with on condition",
"query": "SELECT * FROM unsharded_authoritative as A LEFT JOIN unsharded_authoritative as B USING(col1)",
"plan": {
"QueryType": "SELECT",
"Original": "SELECT * FROM unsharded_authoritative as A LEFT JOIN unsharded_authoritative as B USING(col1)",
"Instructions": {
"OperatorType": "Route",
"Variant": "Unsharded",
"Keyspace": {
"Name": "main",
"Sharded": false
},
"FieldQuery": "select A.col1 as col1, A.col2 as col2, B.col2 as col2 from unsharded_authoritative as A left join unsharded_authoritative as B on A.col1 = B.col1 where 1 != 1",
"Query": "select A.col1 as col1, A.col2 as col2, B.col2 as col2 from unsharded_authoritative as A left join unsharded_authoritative as B on A.col1 = B.col1",
"Table": "unsharded_authoritative"
},
"TablesUsed": [
"main.unsharded_authoritative"
]
}
}
]
4 changes: 2 additions & 2 deletions go/vt/vtgate/planbuilder/testdata/unsupported_cases.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@
"comment": "join with USING construct",
"query": "select * from user join user_extra using(id)",
"v3-plan": "VT12001: unsupported: JOIN with USING(column_list) clause for complex queries",
"gen4-plan": "can't handle JOIN USING without authoritative tables"
"gen4-plan": "VT09015: schema tracking required"
},
{
"comment": "join with USING construct with 3 tables",
"query": "select user.id from user join user_extra using(id) join music using(id2)",
"v3-plan": "VT12001: unsupported: JOIN with USING(column_list) clause for complex queries",
"gen4-plan": "can't handle JOIN USING without authoritative tables"
"gen4-plan": "VT09015: schema tracking required"
},
{
"comment": "natural left join",
Expand Down
5 changes: 5 additions & 0 deletions go/vt/vtgate/semantics/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,11 @@ func (a *analyzer) analyzeUp(cursor *sqlparser.Cursor) bool {
return false
}

if err := a.rewriter.up(cursor); err != nil {
a.setError(err)
return true
}

a.leaveProjection(cursor)
return a.shouldContinue()
}
Expand Down
7 changes: 0 additions & 7 deletions go/vt/vtgate/semantics/binder.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,6 @@ func (b *binder) up(cursor *sqlparser.Cursor) error {
}
currScope.joinUsing[ident.Lowered()] = deps.direct
}
if len(node.Using) > 0 {
err := rewriteJoinUsing(currScope, node.Using, b.org)
if err != nil {
return err
}
node.Using = nil
}
case *sqlparser.ColName:
currentScope := b.scoper.currentScope()
deps, err := b.resolveColumn(node, currentScope, false)
Expand Down
Loading

0 comments on commit 3b20a40

Please sign in to comment.