Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Online DDL: publish vreplication_lag_seconds from vreplication progress #17263

Merged
merged 1 commit into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go/vt/sidecardb/schema/onlineddl/schema_migrations.sql
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ CREATE TABLE IF NOT EXISTS schema_migrations
`message_timestamp` timestamp(6) NULL DEFAULT NULL,
`eta_seconds` bigint NOT NULL DEFAULT '-1',
`rows_copied` bigint unsigned NOT NULL DEFAULT '0',
`vreplication_lag_seconds` bigint unsigned NOT NULL DEFAULT '0',
`table_rows` bigint NOT NULL DEFAULT '0',
`added_unique_keys` int unsigned NOT NULL DEFAULT '0',
`removed_unique_keys` int unsigned NOT NULL DEFAULT '0',
Expand Down
86 changes: 41 additions & 45 deletions go/vt/vttablet/onlineddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3583,53 +3583,36 @@ func (e *Executor) isPreserveForeignKeySupported(ctx context.Context) (isSupport
// and is up to date with the binlogs.
func (e *Executor) isVReplMigrationReadyToCutOver(ctx context.Context, onlineDDL *schema.OnlineDDL, s *VReplStream) (isReady bool, err error) {
// Check all the cases where migration is still running:
{
// when ready to cut-over, pos must have some value
if s.pos == "" {
return false, nil
}
// when ready to cut-over, pos must have some value
if s.pos == "" {
return false, nil
}
{
// Both time_updated and transaction_timestamp must be in close proximity to each
// other and to the time now, otherwise that means we're lagging and it's not a good time
// to cut-over
durationDiff := func(t1, t2 time.Time) time.Duration {
return t1.Sub(t2).Abs()
}
timeNow := time.Now()
timeUpdated := time.Unix(s.timeUpdated, 0)
if durationDiff(timeNow, timeUpdated) > onlineDDL.CutOverThreshold {
return false, nil
}
// Let's look at transaction timestamp. This gets written by any ongoing
// writes on the server (whether on this table or any other table)
transactionTimestamp := time.Unix(s.transactionTimestamp, 0)
if durationDiff(timeNow, transactionTimestamp) > onlineDDL.CutOverThreshold {
return false, nil
}
// Both time_updated and transaction_timestamp must be in close proximity to each
// other and to the time now, otherwise that means we're lagging and it's not a good time
// to cut-over
if s.Lag() > onlineDDL.CutOverThreshold {
return false, nil
}
{
// copy_state must have no entries for this vreplication id: if entries are
// present that means copy is still in progress
query, err := sqlparser.ParseAndBind(sqlReadCountCopyState,
sqltypes.Int32BindVariable(s.id),
)
if err != nil {
return false, err
}
r, err := e.execQuery(ctx, query)
if err != nil {
return false, err
}
csRow := r.Named().Row()
if csRow == nil {
return false, err
}
count := csRow.AsInt64("cnt", 0)
if count > 0 {
// Still copying
return false, nil
}
// copy_state must have no entries for this vreplication id: if entries are
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe in a future refactor, similar to the extraction into Lag(), you could extract the code to check if copy is in progress too into a utility function ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. To do that I'd need to issue a query against copy_state. Or I can converge the two queries (vreplication+copy_state into one. I guess that would be wasteful for most of the time though.

// present that means copy is still in progress
query, err := sqlparser.ParseAndBind(sqlReadCountCopyState,
sqltypes.Int32BindVariable(s.id),
)
if err != nil {
return false, err
}
r, err := e.execQuery(ctx, query)
if err != nil {
return false, err
}
csRow := r.Named().Row()
if csRow == nil {
return false, err
}
count := csRow.AsInt64("cnt", 0)
if count > 0 {
// Still copying
return false, nil
}

return true, nil
Expand Down Expand Up @@ -3776,6 +3759,7 @@ func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning i
}
_ = e.updateRowsCopied(ctx, uuid, s.rowsCopied)
_ = e.updateMigrationProgressByRowsCopied(ctx, uuid, s.rowsCopied)
_ = e.updateMigrationVreplicationLagSeconds(ctx, uuid, int64(s.Lag().Seconds()))
_ = e.updateMigrationETASecondsByProgress(ctx, uuid)
if s.timeThrottled != 0 {
// Avoid creating a 0000-00-00 00:00:00 timestamp
Expand Down Expand Up @@ -4534,6 +4518,18 @@ func (e *Executor) updateRowsCopied(ctx context.Context, uuid string, rowsCopied
return err
}

func (e *Executor) updateMigrationVreplicationLagSeconds(ctx context.Context, uuid string, vreplicationLagSeconds int64) error {
query, err := sqlparser.ParseAndBind(sqlUpdateMigrationVreplicationLagSeconds,
sqltypes.Int64BindVariable(vreplicationLagSeconds),
sqltypes.StringBindVariable(uuid),
)
if err != nil {
return err
}
_, err = e.execQuery(ctx, query)
return err
}

func (e *Executor) updateVitessLivenessIndicator(ctx context.Context, uuid string, livenessIndicator int64) error {
query, err := sqlparser.ParseAndBind(sqlUpdateMigrationVitessLivenessIndicator,
sqltypes.Int64BindVariable(livenessIndicator),
Expand Down
5 changes: 5 additions & 0 deletions go/vt/vttablet/onlineddl/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ const (
WHERE
migration_uuid=%a
`
sqlUpdateMigrationVreplicationLagSeconds = `UPDATE _vt.schema_migrations
SET vreplication_lag_seconds=%a
WHERE
migration_uuid=%a
`
sqlUpdateMigrationIsView = `UPDATE _vt.schema_migrations
SET is_view=%a
WHERE
Expand Down
14 changes: 14 additions & 0 deletions go/vt/vttablet/onlineddl/vrepl.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"net/url"
"strconv"
"strings"
"time"

"vitess.io/vitess/go/mysql/collations"
"vitess.io/vitess/go/mysql/collations/charset"
Expand Down Expand Up @@ -96,6 +97,19 @@ func (v *VReplStream) hasError() (isTerminal bool, vreplError error) {
return false, nil
}

// Lag returns the vreplication lag, as determined by the higher of the transaction timestamp and the time updated.
func (s *VReplStream) Lag() time.Duration {
durationDiff := func(t1, t2 time.Time) time.Duration {
return t1.Sub(t2).Abs()
}
timeNow := time.Now()
timeUpdated := time.Unix(s.timeUpdated, 0)
// Let's look at transaction timestamp. This gets written by any ongoing
// writes on the server (whether on this table or any other table)
transactionTimestamp := time.Unix(s.transactionTimestamp, 0)
return max(durationDiff(timeNow, timeUpdated), durationDiff(timeNow, transactionTimestamp))
}

// VRepl is an online DDL helper for VReplication based migrations (ddl_strategy="online")
type VRepl struct {
workflow string
Expand Down
Loading