diff --git a/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go b/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go index 4362069af66..5b88b1f8678 100644 --- a/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go +++ b/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go @@ -198,7 +198,7 @@ func waitForReadyToComplete(t *testing.T, uuid string, expected bool) { case <-ticker.C: case <-ctx.Done(): } - require.NoError(t, ctx.Err()) + require.NoError(t, ctx.Err(), "waiting for ready_to_complete=%t for %v", expected, uuid) } } @@ -422,6 +422,14 @@ func testScheduler(t *testing.T) { assert.GreaterOrEqual(t, endTime2, endTime1) }) } + testTableCompletionAndStartTimes := func(t *testing.T, uuid1, uuid2 string) { + // expect uuid1 to complete before uuid2 + t.Run("Compare t1, t2 completion times", func(t *testing.T) { + endTime1 := testReadTimestamp(t, uuid1, "completed_timestamp") + startedTime2 := testReadTimestamp(t, uuid2, "started_timestamp") + assert.GreaterOrEqual(t, startedTime2, endTime1) + }) + } testAllowConcurrent := func(t *testing.T, name string, uuid string, expect int64) { t.Run("verify allow_concurrent: "+name, func(t *testing.T) { rs := onlineddl.ReadMigrations(t, &vtParams, uuid) @@ -434,7 +442,7 @@ func testScheduler(t *testing.T) { } // CREATE - t.Run("CREATE TABLEs t1, t1", func(t *testing.T) { + t.Run("CREATE TABLEs t1, t2", func(t *testing.T) { { // The table does not exist t1uuid = testOnlineDDLStatement(t, createParams(createT1Statement, ddlStrategy, "vtgate", "just-created", "", false)) onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusComplete) @@ -1183,6 +1191,36 @@ func testScheduler(t *testing.T) { }) }) // in-order-completion + t.Run("in-order-completion: multiple drops for nonexistent tables and views, sequential", func(t *testing.T) { + u, err := schema.CreateOnlineDDLUUID() + require.NoError(t, err) + + sqls := []string{ + fmt.Sprintf("drop table if exists t4_%s", u), + fmt.Sprintf("drop view if exists t1_%s", u), + fmt.Sprintf("drop table if exists t2_%s", u), + fmt.Sprintf("drop view if exists t3_%s", u), + } + sql := strings.Join(sqls, ";") + var vuuids []string + t.Run("drop multiple tables and views, in-order-completion", func(t *testing.T) { + uuidList := testOnlineDDLStatement(t, createParams(sql, ddlStrategy+" --in-order-completion", "vtctl", "", "", true)) // skip wait + vuuids = strings.Split(uuidList, "\n") + assert.Len(t, vuuids, 4) + for _, uuid := range vuuids { + status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) + fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) + } + }) + require.Len(t, vuuids, 4) + for i := range vuuids { + if i > 0 { + testTableCompletionTimes(t, vuuids[i-1], vuuids[i]) + testTableCompletionAndStartTimes(t, vuuids[i-1], vuuids[i]) + } + } + }) t.Run("in-order-completion: multiple drops for nonexistent tables and views", func(t *testing.T) { u, err := schema.CreateOnlineDDLUUID() require.NoError(t, err) @@ -1198,20 +1236,93 @@ func testScheduler(t *testing.T) { t.Run("drop multiple tables and views, in-order-completion", func(t *testing.T) { uuidList := testOnlineDDLStatement(t, createParams(sql, ddlStrategy+" --allow-concurrent --in-order-completion", "vtctl", "", "", true)) // skip wait vuuids = strings.Split(uuidList, "\n") - assert.Equal(t, 4, len(vuuids)) + assert.Len(t, vuuids, 4) for _, uuid := range vuuids { status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) } }) - require.Equal(t, 4, len(vuuids)) + require.Len(t, vuuids, 4) for i := range vuuids { if i > 0 { testTableCompletionTimes(t, vuuids[i-1], vuuids[i]) } } }) + + t.Run("in-order-completion: bail out on first error", func(t *testing.T) { + u, err := schema.CreateOnlineDDLUUID() + require.NoError(t, err) + + sqls := []string{ + fmt.Sprintf("drop table if exists t4_%s", u), + fmt.Sprintf("drop view if exists t1_%s", u), + fmt.Sprintf("drop table t2_%s", u), // non existent + fmt.Sprintf("drop view if exists t3_%s", u), + } + sql := strings.Join(sqls, ";") + var vuuids []string + t.Run("apply schema", func(t *testing.T) { + uuidList := testOnlineDDLStatement(t, createParams(sql, ddlStrategy+" --in-order-completion", "vtctl", "", "", true)) // skip wait + vuuids = strings.Split(uuidList, "\n") + assert.Len(t, vuuids, 4) + for _, uuid := range vuuids[0:2] { + status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) + fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) + } + { + uuid := vuuids[2] // the failed one + status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) + fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusFailed) + } + { + uuid := vuuids[3] // should consequently fail without even running + status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) + fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusQueued, schema.OnlineDDLStatusFailed) + + rs := onlineddl.ReadMigrations(t, &vtParams, uuid) + require.NotNil(t, rs) + for _, row := range rs.Named().Rows { + message := row["message"].ToString() + require.Contains(t, message, vuuids[2]) // Indicating this migration failed due to vuuids[2] failure + } + } + }) + testTableCompletionTimes(t, vuuids[0], vuuids[1]) + testTableCompletionAndStartTimes(t, vuuids[0], vuuids[1]) + testTableCompletionAndStartTimes(t, vuuids[1], vuuids[2]) + }) + t.Run("in-order-completion concurrent: bail out on first error", func(t *testing.T) { + sqls := []string{ + `alter table t1_test force`, + `alter table t2_test force`, + } + sql := strings.Join(sqls, ";") + var vuuids []string + t.Run("apply schema", func(t *testing.T) { + uuidList := testOnlineDDLStatement(t, createParams(sql, ddlStrategy+" --in-order-completion --postpone-completion --allow-concurrent", "vtctl", "", "", true)) // skip wait + vuuids = strings.Split(uuidList, "\n") + assert.Len(t, vuuids, 2) + for _, uuid := range vuuids { + waitForReadyToComplete(t, uuid, true) + } + t.Run("cancel 1st migration", func(t *testing.T) { + onlineddl.CheckCancelMigration(t, &vtParams, shards, vuuids[0], true) + status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, vuuids[0], normalWaitTime, schema.OnlineDDLStatusFailed, schema.OnlineDDLStatusCancelled) + fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, vuuids[0], schema.OnlineDDLStatusCancelled) + }) + t.Run("expect 2nd migration to fail", func(t *testing.T) { + status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, vuuids[1], normalWaitTime, schema.OnlineDDLStatusFailed, schema.OnlineDDLStatusCancelled) + fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, vuuids[1], schema.OnlineDDLStatusFailed) + }) + }) + }) t.Run("in-order-completion: two new views, one depends on the other", func(t *testing.T) { u, err := schema.CreateOnlineDDLUUID() require.NoError(t, err) @@ -1225,14 +1336,14 @@ func testScheduler(t *testing.T) { t.Run("create two views, expect both complete", func(t *testing.T) { uuidList := testOnlineDDLStatement(t, createParams(sql, ddlStrategy+" --allow-concurrent --in-order-completion", "vtctl", "", "", true)) // skip wait vuuids = strings.Split(uuidList, "\n") - assert.Equal(t, 2, len(vuuids)) + assert.Len(t, vuuids, 2) for _, uuid := range vuuids { status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) } }) - require.Equal(t, 2, len(vuuids)) + require.Len(t, vuuids, 2) testTableCompletionTimes(t, vuuids[0], vuuids[1]) }) t.Run("in-order-completion: new table column, new view depends on said column", func(t *testing.T) { diff --git a/go/vt/vttablet/onlineddl/executor.go b/go/vt/vttablet/onlineddl/executor.go index 42b2a4f827b..ebce8f0619c 100644 --- a/go/vt/vttablet/onlineddl/executor.go +++ b/go/vt/vttablet/onlineddl/executor.go @@ -2854,6 +2854,31 @@ func (e *Executor) getCompletedMigrationByContextAndSQL(ctx context.Context, onl return completedUUID, nil } +// readFailedCancelledMigrationsInContextBeforeMigration returns UUIDs for migrations that are failed/cancelled +// and are in the same context as given migration and _precede_ it chronologically (have lower `id` value) +func (e *Executor) readFailedCancelledMigrationsInContextBeforeMigration(ctx context.Context, onlineDDL *schema.OnlineDDL) (uuids []string, err error) { + if onlineDDL.MigrationContext == "" { + // only applies to migrations with an explicit context + return nil, nil + } + query, err := sqlparser.ParseAndBind(sqlSelectFailedCancelledMigrationsInContextBeforeMigration, + sqltypes.StringBindVariable(onlineDDL.MigrationContext), + sqltypes.StringBindVariable(onlineDDL.UUID), + ) + if err != nil { + return nil, err + } + r, err := e.execQuery(ctx, query) + if err != nil { + return uuids, err + } + for _, row := range r.Named().Rows { + uuid := row["migration_uuid"].ToString() + uuids = append(uuids, uuid) + } + return uuids, err +} + // failMigration marks a migration as failed func (e *Executor) failMigration(ctx context.Context, onlineDDL *schema.OnlineDDL, withError error) error { defer e.triggerNextCheckInterval() @@ -2865,6 +2890,23 @@ func (e *Executor) failMigration(ctx context.Context, onlineDDL *schema.OnlineDD return withError } +// validateInOrderMigration checks whether an in-order migration should be forced to fail, either before running or +// while running. +// This may happen if a prior migration in the same context has failed or was cancelled. +func (e *Executor) validateInOrderMigration(ctx context.Context, onlineDDL *schema.OnlineDDL) (wasFailed bool, err error) { + if !onlineDDL.StrategySetting().IsInOrderCompletion() { + return false, nil + } + uuids, err := e.readFailedCancelledMigrationsInContextBeforeMigration(ctx, onlineDDL) + if err != nil { + return false, err + } + if len(uuids) == 0 { + return false, err + } + return true, e.failMigration(ctx, onlineDDL, fmt.Errorf("migration %v cannot run because prior migration %v in same context has failed/was cancelled", onlineDDL.UUID, uuids[0])) +} + // analyzeDropDDLActionMigration analyzes a DROP migration. func (e *Executor) analyzeDropDDLActionMigration(ctx context.Context, onlineDDL *schema.OnlineDDL) error { // Schema analysis: @@ -3384,6 +3426,58 @@ func (e *Executor) executeMigration(ctx context.Context, onlineDDL *schema.Onlin return nil } +// getNonConflictingMigration finds a single 'ready' migration which does not conflict with running migrations. +// Conflicts are: +// - a migration is 'ready' but is not set to run _concurrently_, and there's a running migration that is also non-concurrent +// - a migration is 'ready' but there's another migration 'running' on the exact same table +func (e *Executor) getNonConflictingMigration(ctx context.Context) (*schema.OnlineDDL, error) { + pendingMigrationsUUIDs, err := e.readPendingMigrationsUUIDs(ctx) + if err != nil { + return nil, err + } + r, err := e.execQuery(ctx, sqlSelectReadyMigrations) + if err != nil { + return nil, err + } + for _, row := range r.Named().Rows { + uuid := row["migration_uuid"].ToString() + onlineDDL, migrationRow, err := e.readMigration(ctx, uuid) + if err != nil { + return nil, err + } + isImmediateOperation := migrationRow.AsBool("is_immediate_operation", false) + + if conflictFound, _ := e.isAnyConflictingMigrationRunning(onlineDDL); conflictFound { + continue // this migration conflicts with a running one + } + if e.countOwnedRunningMigrations() >= maxConcurrentOnlineDDLs { + return nil, nil // too many running migrations + } + if isImmediateOperation && onlineDDL.StrategySetting().IsInOrderCompletion() { + // This migration is immediate: if we run it now, it will complete within a second or two at most. + if len(pendingMigrationsUUIDs) > 0 && pendingMigrationsUUIDs[0] != onlineDDL.UUID { + continue + } + } + // We will fail an in-order migration if there's _prior_ migrations within the same migration-context + // which have failed. + if onlineDDL.StrategySetting().IsInOrderCompletion() { + wasFailed, err := e.validateInOrderMigration(ctx, onlineDDL) + if err != nil { + return nil, err + } + if wasFailed { + continue + } + } + // This migration seems good to go + return onlineDDL, err + } + // no non-conflicting migration found... + // Either all ready migrations are conflicting, or there are no ready migrations... + return nil, nil +} + // runNextMigration picks up to one 'ready' migration that is able to run, and executes it. // Possible scenarios: // - no migration is in 'ready' state -- nothing to be done @@ -3405,47 +3499,7 @@ func (e *Executor) runNextMigration(ctx context.Context) error { return nil } - // getNonConflictingMigration finds a single 'ready' migration which does not conflict with running migrations. - // Conflicts are: - // - a migration is 'ready' but is not set to run _concurrently_, and there's a running migration that is also non-concurrent - // - a migration is 'ready' but there's another migration 'running' on the exact same table - getNonConflictingMigration := func() (*schema.OnlineDDL, error) { - pendingMigrationsUUIDs, err := e.readPendingMigrationsUUIDs(ctx) - if err != nil { - return nil, err - } - r, err := e.execQuery(ctx, sqlSelectReadyMigrations) - if err != nil { - return nil, err - } - for _, row := range r.Named().Rows { - uuid := row["migration_uuid"].ToString() - onlineDDL, migrationRow, err := e.readMigration(ctx, uuid) - if err != nil { - return nil, err - } - isImmediateOperation := migrationRow.AsBool("is_immediate_operation", false) - - if conflictFound, _ := e.isAnyConflictingMigrationRunning(onlineDDL); conflictFound { - continue // this migration conflicts with a running one - } - if e.countOwnedRunningMigrations() >= maxConcurrentOnlineDDLs { - continue // too many running migrations - } - if isImmediateOperation && onlineDDL.StrategySetting().IsInOrderCompletion() { - // This migration is immediate: if we run it now, it will complete within a second or two at most. - if len(pendingMigrationsUUIDs) > 0 && pendingMigrationsUUIDs[0] != onlineDDL.UUID { - continue - } - } - // This migration seems good to go - return onlineDDL, err - } - // no non-conflicting migration found... - // Either all ready migrations are conflicting, or there are no ready migrations... - return nil, nil - } - onlineDDL, err := getNonConflictingMigration() + onlineDDL, err := e.getNonConflictingMigration(ctx) if err != nil { return err } @@ -3792,6 +3846,19 @@ func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning i _ = e.updateMigrationETASecondsByProgress(ctx, uuid) _ = e.updateMigrationLastThrottled(ctx, uuid, time.Unix(s.timeThrottled, 0), s.componentThrottled) + if onlineDDL.StrategySetting().IsInOrderCompletion() { + // We will fail an in-order migration if there's _prior_ migrations within the same migration-context + // which have failed. + wasFailed, err := e.validateInOrderMigration(ctx, onlineDDL) + if err != nil { + return err + } + if wasFailed { + return nil + } + } + + // Check if the migration is ready to cut-over, and proceed to do so if it is. isReady, err := e.isVReplMigrationReadyToCutOver(ctx, onlineDDL, s) if err != nil { _ = e.updateMigrationMessage(ctx, uuid, err.Error()) diff --git a/go/vt/vttablet/onlineddl/schema.go b/go/vt/vttablet/onlineddl/schema.go index 2ba566703e5..30f132bd0e3 100644 --- a/go/vt/vttablet/onlineddl/schema.go +++ b/go/vt/vttablet/onlineddl/schema.go @@ -303,6 +303,7 @@ const ( FROM _vt.schema_migrations WHERE migration_status='running' + ORDER BY id ` sqlSelectCompleteMigrationsOnTable = `SELECT migration_uuid, @@ -333,6 +334,18 @@ const ( WHERE migration_status='running' AND liveness_timestamp < NOW() - INTERVAL %a MINUTE + ORDER BY id + ` + sqlSelectFailedCancelledMigrationsInContextBeforeMigration = `SELECT + migration_uuid + FROM _vt.schema_migrations + WHERE + migration_context=%a + AND migration_status IN ('failed', 'cancelled') + AND id < ( + SELECT id FROM _vt.schema_migrations WHERE migration_uuid=%a + ) + ORDER BY id ` sqlSelectPendingMigrations = `SELECT migration_uuid, @@ -365,6 +378,7 @@ const ( NOW() - INTERVAL %a SECOND, NOW() - INTERVAL retain_artifacts_seconds SECOND ) + ORDER BY id ` sqlFixCompletedTimestamp = `UPDATE _vt.schema_migrations SET