Skip to content

Commit

Permalink
DDL allowed outside of transaction in vttablet (#16661)
Browse files Browse the repository at this point in the history
Signed-off-by: Harshit Gangal <[email protected]>
  • Loading branch information
harshit-gangal authored Sep 10, 2024
1 parent a513601 commit e0a5069
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 99 deletions.
18 changes: 18 additions & 0 deletions go/test/endtoend/vtgate/misc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -798,6 +798,24 @@ func TestRowCountExceed(t *testing.T) {
utils.AssertContainsError(t, conn, "select id1 from t1 where id1 < 1000", `Row count exceeded 100`)
}

func TestDDLTargeted(t *testing.T) {
ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
require.NoError(t, err)
defer conn.Close()

utils.Exec(t, conn, "use `ks/-80`")
utils.Exec(t, conn, `begin`)
utils.Exec(t, conn, `create table ddl_targeted (id bigint primary key)`)
// implicit commit on ddl would have closed the open transaction
// so this execution should happen as autocommit.
utils.Exec(t, conn, `insert into ddl_targeted (id) values (1)`)
// this will have not impact and the row would have inserted.
utils.Exec(t, conn, `rollback`)
// validating the row
utils.AssertMatches(t, conn, `select id from ddl_targeted`, `[[INT64(1)]]`)
}

func TestLookupErrorMetric(t *testing.T) {
conn, closer := start(t)
defer closer()
Expand Down
18 changes: 18 additions & 0 deletions go/vt/vtgate/engine/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ type Send struct {
// IsDML specifies how to deal with autocommit behaviour
IsDML bool

IsDDL bool

// SingleShardOnly specifies that the query must be send to only single shard
SingleShardOnly bool

Expand Down Expand Up @@ -94,6 +96,10 @@ func (s *Send) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[str
ctx, cancelFunc := addQueryTimeout(ctx, vcursor, s.QueryTimeout)
defer cancelFunc()

if err := s.commitIfDDL(ctx, vcursor); err != nil {
return nil, err
}

rss, err := s.checkAndReturnShards(ctx, vcursor)
if err != nil {
return nil, err
Expand Down Expand Up @@ -158,6 +164,10 @@ func copyBindVars(in map[string]*querypb.BindVariable) map[string]*querypb.BindV

// TryStreamExecute implements Primitive interface
func (s *Send) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
if err := s.commitIfDDL(ctx, vcursor); err != nil {
return err
}

rss, err := s.checkAndReturnShards(ctx, vcursor)
if err != nil {
return err
Expand Down Expand Up @@ -204,3 +214,11 @@ func (s *Send) description() PrimitiveDescription {
Other: other,
}
}

// commitIfDDL commits any open transaction before executing the ddl query.
func (s *Send) commitIfDDL(ctx context.Context, vcursor VCursor) error {
if s.IsDDL {
return vcursor.Session().Commit(ctx)
}
return nil
}
5 changes: 3 additions & 2 deletions go/vt/vtgate/planbuilder/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (fk *fkContraint) FkWalk(node sqlparser.SQLNode) (kontinue bool, err error)
// and which chooses which of the two to invoke at runtime.
func buildGeneralDDLPlan(ctx context.Context, sql string, ddlStatement sqlparser.DDLStatement, reservedVars *sqlparser.ReservedVars, vschema plancontext.VSchema, enableOnlineDDL, enableDirectDDL bool) (*planResult, error) {
if vschema.Destination() != nil {
return buildByPassPlan(sql, vschema)
return buildByPassPlan(sql, vschema, true)
}
normalDDLPlan, onlineDDLPlan, err := buildDDLPlans(ctx, sql, ddlStatement, reservedVars, vschema, enableOnlineDDL, enableDirectDDL)
if err != nil {
Expand Down Expand Up @@ -80,7 +80,7 @@ func buildGeneralDDLPlan(ctx context.Context, sql string, ddlStatement sqlparser
return newPlanResult(eddl, tc.getTables()...), nil
}

func buildByPassPlan(sql string, vschema plancontext.VSchema) (*planResult, error) {
func buildByPassPlan(sql string, vschema plancontext.VSchema, isDDL bool) (*planResult, error) {
keyspace, err := vschema.DefaultKeyspace()
if err != nil {
return nil, err
Expand All @@ -89,6 +89,7 @@ func buildByPassPlan(sql string, vschema plancontext.VSchema) (*planResult, erro
Keyspace: keyspace,
TargetDestination: vschema.Destination(),
Query: sql,
IsDDL: isDDL,
}
return newPlanResult(send), nil
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/planbuilder/show.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ const (

func buildShowPlan(sql string, stmt *sqlparser.Show, _ *sqlparser.ReservedVars, vschema plancontext.VSchema) (*planResult, error) {
if vschema.Destination() != nil {
return buildByPassPlan(sql, vschema)
return buildByPassPlan(sql, vschema, false)
}

var prim engine.Primitive
Expand Down
36 changes: 9 additions & 27 deletions go/vt/vttablet/endtoend/settings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,16 +138,9 @@ func TestDDLNoConnectionReservationOnSettings(t *testing.T) {
query := "create table temp(c_date datetime default '0000-00-00')"
setting := "set sql_mode='TRADITIONAL'"

for _, withTx := range []bool{false, true} {
if withTx {
err := client.Begin(false)
require.NoError(t, err)
}
_, err := client.ReserveExecute(query, []string{setting}, nil)
require.Error(t, err, "create table should have failed with TRADITIONAL mode")
require.Contains(t, err.Error(), "Invalid default value")
assert.Zero(t, client.ReservedID())
}
_, err := client.ReserveExecute(query, []string{setting}, nil)
assert.ErrorContains(t, err, "Invalid default value for 'c_date'", "create table should have failed with TRADITIONAL mode")
assert.Zero(t, client.ReservedID())
}

func TestDMLNoConnectionReservationOnSettings(t *testing.T) {
Expand All @@ -156,7 +149,10 @@ func TestDMLNoConnectionReservationOnSettings(t *testing.T) {

_, err := client.Execute("create table temp(c_date datetime)", nil)
require.NoError(t, err)
defer client.Execute("drop table temp", nil)
defer func() {
client.Rollback()
client.Execute("drop table temp", nil)
}()

_, err = client.Execute("insert into temp values ('2022-08-25')", nil)
require.NoError(t, err)
Expand Down Expand Up @@ -211,9 +207,8 @@ func TestDDLNoConnectionReservationOnSettingsWithTx(t *testing.T) {
query := "create table temp(c_date datetime default '0000-00-00')"
setting := "set sql_mode='TRADITIONAL'"

_, err := client.ReserveBeginExecute(query, []string{setting}, nil, nil)
require.Error(t, err, "create table should have failed with TRADITIONAL mode")
require.Contains(t, err.Error(), "Invalid default value")
_, err := client.ReserveExecute(query, []string{setting}, nil)
require.ErrorContains(t, err, "Invalid default value for 'c_date'", "create table should have failed with TRADITIONAL mode")
assert.Zero(t, client.ReservedID())
}

Expand Down Expand Up @@ -297,12 +292,6 @@ func TestTempTableOnReserveExecute(t *testing.T) {
require.NoError(t,
client.Release())

_, err = client.ReserveBeginExecute(tempTblQuery, nil, nil, nil)
require.NoError(t, err)
assert.NotZero(t, client.ReservedID())
require.NoError(t,
client.Release())

// drop the table
_, err = client.Execute("drop table if exists temp", nil)
require.NoError(t, err)
Expand All @@ -318,13 +307,6 @@ func TestTempTableOnReserveExecute(t *testing.T) {
assert.NotZero(t, client.ReservedID(), "as this goes through fallback path of reserving a connection due to temporary tables")
require.NoError(t,
client.Release())

_, err = client.ReserveBeginExecute(tempTblQuery, []string{setting}, nil, nil)
require.Error(t, err, "create table should have failed with TRADITIONAL mode")
require.Contains(t, err.Error(), "Invalid default value")
assert.NotZero(t, client.ReservedID(), "as this goes through fallback path of reserving a connection due to temporary tables")
require.NoError(t,
client.Release())
}

func TestInfiniteSessions(t *testing.T) {
Expand Down
58 changes: 24 additions & 34 deletions go/vt/vttablet/tabletserver/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@ import (
"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/pools/smartconnpool"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/vt/callerid"
"vitess.io/vitess/go/vt/callinfo"
"vitess.io/vitess/go/vt/log"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/tableacl"
Expand All @@ -45,10 +47,7 @@ import (
"vitess.io/vitess/go/vt/vttablet/tabletserver/rules"
eschema "vitess.io/vitess/go/vt/vttablet/tabletserver/schema"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"

querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tx"
)

// QueryExecutor is used for executing a query request.
Expand Down Expand Up @@ -192,8 +191,10 @@ func (qre *QueryExecutor) Execute() (reply *sqltypes.Result, err error) {
return qr, nil
case p.PlanOtherRead, p.PlanOtherAdmin, p.PlanFlush, p.PlanSavepoint, p.PlanRelease, p.PlanSRollback:
return qre.execOther()
case p.PlanInsert, p.PlanUpdate, p.PlanDelete, p.PlanInsertMessage, p.PlanDDL, p.PlanLoad:
case p.PlanInsert, p.PlanUpdate, p.PlanDelete, p.PlanInsertMessage, p.PlanLoad:
return qre.execAutocommit(qre.txConnExec)
case p.PlanDDL:
return qre.execDDL(nil)
case p.PlanUpdateLimit, p.PlanDeleteLimit:
return qre.execAsTransaction(qre.txConnExec)
case p.PlanCallProc:
Expand Down Expand Up @@ -538,7 +539,7 @@ func (qre *QueryExecutor) checkAccess(authorized *tableacl.ACLResult, tableName
return nil
}

func (qre *QueryExecutor) execDDL(conn *StatefulConnection) (*sqltypes.Result, error) {
func (qre *QueryExecutor) execDDL(conn *StatefulConnection) (result *sqltypes.Result, err error) {
// Let's see if this is a normal DDL statement or an Online DDL statement.
// An Online DDL statement is identified by /*vt+ .. */ comment with expected directives, like uuid etc.
if onlineDDL, err := schema.OnlineDDLFromCommentedStatement(qre.plan.FullStmt); err == nil {
Expand All @@ -549,6 +550,21 @@ func (qre *QueryExecutor) execDDL(conn *StatefulConnection) (*sqltypes.Result, e
}
}

if conn == nil {
conn, err = qre.tsv.te.txPool.createConn(qre.ctx, qre.options, qre.setting)
if err != nil {
return nil, err
}
defer conn.Release(tx.ConnRelease)
}

// A DDL statement should commit the current transaction in the VTGate.
// The change was made in PR: https://github.com/vitessio/vitess/pull/14110 in v18.
// DDL statement received by vttablet will be outside of a transaction.
if conn.txProps != nil {
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "DDL statement executed inside a transaction")
}

isTemporaryTable := false
if ddlStmt, ok := qre.plan.FullStmt.(sqlparser.DDLStatement); ok {
isTemporaryTable = ddlStmt.IsTemporary()
Expand Down Expand Up @@ -580,19 +596,7 @@ func (qre *QueryExecutor) execDDL(conn *StatefulConnection) (*sqltypes.Result, e
return nil, err
}
}
result, err := qre.execStatefulConn(conn, sql, true)
if err != nil {
return nil, err
}
// Only perform this operation when the connection has transaction open.
// TODO: This actually does not retain the old transaction. We should see how to provide correct behaviour to client.
if conn.txProps != nil {
err = qre.BeginAgain(qre.ctx, conn)
if err != nil {
return nil, err
}
}
return result, nil
return qre.execStatefulConn(conn, sql, true)
}

func (qre *QueryExecutor) execLoad(conn *StatefulConnection) (*sqltypes.Result, error) {
Expand All @@ -603,20 +607,6 @@ func (qre *QueryExecutor) execLoad(conn *StatefulConnection) (*sqltypes.Result,
return result, nil
}

// BeginAgain commits the existing transaction and begins a new one
func (*QueryExecutor) BeginAgain(ctx context.Context, dc *StatefulConnection) error {
if dc.IsClosed() || dc.TxProperties().Autocommit {
return nil
}
if _, err := dc.Exec(ctx, "commit", 1, false); err != nil {
return err
}
if _, err := dc.Exec(ctx, "begin", 1, false); err != nil {
return err
}
return nil
}

func (qre *QueryExecutor) execNextval() (*sqltypes.Result, error) {
env := evalengine.NewExpressionEnv(qre.ctx, qre.bindVars, evalengine.NewEmptyVCursor(qre.tsv.Environment(), time.Local))
result, err := env.Evaluate(qre.plan.NextCount)
Expand Down
Loading

0 comments on commit e0a5069

Please sign in to comment.