Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OnlineDDL: fix scenarios where migration hangs instead of directly failing #14290

Merged
merged 4 commits into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,9 @@ func testScheduler(t *testing.T) {
createViewDependsOnExtraColumn = `
CREATE VIEW t1_test_view AS SELECT id, extra_column FROM t1_test
`
alterNonexistent = `
ALTER TABLE nonexistent FORCE
`
)

testReadTimestamp := func(t *testing.T, uuid string, timestampColumn string) (timestamp string) {
Expand Down Expand Up @@ -960,6 +963,22 @@ func testScheduler(t *testing.T) {
})
})
}
// Failure scenarios
t.Run("fail nonexistent", func(t *testing.T) {
uuid := testOnlineDDLStatement(t, createParams(alterNonexistent, "vitess", "vtgate", "", "", false))

status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
fmt.Printf("# Migration status (for debug purposes): <%s>\n", status)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusFailed)

rs := onlineddl.ReadMigrations(t, &vtParams, uuid)
require.NotNil(t, rs)
for _, row := range rs.Named().Rows {
message := row["message"].ToString()
require.Contains(t, message, "errno 1146")
}
})

// 'mysql' strategy
t.Run("mysql strategy", func(t *testing.T) {
t.Run("declarative", func(t *testing.T) {
Expand Down
110 changes: 60 additions & 50 deletions go/vt/vttablet/onlineddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2310,6 +2310,64 @@ func (e *Executor) reviewImmediateOperations(ctx context.Context, capableOf mysq
return false, nil
}

// reviewQueuedMigration investigates a single migration found in `queued` state.
// It analyzes whether the migration can & should be fulfilled immediately (e.g. via INSTANT DDL or just because it's a CREATE or DROP),
// or backfils necessary information if it's a REVERT.
// If all goes well, it sets `reviewed_timestamp` which then allows the state machine to schedule the migration.
func (e *Executor) reviewQueuedMigration(ctx context.Context, uuid string, capableOf mysql.CapableOf) error {
onlineDDL, row, err := e.readMigration(ctx, uuid)
if err != nil {
return err
}
// handle REVERT migrations: populate table name and update ddl action and is_view:
ddlAction := row["ddl_action"].ToString()
isRevert := false
if ddlAction == schema.RevertActionStr {
isRevert = true
rowModified, err := e.reviewEmptyTableRevertMigrations(ctx, onlineDDL)
if err != nil {
return err
}
if rowModified {
// re-read migration and entire row
onlineDDL, row, err = e.readMigration(ctx, uuid)
if err != nil {
return err
}
ddlAction = row["ddl_action"].ToString()
}
}
isView := row.AsBool("is_view", false)
isImmediate, err := e.reviewImmediateOperations(ctx, capableOf, onlineDDL, ddlAction, isRevert, isView)
if err != nil {
return err
}
if isImmediate {
if err := e.updateMigrationSetImmediateOperation(ctx, onlineDDL.UUID); err != nil {
return err
}
}
// Find conditions where the migration cannot take place:
switch onlineDDL.Strategy {
case schema.DDLStrategyMySQL:
strategySetting := onlineDDL.StrategySetting()
if strategySetting.IsPostponeCompletion() {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "--postpone-completion not supported in 'mysql' strategy")
}
if strategySetting.IsAllowZeroInDateFlag() {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "--allow-zero-in-date not supported in 'mysql' strategy")
}
}

// The review is complete. We've backfilled details on the migration row. We mark
// the migration as having been reviewed. The function scheduleNextMigration() will then
// have access to this row.
if err := e.updateMigrationTimestamp(ctx, "reviewed_timestamp", uuid); err != nil {
return err
}
return nil
}

// reviewQueuedMigrations iterates through queued migrations and sees if any information needs to be updated.
// The function analyzes the queued migration and fills in some blanks:
// - If this is a REVERT migration, what table is affected? What's the operation?
Expand All @@ -2332,57 +2390,9 @@ func (e *Executor) reviewQueuedMigrations(ctx context.Context) error {

for _, uuidRow := range r.Named().Rows {
uuid := uuidRow["migration_uuid"].ToString()
onlineDDL, row, err := e.readMigration(ctx, uuid)
if err != nil {
return err
if err := e.reviewQueuedMigration(ctx, uuid, capableOf); err != nil {
e.failMigration(ctx, &schema.OnlineDDL{UUID: uuid}, err)
}
// handle REVERT migrations: populate table name and update ddl action and is_view:
ddlAction := row["ddl_action"].ToString()
isRevert := false
if ddlAction == schema.RevertActionStr {
isRevert = true
rowModified, err := e.reviewEmptyTableRevertMigrations(ctx, onlineDDL)
if err != nil {
return err
}
if rowModified {
// re-read migration and entire row
onlineDDL, row, err = e.readMigration(ctx, uuid)
if err != nil {
return err
}
ddlAction = row["ddl_action"].ToString()
}
}
isView := row.AsBool("is_view", false)
isImmediate, err := e.reviewImmediateOperations(ctx, capableOf, onlineDDL, ddlAction, isRevert, isView)
if err != nil {
return err
}
if isImmediate {
if err := e.updateMigrationSetImmediateOperation(ctx, onlineDDL.UUID); err != nil {
return err
}
}
// Find conditions where the migration cannot take place:
switch onlineDDL.Strategy {
case schema.DDLStrategyMySQL:
strategySetting := onlineDDL.StrategySetting()
if strategySetting.IsPostponeCompletion() {
e.failMigration(ctx, onlineDDL, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "--postpone-completion not supported in 'mysql' strategy"))
}
if strategySetting.IsAllowZeroInDateFlag() {
e.failMigration(ctx, onlineDDL, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "--allow-zero-in-date not supported in 'mysql' strategy"))
}
}

// The review is complete. We've backfilled details on the migration row. We mark
// the migration as having been reviewed. The function scheduleNextMigration() will then
// have access to this row.
if err := e.updateMigrationTimestamp(ctx, "reviewed_timestamp", uuid); err != nil {
return err
}

}
return nil
}
Expand Down
5 changes: 3 additions & 2 deletions go/vt/vttablet/tabletmanager/vreplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,10 +261,11 @@ func (ct *controller) runBlp(ctx context.Context) (err error) {

// If this is a mysql error that we know needs manual intervention OR
// we cannot identify this as non-recoverable, but it has persisted
// beyond the retry limit (maxTimeToRetryError).
// beyond the retry limit (maxTimeToRetryError), or is an INTERNAL error
// In addition, we cannot restart a workflow started with AtomicCopy which has _any_ error.
if (err != nil && vr.WorkflowSubType == int32(binlogdatapb.VReplicationWorkflowSubType_AtomicCopy)) ||
isUnrecoverableError(err) || !ct.lastWorkflowError.ShouldRetry() {
isUnrecoverableError(err) ||
!ct.lastWorkflowError.ShouldRetry() {

log.Errorf("vreplication stream %d going into error state due to %+v", ct.id, err)
if errSetState := vr.setState(binlogdatapb.VReplicationWorkflowState_Error, err.Error()); errSetState != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ import (
"vitess.io/vitess/go/vt/key"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"
)

// This file contains just the builders for ReplicatorPlan and TablePlan.
Expand Down Expand Up @@ -629,7 +631,7 @@ func (tpb *tablePlanBuilder) analyzeExtraSourcePkCols(colInfos []*ColumnInfo, so
if !col.IsGenerated {
// We shouldn't get here in any normal scenario. If a column is part of colInfos,
// then it must also exist in tpb.colExprs.
return fmt.Errorf("column %s not found in table expressions", col.Name)
return vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "column %s not found in table expressions", col.Name)
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ import (
"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/proto/vtrpc"
mattlord marked this conversation as resolved.
Show resolved Hide resolved
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"
)

const (
Expand Down Expand Up @@ -123,6 +125,9 @@ func isUnrecoverableError(err error) bool {
if err == nil {
return false
}
if vterrors.Code(err) == vtrpc.Code_FAILED_PRECONDITION {
return true
}
sqlErr, isSQLErr := sqlerror.NewSQLErrorFromError(err).(*sqlerror.SQLError)
if !isSQLErr {
return false
Expand Down
3 changes: 2 additions & 1 deletion go/vt/vttablet/tabletserver/vstreamer/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"vitess.io/vitess/go/mysql/collations"
"vitess.io/vitess/go/vt/vtgate/evalengine"

"vitess.io/vitess/go/vt/proto/vtrpc"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
mattlord marked this conversation as resolved.
Show resolved Hide resolved
"vitess.io/vitess/go/vt/vterrors"

Expand Down Expand Up @@ -865,5 +866,5 @@ func findColumn(ti *Table, name sqlparser.IdentifierCI) (int, error) {
return i, nil
}
}
return 0, fmt.Errorf("column %s not found in table %s", sqlparser.String(name), ti.Name)
return 0, vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "column %s not found in table %s", sqlparser.String(name), ti.Name)
}
Loading