Skip to content

Commit

Permalink
[release-16.0] OnlineDDL: cleanup cancelled migration artifacts; supp…
Browse files Browse the repository at this point in the history
…ort `--retain-artifacts=<duration>` DDL strategy flag (#14029) (#14036)

Signed-off-by: Shlomi Noach <[email protected]>
Co-authored-by: vitess-bot[bot] <108069721+vitess-bot[bot]@users.noreply.github.com>
Co-authored-by: Shlomi Noach <[email protected]>
  • Loading branch information
vitess-bot[bot] and shlomi-noach authored Sep 20, 2023
1 parent e3f01ed commit cef2617
Show file tree
Hide file tree
Showing 5 changed files with 174 additions and 2 deletions.
54 changes: 54 additions & 0 deletions go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -870,6 +870,60 @@ func testScheduler(t *testing.T) {
})
})

t.Run("Cleanup artifacts", func(t *testing.T) {
// Create a migration with a low --retain-artifacts value.
// We will cancel the migration and expect the artifact to be cleaned.
t.Run("start migration", func(t *testing.T) {
t1uuid = testOnlineDDLStatement(t, createParams(trivialAlterT1Statement, ddlStrategy+" --postpone-completion --retain-artifacts=1s", "vtctl", "", "", true)) // skip wait
onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusRunning)
})
var artifacts []string
t.Run("validate artifact exists", func(t *testing.T) {
rs := onlineddl.ReadMigrations(t, &vtParams, t1uuid)
require.NotNil(t, rs)
row := rs.Named().Row()
require.NotNil(t, row)

artifacts = textutil.SplitDelimitedList(row.AsString("artifacts", ""))
assert.NotEmpty(t, artifacts)
assert.Equal(t, 1, len(artifacts))
checkTable(t, artifacts[0], true)

retainArtifactsSeconds := row.AsInt64("retain_artifacts_seconds", 0)
assert.Equal(t, int64(1), retainArtifactsSeconds) // due to --retain-artifacts=1s
})
t.Run("cancel migration", func(t *testing.T) {
onlineddl.CheckCancelMigration(t, &vtParams, shards, t1uuid, true)
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusFailed, schema.OnlineDDLStatusCancelled)
fmt.Printf("# Migration status (for debug purposes): <%s>\n", status)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusCancelled)
})
t.Run("wait for cleanup", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), normalWaitTime)
defer cancel()

for {
rs := onlineddl.ReadMigrations(t, &vtParams, t1uuid)
require.NotNil(t, rs)
row := rs.Named().Row()
require.NotNil(t, row)
if !row["cleanup_timestamp"].IsNull() {
// This is what we've been waiting for
break
}
select {
case <-ctx.Done():
assert.Fail(t, "timeout waiting for cleanup")
return
case <-time.After(time.Second):
}
}
})
t.Run("validate artifact does not exist", func(t *testing.T) {
checkTable(t, artifacts[0], false)
})
})

// INSTANT DDL
instantDDLCapable, err := capableOf(mysql.InstantAddLastColumnFlavorCapability)
require.NoError(t, err)
Expand Down
39 changes: 38 additions & 1 deletion go/vt/schema/ddl_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@ package schema
import (
"fmt"
"regexp"
"strconv"
"time"

"github.com/google/shlex"
)

var (
strategyParserRegexp = regexp.MustCompile(`^([\S]+)\s+(.*)$`)
strategyParserRegexp = regexp.MustCompile(`^([\S]+)\s+(.*)$`)
retainArtifactsFlagRegexp = regexp.MustCompile(fmt.Sprintf(`^[-]{1,2}%s=(.*?)$`, retainArtifactsFlag))
)

const (
Expand All @@ -39,6 +42,7 @@ const (
allowConcurrentFlag = "allow-concurrent"
preferInstantDDL = "prefer-instant-ddl"
fastRangeRotationFlag = "fast-range-rotation"
retainArtifactsFlag = "retain-artifacts"
vreplicationTestSuite = "vreplication-test-suite"
allowForeignKeysFlag = "unsafe-allow-foreign-keys"
)
Expand Down Expand Up @@ -102,6 +106,9 @@ func ParseDDLStrategy(strategyVariable string) (*DDLStrategySetting, error) {
default:
return nil, fmt.Errorf("Unknown online DDL strategy: '%v'", strategy)
}
if _, err := setting.RetainArtifactsDuration(); err != nil {
return nil, err
}
return setting, nil
}

Expand Down Expand Up @@ -177,6 +184,33 @@ func (setting *DDLStrategySetting) IsFastRangeRotationFlag() bool {
return setting.hasFlag(fastRangeRotationFlag)
}

// isRetainArtifactsFlag returns true when given option denotes a `--retain-artifacts=[...]` flag
func isRetainArtifactsFlag(opt string) (string, bool) {
submatch := retainArtifactsFlagRegexp.FindStringSubmatch(opt)
if len(submatch) == 0 {
return "", false
}
return submatch[1], true
}

// RetainArtifactsDuration returns a the duration indicated by --retain-artifacts
func (setting *DDLStrategySetting) RetainArtifactsDuration() (d time.Duration, err error) {
// We do some ugly manual parsing of --retain-artifacts
opts, _ := shlex.Split(setting.Options)
for _, opt := range opts {
if val, isRetainArtifacts := isRetainArtifactsFlag(opt); isRetainArtifacts {
// value is possibly quoted
if s, err := strconv.Unquote(val); err == nil {
val = s
}
if val != "" {
d, err = time.ParseDuration(val)
}
}
}
return d, err
}

// IsVreplicationTestSuite checks if strategy options include --vreplicatoin-test-suite
func (setting *DDLStrategySetting) IsVreplicationTestSuite() bool {
return setting.hasFlag(vreplicationTestSuite)
Expand All @@ -192,6 +226,9 @@ func (setting *DDLStrategySetting) RuntimeOptions() []string {
opts, _ := shlex.Split(setting.Options)
validOpts := []string{}
for _, opt := range opts {
if _, ok := isRetainArtifactsFlag(opt); ok {
continue
}
switch {
case isFlag(opt, declarativeFlag):
case isFlag(opt, skipTopoFlag):
Expand Down
76 changes: 76 additions & 0 deletions go/vt/schema/ddl_strategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package schema
import (
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
)
Expand All @@ -38,6 +39,69 @@ func TestIsDirect(t *testing.T) {
assert.True(t, DDLStrategy("something").IsDirect())
}

func TestIsExpireArtifactsFlag(t *testing.T) {
tt := []struct {
s string
expect bool
val string
d time.Duration
}{
{
s: "something",
},
{
s: "-retain-artifacts",
},
{
s: "--retain-artifacts",
},
{
s: "--retain-artifacts=",
expect: true,
},
{
s: "--retain-artifacts=0",
expect: true,
val: "0",
d: 0,
},
{
s: "-retain-artifacts=0",
expect: true,
val: "0",
d: 0,
},
{
s: "--retain-artifacts=1m",
expect: true,
val: "1m",
d: time.Minute,
},
{
s: `--retain-artifacts="1m"`,
expect: true,
val: `"1m"`,
d: time.Minute,
},
}
for _, ts := range tt {
t.Run(ts.s, func(t *testing.T) {
setting, err := ParseDDLStrategy("online " + ts.s)
assert.NoError(t, err)

val, isRetainArtifacts := isRetainArtifactsFlag(ts.s)
assert.Equal(t, ts.expect, isRetainArtifacts)
assert.Equal(t, ts.val, val)

if ts.expect {
d, err := setting.RetainArtifactsDuration()
assert.NoError(t, err)
assert.Equal(t, ts.d, d)
}
})
}
}

func TestParseDDLStrategy(t *testing.T) {
tt := []struct {
strategyVariable string
Expand All @@ -52,6 +116,7 @@ func TestParseDDLStrategy(t *testing.T) {
fastOverRevertible bool
fastRangeRotation bool
allowForeignKeys bool
expireArtifacts time.Duration
runtimeOptions string
err error
}{
Expand Down Expand Up @@ -166,6 +231,13 @@ func TestParseDDLStrategy(t *testing.T) {
runtimeOptions: "",
allowForeignKeys: true,
},
{
strategyVariable: "vitess --retain-artifacts=4m",
strategy: DDLStrategyVitess,
options: "--retain-artifacts=4m",
runtimeOptions: "",
expireArtifacts: 4 * time.Minute,
},
}
for _, ts := range tt {
t.Run(ts.strategyVariable, func(t *testing.T) {
Expand All @@ -190,4 +262,8 @@ func TestParseDDLStrategy(t *testing.T) {
_, err := ParseDDLStrategy("other")
assert.Error(t, err)
}
{
_, err := ParseDDLStrategy("online --retain-artifacts=3")
assert.Error(t, err)
}
}
5 changes: 5 additions & 0 deletions go/vt/vttablet/onlineddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4723,6 +4723,11 @@ func (e *Executor) SubmitMigration(

revertedUUID, _ := onlineDDL.GetRevertUUID() // Empty value if the migration is not actually a REVERT. Safe to ignore error.
retainArtifactsSeconds := int64((retainOnlineDDLTables).Seconds())
if retainArtifacts, _ := onlineDDL.StrategySetting().RetainArtifactsDuration(); retainArtifacts != 0 {
// Explicit retention indicated by `--retain-artifact` DDL strategy flag for this migration. Override!
retainArtifactsSeconds = int64((retainArtifacts).Seconds())
}

_, allowConcurrentMigration := e.allowConcurrentMigration(onlineDDL)
submitQuery, err := sqlparser.ParseAndBind(sqlInsertMigration,
sqltypes.StringBindVariable(onlineDDL.UUID),
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/onlineddl/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ const (
log_path
FROM _vt.schema_migrations
WHERE
migration_status IN ('complete', 'failed')
migration_status IN ('complete', 'cancelled', 'failed')
AND cleanup_timestamp IS NULL
AND completed_timestamp <= IF(retain_artifacts_seconds=0,
NOW() - INTERVAL %a SECOND,
Expand Down

0 comments on commit cef2617

Please sign in to comment.