Skip to content

Commit

Permalink
Online DDL: ensure high lock_wait_timeout in Vreplication cut-over (#…
Browse files Browse the repository at this point in the history
…16601)

Signed-off-by: Shlomi Noach <[email protected]>
  • Loading branch information
shlomi-noach authored Aug 18, 2024
1 parent 0d6b768 commit 44e48ca
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 0 deletions.
23 changes: 23 additions & 0 deletions go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,19 @@ func testScheduler(t *testing.T) {
})
}

var originalLockWaitTimeout int64
t.Run("set low lock_wait_timeout", func(t *testing.T) {
rs, err := primaryTablet.VttabletProcess.QueryTablet("select @@lock_wait_timeout as lock_wait_timeout", keyspaceName, false)
require.NoError(t, err)
row := rs.Named().Row()
require.NotNil(t, row)
originalLockWaitTimeout = row.AsInt64("lock_wait_timeout", 0)
require.NotZero(t, originalLockWaitTimeout)

_, err = primaryTablet.VttabletProcess.QueryTablet("set global lock_wait_timeout=1", keyspaceName, false)
require.NoError(t, err)
})

// CREATE
t.Run("CREATE TABLEs t1, t2", func(t *testing.T) {
{ // The table does not exist
Expand Down Expand Up @@ -578,6 +591,16 @@ func testScheduler(t *testing.T) {
assert.NotEmpty(t, rs.Rows)
})

t.Run("low @@lock_wait_timeout", func(t *testing.T) {
defer primaryTablet.VttabletProcess.QueryTablet(fmt.Sprintf("set global lock_wait_timeout=%d", originalLockWaitTimeout), keyspaceName, false)

t1uuid = testOnlineDDLStatement(t, createParams(trivialAlterT1Statement, ddlStrategy, "vtgate", "", "", false)) // wait
t.Run("trivial t1 migration", func(t *testing.T) {
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusComplete)
checkTable(t, t1Name, true)
})
})

forceCutoverCapable, err := capableOf(capabilities.PerformanceSchemaDataLocksTableCapability) // 8.0
require.NoError(t, err)
if forceCutoverCapable {
Expand Down
33 changes: 33 additions & 0 deletions go/vt/vttablet/onlineddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -979,7 +979,14 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh
if err != nil {
return err
}
// Set large enough `@@lock_wait_timeout` so that it does not interfere with the cut-over operation.
// The code will ensure everything that needs to be terminated by `migrationCutOverThreshold` will be terminated.
lockConnRestoreLockWaitTimeout, err := e.initConnectionLockWaitTimeout(ctx, lockConn.Conn, 5*migrationCutOverThreshold)
if err != nil {
return err
}
defer lockConn.Recycle()
defer lockConnRestoreLockWaitTimeout()
defer lockConn.Conn.Exec(ctx, sqlUnlockTables, 1, false)

renameCompleteChan := make(chan error)
Expand All @@ -988,6 +995,12 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh
if err != nil {
return err
}
// Set large enough `@@lock_wait_timeout` so that it does not interfere with the cut-over operation.
// The code will ensure everything that needs to be terminated by `migrationCutOverThreshold` will be terminated.
renameConnRestoreLockWaitTimeout, err := e.initConnectionLockWaitTimeout(ctx, renameConn.Conn, 5*migrationCutOverThreshold*4)
if err != nil {
return err
}
defer renameConn.Recycle()
defer func() {
if !renameWasSuccessful {
Expand All @@ -997,6 +1010,8 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh
}
}
}()
defer renameConnRestoreLockWaitTimeout()

// See if backend MySQL server supports 'rename_table_preserve_foreign_key' variable
preserveFKSupported, err := e.isPreserveForeignKeySupported(ctx)
if err != nil {
Expand Down Expand Up @@ -1260,6 +1275,24 @@ func (e *Executor) initMigrationSQLMode(ctx context.Context, onlineDDL *schema.O
return deferFunc, nil
}

// initConnectionLockWaitTimeout sets the given lock_wait_timeout for the given connection, with a deferred value restoration function
func (e *Executor) initConnectionLockWaitTimeout(ctx context.Context, conn *connpool.Conn, lockWaitTimeout time.Duration) (deferFunc func(), err error) {
deferFunc = func() {}

if _, err := conn.Exec(ctx, `set @lock_wait_timeout=@@session.lock_wait_timeout`, 0, false); err != nil {
return deferFunc, vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "could not read lock_wait_timeout: %v", err)
}
timeoutSeconds := int64(lockWaitTimeout.Seconds())
setQuery := fmt.Sprintf("set @@session.lock_wait_timeout=%d", timeoutSeconds)
if _, err := conn.Exec(ctx, setQuery, 0, false); err != nil {
return deferFunc, err
}
deferFunc = func() {
conn.Exec(ctx, "set @@session.lock_wait_timeout=@lock_wait_timeout", 0, false)
}
return deferFunc, nil
}

// newConstraintName generates a new, unique name for a constraint. Our problem is that a MySQL
// constraint's name is unique in the schema (!). And so as we duplicate the original table, we must
// create completely new names for all constraints.
Expand Down

0 comments on commit 44e48ca

Please sign in to comment.