Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport fix for Vitess Online DDL atomic cut-over #106

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ const (
maxConcurrency = 20
singleConnectionSleepInterval = 2 * time.Millisecond
countIterations = 5
migrationWaitTimeout = 60 * time.Second
)

func resetOpOrder() {
Expand Down Expand Up @@ -344,7 +345,7 @@ func testOnlineDDLStatement(t *testing.T, alterStatement string, ddlStrategy str
assert.NoError(t, err)

if !strategySetting.Strategy.IsDirect() {
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, 30*time.Second, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, migrationWaitTimeout, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
fmt.Printf("# Migration status (for debug purposes): <%s>\n", status)
}

Expand Down
245 changes: 194 additions & 51 deletions go/vt/vttablet/onlineddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,22 @@ func (e *Executor) executeDirectly(ctx context.Context, onlineDDL *schema.Online
return acceptableErrorCodeFound, nil
}

// doesConnectionInfoMatch checks if theres a MySQL connection in PROCESSLIST whose Info matches given text
func (e *Executor) doesConnectionInfoMatch(ctx context.Context, connID int64, submatch string) (bool, error) {
findProcessQuery, err := sqlparser.ParseAndBind(sqlFindProcess,
sqltypes.Int64BindVariable(connID),
sqltypes.StringBindVariable("%"+submatch+"%"),
)
if err != nil {
return false, err
}
rs, err := e.execQuery(ctx, findProcessQuery)
if err != nil {
return false, err
}
return len(rs.Rows) == 1, nil
}

// validateTableForAlterAction checks whether a table is good to undergo a ALTER operation. It returns detailed error if not.
func (e *Executor) validateTableForAlterAction(ctx context.Context, onlineDDL *schema.OnlineDDL) (err error) {
// Validate table does not participate in foreign key relationship:
Expand Down Expand Up @@ -718,6 +734,10 @@ func (e *Executor) terminateVReplMigration(ctx context.Context, uuid string) err

// cutOverVReplMigration stops vreplication, then removes the _vt.vreplication entry for the given migration
func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream) error {
if err := e.incrementCutoverAttempts(ctx, s.workflow); err != nil {
return err
}

tmClient := e.tabletManagerClient()
defer tmClient.Close()

Expand All @@ -739,31 +759,101 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream) er
return err
}
isVreplicationTestSuite := onlineDDL.StrategySetting().IsVreplicationTestSuite()
e.updateMigrationStage(ctx, onlineDDL.UUID, "starting cut-over")

var sentryTableName string

waitForPos := func(s *VReplStream, pos mysql.Position) error {
ctx, cancel := context.WithTimeout(ctx, vreplicationCutOverThreshold)
defer cancel()
// Wait for target to reach the up-to-date pos
if err := tmClient.VReplicationWaitForPos(ctx, tablet.Tablet, int(s.id), mysql.EncodePosition(pos)); err != nil {
return err
}
// Target is now in sync with source!
return nil
}

// A bit early on, we generate names for stowaway and temporary tables
// We do this here because right now we're in a safe place where nothing happened yet. If there's an error now, bail out
// and no harm done.
// Later on, when traffic is blocked and tables renamed, that's a more dangerous place to be in; we want as little logic
// in that place as possible.
var stowawayTableName string
if !isVreplicationTestSuite {
stowawayTableName, err = schema.GenerateGCTableName(schema.HoldTableGCState, newGCTableRetainTime())
// A bit early on, we generate names for stowaway and temporary tables
// We do this here because right now we're in a safe place where nothing happened yet. If there's an error now, bail out
// and no harm done.
// Later on, when traffic is blocked and tables renamed, that's a more dangerous place to be in; we want as little logic
// in that place as possible.
sentryTableName, err = schema.GenerateGCTableName(schema.HoldTableGCState, newGCTableRetainTime())
if err != nil {
return nil
}

// We create the sentry table before toggling writes, because this involves a WaitForPos, which takes some time. We
// don't want to overload the buffering time with this excessive wait.

if err := e.updateArtifacts(ctx, onlineDDL.UUID, sentryTableName); err != nil {
return err
}
// Audit stowawayTableName. If operation is complete, we remove the audit. But if this tablet fails while
// the original table is renamed (into stowaway table), then this will be both the evidence and the information we need
// to restore the table back into existence. This can (and will) be done by a different vttablet process
if err := e.updateMigrationStowawayTable(ctx, onlineDDL.UUID, stowawayTableName); err != nil {
parsed := sqlparser.BuildParsedQuery(sqlCreateSentryTable, sentryTableName)
if _, err := e.execQuery(ctx, parsed.Query); err != nil {
return err
}
e.updateMigrationStage(ctx, onlineDDL.UUID, "sentry table created: %s", sentryTableName)

postSentryPos, err := e.primaryPosition(ctx)
if err != nil {
return err
}
defer e.updateMigrationStowawayTable(ctx, onlineDDL.UUID, "")
e.updateMigrationStage(ctx, onlineDDL.UUID, "waiting for post-sentry pos: %v", mysql.EncodePosition(postSentryPos))
if err := waitForPos(s, postSentryPos); err != nil {
return err
}
e.updateMigrationStage(ctx, onlineDDL.UUID, "post-sentry pos reached")
}

lockConn, err := e.pool.Get(ctx, nil)
if err != nil {
return err
}
defer lockConn.Recycle()
defer lockConn.Exec(ctx, sqlUnlockTables, 1, false)

renameConn, err := e.pool.Get(ctx, nil)
if err != nil {
return err
}
defer renameConn.Recycle()
defer renameConn.Kill("premature exit while renaming tables", 0)
renameQuery := sqlparser.BuildParsedQuery(sqlSwapTables, onlineDDL.Table, sentryTableName, vreplTable, onlineDDL.Table, sentryTableName, vreplTable)

waitForRenameProcess := func() error {
// This function waits until it finds the RENAME TABLE... query running in MySQL's PROCESSLIST, or until timeout
// The function assumes that one of the renamed tables is locked, thus causing the RENAME to block. If nothing
// is locked, then the RENAME will be near-instantaneious and it's unlikely that the function will find it.
renameWaitCtx, cancel := context.WithTimeout(ctx, vreplicationCutOverThreshold)
defer cancel()

for {
renameProcessFound, err := e.doesConnectionInfoMatch(renameWaitCtx, renameConn.ID(), "rename")
if err != nil {
return err
}
if renameProcessFound {
return nil
}
select {
case <-renameWaitCtx.Done():
return vterrors.Errorf(vtrpcpb.Code_ABORTED, "timeout for rename query: %s", renameQuery.Query)
case <-time.After(time.Second):
// sleep
}
}
}

renameCompleteChan := make(chan error)

bufferingCtx, bufferingContextCancel := context.WithCancel(ctx)
defer bufferingContextCancel()
// Preparation is complete. We proceed to cut-over.
toggleBuffering := func(bufferQueries bool) error {
log.Infof("toggling buffering: %t in migration %v", bufferQueries, onlineDDL.UUID)
e.toggleBufferTableFunc(bufferingCtx, onlineDDL.Table, bufferQueries)
if !bufferQueries {
// called after new table is in place.
Expand All @@ -774,27 +864,31 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream) er
return err
}
}
log.Infof("toggled buffering: %t in migration %v", bufferQueries, onlineDDL.UUID)
return nil
}

var reenableOnce sync.Once
reenableWritesOnce := func() {
reenableOnce.Do(func() {
log.Infof("re-enabling writes in migration %v", onlineDDL.UUID)
toggleBuffering(false)
})
}
e.updateMigrationStage(ctx, onlineDDL.UUID, "buffering queries")
// stop writes on source:
err = toggleBuffering(true)
defer reenableWritesOnce()
if err != nil {
return err
}

// swap out the table
// Give a fraction of a second for a scenario where a query is in
// query executor, it passed the ACLs and is _about to_ execute. This will be nicer to those queries:
// they will be able to complete before the rename, rather than block briefly on the rename only to find
// the table no longer exists.
e.updateMigrationStage(ctx, onlineDDL.UUID, "graceful wait for buffering")
time.Sleep(100 * time.Millisecond)

if isVreplicationTestSuite {
// The testing suite may inject queries internally from the server via a recurring EVENT.
// Those queries are unaffected by query rules (ACLs) because they don't go through Vitess.
Expand All @@ -805,30 +899,41 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream) er
if _, err := e.execQuery(ctx, parsed.Query); err != nil {
return err
}
e.updateMigrationStage(ctx, onlineDDL.UUID, "test suite 'before' table renamed")
} else {
// real production
parsed := sqlparser.BuildParsedQuery(sqlRenameTable, onlineDDL.Table, stowawayTableName)
if _, err := e.execQuery(ctx, parsed.Query); err != nil {

e.updateMigrationStage(ctx, onlineDDL.UUID, "locking tables")
lockCtx, cancel := context.WithTimeout(ctx, vreplicationCutOverThreshold)
defer cancel()
lockTableQuery := sqlparser.BuildParsedQuery(sqlLockTwoTablesWrite, sentryTableName, onlineDDL.Table)
if _, err := lockConn.Exec(lockCtx, lockTableQuery.Query, 1, false); err != nil {
return err
}
}

// We have just created a gaping hole, the original table does not exist.
// we expect to fill that hole by swapping in the vrepl table. But if anything goes wrong we prepare
// to rename the table back:
defer func() {
if _, err := e.renameTableIfApplicable(ctx, stowawayTableName, onlineDDL.Table); err != nil {
vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "cannot rename back swapped table: %v into %v: %v", stowawayTableName, onlineDDL.Table, err)
e.updateMigrationStage(ctx, onlineDDL.UUID, "renaming tables")
go func() {
_, err := renameConn.Exec(ctx, renameQuery.Query, 1, false)
renameCompleteChan <- err
}()
// the rename should block, because of the LOCK. Wait for it to show up.
e.updateMigrationStage(ctx, onlineDDL.UUID, "waiting for RENAME to block")
if err := waitForRenameProcess(); err != nil {
return err
}
}()
// Right now: new queries are buffered, any existing query will have executed, and worst case scenario is
// that some leftover query finds the table is not actually there anymore...
// At any case, there's definitely no more writes to the table since it does not exist. We can
// safely take the (GTID) pos now.
e.updateMigrationStage(ctx, onlineDDL.UUID, "RENAME found")
}

e.updateMigrationStage(ctx, onlineDDL.UUID, "reading post-lock pos")
postWritesPos, err := e.primaryPosition(ctx)
if err != nil {
return err
}

// Right now: new queries are buffered, any existing query will have executed, and worst case scenario is
// that some leftover query finds the table is not actually there anymore...
// At any case, there's definitely no more writes to the table since it does not exist. We can
// safely take the (GTID) pos now.
_ = e.updateMigrationTimestamp(ctx, "liveness_timestamp", s.workflow)

// Writes are now disabled on table. Read up-to-date vreplication info, specifically to get latest (and fixed) pos:
Expand All @@ -837,21 +942,13 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream) er
return err
}

waitForPos := func() error {
ctx, cancel := context.WithTimeout(ctx, 2*vreplicationCutOverThreshold)
defer cancel()
// Wait for target to reach the up-to-date pos
if err := tmClient.VReplicationWaitForPos(ctx, tablet.Tablet, int(s.id), mysql.EncodePosition(postWritesPos)); err != nil {
return err
}
// Target is now in sync with source!
return nil
}
log.Infof("VReplication migration %v waiting for position %v", s.workflow, mysql.EncodePosition(postWritesPos))
if err := waitForPos(); err != nil {
e.updateMigrationStage(ctx, onlineDDL.UUID, "waiting for post-lock pos: %v", mysql.EncodePosition(postWritesPos))
if err := waitForPos(s, postWritesPos); err != nil {
e.updateMigrationStage(ctx, onlineDDL.UUID, "timeout while waiting for post-lock pos: %v", err)
return err
}
// Stop vreplication
e.updateMigrationStage(ctx, onlineDDL.UUID, "stopping vreplication")
if _, err := e.vreplicationExec(ctx, tablet.Tablet, binlogplayer.StopVReplication(uint32(s.id), "stopped for online DDL cutover")); err != nil {
return err
}
Expand All @@ -865,23 +962,43 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream) er
if _, err := e.execQuery(ctx, parsed.Query); err != nil {
return err
}
e.updateMigrationStage(ctx, onlineDDL.UUID, "test suite 'after' table renamed")
} else {
// Normal (non-testing) alter table
conn, err := dbconnpool.NewDBConnection(ctx, e.env.Config().DB.DbaWithDB())
if err != nil {
e.updateMigrationStage(ctx, onlineDDL.UUID, "validating rename is still in place")
if err := waitForRenameProcess(); err != nil {
return err
}
defer conn.Close()

parsed := sqlparser.BuildParsedQuery(sqlRenameTwoTables,
vreplTable, onlineDDL.Table,
stowawayTableName, vreplTable,
)
if _, err := e.execQuery(ctx, parsed.Query); err != nil {
return err
// Normal (non-testing) alter table
e.updateMigrationStage(ctx, onlineDDL.UUID, "dropping sentry table")

{
dropTableQuery := sqlparser.BuildParsedQuery(sqlDropTable, sentryTableName)
lockCtx, cancel := context.WithTimeout(ctx, vreplicationCutOverThreshold)
defer cancel()
if _, err := lockConn.Exec(lockCtx, dropTableQuery.Query, 1, false); err != nil {
return err
}
}
{
lockCtx, cancel := context.WithTimeout(ctx, vreplicationCutOverThreshold)
defer cancel()
e.updateMigrationStage(ctx, onlineDDL.UUID, "unlocking tables")
if _, err := lockConn.Exec(lockCtx, sqlUnlockTables, 1, false); err != nil {
return err
}
}
{
lockCtx, cancel := context.WithTimeout(ctx, vreplicationCutOverThreshold)
defer cancel()
e.updateMigrationStage(lockCtx, onlineDDL.UUID, "waiting for RENAME to complete")
if err := <-renameCompleteChan; err != nil {
return err
}
}
}
}
e.updateMigrationStage(ctx, onlineDDL.UUID, "cut-over complete")
e.ownedRunningMigrations.Delete(onlineDDL.UUID)

go func() {
Expand All @@ -896,12 +1013,12 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream) er
}()

// Tables are now swapped! Migration is successful
e.updateMigrationStage(ctx, onlineDDL.UUID, "re-enabling writes")
reenableWritesOnce() // this function is also deferred, in case of early return; but now would be a good time to resume writes, before we publish the migration as "complete"
_ = e.onSchemaMigrationStatus(ctx, onlineDDL.UUID, schema.OnlineDDLStatusComplete, false, progressPctFull, etaSecondsNow, s.rowsCopied, emptyHint)
return nil

// deferred function will re-enable writes now
// deferred function will unlock keyspace
}

// initMigrationSQLMode sets sql_mode according to DDL strategy, and returns a function that
Expand Down Expand Up @@ -3336,6 +3453,7 @@ func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning i
if isReady {
if err := e.cutOverVReplMigration(ctx, s); err != nil {
_ = e.updateMigrationMessage(ctx, uuid, err.Error())
log.Errorf("cutOverVReplMigration failed: err=%v", err)
if merr, ok := err.(*mysql.SQLError); ok {
switch merr.Num {
case mysql.ERTooLongIdent:
Expand Down Expand Up @@ -3772,6 +3890,31 @@ func (e *Executor) updateMigrationSpecialPlan(ctx context.Context, uuid string,
return err
}

func (e *Executor) updateMigrationStage(ctx context.Context, uuid string, stage string, args ...interface{}) error {
msg := fmt.Sprintf(stage, args...)
log.Infof("updateMigrationStage: uuid=%s, stage=%s", uuid, msg)
query, err := sqlparser.ParseAndBind(sqlUpdateStage,
sqltypes.StringBindVariable(msg),
sqltypes.StringBindVariable(uuid),
)
if err != nil {
return err
}
_, err = e.execQuery(ctx, query)
return err
}

func (e *Executor) incrementCutoverAttempts(ctx context.Context, uuid string) error {
query, err := sqlparser.ParseAndBind(sqlIncrementCutoverAttempts,
sqltypes.StringBindVariable(uuid),
)
if err != nil {
return err
}
_, err = e.execQuery(ctx, query)
return err
}

// updateMigrationTablet sets 'tablet' column to be this executor's tablet alias for given migration
func (e *Executor) updateMigrationTablet(ctx context.Context, uuid string) error {
query, err := sqlparser.ParseAndBind(sqlUpdateTablet,
Expand Down
Loading