Skip to content

Commit

Permalink
TableGC: speed up GC process via RequestChecks(). Utilized by Onlin…
Browse files Browse the repository at this point in the history
…e DDL for artifact cleanup (#14431)

Signed-off-by: Shlomi Noach <[email protected]>
  • Loading branch information
shlomi-noach authored Dec 4, 2023
1 parent 69e65bd commit 17a7e59
Show file tree
Hide file tree
Showing 7 changed files with 359 additions and 133 deletions.
168 changes: 99 additions & 69 deletions go/test/endtoend/tabletmanager/tablegc/tablegc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vttablet/tabletserver/gc"

"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/onlineddl"
Expand Down Expand Up @@ -128,13 +129,18 @@ func TestMain(m *testing.M) {
os.Exit(exitCode)
}

func checkTableRows(t *testing.T, tableName string, expect int64) {
func getTableRows(t *testing.T, tableName string) int64 {
require.NotEmpty(t, tableName)
query := `select count(*) as c from %a`
parsed := sqlparser.BuildParsedQuery(query, tableName)
rs, err := primaryTablet.VttabletProcess.QueryTablet(parsed.Query, keyspaceName, true)
require.NoError(t, err)
count := rs.Named().Row().AsInt64("c", 0)
return count
}

func checkTableRows(t *testing.T, tableName string, expect int64) {
count := getTableRows(t, tableName)
assert.Equal(t, expect, count)
}

Expand Down Expand Up @@ -176,19 +182,18 @@ func validateTableDoesNotExist(t *testing.T, tableExpr string) {
defer cancel()

ticker := time.NewTicker(time.Second)
var foundTableName string
var exists bool
var err error
defer ticker.Stop()

for {
exists, foundTableName, err := tableExists(tableExpr)
require.NoError(t, err)
if !exists {
return
}
select {
case <-ticker.C:
exists, foundTableName, err = tableExists(tableExpr)
require.NoError(t, err)
if !exists {
return
}
case <-ctx.Done():
assert.NoError(t, ctx.Err(), "validateTableDoesNotExist timed out, table %v still exists (%v)", tableExpr, foundTableName)
assert.Failf(t, "validateTableDoesNotExist timed out, table %v still exists (%v)", tableExpr, foundTableName)
return
}
}
Expand All @@ -199,59 +204,78 @@ func validateTableExists(t *testing.T, tableExpr string) {
defer cancel()

ticker := time.NewTicker(time.Second)
var exists bool
var err error
defer ticker.Stop()

for {
exists, _, err := tableExists(tableExpr)
require.NoError(t, err)
if exists {
return
}
select {
case <-ticker.C:
exists, _, err = tableExists(tableExpr)
require.NoError(t, err)
if exists {
return
}
case <-ctx.Done():
assert.NoError(t, ctx.Err(), "validateTableExists timed out, table %v still does not exist", tableExpr)
assert.Failf(t, "validateTableExists timed out, table %v still does not exist", tableExpr)
return
}
}
}

func validateAnyState(t *testing.T, expectNumRows int64, states ...schema.TableGCState) {
for _, state := range states {
expectTableToExist := true
searchExpr := ""
switch state {
case schema.HoldTableGCState:
searchExpr = `\_vt\_HOLD\_%`
case schema.PurgeTableGCState:
searchExpr = `\_vt\_PURGE\_%`
case schema.EvacTableGCState:
searchExpr = `\_vt\_EVAC\_%`
case schema.DropTableGCState:
searchExpr = `\_vt\_DROP\_%`
case schema.TableDroppedGCState:
searchExpr = `\_vt\_%`
expectTableToExist = false
default:
t.Log("Unknown state")
t.Fail()
}
exists, tableName, err := tableExists(searchExpr)
require.NoError(t, err)

if exists {
if expectNumRows >= 0 {
checkTableRows(t, tableName, expectNumRows)
t.Run(fmt.Sprintf("validateAnyState: expectNumRows=%v, states=%v", expectNumRows, states), func(t *testing.T) {
timeout := gc.NextChecksIntervals[len(gc.NextChecksIntervals)-1] + 5*time.Second
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

ticker := time.NewTicker(time.Second)
defer ticker.Stop()

for {
// Attempt validation:
for _, state := range states {
expectTableToExist := true
searchExpr := ""
switch state {
case schema.HoldTableGCState:
searchExpr = `\_vt\_HOLD\_%`
case schema.PurgeTableGCState:
searchExpr = `\_vt\_PURGE\_%`
case schema.EvacTableGCState:
searchExpr = `\_vt\_EVAC\_%`
case schema.DropTableGCState:
searchExpr = `\_vt\_DROP\_%`
case schema.TableDroppedGCState:
searchExpr = `\_vt\_%`
expectTableToExist = false
default:
require.Failf(t, "unknown state", "%v", state)
}
exists, tableName, err := tableExists(searchExpr)
require.NoError(t, err)

var foundRows int64
if exists {
foundRows = getTableRows(t, tableName)
// Now that the table is validated, we can drop it (test cleanup)
dropTable(t, tableName)
}
t.Logf("=== exists: %v, tableName: %v, rows: %v", exists, tableName, foundRows)
if exists == expectTableToExist {
// expectNumRows < 0 means "don't care"
if expectNumRows < 0 || (expectNumRows == foundRows) {
// All conditions are met
return
}
}
}
select {
case <-ticker.C:
case <-ctx.Done():
assert.Failf(t, "timeout in validateAnyState", " waiting for any of these states: %v, expecting rows: %v", states, expectNumRows)
return
}
// Now that the table is validated, we can drop it
dropTable(t, tableName)
}
if exists == expectTableToExist {
// condition met
return
}
}
assert.Failf(t, "could not match any of the states", "states=%v", states)
})
}

// dropTable drops a table
Expand Down Expand Up @@ -309,17 +333,22 @@ func TestHold(t *testing.T) {
}

func TestEvac(t *testing.T) {
populateTable(t)
query, tableName, err := schema.GenerateRenameStatement("t1", schema.EvacTableGCState, time.Now().UTC().Add(tableTransitionExpiration))
assert.NoError(t, err)

_, err = primaryTablet.VttabletProcess.QueryTablet(query, keyspaceName, true)
assert.NoError(t, err)

validateTableDoesNotExist(t, "t1")

time.Sleep(tableTransitionExpiration / 2)
{
var tableName string
t.Run("setting up EVAC table", func(t *testing.T) {
populateTable(t)
var query string
var err error
query, tableName, err = schema.GenerateRenameStatement("t1", schema.EvacTableGCState, time.Now().UTC().Add(tableTransitionExpiration))
assert.NoError(t, err)

_, err = primaryTablet.VttabletProcess.QueryTablet(query, keyspaceName, true)
assert.NoError(t, err)

validateTableDoesNotExist(t, "t1")
})

t.Run("validating before expiration", func(t *testing.T) {
time.Sleep(tableTransitionExpiration / 2)
// Table was created with +10s timestamp, so it should still exist
if fastDropTable {
// EVAC state is skipped in mysql 8.0.23 and beyond
Expand All @@ -328,13 +357,14 @@ func TestEvac(t *testing.T) {
validateTableExists(t, tableName)
checkTableRows(t, tableName, 1024)
}
}

time.Sleep(tableTransitionExpiration)
// We're now both beyond table's timestamp as well as a tableGC interval
validateTableDoesNotExist(t, tableName)
// Table should be renamed as _vt_DROP_... and then dropped!
validateAnyState(t, 0, schema.DropTableGCState, schema.TableDroppedGCState)
})

t.Run("validating rows evacuated", func(t *testing.T) {
// We're now both beyond table's timestamp as well as a tableGC interval
validateTableDoesNotExist(t, tableName)
// Table should be renamed as _vt_DROP_... and then dropped!
validateAnyState(t, 0, schema.DropTableGCState, schema.TableDroppedGCState)
})
}

func TestDrop(t *testing.T) {
Expand Down
20 changes: 14 additions & 6 deletions go/timer/suspendable_ticker.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type SuspendableTicker struct {
// C is user facing
C chan time.Time

suspended int64
suspended atomic.Bool
}

// NewSuspendableTicker creates a new suspendable ticker, indicating whether the ticker should start
Expand All @@ -39,7 +39,7 @@ func NewSuspendableTicker(d time.Duration, initiallySuspended bool) *Suspendable
C: make(chan time.Time),
}
if initiallySuspended {
s.suspended = 1
s.suspended.Store(true)
}
go s.loop()
return s
Expand All @@ -48,12 +48,12 @@ func NewSuspendableTicker(d time.Duration, initiallySuspended bool) *Suspendable
// Suspend stops sending time events on the channel C
// time events sent during suspended time are lost
func (s *SuspendableTicker) Suspend() {
atomic.StoreInt64(&s.suspended, 1)
s.suspended.Store(true)
}

// Resume re-enables time events on channel C
func (s *SuspendableTicker) Resume() {
atomic.StoreInt64(&s.suspended, 0)
s.suspended.Store(false)
}

// Stop completely stops the timer, like time.Timer
Expand All @@ -64,15 +64,23 @@ func (s *SuspendableTicker) Stop() {
// TickNow generates a tick at this point in time. It may block
// if nothing consumes the tick.
func (s *SuspendableTicker) TickNow() {
if atomic.LoadInt64(&s.suspended) == 0 {
if !s.suspended.Load() {
// not suspended
s.C <- time.Now()
}
}

// TickAfter generates a tick after given duration has passed.
// It runs asynchronously and returns immediately.
func (s *SuspendableTicker) TickAfter(d time.Duration) {
time.AfterFunc(d, func() {
s.TickNow()
})
}

func (s *SuspendableTicker) loop() {
for t := range s.ticker.C {
if atomic.LoadInt64(&s.suspended) == 0 {
if !s.suspended.Load() {
// not suspended
s.C <- t
}
Expand Down
Loading

0 comments on commit 17a7e59

Please sign in to comment.