Skip to content

Commit

Permalink
Online DDL: publish vreplication_lag_seconds from vreplication prog…
Browse files Browse the repository at this point in the history
…ress (#17263)

Signed-off-by: Shlomi Noach <[email protected]>
  • Loading branch information
shlomi-noach authored Nov 25, 2024
1 parent 0439d89 commit ebf321e
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 45 deletions.
1 change: 1 addition & 0 deletions go/vt/sidecardb/schema/onlineddl/schema_migrations.sql
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ CREATE TABLE IF NOT EXISTS schema_migrations
`message_timestamp` timestamp(6) NULL DEFAULT NULL,
`eta_seconds` bigint NOT NULL DEFAULT '-1',
`rows_copied` bigint unsigned NOT NULL DEFAULT '0',
`vreplication_lag_seconds` bigint unsigned NOT NULL DEFAULT '0',
`table_rows` bigint NOT NULL DEFAULT '0',
`added_unique_keys` int unsigned NOT NULL DEFAULT '0',
`removed_unique_keys` int unsigned NOT NULL DEFAULT '0',
Expand Down
86 changes: 41 additions & 45 deletions go/vt/vttablet/onlineddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3583,53 +3583,36 @@ func (e *Executor) isPreserveForeignKeySupported(ctx context.Context) (isSupport
// and is up to date with the binlogs.
func (e *Executor) isVReplMigrationReadyToCutOver(ctx context.Context, onlineDDL *schema.OnlineDDL, s *VReplStream) (isReady bool, err error) {
// Check all the cases where migration is still running:
{
// when ready to cut-over, pos must have some value
if s.pos == "" {
return false, nil
}
// when ready to cut-over, pos must have some value
if s.pos == "" {
return false, nil
}
{
// Both time_updated and transaction_timestamp must be in close proximity to each
// other and to the time now, otherwise that means we're lagging and it's not a good time
// to cut-over
durationDiff := func(t1, t2 time.Time) time.Duration {
return t1.Sub(t2).Abs()
}
timeNow := time.Now()
timeUpdated := time.Unix(s.timeUpdated, 0)
if durationDiff(timeNow, timeUpdated) > onlineDDL.CutOverThreshold {
return false, nil
}
// Let's look at transaction timestamp. This gets written by any ongoing
// writes on the server (whether on this table or any other table)
transactionTimestamp := time.Unix(s.transactionTimestamp, 0)
if durationDiff(timeNow, transactionTimestamp) > onlineDDL.CutOverThreshold {
return false, nil
}
// Both time_updated and transaction_timestamp must be in close proximity to each
// other and to the time now, otherwise that means we're lagging and it's not a good time
// to cut-over
if s.Lag() > onlineDDL.CutOverThreshold {
return false, nil
}
{
// copy_state must have no entries for this vreplication id: if entries are
// present that means copy is still in progress
query, err := sqlparser.ParseAndBind(sqlReadCountCopyState,
sqltypes.Int32BindVariable(s.id),
)
if err != nil {
return false, err
}
r, err := e.execQuery(ctx, query)
if err != nil {
return false, err
}
csRow := r.Named().Row()
if csRow == nil {
return false, err
}
count := csRow.AsInt64("cnt", 0)
if count > 0 {
// Still copying
return false, nil
}
// copy_state must have no entries for this vreplication id: if entries are
// present that means copy is still in progress
query, err := sqlparser.ParseAndBind(sqlReadCountCopyState,
sqltypes.Int32BindVariable(s.id),
)
if err != nil {
return false, err
}
r, err := e.execQuery(ctx, query)
if err != nil {
return false, err
}
csRow := r.Named().Row()
if csRow == nil {
return false, err
}
count := csRow.AsInt64("cnt", 0)
if count > 0 {
// Still copying
return false, nil
}

return true, nil
Expand Down Expand Up @@ -3776,6 +3759,7 @@ func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning i
}
_ = e.updateRowsCopied(ctx, uuid, s.rowsCopied)
_ = e.updateMigrationProgressByRowsCopied(ctx, uuid, s.rowsCopied)
_ = e.updateMigrationVreplicationLagSeconds(ctx, uuid, int64(s.Lag().Seconds()))
_ = e.updateMigrationETASecondsByProgress(ctx, uuid)
if s.timeThrottled != 0 {
// Avoid creating a 0000-00-00 00:00:00 timestamp
Expand Down Expand Up @@ -4534,6 +4518,18 @@ func (e *Executor) updateRowsCopied(ctx context.Context, uuid string, rowsCopied
return err
}

func (e *Executor) updateMigrationVreplicationLagSeconds(ctx context.Context, uuid string, vreplicationLagSeconds int64) error {
query, err := sqlparser.ParseAndBind(sqlUpdateMigrationVreplicationLagSeconds,
sqltypes.Int64BindVariable(vreplicationLagSeconds),
sqltypes.StringBindVariable(uuid),
)
if err != nil {
return err
}
_, err = e.execQuery(ctx, query)
return err
}

func (e *Executor) updateVitessLivenessIndicator(ctx context.Context, uuid string, livenessIndicator int64) error {
query, err := sqlparser.ParseAndBind(sqlUpdateMigrationVitessLivenessIndicator,
sqltypes.Int64BindVariable(livenessIndicator),
Expand Down
5 changes: 5 additions & 0 deletions go/vt/vttablet/onlineddl/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ const (
WHERE
migration_uuid=%a
`
sqlUpdateMigrationVreplicationLagSeconds = `UPDATE _vt.schema_migrations
SET vreplication_lag_seconds=%a
WHERE
migration_uuid=%a
`
sqlUpdateMigrationIsView = `UPDATE _vt.schema_migrations
SET is_view=%a
WHERE
Expand Down
14 changes: 14 additions & 0 deletions go/vt/vttablet/onlineddl/vrepl.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"net/url"
"strconv"
"strings"
"time"

"vitess.io/vitess/go/mysql/collations"
"vitess.io/vitess/go/mysql/collations/charset"
Expand Down Expand Up @@ -96,6 +97,19 @@ func (v *VReplStream) hasError() (isTerminal bool, vreplError error) {
return false, nil
}

// Lag returns the vreplication lag, as determined by the higher of the transaction timestamp and the time updated.
func (s *VReplStream) Lag() time.Duration {
durationDiff := func(t1, t2 time.Time) time.Duration {
return t1.Sub(t2).Abs()
}
timeNow := time.Now()
timeUpdated := time.Unix(s.timeUpdated, 0)
// Let's look at transaction timestamp. This gets written by any ongoing
// writes on the server (whether on this table or any other table)
transactionTimestamp := time.Unix(s.transactionTimestamp, 0)
return max(durationDiff(timeNow, timeUpdated), durationDiff(timeNow, transactionTimestamp))
}

// VRepl is an online DDL helper for VReplication based migrations (ddl_strategy="online")
type VRepl struct {
workflow string
Expand Down

0 comments on commit ebf321e

Please sign in to comment.