Skip to content

Commit

Permalink
OnlineDDL: cleanup cancelled migration artifacts; support `--retain-a…
Browse files Browse the repository at this point in the history
…rtifacts=<duration>` DDL strategy flag (#14029)

Signed-off-by: Shlomi Noach <[email protected]>
  • Loading branch information
shlomi-noach authored Sep 19, 2023
1 parent b847161 commit 0461faf
Show file tree
Hide file tree
Showing 5 changed files with 172 additions and 3 deletions.
54 changes: 54 additions & 0 deletions go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -884,6 +884,60 @@ func testScheduler(t *testing.T) {
})
})

t.Run("Cleanup artifacts", func(t *testing.T) {
// Create a migration with a low --retain-artifacts value.
// We will cancel the migration and expect the artifact to be cleaned.
t.Run("start migration", func(t *testing.T) {
t1uuid = testOnlineDDLStatement(t, createParams(trivialAlterT1Statement, ddlStrategy+" --postpone-completion --retain-artifacts=1s", "vtctl", "", "", true)) // skip wait
onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusRunning)
})
var artifacts []string
t.Run("validate artifact exists", func(t *testing.T) {
rs := onlineddl.ReadMigrations(t, &vtParams, t1uuid)
require.NotNil(t, rs)
row := rs.Named().Row()
require.NotNil(t, row)

artifacts = textutil.SplitDelimitedList(row.AsString("artifacts", ""))
assert.NotEmpty(t, artifacts)
assert.Equal(t, 1, len(artifacts))
checkTable(t, artifacts[0], true)

retainArtifactsSeconds := row.AsInt64("retain_artifacts_seconds", 0)
assert.Equal(t, int64(1), retainArtifactsSeconds) // due to --retain-artifacts=1s
})
t.Run("cancel migration", func(t *testing.T) {
onlineddl.CheckCancelMigration(t, &vtParams, shards, t1uuid, true)
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusFailed, schema.OnlineDDLStatusCancelled)
fmt.Printf("# Migration status (for debug purposes): <%s>\n", status)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusCancelled)
})
t.Run("wait for cleanup", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), normalWaitTime)
defer cancel()

for {
rs := onlineddl.ReadMigrations(t, &vtParams, t1uuid)
require.NotNil(t, rs)
row := rs.Named().Row()
require.NotNil(t, row)
if !row["cleanup_timestamp"].IsNull() {
// This is what we've been waiting for
break
}
select {
case <-ctx.Done():
assert.Fail(t, "timeout waiting for cleanup")
return
case <-time.After(time.Second):
}
}
})
t.Run("validate artifact does not exist", func(t *testing.T) {
checkTable(t, artifacts[0], false)
})
})

// INSTANT DDL
instantDDLCapable, err := capableOf(mysql.InstantAddLastColumnFlavorCapability)
require.NoError(t, err)
Expand Down
37 changes: 36 additions & 1 deletion go/vt/schema/ddl_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
var (
strategyParserRegexp = regexp.MustCompile(`^([\S]+)\s+(.*)$`)
cutOverThresholdFlagRegexp = regexp.MustCompile(fmt.Sprintf(`^[-]{1,2}%s=(.*?)$`, cutOverThresholdFlag))
retainArtifactsFlagRegexp = regexp.MustCompile(fmt.Sprintf(`^[-]{1,2}%s=(.*?)$`, retainArtifactsFlag))
)

const (
Expand All @@ -43,6 +44,7 @@ const (
preferInstantDDL = "prefer-instant-ddl"
fastRangeRotationFlag = "fast-range-rotation"
cutOverThresholdFlag = "cut-over-threshold"
retainArtifactsFlag = "retain-artifacts"
vreplicationTestSuite = "vreplication-test-suite"
allowForeignKeysFlag = "unsafe-allow-foreign-keys"
analyzeTableFlag = "analyze-table"
Expand Down Expand Up @@ -110,6 +112,9 @@ func ParseDDLStrategy(strategyVariable string) (*DDLStrategySetting, error) {
if _, err := setting.CutOverThreshold(); err != nil {
return nil, err
}
if _, err := setting.RetainArtifactsDuration(); err != nil {
return nil, err
}
return setting, nil
}

Expand Down Expand Up @@ -194,7 +199,16 @@ func isCutOverThresholdFlag(opt string) (string, bool) {
return submatch[1], true
}

// CutOverThreshold returns a list of shards specified in '--shards=...', or an empty slice if unspecified
// isRetainArtifactsFlag returns true when given option denotes a `--retain-artifacts=[...]` flag
func isRetainArtifactsFlag(opt string) (string, bool) {
submatch := retainArtifactsFlagRegexp.FindStringSubmatch(opt)
if len(submatch) == 0 {
return "", false
}
return submatch[1], true
}

// CutOverThreshold returns a the duration threshold indicated by --cut-over-threshold
func (setting *DDLStrategySetting) CutOverThreshold() (d time.Duration, err error) {
// We do some ugly manual parsing of --cut-over-threshold value
opts, _ := shlex.Split(setting.Options)
Expand All @@ -212,6 +226,24 @@ func (setting *DDLStrategySetting) CutOverThreshold() (d time.Duration, err erro
return d, err
}

// RetainArtifactsDuration returns a the duration indicated by --retain-artifacts
func (setting *DDLStrategySetting) RetainArtifactsDuration() (d time.Duration, err error) {
// We do some ugly manual parsing of --retain-artifacts
opts, _ := shlex.Split(setting.Options)
for _, opt := range opts {
if val, isRetainArtifacts := isRetainArtifactsFlag(opt); isRetainArtifacts {
// value is possibly quoted
if s, err := strconv.Unquote(val); err == nil {
val = s
}
if val != "" {
d, err = time.ParseDuration(val)
}
}
}
return d, err
}

// IsVreplicationTestSuite checks if strategy options include --vreplicatoin-test-suite
func (setting *DDLStrategySetting) IsVreplicationTestSuite() bool {
return setting.hasFlag(vreplicationTestSuite)
Expand All @@ -235,6 +267,9 @@ func (setting *DDLStrategySetting) RuntimeOptions() []string {
if _, ok := isCutOverThresholdFlag(opt); ok {
continue
}
if _, ok := isRetainArtifactsFlag(opt); ok {
continue
}
switch {
case isFlag(opt, declarativeFlag):
case isFlag(opt, skipTopoFlag):
Expand Down
77 changes: 76 additions & 1 deletion go/vt/schema/ddl_strategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestIsCutOverThresholdFlag(t *testing.T) {
setting, err := ParseDDLStrategy("online " + ts.s)
assert.NoError(t, err)

val, isCutOver := isCutOverThresholdFlag((ts.s))
val, isCutOver := isCutOverThresholdFlag(ts.s)
assert.Equal(t, ts.expect, isCutOver)
assert.Equal(t, ts.val, val)

Expand All @@ -102,6 +102,69 @@ func TestIsCutOverThresholdFlag(t *testing.T) {
}
}

func TestIsExpireArtifactsFlag(t *testing.T) {
tt := []struct {
s string
expect bool
val string
d time.Duration
}{
{
s: "something",
},
{
s: "-retain-artifacts",
},
{
s: "--retain-artifacts",
},
{
s: "--retain-artifacts=",
expect: true,
},
{
s: "--retain-artifacts=0",
expect: true,
val: "0",
d: 0,
},
{
s: "-retain-artifacts=0",
expect: true,
val: "0",
d: 0,
},
{
s: "--retain-artifacts=1m",
expect: true,
val: "1m",
d: time.Minute,
},
{
s: `--retain-artifacts="1m"`,
expect: true,
val: `"1m"`,
d: time.Minute,
},
}
for _, ts := range tt {
t.Run(ts.s, func(t *testing.T) {
setting, err := ParseDDLStrategy("online " + ts.s)
assert.NoError(t, err)

val, isRetainArtifacts := isRetainArtifactsFlag(ts.s)
assert.Equal(t, ts.expect, isRetainArtifacts)
assert.Equal(t, ts.val, val)

if ts.expect {
d, err := setting.RetainArtifactsDuration()
assert.NoError(t, err)
assert.Equal(t, ts.d, d)
}
})
}
}

func TestParseDDLStrategy(t *testing.T) {
tt := []struct {
strategyVariable string
Expand All @@ -118,6 +181,7 @@ func TestParseDDLStrategy(t *testing.T) {
allowForeignKeys bool
analyzeTable bool
cutOverThreshold time.Duration
expireArtifacts time.Duration
runtimeOptions string
err error
}{
Expand Down Expand Up @@ -239,6 +303,13 @@ func TestParseDDLStrategy(t *testing.T) {
runtimeOptions: "",
cutOverThreshold: 5 * time.Minute,
},
{
strategyVariable: "vitess --retain-artifacts=4m",
strategy: DDLStrategyVitess,
options: "--retain-artifacts=4m",
runtimeOptions: "",
expireArtifacts: 4 * time.Minute,
},
{
strategyVariable: "vitess --analyze-table",
strategy: DDLStrategyVitess,
Expand Down Expand Up @@ -282,4 +353,8 @@ func TestParseDDLStrategy(t *testing.T) {
_, err := ParseDDLStrategy("online --cut-over-threshold=3")
assert.Error(t, err)
}
{
_, err := ParseDDLStrategy("online --retain-artifacts=3")
assert.Error(t, err)
}
}
5 changes: 5 additions & 0 deletions go/vt/vttablet/onlineddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4634,6 +4634,11 @@ func (e *Executor) SubmitMigration(

revertedUUID, _ := onlineDDL.GetRevertUUID() // Empty value if the migration is not actually a REVERT. Safe to ignore error.
retainArtifactsSeconds := int64((retainOnlineDDLTables).Seconds())
if retainArtifacts, _ := onlineDDL.StrategySetting().RetainArtifactsDuration(); retainArtifacts != 0 {
// Explicit retention indicated by `--retain-artifact` DDL strategy flag for this migration. Override!
retainArtifactsSeconds = int64((retainArtifacts).Seconds())
}

_, allowConcurrentMigration := e.allowConcurrentMigration(onlineDDL)
submitQuery, err := sqlparser.ParseAndBind(sqlInsertMigration,
sqltypes.StringBindVariable(onlineDDL.UUID),
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/onlineddl/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ const (
log_path
FROM _vt.schema_migrations
WHERE
migration_status IN ('complete', 'failed')
migration_status IN ('complete', 'cancelled', 'failed')
AND cleanup_timestamp IS NULL
AND completed_timestamp <= IF(retain_artifacts_seconds=0,
NOW() - INTERVAL %a SECOND,
Expand Down

0 comments on commit 0461faf

Please sign in to comment.