diff --git a/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go b/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go index 2b068df643d..e3254817c18 100644 --- a/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go +++ b/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go @@ -870,6 +870,60 @@ func testScheduler(t *testing.T) { }) }) + t.Run("Cleanup artifacts", func(t *testing.T) { + // Create a migration with a low --retain-artifacts value. + // We will cancel the migration and expect the artifact to be cleaned. + t.Run("start migration", func(t *testing.T) { + t1uuid = testOnlineDDLStatement(t, createParams(trivialAlterT1Statement, ddlStrategy+" --postpone-completion --retain-artifacts=1s", "vtctl", "", "", true)) // skip wait + onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusRunning) + }) + var artifacts []string + t.Run("validate artifact exists", func(t *testing.T) { + rs := onlineddl.ReadMigrations(t, &vtParams, t1uuid) + require.NotNil(t, rs) + row := rs.Named().Row() + require.NotNil(t, row) + + artifacts = textutil.SplitDelimitedList(row.AsString("artifacts", "")) + assert.NotEmpty(t, artifacts) + assert.Equal(t, 1, len(artifacts)) + checkTable(t, artifacts[0], true) + + retainArtifactsSeconds := row.AsInt64("retain_artifacts_seconds", 0) + assert.Equal(t, int64(1), retainArtifactsSeconds) // due to --retain-artifacts=1s + }) + t.Run("cancel migration", func(t *testing.T) { + onlineddl.CheckCancelMigration(t, &vtParams, shards, t1uuid, true) + status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusFailed, schema.OnlineDDLStatusCancelled) + fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusCancelled) + }) + t.Run("wait for cleanup", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), normalWaitTime) + defer cancel() + + for { + rs := onlineddl.ReadMigrations(t, &vtParams, t1uuid) + require.NotNil(t, rs) + row := rs.Named().Row() + require.NotNil(t, row) + if !row["cleanup_timestamp"].IsNull() { + // This is what we've been waiting for + break + } + select { + case <-ctx.Done(): + assert.Fail(t, "timeout waiting for cleanup") + return + case <-time.After(time.Second): + } + } + }) + t.Run("validate artifact does not exist", func(t *testing.T) { + checkTable(t, artifacts[0], false) + }) + }) + // INSTANT DDL instantDDLCapable, err := capableOf(mysql.InstantAddLastColumnFlavorCapability) require.NoError(t, err) diff --git a/go/vt/schema/ddl_strategy.go b/go/vt/schema/ddl_strategy.go index d56b8004ab8..c75de3ad7e7 100644 --- a/go/vt/schema/ddl_strategy.go +++ b/go/vt/schema/ddl_strategy.go @@ -19,12 +19,15 @@ package schema import ( "fmt" "regexp" + "strconv" + "time" "github.com/google/shlex" ) var ( - strategyParserRegexp = regexp.MustCompile(`^([\S]+)\s+(.*)$`) + strategyParserRegexp = regexp.MustCompile(`^([\S]+)\s+(.*)$`) + retainArtifactsFlagRegexp = regexp.MustCompile(fmt.Sprintf(`^[-]{1,2}%s=(.*?)$`, retainArtifactsFlag)) ) const ( @@ -39,6 +42,7 @@ const ( allowConcurrentFlag = "allow-concurrent" preferInstantDDL = "prefer-instant-ddl" fastRangeRotationFlag = "fast-range-rotation" + retainArtifactsFlag = "retain-artifacts" vreplicationTestSuite = "vreplication-test-suite" allowForeignKeysFlag = "unsafe-allow-foreign-keys" ) @@ -102,6 +106,9 @@ func ParseDDLStrategy(strategyVariable string) (*DDLStrategySetting, error) { default: return nil, fmt.Errorf("Unknown online DDL strategy: '%v'", strategy) } + if _, err := setting.RetainArtifactsDuration(); err != nil { + return nil, err + } return setting, nil } @@ -177,6 +184,33 @@ func (setting *DDLStrategySetting) IsFastRangeRotationFlag() bool { return setting.hasFlag(fastRangeRotationFlag) } +// isRetainArtifactsFlag returns true when given option denotes a `--retain-artifacts=[...]` flag +func isRetainArtifactsFlag(opt string) (string, bool) { + submatch := retainArtifactsFlagRegexp.FindStringSubmatch(opt) + if len(submatch) == 0 { + return "", false + } + return submatch[1], true +} + +// RetainArtifactsDuration returns a the duration indicated by --retain-artifacts +func (setting *DDLStrategySetting) RetainArtifactsDuration() (d time.Duration, err error) { + // We do some ugly manual parsing of --retain-artifacts + opts, _ := shlex.Split(setting.Options) + for _, opt := range opts { + if val, isRetainArtifacts := isRetainArtifactsFlag(opt); isRetainArtifacts { + // value is possibly quoted + if s, err := strconv.Unquote(val); err == nil { + val = s + } + if val != "" { + d, err = time.ParseDuration(val) + } + } + } + return d, err +} + // IsVreplicationTestSuite checks if strategy options include --vreplicatoin-test-suite func (setting *DDLStrategySetting) IsVreplicationTestSuite() bool { return setting.hasFlag(vreplicationTestSuite) @@ -192,6 +226,9 @@ func (setting *DDLStrategySetting) RuntimeOptions() []string { opts, _ := shlex.Split(setting.Options) validOpts := []string{} for _, opt := range opts { + if _, ok := isRetainArtifactsFlag(opt); ok { + continue + } switch { case isFlag(opt, declarativeFlag): case isFlag(opt, skipTopoFlag): diff --git a/go/vt/schema/ddl_strategy_test.go b/go/vt/schema/ddl_strategy_test.go index 610cb8b9ed3..b42df37024e 100644 --- a/go/vt/schema/ddl_strategy_test.go +++ b/go/vt/schema/ddl_strategy_test.go @@ -19,6 +19,7 @@ package schema import ( "strings" "testing" + "time" "github.com/stretchr/testify/assert" ) @@ -38,6 +39,69 @@ func TestIsDirect(t *testing.T) { assert.True(t, DDLStrategy("something").IsDirect()) } +func TestIsExpireArtifactsFlag(t *testing.T) { + tt := []struct { + s string + expect bool + val string + d time.Duration + }{ + { + s: "something", + }, + { + s: "-retain-artifacts", + }, + { + s: "--retain-artifacts", + }, + { + s: "--retain-artifacts=", + expect: true, + }, + { + s: "--retain-artifacts=0", + expect: true, + val: "0", + d: 0, + }, + { + s: "-retain-artifacts=0", + expect: true, + val: "0", + d: 0, + }, + { + s: "--retain-artifacts=1m", + expect: true, + val: "1m", + d: time.Minute, + }, + { + s: `--retain-artifacts="1m"`, + expect: true, + val: `"1m"`, + d: time.Minute, + }, + } + for _, ts := range tt { + t.Run(ts.s, func(t *testing.T) { + setting, err := ParseDDLStrategy("online " + ts.s) + assert.NoError(t, err) + + val, isRetainArtifacts := isRetainArtifactsFlag(ts.s) + assert.Equal(t, ts.expect, isRetainArtifacts) + assert.Equal(t, ts.val, val) + + if ts.expect { + d, err := setting.RetainArtifactsDuration() + assert.NoError(t, err) + assert.Equal(t, ts.d, d) + } + }) + } +} + func TestParseDDLStrategy(t *testing.T) { tt := []struct { strategyVariable string @@ -52,6 +116,7 @@ func TestParseDDLStrategy(t *testing.T) { fastOverRevertible bool fastRangeRotation bool allowForeignKeys bool + expireArtifacts time.Duration runtimeOptions string err error }{ @@ -166,6 +231,13 @@ func TestParseDDLStrategy(t *testing.T) { runtimeOptions: "", allowForeignKeys: true, }, + { + strategyVariable: "vitess --retain-artifacts=4m", + strategy: DDLStrategyVitess, + options: "--retain-artifacts=4m", + runtimeOptions: "", + expireArtifacts: 4 * time.Minute, + }, } for _, ts := range tt { t.Run(ts.strategyVariable, func(t *testing.T) { @@ -190,4 +262,8 @@ func TestParseDDLStrategy(t *testing.T) { _, err := ParseDDLStrategy("other") assert.Error(t, err) } + { + _, err := ParseDDLStrategy("online --retain-artifacts=3") + assert.Error(t, err) + } } diff --git a/go/vt/vttablet/onlineddl/executor.go b/go/vt/vttablet/onlineddl/executor.go index 62e22cf3fda..071deb99aa9 100644 --- a/go/vt/vttablet/onlineddl/executor.go +++ b/go/vt/vttablet/onlineddl/executor.go @@ -4723,6 +4723,11 @@ func (e *Executor) SubmitMigration( revertedUUID, _ := onlineDDL.GetRevertUUID() // Empty value if the migration is not actually a REVERT. Safe to ignore error. retainArtifactsSeconds := int64((retainOnlineDDLTables).Seconds()) + if retainArtifacts, _ := onlineDDL.StrategySetting().RetainArtifactsDuration(); retainArtifacts != 0 { + // Explicit retention indicated by `--retain-artifact` DDL strategy flag for this migration. Override! + retainArtifactsSeconds = int64((retainArtifacts).Seconds()) + } + _, allowConcurrentMigration := e.allowConcurrentMigration(onlineDDL) submitQuery, err := sqlparser.ParseAndBind(sqlInsertMigration, sqltypes.StringBindVariable(onlineDDL.UUID), diff --git a/go/vt/vttablet/onlineddl/schema.go b/go/vt/vttablet/onlineddl/schema.go index c4a6d96953b..45d5f46e4a6 100644 --- a/go/vt/vttablet/onlineddl/schema.go +++ b/go/vt/vttablet/onlineddl/schema.go @@ -339,7 +339,7 @@ const ( log_path FROM _vt.schema_migrations WHERE - migration_status IN ('complete', 'failed') + migration_status IN ('complete', 'cancelled', 'failed') AND cleanup_timestamp IS NULL AND completed_timestamp <= IF(retain_artifacts_seconds=0, NOW() - INTERVAL %a SECOND,