From ebf321ef957d52eb760ccecf8780ebda50c12045 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Mon, 25 Nov 2024 20:17:45 +0200 Subject: [PATCH] Online DDL: publish `vreplication_lag_seconds` from vreplication progress (#17263) Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../schema/onlineddl/schema_migrations.sql | 1 + go/vt/vttablet/onlineddl/executor.go | 86 +++++++++---------- go/vt/vttablet/onlineddl/schema.go | 5 ++ go/vt/vttablet/onlineddl/vrepl.go | 14 +++ 4 files changed, 61 insertions(+), 45 deletions(-) diff --git a/go/vt/sidecardb/schema/onlineddl/schema_migrations.sql b/go/vt/sidecardb/schema/onlineddl/schema_migrations.sql index 91c0323e54b..3e2a72d6ae5 100644 --- a/go/vt/sidecardb/schema/onlineddl/schema_migrations.sql +++ b/go/vt/sidecardb/schema/onlineddl/schema_migrations.sql @@ -45,6 +45,7 @@ CREATE TABLE IF NOT EXISTS schema_migrations `message_timestamp` timestamp(6) NULL DEFAULT NULL, `eta_seconds` bigint NOT NULL DEFAULT '-1', `rows_copied` bigint unsigned NOT NULL DEFAULT '0', + `vreplication_lag_seconds` bigint unsigned NOT NULL DEFAULT '0', `table_rows` bigint NOT NULL DEFAULT '0', `added_unique_keys` int unsigned NOT NULL DEFAULT '0', `removed_unique_keys` int unsigned NOT NULL DEFAULT '0', diff --git a/go/vt/vttablet/onlineddl/executor.go b/go/vt/vttablet/onlineddl/executor.go index 002cd1fb6d0..f8b5cfd9b8d 100644 --- a/go/vt/vttablet/onlineddl/executor.go +++ b/go/vt/vttablet/onlineddl/executor.go @@ -3583,53 +3583,36 @@ func (e *Executor) isPreserveForeignKeySupported(ctx context.Context) (isSupport // and is up to date with the binlogs. func (e *Executor) isVReplMigrationReadyToCutOver(ctx context.Context, onlineDDL *schema.OnlineDDL, s *VReplStream) (isReady bool, err error) { // Check all the cases where migration is still running: - { - // when ready to cut-over, pos must have some value - if s.pos == "" { - return false, nil - } + // when ready to cut-over, pos must have some value + if s.pos == "" { + return false, nil } - { - // Both time_updated and transaction_timestamp must be in close proximity to each - // other and to the time now, otherwise that means we're lagging and it's not a good time - // to cut-over - durationDiff := func(t1, t2 time.Time) time.Duration { - return t1.Sub(t2).Abs() - } - timeNow := time.Now() - timeUpdated := time.Unix(s.timeUpdated, 0) - if durationDiff(timeNow, timeUpdated) > onlineDDL.CutOverThreshold { - return false, nil - } - // Let's look at transaction timestamp. This gets written by any ongoing - // writes on the server (whether on this table or any other table) - transactionTimestamp := time.Unix(s.transactionTimestamp, 0) - if durationDiff(timeNow, transactionTimestamp) > onlineDDL.CutOverThreshold { - return false, nil - } + // Both time_updated and transaction_timestamp must be in close proximity to each + // other and to the time now, otherwise that means we're lagging and it's not a good time + // to cut-over + if s.Lag() > onlineDDL.CutOverThreshold { + return false, nil } - { - // copy_state must have no entries for this vreplication id: if entries are - // present that means copy is still in progress - query, err := sqlparser.ParseAndBind(sqlReadCountCopyState, - sqltypes.Int32BindVariable(s.id), - ) - if err != nil { - return false, err - } - r, err := e.execQuery(ctx, query) - if err != nil { - return false, err - } - csRow := r.Named().Row() - if csRow == nil { - return false, err - } - count := csRow.AsInt64("cnt", 0) - if count > 0 { - // Still copying - return false, nil - } + // copy_state must have no entries for this vreplication id: if entries are + // present that means copy is still in progress + query, err := sqlparser.ParseAndBind(sqlReadCountCopyState, + sqltypes.Int32BindVariable(s.id), + ) + if err != nil { + return false, err + } + r, err := e.execQuery(ctx, query) + if err != nil { + return false, err + } + csRow := r.Named().Row() + if csRow == nil { + return false, err + } + count := csRow.AsInt64("cnt", 0) + if count > 0 { + // Still copying + return false, nil } return true, nil @@ -3776,6 +3759,7 @@ func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning i } _ = e.updateRowsCopied(ctx, uuid, s.rowsCopied) _ = e.updateMigrationProgressByRowsCopied(ctx, uuid, s.rowsCopied) + _ = e.updateMigrationVreplicationLagSeconds(ctx, uuid, int64(s.Lag().Seconds())) _ = e.updateMigrationETASecondsByProgress(ctx, uuid) if s.timeThrottled != 0 { // Avoid creating a 0000-00-00 00:00:00 timestamp @@ -4534,6 +4518,18 @@ func (e *Executor) updateRowsCopied(ctx context.Context, uuid string, rowsCopied return err } +func (e *Executor) updateMigrationVreplicationLagSeconds(ctx context.Context, uuid string, vreplicationLagSeconds int64) error { + query, err := sqlparser.ParseAndBind(sqlUpdateMigrationVreplicationLagSeconds, + sqltypes.Int64BindVariable(vreplicationLagSeconds), + sqltypes.StringBindVariable(uuid), + ) + if err != nil { + return err + } + _, err = e.execQuery(ctx, query) + return err +} + func (e *Executor) updateVitessLivenessIndicator(ctx context.Context, uuid string, livenessIndicator int64) error { query, err := sqlparser.ParseAndBind(sqlUpdateMigrationVitessLivenessIndicator, sqltypes.Int64BindVariable(livenessIndicator), diff --git a/go/vt/vttablet/onlineddl/schema.go b/go/vt/vttablet/onlineddl/schema.go index bc3fa4f2bc9..943a3b1df07 100644 --- a/go/vt/vttablet/onlineddl/schema.go +++ b/go/vt/vttablet/onlineddl/schema.go @@ -87,6 +87,11 @@ const ( WHERE migration_uuid=%a ` + sqlUpdateMigrationVreplicationLagSeconds = `UPDATE _vt.schema_migrations + SET vreplication_lag_seconds=%a + WHERE + migration_uuid=%a + ` sqlUpdateMigrationIsView = `UPDATE _vt.schema_migrations SET is_view=%a WHERE diff --git a/go/vt/vttablet/onlineddl/vrepl.go b/go/vt/vttablet/onlineddl/vrepl.go index 26eb614e95a..2761c27c801 100644 --- a/go/vt/vttablet/onlineddl/vrepl.go +++ b/go/vt/vttablet/onlineddl/vrepl.go @@ -30,6 +30,7 @@ import ( "net/url" "strconv" "strings" + "time" "vitess.io/vitess/go/mysql/collations" "vitess.io/vitess/go/mysql/collations/charset" @@ -96,6 +97,19 @@ func (v *VReplStream) hasError() (isTerminal bool, vreplError error) { return false, nil } +// Lag returns the vreplication lag, as determined by the higher of the transaction timestamp and the time updated. +func (s *VReplStream) Lag() time.Duration { + durationDiff := func(t1, t2 time.Time) time.Duration { + return t1.Sub(t2).Abs() + } + timeNow := time.Now() + timeUpdated := time.Unix(s.timeUpdated, 0) + // Let's look at transaction timestamp. This gets written by any ongoing + // writes on the server (whether on this table or any other table) + transactionTimestamp := time.Unix(s.transactionTimestamp, 0) + return max(durationDiff(timeNow, timeUpdated), durationDiff(timeNow, transactionTimestamp)) +} + // VRepl is an online DDL helper for VReplication based migrations (ddl_strategy="online") type VRepl struct { workflow string