Skip to content

Commit

Permalink
Enable Replace Into engine and Fix Foreign key locking issue (#14532)
Browse files Browse the repository at this point in the history
Signed-off-by: Harshit Gangal <[email protected]>
Signed-off-by: Shlomi Noach <[email protected]>
Co-authored-by: Shlomi Noach <[email protected]>
  • Loading branch information
harshit-gangal and shlomi-noach authored Nov 20, 2023
1 parent 543e761 commit 82bd8ee
Show file tree
Hide file tree
Showing 16 changed files with 267 additions and 220 deletions.
1 change: 1 addition & 0 deletions go/mysql/sqlerror/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ const (
ERJSONValueTooBig = ErrorCode(3150)
ERJSONDocumentTooDeep = ErrorCode(3157)

ERLockNowait = ErrorCode(3572)
ERRegexpStringNotTerminated = ErrorCode(3684)
ERRegexpBufferOverflow = ErrorCode(3684)
ERRegexpIllegalArgument = ErrorCode(3685)
Expand Down
55 changes: 25 additions & 30 deletions go/test/endtoend/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,51 +256,46 @@ func WaitForAuthoritative(t *testing.T, ks, tbl string, readVSchema func() (*int

// WaitForKsError waits for the ks error field to be populated and returns it.
func WaitForKsError(t *testing.T, vtgateProcess cluster.VtgateProcess, ks string) string {
var errString string
WaitForVschemaCondition(t, vtgateProcess, ks, func(t *testing.T, keyspace map[string]interface{}) bool {
ksErr, fieldPresent := keyspace["error"]
if !fieldPresent {
return false
}
var ok bool
errString, ok = ksErr.(string)
return ok
})
return errString
}

// WaitForVschemaCondition waits for the condition to be true
func WaitForVschemaCondition(t *testing.T, vtgateProcess cluster.VtgateProcess, ks string, conditionMet func(t *testing.T, keyspace map[string]interface{}) bool) {
timeout := time.After(60 * time.Second)
for {
select {
case <-timeout:
t.Fatalf("schema tracking did not find error in '%s'", ks)
return ""
t.Fatalf("schema tracking did not met the condition within the time for keyspace: %s", ks)
default:
res, err := vtgateProcess.ReadVSchema()
require.NoError(t, err, res)
kss := convertToMap(*res)["keyspaces"]
ksMap := convertToMap(convertToMap(kss)[ks])
ksErr, fieldPresent := ksMap["error"]
if !fieldPresent {
time.Sleep(100 * time.Millisecond)
continue
if conditionMet(t, ksMap) {
return
}
errString, isErr := ksErr.(string)
if !isErr {
time.Sleep(100 * time.Millisecond)
continue
}
return errString
time.Sleep(100 * time.Millisecond)
}
}
}

// WaitForTableDeletions waits for a table to be deleted
func WaitForTableDeletions(ctx context.Context, t *testing.T, vtgateProcess cluster.VtgateProcess, ks, tbl string) error {
for {
select {
case <-ctx.Done():
return fmt.Errorf("schema tracking still found the table '%s'", tbl)
default:
res, err := vtgateProcess.ReadVSchema()
require.NoError(t, err, res)
keyspacesMap := convertToMap(*res)["keyspaces"]
ksMap := convertToMap(keyspacesMap)[ks]
tablesMap := convertToMap(ksMap)["tables"]
_, isPresent := convertToMap(tablesMap)[tbl]
if !isPresent {
return nil
}
time.Sleep(100 * time.Millisecond)
}
}
func WaitForTableDeletions(ctx context.Context, t *testing.T, vtgateProcess cluster.VtgateProcess, ks, tbl string) {
WaitForVschemaCondition(t, vtgateProcess, ks, func(t *testing.T, keyspace map[string]interface{}) bool {
tablesMap := keyspace["tables"]
_, isPresent := convertToMap(tablesMap)[tbl]
return !isPresent
})
}

// WaitForColumn waits for a table's column to be present
Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/vtgate/foreignkey/fk_fuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (fz *fuzzer) generateQuery() []string {
}

func getInsertType() string {
return "insert"
return []string{"insert", "replace"}[rand.Intn(2)]
}

// generateInsertDMLQuery generates an INSERT query from the parameters for the fuzzer.
Expand Down
36 changes: 29 additions & 7 deletions go/test/endtoend/vtgate/foreignkey/fk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1000,20 +1000,26 @@ func TestCyclicFks(t *testing.T) {

// Create a cyclic foreign key constraint.
utils.Exec(t, mcmp.VtConn, "alter table fk_t10 add constraint test_cyclic_fks foreign key (col) references fk_t12 (col) on delete cascade on update cascade")
defer func() {
utils.Exec(t, mcmp.VtConn, "alter table fk_t10 drop foreign key test_cyclic_fks")
}()

// Wait for schema-tracking to be complete.
ksErr := utils.WaitForKsError(t, clusterInstance.VtgateProcess, unshardedKs)
// Make sure Vschema has the error for cyclic foreign keys.
assert.Contains(t, ksErr, "VT09019: uks has cyclic foreign keys")
errString := utils.WaitForKsError(t, clusterInstance.VtgateProcess, unshardedKs)
assert.Contains(t, errString, "VT09019: keyspace 'uks' has cyclic foreign keys")

// Ensure that the Vitess database is originally empty
ensureDatabaseState(t, mcmp.VtConn, true)

_, err := utils.ExecAllowError(t, mcmp.VtConn, "insert into fk_t10(id, col) values (1, 1)")
require.ErrorContains(t, err, "VT09019: uks has cyclic foreign keys")
require.ErrorContains(t, err, "VT09019: keyspace 'uks' has cyclic foreign keys")

// Drop the cyclic foreign key constraint.
utils.Exec(t, mcmp.VtConn, "alter table fk_t10 drop foreign key test_cyclic_fks")

// Wait for schema-tracking to be complete.
utils.WaitForVschemaCondition(t, clusterInstance.VtgateProcess, unshardedKs, func(t *testing.T, keyspace map[string]interface{}) bool {
_, fieldPresent := keyspace["error"]
return !fieldPresent
})

}

func TestReplace(t *testing.T) {
Expand Down Expand Up @@ -1175,4 +1181,20 @@ func TestReplaceWithFK(t *testing.T) {
// replace some data.
_, err := utils.ExecAllowError(t, conn, `replace into t1(id, col) values (1, 1)`)
require.ErrorContains(t, err, "VT12001: unsupported: REPLACE INTO with sharded keyspace (errno 1235) (sqlstate 42000)")

_ = utils.Exec(t, conn, `use uks`)

_ = utils.Exec(t, conn, `replace into u_t1(id, col1) values (1, 1), (2, 1)`)
// u_t1: (1,1) (2,1)

_ = utils.Exec(t, conn, `replace into u_t2(id, col2) values (1, 1), (2, 1)`)
// u_t1: (1,1) (2,1)
// u_t2: (1,1) (2,1)

_ = utils.Exec(t, conn, `replace into u_t1(id, col1) values (2, 2)`)
// u_t1: (1,1) (2,2)
// u_t2: (1,null) (2,null)

utils.AssertMatches(t, conn, `select * from u_t1`, `[[INT64(1) INT64(1)] [INT64(2) INT64(2)]]`)
utils.AssertMatches(t, conn, `select * from u_t2`, `[[INT64(1) NULL] [INT64(2) NULL]]`)
}
5 changes: 3 additions & 2 deletions go/test/endtoend/vtgate/foreignkey/stress/fk_stress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -707,8 +707,7 @@ func createInitialSchema(t *testing.T, tcase *testCase) {
timeoutCtx, cancel := context.WithTimeout(ctx, 1*time.Minute)
defer cancel()
for _, tableName := range tableNames {
err := utils.WaitForTableDeletions(timeoutCtx, t, clusterInstance.VtgateProcess, keyspaceName, tableName)
require.NoError(t, err)
utils.WaitForTableDeletions(timeoutCtx, t, clusterInstance.VtgateProcess, keyspaceName, tableName)
}
})
t.Run("creating tables", func(t *testing.T) {
Expand Down Expand Up @@ -922,6 +921,8 @@ func isFKError(err error) bool {
return false
case sqlerror.ERLockDeadlock:
return false // bummer, but deadlocks can happen, it's a legit error.
case sqlerror.ERLockNowait:
return false // For some queries we use NOWAIT. Bummer, but this can happen, it's a legit error.
case sqlerror.ERNoReferencedRow,
sqlerror.ERRowIsReferenced,
sqlerror.ERRowIsReferenced2,
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vterrors/code.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ var (
VT09016 = errorWithState("VT09016", vtrpcpb.Code_FAILED_PRECONDITION, RowIsReferenced2, "Cannot delete or update a parent row: a foreign key constraint fails", "SET DEFAULT is not supported by InnoDB")
VT09017 = errorWithoutState("VT09017", vtrpcpb.Code_FAILED_PRECONDITION, "%s", "Invalid syntax for the statement type.")
VT09018 = errorWithoutState("VT09018", vtrpcpb.Code_FAILED_PRECONDITION, "%s", "Invalid syntax for the vindex function statement.")
VT09019 = errorWithoutState("VT09019", vtrpcpb.Code_FAILED_PRECONDITION, "%s has cyclic foreign keys", "Vitess doesn't support cyclic foreign keys.")
VT09019 = errorWithoutState("VT09019", vtrpcpb.Code_FAILED_PRECONDITION, "keyspace '%s' has cyclic foreign keys", "Vitess doesn't support cyclic foreign keys.")

VT10001 = errorWithoutState("VT10001", vtrpcpb.Code_ABORTED, "foreign key constraints are not allowed", "Foreign key constraints are not allowed, see https://vitess.io/blog/2021-06-15-online-ddl-why-no-fk/.")
VT10002 = errorWithoutState("VT10002", vtrpcpb.Code_ABORTED, "'replace into' with foreign key constraints are not allowed", "Foreign key constraints sometimes are not written in binary logs and will cause issue with vreplication workflows like online-ddl.")
Expand Down
22 changes: 20 additions & 2 deletions go/vt/vtgate/engine/sequential.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,30 @@ func (s *Sequential) GetTableName() string {

// TryExecute performs a non-streaming exec.
func (s *Sequential) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantFields bool) (*sqltypes.Result, error) {
return nil, vterrors.VT10002()
finalRes := &sqltypes.Result{}
for _, source := range s.Sources {
res, err := vcursor.ExecutePrimitive(ctx, source, bindVars, wantFields)
if err != nil {
return nil, err
}
finalRes.RowsAffected += res.RowsAffected
if finalRes.InsertID == 0 {
finalRes.InsertID = res.InsertID
}
if res.Info != "" {
finalRes.Info = res.Info
}
}
return finalRes, nil
}

// TryStreamExecute performs a streaming exec.
func (s *Sequential) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantFields bool, callback func(*sqltypes.Result) error) error {
return vterrors.VT10002()
qr, err := s.TryExecute(ctx, vcursor, bindVars, wantFields)
if err != nil {
return err
}
return callback(qr)
}

// GetFields fetches the field info.
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/planbuilder/operators/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func createFkCascadeOpForDelete(ctx *plancontext.PlanningContext, parentOp ops.O
}
fkChildren = append(fkChildren, fkChild)
}
selectionOp, err := createSelectionOp(ctx, selectExprs, delStmt.TableExprs, delStmt.Where, nil, nil, sqlparser.ForUpdateLock)
selectionOp, err := createSelectionOp(ctx, selectExprs, delStmt.TableExprs, delStmt.Where, nil, nil, sqlparser.ForUpdateLockNoWait)
if err != nil {
return nil, err
}
Expand Down
8 changes: 4 additions & 4 deletions go/vt/vtgate/planbuilder/operators/query_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ func exposeColumnsThroughDerivedTable(ctx *plancontext.PlanningContext, p *Proje
}

expr = semantics.RewriteDerivedTableExpression(expr, derivedTbl)
out := prefixColNames(tblName, expr)
out := prefixColNames(ctx, tblName, expr)

alias := sqlparser.UnescapedString(out)
predicate.LHSExprs[idx].Expr = sqlparser.NewColNameWithQualifier(alias, derivedTblName)
Expand All @@ -450,14 +450,14 @@ func exposeColumnsThroughDerivedTable(ctx *plancontext.PlanningContext, p *Proje

// prefixColNames adds qualifier prefixes to all ColName:s.
// We want to be more explicit than the user was to make sure we never produce invalid SQL
func prefixColNames(tblName sqlparser.TableName, e sqlparser.Expr) sqlparser.Expr {
func prefixColNames(ctx *plancontext.PlanningContext, tblName sqlparser.TableName, e sqlparser.Expr) sqlparser.Expr {
return sqlparser.CopyOnRewrite(e, nil, func(cursor *sqlparser.CopyOnWriteCursor) {
col, ok := cursor.Node().(*sqlparser.ColName)
if !ok {
return
}
col.Qualifier = tblName
}, nil).(sqlparser.Expr)
cursor.Replace(sqlparser.NewColNameWithQualifier(col.Name.String(), tblName))
}, ctx.SemTable.CopySemanticInfo).(sqlparser.Expr)
}

func createProjectionWithTheseColumns(
Expand Down
32 changes: 19 additions & 13 deletions go/vt/vtgate/planbuilder/operators/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func createFKCascadeOp(ctx *plancontext.PlanningContext, parentOp ops.Operator,
fkChildren = append(fkChildren, fkChild)
}

selectionOp, err := createSelectionOp(ctx, selectExprs, updStmt.TableExprs, updStmt.Where, updStmt.OrderBy, nil, sqlparser.ForUpdateLock)
selectionOp, err := createSelectionOp(ctx, selectExprs, updStmt.TableExprs, updStmt.Where, updStmt.OrderBy, nil, sqlparser.ForUpdateLockNoWait)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -482,7 +482,9 @@ func buildChildUpdOpForSetNull(
// For example, if we are setting `update parent cola = :v1 and colb = :v2`, then on the child, the where condition would look something like this -
// `:v1 IS NULL OR :v2 IS NULL OR (child_cola, child_colb) NOT IN ((:v1,:v2))`
// So, if either of :v1 or :v2 is NULL, then the entire condition is true (which is the same as not having the condition when :v1 or :v2 is NULL).
compExpr := nullSafeNotInComparison(ctx.SemTable.GetUpdateExpressionsForFk(fk.String(updatedTable)), fk, updatedTable.GetTableName(), nonLiteralUpdateInfo)
updateExprs := ctx.SemTable.GetUpdateExpressionsForFk(fk.String(updatedTable))
compExpr := nullSafeNotInComparison(ctx,
updateExprs, fk, updatedTable.GetTableName(), nonLiteralUpdateInfo, false /* appendQualifier */)
if compExpr != nil {
childWhereExpr = &sqlparser.AndExpr{
Left: childWhereExpr,
Expand Down Expand Up @@ -605,8 +607,8 @@ func createFkVerifyOpForParentFKForUpdate(ctx *plancontext.PlanningContext, updS
Right: sqlparser.NewColNameWithQualifier(pFK.ChildColumns[idx].String(), childTbl),
}
} else {
notEqualColNames = append(notEqualColNames, prefixColNames(childTbl, matchedExpr.Name))
prefixedMatchExpr := prefixColNames(childTbl, matchedExpr.Expr)
notEqualColNames = append(notEqualColNames, prefixColNames(ctx, childTbl, matchedExpr.Name))
prefixedMatchExpr := prefixColNames(ctx, childTbl, matchedExpr.Expr)
notEqualExprs = append(notEqualExprs, prefixedMatchExpr)
joinExpr = &sqlparser.ComparisonExpr{
Operator: sqlparser.EqualOp,
Expand Down Expand Up @@ -641,7 +643,7 @@ func createFkVerifyOpForParentFKForUpdate(ctx *plancontext.PlanningContext, updS
}
// add existing where condition on the update statement
if updStmt.Where != nil {
whereCond = &sqlparser.AndExpr{Left: whereCond, Right: prefixColNames(childTbl, updStmt.Where.Expr)}
whereCond = &sqlparser.AndExpr{Left: whereCond, Right: prefixColNames(ctx, childTbl, updStmt.Where.Expr)}
}
return createSelectionOp(ctx,
sqlparser.SelectExprs{sqlparser.NewAliasedExpr(sqlparser.NewIntLiteral("1"), "")},
Expand All @@ -655,7 +657,7 @@ func createFkVerifyOpForParentFKForUpdate(ctx *plancontext.PlanningContext, updS
sqlparser.NewWhere(sqlparser.WhereClause, whereCond),
nil,
sqlparser.NewLimitWithoutOffset(1),
sqlparser.ShareModeLock)
sqlparser.ForShareLockNoWait)
}

// Each child foreign key constraint is verified by a join query of the form:
Expand Down Expand Up @@ -696,7 +698,7 @@ func createFkVerifyOpForChildFKForUpdate(ctx *plancontext.PlanningContext, updSt
var whereCond sqlparser.Expr
// add existing where condition on the update statement
if updStmt.Where != nil {
whereCond = prefixColNames(parentTbl, updStmt.Where.Expr)
whereCond = prefixColNames(ctx, parentTbl, updStmt.Where.Expr)
}

// We don't want to fail the RESTRICT for the case where the parent columns remains unchanged on the update.
Expand All @@ -708,7 +710,7 @@ func createFkVerifyOpForChildFKForUpdate(ctx *plancontext.PlanningContext, updSt
// For example, if we are setting `update child cola = :v1 and colb = :v2`, then on the parent, the where condition would look something like this -
// `:v1 IS NULL OR :v2 IS NULL OR (cola, colb) NOT IN ((:v1,:v2))`
// So, if either of :v1 or :v2 is NULL, then the entire condition is true (which is the same as not having the condition when :v1 or :v2 is NULL).
compExpr := nullSafeNotInComparison(updStmt.Exprs, cFk, parentTbl, nil)
compExpr := nullSafeNotInComparison(ctx, updStmt.Exprs, cFk, parentTbl, nil /* nonLiteralUpdateInfo */, true /* appendQualifier */)
if compExpr != nil {
whereCond = sqlparser.AndExpressions(whereCond, compExpr)
}
Expand All @@ -725,15 +727,15 @@ func createFkVerifyOpForChildFKForUpdate(ctx *plancontext.PlanningContext, updSt
sqlparser.NewWhere(sqlparser.WhereClause, whereCond),
nil,
sqlparser.NewLimitWithoutOffset(1),
sqlparser.ShareModeLock)
sqlparser.ForShareLockNoWait)
}

// nullSafeNotInComparison is used to compare the child columns in the foreign key constraint aren't the same as the updateExpressions exactly.
// This comparison has to be null safe so we create an expression which looks like the following for a query like `update child cola = :v1 and colb = :v2` -
// This comparison has to be null safe, so we create an expression which looks like the following for a query like `update child cola = :v1 and colb = :v2` -
// `:v1 IS NULL OR :v2 IS NULL OR (cola, colb) NOT IN ((:v1,:v2))`
// So, if either of :v1 or :v2 is NULL, then the entire condition is true (which is the same as not having the condition when :v1 or :v2 is NULL)
// This expression is used in cascading SET NULLs and in verifying whether an update should be restricted.
func nullSafeNotInComparison(updateExprs sqlparser.UpdateExprs, cFk vindexes.ChildFKInfo, parentTbl sqlparser.TableName, nonLiteralUpdateInfo []engine.NonLiteralUpdateInfo) sqlparser.Expr {
func nullSafeNotInComparison(ctx *plancontext.PlanningContext, updateExprs sqlparser.UpdateExprs, cFk vindexes.ChildFKInfo, parentTbl sqlparser.TableName, nonLiteralUpdateInfo []engine.NonLiteralUpdateInfo, appendQualifier bool) sqlparser.Expr {
var valTuple sqlparser.ValTuple
var updateValues sqlparser.ValTuple
for idx, updateExpr := range updateExprs {
Expand All @@ -742,12 +744,16 @@ func nullSafeNotInComparison(updateExprs sqlparser.UpdateExprs, cFk vindexes.Chi
if sqlparser.IsNull(updateExpr.Expr) {
return nil
}
childUpdateExpr := prefixColNames(parentTbl, updateExpr.Expr)
childUpdateExpr := prefixColNames(ctx, parentTbl, updateExpr.Expr)
if len(nonLiteralUpdateInfo) > 0 && nonLiteralUpdateInfo[idx].UpdateExprBvName != "" {
childUpdateExpr = sqlparser.NewArgument(nonLiteralUpdateInfo[idx].UpdateExprBvName)
}
updateValues = append(updateValues, childUpdateExpr)
valTuple = append(valTuple, sqlparser.NewColNameWithQualifier(cFk.ChildColumns[colIdx].String(), cFk.Table.GetTableName()))
if appendQualifier {
valTuple = append(valTuple, sqlparser.NewColNameWithQualifier(cFk.ChildColumns[colIdx].String(), cFk.Table.GetTableName()))
} else {
valTuple = append(valTuple, sqlparser.NewColName(cFk.ChildColumns[colIdx].String()))
}
}
}

Expand Down
Loading

0 comments on commit 82bd8ee

Please sign in to comment.