diff --git a/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go b/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go index 0531af319b4..57e7029f56b 100644 --- a/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go +++ b/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go @@ -140,6 +140,7 @@ const ( maxConcurrency = 20 singleConnectionSleepInterval = 2 * time.Millisecond countIterations = 5 + migrationWaitTimeout = 60 * time.Second ) func resetOpOrder() { @@ -344,7 +345,7 @@ func testOnlineDDLStatement(t *testing.T, alterStatement string, ddlStrategy str assert.NoError(t, err) if !strategySetting.Strategy.IsDirect() { - status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, 30*time.Second, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) + status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, migrationWaitTimeout, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) } diff --git a/go/vt/vttablet/onlineddl/executor.go b/go/vt/vttablet/onlineddl/executor.go index 1a77235ab19..2e4cb822846 100644 --- a/go/vt/vttablet/onlineddl/executor.go +++ b/go/vt/vttablet/onlineddl/executor.go @@ -650,6 +650,22 @@ func (e *Executor) executeDirectly(ctx context.Context, onlineDDL *schema.Online return acceptableErrorCodeFound, nil } +// doesConnectionInfoMatch checks if theres a MySQL connection in PROCESSLIST whose Info matches given text +func (e *Executor) doesConnectionInfoMatch(ctx context.Context, connID int64, submatch string) (bool, error) { + findProcessQuery, err := sqlparser.ParseAndBind(sqlFindProcess, + sqltypes.Int64BindVariable(connID), + sqltypes.StringBindVariable("%"+submatch+"%"), + ) + if err != nil { + return false, err + } + rs, err := e.execQuery(ctx, findProcessQuery) + if err != nil { + return false, err + } + return len(rs.Rows) == 1, nil +} + // validateTableForAlterAction checks whether a table is good to undergo a ALTER operation. It returns detailed error if not. func (e *Executor) validateTableForAlterAction(ctx context.Context, onlineDDL *schema.OnlineDDL) (err error) { // Validate table does not participate in foreign key relationship: @@ -718,6 +734,10 @@ func (e *Executor) terminateVReplMigration(ctx context.Context, uuid string) err // cutOverVReplMigration stops vreplication, then removes the _vt.vreplication entry for the given migration func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream) error { + if err := e.incrementCutoverAttempts(ctx, s.workflow); err != nil { + return err + } + tmClient := e.tabletManagerClient() defer tmClient.Close() @@ -739,31 +759,101 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream) er return err } isVreplicationTestSuite := onlineDDL.StrategySetting().IsVreplicationTestSuite() + e.updateMigrationStage(ctx, onlineDDL.UUID, "starting cut-over") + + var sentryTableName string + + waitForPos := func(s *VReplStream, pos mysql.Position) error { + ctx, cancel := context.WithTimeout(ctx, vreplicationCutOverThreshold) + defer cancel() + // Wait for target to reach the up-to-date pos + if err := tmClient.VReplicationWaitForPos(ctx, tablet.Tablet, int(s.id), mysql.EncodePosition(pos)); err != nil { + return err + } + // Target is now in sync with source! + return nil + } - // A bit early on, we generate names for stowaway and temporary tables - // We do this here because right now we're in a safe place where nothing happened yet. If there's an error now, bail out - // and no harm done. - // Later on, when traffic is blocked and tables renamed, that's a more dangerous place to be in; we want as little logic - // in that place as possible. - var stowawayTableName string if !isVreplicationTestSuite { - stowawayTableName, err = schema.GenerateGCTableName(schema.HoldTableGCState, newGCTableRetainTime()) + // A bit early on, we generate names for stowaway and temporary tables + // We do this here because right now we're in a safe place where nothing happened yet. If there's an error now, bail out + // and no harm done. + // Later on, when traffic is blocked and tables renamed, that's a more dangerous place to be in; we want as little logic + // in that place as possible. + sentryTableName, err = schema.GenerateGCTableName(schema.HoldTableGCState, newGCTableRetainTime()) if err != nil { + return nil + } + + // We create the sentry table before toggling writes, because this involves a WaitForPos, which takes some time. We + // don't want to overload the buffering time with this excessive wait. + + if err := e.updateArtifacts(ctx, onlineDDL.UUID, sentryTableName); err != nil { return err } - // Audit stowawayTableName. If operation is complete, we remove the audit. But if this tablet fails while - // the original table is renamed (into stowaway table), then this will be both the evidence and the information we need - // to restore the table back into existence. This can (and will) be done by a different vttablet process - if err := e.updateMigrationStowawayTable(ctx, onlineDDL.UUID, stowawayTableName); err != nil { + parsed := sqlparser.BuildParsedQuery(sqlCreateSentryTable, sentryTableName) + if _, err := e.execQuery(ctx, parsed.Query); err != nil { + return err + } + e.updateMigrationStage(ctx, onlineDDL.UUID, "sentry table created: %s", sentryTableName) + + postSentryPos, err := e.primaryPosition(ctx) + if err != nil { return err } - defer e.updateMigrationStowawayTable(ctx, onlineDDL.UUID, "") + e.updateMigrationStage(ctx, onlineDDL.UUID, "waiting for post-sentry pos: %v", mysql.EncodePosition(postSentryPos)) + if err := waitForPos(s, postSentryPos); err != nil { + return err + } + e.updateMigrationStage(ctx, onlineDDL.UUID, "post-sentry pos reached") + } + + lockConn, err := e.pool.Get(ctx, nil) + if err != nil { + return err } + defer lockConn.Recycle() + defer lockConn.Exec(ctx, sqlUnlockTables, 1, false) + + renameConn, err := e.pool.Get(ctx, nil) + if err != nil { + return err + } + defer renameConn.Recycle() + defer renameConn.Kill("premature exit while renaming tables", 0) + renameQuery := sqlparser.BuildParsedQuery(sqlSwapTables, onlineDDL.Table, sentryTableName, vreplTable, onlineDDL.Table, sentryTableName, vreplTable) + + waitForRenameProcess := func() error { + // This function waits until it finds the RENAME TABLE... query running in MySQL's PROCESSLIST, or until timeout + // The function assumes that one of the renamed tables is locked, thus causing the RENAME to block. If nothing + // is locked, then the RENAME will be near-instantaneious and it's unlikely that the function will find it. + renameWaitCtx, cancel := context.WithTimeout(ctx, vreplicationCutOverThreshold) + defer cancel() + + for { + renameProcessFound, err := e.doesConnectionInfoMatch(renameWaitCtx, renameConn.ID(), "rename") + if err != nil { + return err + } + if renameProcessFound { + return nil + } + select { + case <-renameWaitCtx.Done(): + return vterrors.Errorf(vtrpcpb.Code_ABORTED, "timeout for rename query: %s", renameQuery.Query) + case <-time.After(time.Second): + // sleep + } + } + } + + renameCompleteChan := make(chan error) bufferingCtx, bufferingContextCancel := context.WithCancel(ctx) defer bufferingContextCancel() // Preparation is complete. We proceed to cut-over. toggleBuffering := func(bufferQueries bool) error { + log.Infof("toggling buffering: %t in migration %v", bufferQueries, onlineDDL.UUID) e.toggleBufferTableFunc(bufferingCtx, onlineDDL.Table, bufferQueries) if !bufferQueries { // called after new table is in place. @@ -774,27 +864,31 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream) er return err } } + log.Infof("toggled buffering: %t in migration %v", bufferQueries, onlineDDL.UUID) return nil } + var reenableOnce sync.Once reenableWritesOnce := func() { reenableOnce.Do(func() { + log.Infof("re-enabling writes in migration %v", onlineDDL.UUID) toggleBuffering(false) }) } + e.updateMigrationStage(ctx, onlineDDL.UUID, "buffering queries") // stop writes on source: err = toggleBuffering(true) defer reenableWritesOnce() if err != nil { return err } - - // swap out the table // Give a fraction of a second for a scenario where a query is in // query executor, it passed the ACLs and is _about to_ execute. This will be nicer to those queries: // they will be able to complete before the rename, rather than block briefly on the rename only to find // the table no longer exists. + e.updateMigrationStage(ctx, onlineDDL.UUID, "graceful wait for buffering") time.Sleep(100 * time.Millisecond) + if isVreplicationTestSuite { // The testing suite may inject queries internally from the server via a recurring EVENT. // Those queries are unaffected by query rules (ACLs) because they don't go through Vitess. @@ -805,30 +899,41 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream) er if _, err := e.execQuery(ctx, parsed.Query); err != nil { return err } + e.updateMigrationStage(ctx, onlineDDL.UUID, "test suite 'before' table renamed") } else { // real production - parsed := sqlparser.BuildParsedQuery(sqlRenameTable, onlineDDL.Table, stowawayTableName) - if _, err := e.execQuery(ctx, parsed.Query); err != nil { + + e.updateMigrationStage(ctx, onlineDDL.UUID, "locking tables") + lockCtx, cancel := context.WithTimeout(ctx, vreplicationCutOverThreshold) + defer cancel() + lockTableQuery := sqlparser.BuildParsedQuery(sqlLockTwoTablesWrite, sentryTableName, onlineDDL.Table) + if _, err := lockConn.Exec(lockCtx, lockTableQuery.Query, 1, false); err != nil { return err } - } - // We have just created a gaping hole, the original table does not exist. - // we expect to fill that hole by swapping in the vrepl table. But if anything goes wrong we prepare - // to rename the table back: - defer func() { - if _, err := e.renameTableIfApplicable(ctx, stowawayTableName, onlineDDL.Table); err != nil { - vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "cannot rename back swapped table: %v into %v: %v", stowawayTableName, onlineDDL.Table, err) + e.updateMigrationStage(ctx, onlineDDL.UUID, "renaming tables") + go func() { + _, err := renameConn.Exec(ctx, renameQuery.Query, 1, false) + renameCompleteChan <- err + }() + // the rename should block, because of the LOCK. Wait for it to show up. + e.updateMigrationStage(ctx, onlineDDL.UUID, "waiting for RENAME to block") + if err := waitForRenameProcess(); err != nil { + return err } - }() - // Right now: new queries are buffered, any existing query will have executed, and worst case scenario is - // that some leftover query finds the table is not actually there anymore... - // At any case, there's definitely no more writes to the table since it does not exist. We can - // safely take the (GTID) pos now. + e.updateMigrationStage(ctx, onlineDDL.UUID, "RENAME found") + } + + e.updateMigrationStage(ctx, onlineDDL.UUID, "reading post-lock pos") postWritesPos, err := e.primaryPosition(ctx) if err != nil { return err } + + // Right now: new queries are buffered, any existing query will have executed, and worst case scenario is + // that some leftover query finds the table is not actually there anymore... + // At any case, there's definitely no more writes to the table since it does not exist. We can + // safely take the (GTID) pos now. _ = e.updateMigrationTimestamp(ctx, "liveness_timestamp", s.workflow) // Writes are now disabled on table. Read up-to-date vreplication info, specifically to get latest (and fixed) pos: @@ -837,21 +942,13 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream) er return err } - waitForPos := func() error { - ctx, cancel := context.WithTimeout(ctx, 2*vreplicationCutOverThreshold) - defer cancel() - // Wait for target to reach the up-to-date pos - if err := tmClient.VReplicationWaitForPos(ctx, tablet.Tablet, int(s.id), mysql.EncodePosition(postWritesPos)); err != nil { - return err - } - // Target is now in sync with source! - return nil - } - log.Infof("VReplication migration %v waiting for position %v", s.workflow, mysql.EncodePosition(postWritesPos)) - if err := waitForPos(); err != nil { + e.updateMigrationStage(ctx, onlineDDL.UUID, "waiting for post-lock pos: %v", mysql.EncodePosition(postWritesPos)) + if err := waitForPos(s, postWritesPos); err != nil { + e.updateMigrationStage(ctx, onlineDDL.UUID, "timeout while waiting for post-lock pos: %v", err) return err } // Stop vreplication + e.updateMigrationStage(ctx, onlineDDL.UUID, "stopping vreplication") if _, err := e.vreplicationExec(ctx, tablet.Tablet, binlogplayer.StopVReplication(uint32(s.id), "stopped for online DDL cutover")); err != nil { return err } @@ -865,23 +962,43 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream) er if _, err := e.execQuery(ctx, parsed.Query); err != nil { return err } + e.updateMigrationStage(ctx, onlineDDL.UUID, "test suite 'after' table renamed") } else { - // Normal (non-testing) alter table - conn, err := dbconnpool.NewDBConnection(ctx, e.env.Config().DB.DbaWithDB()) - if err != nil { + e.updateMigrationStage(ctx, onlineDDL.UUID, "validating rename is still in place") + if err := waitForRenameProcess(); err != nil { return err } - defer conn.Close() - parsed := sqlparser.BuildParsedQuery(sqlRenameTwoTables, - vreplTable, onlineDDL.Table, - stowawayTableName, vreplTable, - ) - if _, err := e.execQuery(ctx, parsed.Query); err != nil { - return err + // Normal (non-testing) alter table + e.updateMigrationStage(ctx, onlineDDL.UUID, "dropping sentry table") + + { + dropTableQuery := sqlparser.BuildParsedQuery(sqlDropTable, sentryTableName) + lockCtx, cancel := context.WithTimeout(ctx, vreplicationCutOverThreshold) + defer cancel() + if _, err := lockConn.Exec(lockCtx, dropTableQuery.Query, 1, false); err != nil { + return err + } + } + { + lockCtx, cancel := context.WithTimeout(ctx, vreplicationCutOverThreshold) + defer cancel() + e.updateMigrationStage(ctx, onlineDDL.UUID, "unlocking tables") + if _, err := lockConn.Exec(lockCtx, sqlUnlockTables, 1, false); err != nil { + return err + } + } + { + lockCtx, cancel := context.WithTimeout(ctx, vreplicationCutOverThreshold) + defer cancel() + e.updateMigrationStage(lockCtx, onlineDDL.UUID, "waiting for RENAME to complete") + if err := <-renameCompleteChan; err != nil { + return err + } } } } + e.updateMigrationStage(ctx, onlineDDL.UUID, "cut-over complete") e.ownedRunningMigrations.Delete(onlineDDL.UUID) go func() { @@ -896,12 +1013,12 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream) er }() // Tables are now swapped! Migration is successful + e.updateMigrationStage(ctx, onlineDDL.UUID, "re-enabling writes") reenableWritesOnce() // this function is also deferred, in case of early return; but now would be a good time to resume writes, before we publish the migration as "complete" _ = e.onSchemaMigrationStatus(ctx, onlineDDL.UUID, schema.OnlineDDLStatusComplete, false, progressPctFull, etaSecondsNow, s.rowsCopied, emptyHint) return nil // deferred function will re-enable writes now - // deferred function will unlock keyspace } // initMigrationSQLMode sets sql_mode according to DDL strategy, and returns a function that @@ -3336,6 +3453,7 @@ func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning i if isReady { if err := e.cutOverVReplMigration(ctx, s); err != nil { _ = e.updateMigrationMessage(ctx, uuid, err.Error()) + log.Errorf("cutOverVReplMigration failed: err=%v", err) if merr, ok := err.(*mysql.SQLError); ok { switch merr.Num { case mysql.ERTooLongIdent: @@ -3772,6 +3890,31 @@ func (e *Executor) updateMigrationSpecialPlan(ctx context.Context, uuid string, return err } +func (e *Executor) updateMigrationStage(ctx context.Context, uuid string, stage string, args ...interface{}) error { + msg := fmt.Sprintf(stage, args...) + log.Infof("updateMigrationStage: uuid=%s, stage=%s", uuid, msg) + query, err := sqlparser.ParseAndBind(sqlUpdateStage, + sqltypes.StringBindVariable(msg), + sqltypes.StringBindVariable(uuid), + ) + if err != nil { + return err + } + _, err = e.execQuery(ctx, query) + return err +} + +func (e *Executor) incrementCutoverAttempts(ctx context.Context, uuid string) error { + query, err := sqlparser.ParseAndBind(sqlIncrementCutoverAttempts, + sqltypes.StringBindVariable(uuid), + ) + if err != nil { + return err + } + _, err = e.execQuery(ctx, query) + return err +} + // updateMigrationTablet sets 'tablet' column to be this executor's tablet alias for given migration func (e *Executor) updateMigrationTablet(ctx context.Context, uuid string) error { query, err := sqlparser.ParseAndBind(sqlUpdateTablet, diff --git a/go/vt/vttablet/onlineddl/schema.go b/go/vt/vttablet/onlineddl/schema.go index cbae743ac7f..b6a7532378a 100644 --- a/go/vt/vttablet/onlineddl/schema.go +++ b/go/vt/vttablet/onlineddl/schema.go @@ -81,6 +81,8 @@ const ( alterSchemaMigrationsComponentThrottled = "ALTER TABLE _vt.schema_migrations add column component_throttled tinytext NOT NULL" alterSchemaMigrationsCancelledTimestamp = "ALTER TABLE _vt.schema_migrations add column cancelled_timestamp timestamp NULL DEFAULT NULL" alterSchemaMigrationsTablePostponeLaunch = "ALTER TABLE _vt.schema_migrations add column postpone_launch tinyint unsigned NOT NULL DEFAULT 0" + alterSchemaMigrationsStage = "ALTER TABLE _vt.schema_migrations add column stage text not null" + alterSchemaMigrationsCutoverAttempts = "ALTER TABLE _vt.schema_migrations add column cutover_attempts int unsigned NOT NULL DEFAULT 0" sqlInsertMigration = `INSERT IGNORE INTO _vt.schema_migrations ( migration_uuid, @@ -202,6 +204,16 @@ const ( WHERE migration_uuid=%a ` + sqlUpdateStage = `UPDATE _vt.schema_migrations + SET stage=%a + WHERE + migration_uuid=%a + ` + sqlIncrementCutoverAttempts = `UPDATE _vt.schema_migrations + SET cutover_attempts=cutover_attempts+1 + WHERE + migration_uuid=%a + ` sqlUpdateReadyForCleanup = `UPDATE _vt.schema_migrations SET retain_artifacts_seconds=-1 WHERE @@ -284,6 +296,8 @@ const ( retries=retries + 1, tablet_failure=0, message='', + stage='', + cutover_attempts=0, ready_timestamp=NULL, started_timestamp=NULL, liveness_timestamp=NULL, @@ -302,6 +316,8 @@ const ( retries=retries + 1, tablet_failure=0, message='', + stage='', + cutover_attempts=0, ready_timestamp=NULL, started_timestamp=NULL, liveness_timestamp=NULL, @@ -587,9 +603,12 @@ const ( _vt.copy_state WHERE vrepl_id=%a ` - sqlSwapTables = "RENAME TABLE `%a` TO `%a`, `%a` TO `%a`, `%a` TO `%a`" - sqlRenameTable = "RENAME TABLE `%a` TO `%a`" - sqlRenameTwoTables = "RENAME TABLE `%a` TO `%a`, `%a` TO `%a`" + sqlSwapTables = "RENAME TABLE `%a` TO `%a`, `%a` TO `%a`, `%a` TO `%a`" + sqlRenameTable = "RENAME TABLE `%a` TO `%a`" + sqlLockTwoTablesWrite = "LOCK TABLES `%a` WRITE, `%a` WRITE" + sqlUnlockTables = "UNLOCK TABLES" + sqlCreateSentryTable = "CREATE TABLE IF NOT EXISTS `%a` (id INT PRIMARY KEY)" + sqlFindProcess = "SELECT id, Info as info FROM information_schema.processlist WHERE id=%a AND Info LIKE %a" ) const ( @@ -661,4 +680,6 @@ var ApplyDDL = []string{ alterSchemaMigrationsComponentThrottled, alterSchemaMigrationsCancelledTimestamp, alterSchemaMigrationsTablePostponeLaunch, + alterSchemaMigrationsStage, + alterSchemaMigrationsCutoverAttempts, }