Skip to content

Commit

Permalink
descriptive errors returned from cutOverVReplMigration()
Browse files Browse the repository at this point in the history
Signed-off-by: Shlomi Noach <[email protected]>
  • Loading branch information
shlomi-noach committed Oct 23, 2024
1 parent dcbcd59 commit b54e53e
Showing 1 changed file with 36 additions and 32 deletions.
68 changes: 36 additions & 32 deletions go/vt/vttablet/onlineddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -785,7 +785,7 @@ func (e *Executor) killTableLockHoldersAndAccessors(ctx context.Context, tableNa
}
rs, err := conn.Conn.ExecuteFetch(query, -1, true)
if err != nil {
return err
return vterrors.Wrapf(err, "finding queries potentially operating on table")
}

log.Infof("killTableLockHoldersAndAccessors: found %v potential queries", len(rs.Rows))
Expand Down Expand Up @@ -841,7 +841,7 @@ func (e *Executor) killTableLockHoldersAndAccessors(ctx context.Context, tableNa
}
rs, err := conn.Conn.ExecuteFetch(query, -1, true)
if err != nil {
return err
return vterrors.Wrapf(err, "finding transactions locking table")
}
log.Infof("killTableLockHoldersAndAccessors: found %v locking transactions", len(rs.Rows))
for _, row := range rs.Named().Rows {
Expand All @@ -861,7 +861,7 @@ func (e *Executor) killTableLockHoldersAndAccessors(ctx context.Context, tableNa
// cutOverVReplMigration stops vreplication, then removes the _vt.vreplication entry for the given migration
func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, shouldForceCutOver bool) error {
if err := e.incrementCutoverAttempts(ctx, s.workflow); err != nil {
return err
return vterrors.Wrapf(err, "cutover: failed incrementing cutover attempts")
}

tmClient := e.tabletManagerClient()
Expand All @@ -870,19 +870,19 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh
// sanity checks:
vreplTable, err := getVreplTable(s)
if err != nil {
return err
return vterrors.Wrapf(err, "cutover: failed getting vreplication table")
}

// get topology client & entities:
tablet, err := e.ts.GetTablet(ctx, e.tabletAlias)
if err != nil {
return err
return vterrors.Wrapf(err, "cutover: failed reading vreplication table")
}

// information about source tablet
onlineDDL, _, err := e.readMigration(ctx, s.workflow)
if err != nil {
return err
return vterrors.Wrapf(err, "cutover: failed reading migration")
}
isVreplicationTestSuite := onlineDDL.StrategySetting().IsVreplicationTestSuite()
e.updateMigrationStage(ctx, onlineDDL.UUID, "starting cut-over")
Expand All @@ -896,7 +896,10 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh
defer cancel()
// Wait for target to reach the up-to-date pos
if err := tmClient.VReplicationWaitForPos(ctx, tablet.Tablet, s.id, replication.EncodePosition(pos)); err != nil {
return err
if s, _ := e.readVReplStream(ctx, s.workflow, true); s != nil {
err = vterrors.Wrapf(err, "read vrepl position %v", s.pos)
}
return vterrors.Wrapf(err, "failed waiting for position %v", replication.EncodePosition(pos))
}
// Target is now in sync with source!
return nil
Expand All @@ -910,14 +913,14 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh
// in that place as possible.
sentryTableName, err = schema.GenerateGCTableName(schema.HoldTableGCState, newGCTableRetainTime())
if err != nil {
return nil
return vterrors.Wrapf(err, "failed creating sentry table name")
}

// We create the sentry table before toggling writes, because this involves a WaitForPos, which takes some time. We
// don't want to overload the buffering time with this excessive wait.

if err := e.updateArtifacts(ctx, onlineDDL.UUID, sentryTableName); err != nil {
return err
return vterrors.Wrapf(err, "failed updating artifacts with sentry table name")
}

dropSentryTableQuery := sqlparser.BuildParsedQuery(sqlDropTableIfExists, sentryTableName)
Expand All @@ -941,30 +944,30 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh
}()
parsed := sqlparser.BuildParsedQuery(sqlCreateSentryTable, sentryTableName)
if _, err := e.execQuery(ctx, parsed.Query); err != nil {
return err
return vterrors.Wrapf(err, "failed creating sentry table")
}
e.updateMigrationStage(ctx, onlineDDL.UUID, "sentry table created: %s", sentryTableName)

postSentryPos, err := e.primaryPosition(ctx)
if err != nil {
return err
return vterrors.Wrapf(err, "failed getting primary pos after sentry creation")
}
e.updateMigrationStage(ctx, onlineDDL.UUID, "waiting for post-sentry pos: %v", replication.EncodePosition(postSentryPos))
if err := waitForPos(s, postSentryPos); err != nil {
return err
return vterrors.Wrapf(err, "failed waiting for pos after sentry creation")
}
e.updateMigrationStage(ctx, onlineDDL.UUID, "post-sentry pos reached")
}

lockConn, err := e.pool.Get(ctx, nil)
if err != nil {
return err
return vterrors.Wrapf(err, "failed getting locking connection")
}
// Set large enough `@@lock_wait_timeout` so that it does not interfere with the cut-over operation.
// The code will ensure everything that needs to be terminated by `migrationCutOverThreshold` will be terminated.
lockConnRestoreLockWaitTimeout, err := e.initConnectionLockWaitTimeout(ctx, lockConn.Conn, 5*migrationCutOverThreshold)
if err != nil {
return err
return vterrors.Wrapf(err, "failed setting lock_wait_timeout on locking connection")
}
defer lockConn.Recycle()
defer lockConnRestoreLockWaitTimeout()
Expand All @@ -974,13 +977,13 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh
renameWasSuccessful := false
renameConn, err := e.pool.Get(ctx, nil)
if err != nil {
return err
return vterrors.Wrapf(err, "failed getting rename connection")
}
// Set large enough `@@lock_wait_timeout` so that it does not interfere with the cut-over operation.
// The code will ensure everything that needs to be terminated by `migrationCutOverThreshold` will be terminated.
renameConnRestoreLockWaitTimeout, err := e.initConnectionLockWaitTimeout(ctx, renameConn.Conn, 5*migrationCutOverThreshold*4)
if err != nil {
return err
return vterrors.Wrapf(err, "failed setting lock_wait_timeout on rename connection")
}
defer renameConn.Recycle()
defer func() {
Expand All @@ -996,7 +999,7 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh
// See if backend MySQL server supports 'rename_table_preserve_foreign_key' variable
preserveFKSupported, err := e.isPreserveForeignKeySupported(ctx)
if err != nil {
return err
return vterrors.Wrapf(err, "failed checking for 'rename_table_preserve_foreign_key' support")
}
if preserveFKSupported {
// This code is only applicable when MySQL supports the 'rename_table_preserve_foreign_key' variable. This variable
Expand All @@ -1019,7 +1022,7 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh
for {
renameProcessFound, err := e.doesConnectionInfoMatch(renameWaitCtx, renameConn.Conn.ID(), "rename")
if err != nil {
return err
return vterrors.Wrapf(err, "searching for rename process")
}
if renameProcessFound {
return nil
Expand Down Expand Up @@ -1053,7 +1056,7 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh
bufferingContextCancel()
// force re-read of tables
if err := tmClient.RefreshState(grpcCtx, tablet.Tablet); err != nil {
return err
return vterrors.Wrapf(err, "refreshing table state")
}
}
log.Infof("toggled buffering: %t in migration %v", bufferQueries, onlineDDL.UUID)
Expand All @@ -1073,7 +1076,7 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh
err = toggleBuffering(true)
defer reenableWritesOnce()
if err != nil {
return err
return vterrors.Wrapf(err, "failed enabling buffering")
}
// Give a fraction of a second for a scenario where a query is in
// query executor, it passed the ACLs and is _about to_ execute. This will be nicer to those queries:
Expand All @@ -1086,10 +1089,10 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh
// We should only proceed with forceful cut over if there is no pending atomic transaction for the table.
// This will help in keeping the atomicity guarantee of a prepared transaction.
if err := e.checkOnPreparedPool(ctx, onlineDDL.Table, 100*time.Millisecond); err != nil {
return err
return vterrors.Wrapf(err, "checking prepared pool for table")
}
if err := e.killTableLockHoldersAndAccessors(ctx, onlineDDL.Table); err != nil {
return err
return vterrors.Wrapf(err, "failed killing table lock holders and accessors")
}
}

Expand All @@ -1112,7 +1115,7 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh
defer cancel()
lockTableQuery := sqlparser.BuildParsedQuery(sqlLockTwoTablesWrite, sentryTableName, onlineDDL.Table)
if _, err := lockConn.Conn.Exec(lockCtx, lockTableQuery.Query, 1, false); err != nil {
return err
return vterrors.Wrapf(err, "failed locking tables")
}

e.updateMigrationStage(ctx, onlineDDL.UUID, "renaming tables")
Expand All @@ -1124,15 +1127,15 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh
// the rename should block, because of the LOCK. Wait for it to show up.
e.updateMigrationStage(ctx, onlineDDL.UUID, "waiting for RENAME to block")
if err := waitForRenameProcess(); err != nil {
return err
return vterrors.Wrapf(err, "failed waiting for rename process")
}
e.updateMigrationStage(ctx, onlineDDL.UUID, "RENAME found")
}

e.updateMigrationStage(ctx, onlineDDL.UUID, "reading post-lock pos")
postWritesPos, err := e.primaryPosition(ctx)
if err != nil {
return err
return vterrors.Wrapf(err, "failed reading pos after locking")
}

// Right now: new queries are buffered, any existing query will have executed, and worst case scenario is
Expand All @@ -1144,19 +1147,19 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh
// Writes are now disabled on table. Read up-to-date vreplication info, specifically to get latest (and fixed) pos:
s, err = e.readVReplStream(ctx, s.workflow, false)
if err != nil {
return err
return vterrors.Wrapf(err, "failed reading vreplication table after locking")
}

e.updateMigrationStage(ctx, onlineDDL.UUID, "waiting for post-lock pos: %v", replication.EncodePosition(postWritesPos))
if err := waitForPos(s, postWritesPos); err != nil {
e.updateMigrationStage(ctx, onlineDDL.UUID, "timeout while waiting for post-lock pos: %v", err)
return err
return vterrors.Wrapf(err, "failed waiting for pos after locking")
}
go log.Infof("cutOverVReplMigration %v: done waiting for position %v", s.workflow, replication.EncodePosition(postWritesPos))
// Stop vreplication
e.updateMigrationStage(ctx, onlineDDL.UUID, "stopping vreplication")
if _, err := e.vreplicationExec(ctx, tablet.Tablet, binlogplayer.StopVReplication(s.id, "stopped for online DDL cutover")); err != nil {
return err
return vterrors.Wrapf(err, "failed stopping vreplication")
}
go log.Infof("cutOverVReplMigration %v: stopped vreplication", s.workflow)

Expand All @@ -1173,7 +1176,7 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh
} else {
e.updateMigrationStage(ctx, onlineDDL.UUID, "validating rename is still in place")
if err := waitForRenameProcess(); err != nil {
return err
return vterrors.Wrapf(err, "failed waiting for rename process before dropping sentry table")
}

// Normal (non-testing) alter table
Expand All @@ -1184,23 +1187,23 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh
lockCtx, cancel := context.WithTimeout(ctx, migrationCutOverThreshold)
defer cancel()
if _, err := lockConn.Conn.Exec(lockCtx, dropTableQuery.Query, 1, false); err != nil {
return err
return vterrors.Wrapf(err, "failed dropping sentry table")
}
}
{
lockCtx, cancel := context.WithTimeout(ctx, migrationCutOverThreshold)
defer cancel()
e.updateMigrationStage(ctx, onlineDDL.UUID, "unlocking tables")
if _, err := lockConn.Conn.Exec(lockCtx, sqlUnlockTables, 1, false); err != nil {
return err
return vterrors.Wrapf(err, "failed unlocking tables")
}
}
{
lockCtx, cancel := context.WithTimeout(ctx, migrationCutOverThreshold)
defer cancel()
e.updateMigrationStage(lockCtx, onlineDDL.UUID, "waiting for RENAME to complete")
if err := <-renameCompleteChan; err != nil {
return err
return vterrors.Wrapf(err, "failed waiting for rename to complete")
}
renameWasSuccessful = true
}
Expand Down Expand Up @@ -5162,6 +5165,7 @@ func (e *Executor) OnSchemaMigrationStatus(ctx context.Context,
return e.onSchemaMigrationStatus(ctx, uuidParam, status, dryRun, progressPct, etaSeconds, rowsCopied, hint)
}

// checkOnPreparedPool checks if there are any cross-shard prepared transactions on the given table
func (e *Executor) checkOnPreparedPool(ctx context.Context, table string, waitTime time.Duration) error {
if e.isPreparedPoolEmpty(table) {
return nil
Expand Down

0 comments on commit b54e53e

Please sign in to comment.