From 17a7e59db92bc99d1c36a8196299149ae5ecb4a9 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Mon, 4 Dec 2023 14:04:49 +0200 Subject: [PATCH] TableGC: speed up GC process via `RequestChecks()`. Utilized by Online DDL for artifact cleanup (#14431) Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../tabletmanager/tablegc/tablegc_test.go | 168 +++++++++++------- go/timer/suspendable_ticker.go | 20 ++- go/timer/suspendable_ticker_test.go | 144 +++++++++++++++ go/vt/vttablet/onlineddl/executor.go | 5 + go/vt/vttablet/tabletserver/gc/tablegc.go | 145 +++++++++------ .../vttablet/tabletserver/gc/tablegc_test.go | 8 +- go/vt/vttablet/tabletserver/tabletserver.go | 2 +- 7 files changed, 359 insertions(+), 133 deletions(-) create mode 100644 go/timer/suspendable_ticker_test.go diff --git a/go/test/endtoend/tabletmanager/tablegc/tablegc_test.go b/go/test/endtoend/tabletmanager/tablegc/tablegc_test.go index c6f7253c791..c21a4fe2d99 100644 --- a/go/test/endtoend/tabletmanager/tablegc/tablegc_test.go +++ b/go/test/endtoend/tabletmanager/tablegc/tablegc_test.go @@ -26,6 +26,7 @@ import ( "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/vt/schema" "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/vttablet/tabletserver/gc" "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/test/endtoend/onlineddl" @@ -128,13 +129,18 @@ func TestMain(m *testing.M) { os.Exit(exitCode) } -func checkTableRows(t *testing.T, tableName string, expect int64) { +func getTableRows(t *testing.T, tableName string) int64 { require.NotEmpty(t, tableName) query := `select count(*) as c from %a` parsed := sqlparser.BuildParsedQuery(query, tableName) rs, err := primaryTablet.VttabletProcess.QueryTablet(parsed.Query, keyspaceName, true) require.NoError(t, err) count := rs.Named().Row().AsInt64("c", 0) + return count +} + +func checkTableRows(t *testing.T, tableName string, expect int64) { + count := getTableRows(t, tableName) assert.Equal(t, expect, count) } @@ -176,19 +182,18 @@ func validateTableDoesNotExist(t *testing.T, tableExpr string) { defer cancel() ticker := time.NewTicker(time.Second) - var foundTableName string - var exists bool - var err error + defer ticker.Stop() + for { + exists, foundTableName, err := tableExists(tableExpr) + require.NoError(t, err) + if !exists { + return + } select { case <-ticker.C: - exists, foundTableName, err = tableExists(tableExpr) - require.NoError(t, err) - if !exists { - return - } case <-ctx.Done(): - assert.NoError(t, ctx.Err(), "validateTableDoesNotExist timed out, table %v still exists (%v)", tableExpr, foundTableName) + assert.Failf(t, "validateTableDoesNotExist timed out, table %v still exists (%v)", tableExpr, foundTableName) return } } @@ -199,59 +204,78 @@ func validateTableExists(t *testing.T, tableExpr string) { defer cancel() ticker := time.NewTicker(time.Second) - var exists bool - var err error + defer ticker.Stop() + for { + exists, _, err := tableExists(tableExpr) + require.NoError(t, err) + if exists { + return + } select { case <-ticker.C: - exists, _, err = tableExists(tableExpr) - require.NoError(t, err) - if exists { - return - } case <-ctx.Done(): - assert.NoError(t, ctx.Err(), "validateTableExists timed out, table %v still does not exist", tableExpr) + assert.Failf(t, "validateTableExists timed out, table %v still does not exist", tableExpr) return } } } func validateAnyState(t *testing.T, expectNumRows int64, states ...schema.TableGCState) { - for _, state := range states { - expectTableToExist := true - searchExpr := "" - switch state { - case schema.HoldTableGCState: - searchExpr = `\_vt\_HOLD\_%` - case schema.PurgeTableGCState: - searchExpr = `\_vt\_PURGE\_%` - case schema.EvacTableGCState: - searchExpr = `\_vt\_EVAC\_%` - case schema.DropTableGCState: - searchExpr = `\_vt\_DROP\_%` - case schema.TableDroppedGCState: - searchExpr = `\_vt\_%` - expectTableToExist = false - default: - t.Log("Unknown state") - t.Fail() - } - exists, tableName, err := tableExists(searchExpr) - require.NoError(t, err) - - if exists { - if expectNumRows >= 0 { - checkTableRows(t, tableName, expectNumRows) + t.Run(fmt.Sprintf("validateAnyState: expectNumRows=%v, states=%v", expectNumRows, states), func(t *testing.T) { + timeout := gc.NextChecksIntervals[len(gc.NextChecksIntervals)-1] + 5*time.Second + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + for { + // Attempt validation: + for _, state := range states { + expectTableToExist := true + searchExpr := "" + switch state { + case schema.HoldTableGCState: + searchExpr = `\_vt\_HOLD\_%` + case schema.PurgeTableGCState: + searchExpr = `\_vt\_PURGE\_%` + case schema.EvacTableGCState: + searchExpr = `\_vt\_EVAC\_%` + case schema.DropTableGCState: + searchExpr = `\_vt\_DROP\_%` + case schema.TableDroppedGCState: + searchExpr = `\_vt\_%` + expectTableToExist = false + default: + require.Failf(t, "unknown state", "%v", state) + } + exists, tableName, err := tableExists(searchExpr) + require.NoError(t, err) + + var foundRows int64 + if exists { + foundRows = getTableRows(t, tableName) + // Now that the table is validated, we can drop it (test cleanup) + dropTable(t, tableName) + } + t.Logf("=== exists: %v, tableName: %v, rows: %v", exists, tableName, foundRows) + if exists == expectTableToExist { + // expectNumRows < 0 means "don't care" + if expectNumRows < 0 || (expectNumRows == foundRows) { + // All conditions are met + return + } + } + } + select { + case <-ticker.C: + case <-ctx.Done(): + assert.Failf(t, "timeout in validateAnyState", " waiting for any of these states: %v, expecting rows: %v", states, expectNumRows) + return } - // Now that the table is validated, we can drop it - dropTable(t, tableName) - } - if exists == expectTableToExist { - // condition met - return } - } - assert.Failf(t, "could not match any of the states", "states=%v", states) + }) } // dropTable drops a table @@ -309,17 +333,22 @@ func TestHold(t *testing.T) { } func TestEvac(t *testing.T) { - populateTable(t) - query, tableName, err := schema.GenerateRenameStatement("t1", schema.EvacTableGCState, time.Now().UTC().Add(tableTransitionExpiration)) - assert.NoError(t, err) - - _, err = primaryTablet.VttabletProcess.QueryTablet(query, keyspaceName, true) - assert.NoError(t, err) - - validateTableDoesNotExist(t, "t1") - - time.Sleep(tableTransitionExpiration / 2) - { + var tableName string + t.Run("setting up EVAC table", func(t *testing.T) { + populateTable(t) + var query string + var err error + query, tableName, err = schema.GenerateRenameStatement("t1", schema.EvacTableGCState, time.Now().UTC().Add(tableTransitionExpiration)) + assert.NoError(t, err) + + _, err = primaryTablet.VttabletProcess.QueryTablet(query, keyspaceName, true) + assert.NoError(t, err) + + validateTableDoesNotExist(t, "t1") + }) + + t.Run("validating before expiration", func(t *testing.T) { + time.Sleep(tableTransitionExpiration / 2) // Table was created with +10s timestamp, so it should still exist if fastDropTable { // EVAC state is skipped in mysql 8.0.23 and beyond @@ -328,13 +357,14 @@ func TestEvac(t *testing.T) { validateTableExists(t, tableName) checkTableRows(t, tableName, 1024) } - } - - time.Sleep(tableTransitionExpiration) - // We're now both beyond table's timestamp as well as a tableGC interval - validateTableDoesNotExist(t, tableName) - // Table should be renamed as _vt_DROP_... and then dropped! - validateAnyState(t, 0, schema.DropTableGCState, schema.TableDroppedGCState) + }) + + t.Run("validating rows evacuated", func(t *testing.T) { + // We're now both beyond table's timestamp as well as a tableGC interval + validateTableDoesNotExist(t, tableName) + // Table should be renamed as _vt_DROP_... and then dropped! + validateAnyState(t, 0, schema.DropTableGCState, schema.TableDroppedGCState) + }) } func TestDrop(t *testing.T) { diff --git a/go/timer/suspendable_ticker.go b/go/timer/suspendable_ticker.go index 5257626b85f..f2694a5cab3 100644 --- a/go/timer/suspendable_ticker.go +++ b/go/timer/suspendable_ticker.go @@ -28,7 +28,7 @@ type SuspendableTicker struct { // C is user facing C chan time.Time - suspended int64 + suspended atomic.Bool } // NewSuspendableTicker creates a new suspendable ticker, indicating whether the ticker should start @@ -39,7 +39,7 @@ func NewSuspendableTicker(d time.Duration, initiallySuspended bool) *Suspendable C: make(chan time.Time), } if initiallySuspended { - s.suspended = 1 + s.suspended.Store(true) } go s.loop() return s @@ -48,12 +48,12 @@ func NewSuspendableTicker(d time.Duration, initiallySuspended bool) *Suspendable // Suspend stops sending time events on the channel C // time events sent during suspended time are lost func (s *SuspendableTicker) Suspend() { - atomic.StoreInt64(&s.suspended, 1) + s.suspended.Store(true) } // Resume re-enables time events on channel C func (s *SuspendableTicker) Resume() { - atomic.StoreInt64(&s.suspended, 0) + s.suspended.Store(false) } // Stop completely stops the timer, like time.Timer @@ -64,15 +64,23 @@ func (s *SuspendableTicker) Stop() { // TickNow generates a tick at this point in time. It may block // if nothing consumes the tick. func (s *SuspendableTicker) TickNow() { - if atomic.LoadInt64(&s.suspended) == 0 { + if !s.suspended.Load() { // not suspended s.C <- time.Now() } } +// TickAfter generates a tick after given duration has passed. +// It runs asynchronously and returns immediately. +func (s *SuspendableTicker) TickAfter(d time.Duration) { + time.AfterFunc(d, func() { + s.TickNow() + }) +} + func (s *SuspendableTicker) loop() { for t := range s.ticker.C { - if atomic.LoadInt64(&s.suspended) == 0 { + if !s.suspended.Load() { // not suspended s.C <- t } diff --git a/go/timer/suspendable_ticker_test.go b/go/timer/suspendable_ticker_test.go new file mode 100644 index 00000000000..64c468a0edc --- /dev/null +++ b/go/timer/suspendable_ticker_test.go @@ -0,0 +1,144 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package timer + +import ( + "context" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +const ( + fastTickerInterval = 10 * time.Millisecond +) + +func TestInitiallySuspended(t *testing.T) { + ctx := context.Background() + t.Run("true", func(t *testing.T) { + ctx, cancel := context.WithTimeout(ctx, time.Second) + defer cancel() + ticker := NewSuspendableTicker(fastTickerInterval, true) + defer ticker.Stop() + select { + case <-ticker.C: + assert.Fail(t, "unexpected tick. Was supposed to be suspended") + case <-ctx.Done(): + return + } + }) + t.Run("false", func(t *testing.T) { + ctx, cancel := context.WithTimeout(ctx, time.Second) + defer cancel() + ticker := NewSuspendableTicker(fastTickerInterval, false) + defer ticker.Stop() + select { + case <-ticker.C: + return + case <-ctx.Done(): + assert.Fail(t, "unexpected timeout. Expected tick") + } + }) +} + +func TestSuspendableTicker(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + ticker := NewSuspendableTicker(fastTickerInterval, false) + defer ticker.Stop() + + var ticks atomic.Int64 + go func() { + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + ticks.Add(1) + } + } + }() + t.Run("ticks running", func(t *testing.T) { + time.Sleep(time.Second) + after := ticks.Load() + assert.Greater(t, after, int64(10)) // should be about 100 + }) + t.Run("ticks suspended", func(t *testing.T) { + ticker.Suspend() + before := ticks.Load() + time.Sleep(time.Second) + after := ticks.Load() + assert.Less(t, after-before, int64(10)) + }) + t.Run("ticks resumed", func(t *testing.T) { + ticker.Resume() + before := ticks.Load() + time.Sleep(time.Second) + after := ticks.Load() + assert.Greater(t, after-before, int64(10)) + }) + t.Run("ticker stopped", func(t *testing.T) { + ticker.Stop() + before := ticks.Load() + time.Sleep(time.Second) + after := ticks.Load() + assert.Less(t, after-before, int64(10)) + }) +} + +func TestSuspendableTickerTick(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + ticker := NewSuspendableTicker(time.Hour, false) + defer ticker.Stop() + + var ticks atomic.Int64 + go func() { + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + ticks.Add(1) + } + } + }() + t.Run("nothing going on", func(t *testing.T) { + time.Sleep(time.Second) + after := ticks.Load() + assert.Zero(t, after) + }) + t.Run("tick now", func(t *testing.T) { + before := ticks.Load() + ticker.TickNow() + time.Sleep(time.Second) + after := ticks.Load() + assert.Equal(t, int64(1), after-before) + }) + t.Run("tick after", func(t *testing.T) { + before := ticks.Load() + ticker.TickAfter(1 * time.Second) + time.Sleep(time.Second) + after := ticks.Load() + assert.Zero(t, after-before) + time.Sleep(3 * time.Second) + after = ticks.Load() + assert.Equal(t, int64(1), after-before) + }) +} diff --git a/go/vt/vttablet/onlineddl/executor.go b/go/vt/vttablet/onlineddl/executor.go index dca3186f7b3..2c78069e962 100644 --- a/go/vt/vttablet/onlineddl/executor.go +++ b/go/vt/vttablet/onlineddl/executor.go @@ -176,6 +176,7 @@ type Executor struct { ts *topo.Server lagThrottler *throttle.Throttler toggleBufferTableFunc func(cancelCtx context.Context, tableName string, timeout time.Duration, bufferQueries bool) + requestGCChecksFunc func() tabletAlias *topodatapb.TabletAlias keyspace string @@ -251,6 +252,7 @@ func NewExecutor(env tabletenv.Env, tabletAlias *topodatapb.TabletAlias, ts *top lagThrottler *throttle.Throttler, tabletTypeFunc func() topodatapb.TabletType, toggleBufferTableFunc func(cancelCtx context.Context, tableName string, timeout time.Duration, bufferQueries bool), + requestGCChecksFunc func(), ) *Executor { // sanitize flags if maxConcurrentOnlineDDLs < 1 { @@ -268,6 +270,7 @@ func NewExecutor(env tabletenv.Env, tabletAlias *topodatapb.TabletAlias, ts *top ts: ts, lagThrottler: lagThrottler, toggleBufferTableFunc: toggleBufferTableFunc, + requestGCChecksFunc: requestGCChecksFunc, ticks: timer.NewTimer(migrationCheckInterval), // Gracefully return an error if any caller tries to execute // a query before the executor has been fully opened. @@ -3932,6 +3935,7 @@ func (e *Executor) gcArtifacts(ctx context.Context) error { if err == nil { // artifact was renamed away and is gone. There' no need to list it in `artifacts` column. e.clearSingleArtifact(ctx, uuid, artifactTable) + e.requestGCChecksFunc() } else { return vterrors.Wrapf(err, "in gcArtifacts() for %s", artifactTable) } @@ -4502,6 +4506,7 @@ func (e *Executor) CleanupMigration(ctx context.Context, uuid string) (result *s return nil, err } log.Infof("CleanupMigration: migration %s marked as ready to clean up", uuid) + defer e.triggerNextCheckInterval() return rs, nil } diff --git a/go/vt/vttablet/tabletserver/gc/tablegc.go b/go/vt/vttablet/tabletserver/gc/tablegc.go index 8658d7c3a3b..d8d12611e43 100644 --- a/go/vt/vttablet/tabletserver/gc/tablegc.go +++ b/go/vt/vttablet/tabletserver/gc/tablegc.go @@ -49,9 +49,12 @@ const ( ) var ( - checkInterval = 1 * time.Hour - purgeReentranceInterval = 1 * time.Minute - gcLifecycle = "hold,purge,evac,drop" + checkInterval = 1 * time.Hour + purgeReentranceInterval = 1 * time.Minute + nextPurgeReentry = 1 * time.Second + checkTablesReentryMinInterval = 10 * time.Second + NextChecksIntervals = []time.Duration{time.Second, checkTablesReentryMinInterval + 5*time.Second} + gcLifecycle = "hold,purge,evac,drop" ) func init() { @@ -69,11 +72,10 @@ func registerGCFlags(fs *pflag.FlagSet) { } var ( - sqlPurgeTable = `delete from %a limit 50` - sqlShowVtTables = `show full tables like '\_vt\_%'` - sqlDropTable = "drop table if exists `%a`" - sqlDropView = "drop view if exists `%a`" - purgeReentranceFlag int64 + sqlPurgeTable = `delete from %a limit 50` + sqlShowVtTables = `show full tables like '\_vt\_%'` + sqlDropTable = "drop table if exists `%a`" + sqlDropView = "drop view if exists `%a`" ) type gcTable struct { @@ -105,6 +107,10 @@ type TableGC struct { isOpen int64 cancelOperation context.CancelFunc + purgeReentranceFlag atomic.Int64 + readReentranceFlag atomic.Int64 + checkRequestChan chan bool + throttlerClient *throttle.Client env tabletenv.Env @@ -143,7 +149,8 @@ func NewTableGC(env tabletenv.Env, ts *topo.Server, lagThrottler *throttle.Throt IdleTimeoutSeconds: env.Config().OltpReadPool.IdleTimeoutSeconds, }), - purgingTables: map[string]bool{}, + purgingTables: map[string]bool{}, + checkRequestChan: make(chan bool), } return collector @@ -226,6 +233,16 @@ func (collector *TableGC) Close() { log.Infof("TableGC - finished execution of Close") } +// RequestChecks requests that the GC will do a table check right away, as well as in a few seconds. +// Calling this function is useful to modules that are performing operations that affect GC tables. Those modules +// _know_ that changes have been made, and now have a way to tell TableGC: "please take a look asap rather +// than in the next hour". +func (collector *TableGC) RequestChecks() { + for _, d := range NextChecksIntervals { + time.AfterFunc(d, func() { collector.checkRequestChan <- true }) + } +} + // operate is the main entry point for the table garbage collector operation and logic. func (collector *TableGC) operate(ctx context.Context) { @@ -254,55 +271,47 @@ func (collector *TableGC) operate(ctx context.Context) { case <-ctx.Done(): log.Info("TableGC: done operating") return + case <-collector.checkRequestChan: + // Got a request to check tables. Probably some event took place and we will + // find something new to do. + go tableCheckTicker.TickNow() case <-tableCheckTicker.C: - { - log.Info("TableGC: tableCheckTicker") - if gcTables, err := collector.readTables(ctx); err != nil { - log.Errorf("TableGC: error while reading tables: %+v", err) - } else { - _ = collector.checkTables(ctx, gcTables, dropTablesChan, transitionRequestsChan) - } + log.Info("TableGC: tableCheckTicker") + if err := collector.readAndCheckTables(ctx, dropTablesChan, transitionRequestsChan); err != nil { + log.Error(err) } case <-purgeReentranceTicker.C: - { - // relay the request - go func() { purgeRequestsChan <- true }() - } + // relay the request + go func() { purgeRequestsChan <- true }() case <-purgeRequestsChan: - { - go func() { - tableName, err := collector.purge(ctx) - if err != nil { - log.Errorf("TableGC: error purging table %s: %+v", tableName, err) - return - } - if tableName == "" { - // No table purged (or at least not to completion) - // Either because there _is_ nothing to purge, or because PURGE isn't a handled state - return - } - // The table has been purged! Let's move the table into the next phase: - _, _, uuid, _, _ := schema.AnalyzeGCTableName(tableName) - collector.submitTransitionRequest(ctx, transitionRequestsChan, schema.PurgeTableGCState, tableName, true, uuid) - collector.removePurgingTable(tableName) - // Chances are, there's more tables waiting to be purged. Let's speed things by - // requesting another purge, instead of waiting a full purgeReentranceInterval cycle - time.AfterFunc(time.Second, func() { purgeRequestsChan <- true }) - }() - } - case dropTable := <-dropTablesChan: - { - log.Infof("TableGC: found %v in dropTablesChan", dropTable.tableName) - if err := collector.dropTable(ctx, dropTable.tableName, dropTable.isBaseTable); err != nil { - log.Errorf("TableGC: error dropping table %s: %+v", dropTable.tableName, err) + go func() { + tableName, err := collector.purge(ctx) + if err != nil { + log.Errorf("TableGC: error purging table %s: %+v", tableName, err) + return + } + if tableName == "" { + // No table purged (or at least not to completion) + // Either because there _is_ nothing to purge, or because PURGE isn't a handled state + return } + // The table has been purged! Let's move the table into the next phase: + _, _, uuid, _, _ := schema.AnalyzeGCTableName(tableName) + collector.submitTransitionRequest(ctx, transitionRequestsChan, schema.PurgeTableGCState, tableName, true, uuid) + collector.removePurgingTable(tableName) + // Chances are, there's more tables waiting to be purged. Let's speed things by + // requesting another purge, instead of waiting a full purgeReentranceInterval cycle + purgeReentranceTicker.TickAfter(nextPurgeReentry) + }() + case dropTable := <-dropTablesChan: + log.Infof("TableGC: found %v in dropTablesChan", dropTable.tableName) + if err := collector.dropTable(ctx, dropTable.tableName, dropTable.isBaseTable); err != nil { + log.Errorf("TableGC: error dropping table %s: %+v", dropTable.tableName, err) } case transition := <-transitionRequestsChan: - { - log.Info("TableGC: transitionRequestsChan, transition=%v", transition) - if err := collector.transitionTable(ctx, transition); err != nil { - log.Errorf("TableGC: error transitioning table %s to %+v: %+v", transition.fromTableName, transition.toGCState, err) - } + log.Info("TableGC: transitionRequestsChan, transition=%v", transition) + if err := collector.transitionTable(ctx, transition); err != nil { + log.Errorf("TableGC: error transitioning table %s to %+v: %+v", transition.fromTableName, transition.toGCState, err) } } } @@ -378,6 +387,32 @@ func (collector *TableGC) shouldTransitionTable(tableName string) (shouldTransit return true, state, uuid, nil } +// readAndCheckTables is the routine check for which GC tables exist, and which of those need to transition +// into the next state. The function is non-reentrant, and poses a minimal duration between any two executions. +func (collector *TableGC) readAndCheckTables( + ctx context.Context, + dropTablesChan chan<- *gcTable, + transitionRequestsChan chan<- *transitionRequest, +) (err error) { + if !collector.readReentranceFlag.CompareAndSwap(0, 1) { + // An instance of this function is already running + return nil + } + defer time.AfterFunc(checkTablesReentryMinInterval, func() { + collector.readReentranceFlag.Store(0) + }) + + log.Info("TableGC: readAndCheckTables") + gcTables, err := collector.readTables(ctx) + if err != nil { + return fmt.Errorf("TableGC: error while reading tables: %+v", err) + } + if err := collector.checkTables(ctx, gcTables, dropTablesChan, transitionRequestsChan); err != nil { + return err + } + return nil +} + // readTables reads the list of _vt_% tables from the database func (collector *TableGC) readTables(ctx context.Context) (gcTables []*gcTable, err error) { log.Infof("TableGC: read tables") @@ -457,12 +492,11 @@ func (collector *TableGC) checkTables(ctx context.Context, gcTables []*gcTable, // This function is non-reentrant: there's only one instance of this function running at any given time. // A timer keeps calling this function, so if it bails out (e.g. on error) it will later resume work func (collector *TableGC) purge(ctx context.Context) (tableName string, err error) { - if atomic.CompareAndSwapInt64(&purgeReentranceFlag, 0, 1) { - defer atomic.StoreInt64(&purgeReentranceFlag, 0) - } else { + if !collector.purgeReentranceFlag.CompareAndSwap(0, 1) { // An instance of this function is already running return "", nil } + defer collector.purgeReentranceFlag.Store(0) tableName, found := collector.nextTableToPurge() if !found { @@ -598,6 +632,9 @@ func (collector *TableGC) transitionTable(ctx context.Context, transition *trans return err } log.Infof("TableGC: renamed table: %s", transition.fromTableName) + // Since the table has transitioned, there is a potential for more work on this table or on other tables, + // let's kick a check request. + collector.RequestChecks() return nil } diff --git a/go/vt/vttablet/tabletserver/gc/tablegc_test.go b/go/vt/vttablet/tabletserver/gc/tablegc_test.go index 446f6e6ff85..12ee5e2a28b 100644 --- a/go/vt/vttablet/tabletserver/gc/tablegc_test.go +++ b/go/vt/vttablet/tabletserver/gc/tablegc_test.go @@ -60,7 +60,8 @@ func TestNextTableToPurge(t *testing.T) { } for _, ts := range tt { collector := &TableGC{ - purgingTables: make(map[string]bool), + purgingTables: make(map[string]bool), + checkRequestChan: make(chan bool), } for _, table := range ts.tables { collector.purgingTables[table] = true @@ -256,8 +257,9 @@ func TestShouldTransitionTable(t *testing.T) { func TestCheckTables(t *testing.T) { collector := &TableGC{ - isOpen: 0, - purgingTables: map[string]bool{}, + isOpen: 0, + purgingTables: map[string]bool{}, + checkRequestChan: make(chan bool), } var err error collector.lifecycleStates, err = schema.ParseGCLifecycle("hold,purge,evac,drop") diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index 2e1831e0acd..3b0ac598ad0 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -187,8 +187,8 @@ func NewTabletServer(ctx context.Context, name string, config *tabletenv.TabletC tsv.te = NewTxEngine(tsv) tsv.messager = messager.NewEngine(tsv, tsv.se, tsv.vstreamer) - tsv.onlineDDLExecutor = onlineddl.NewExecutor(tsv, alias, topoServer, tsv.lagThrottler, tabletTypeFunc, tsv.onlineDDLExecutorToggleTableBuffer) tsv.tableGC = gc.NewTableGC(tsv, topoServer, tsv.lagThrottler) + tsv.onlineDDLExecutor = onlineddl.NewExecutor(tsv, alias, topoServer, tsv.lagThrottler, tabletTypeFunc, tsv.onlineDDLExecutorToggleTableBuffer, tsv.tableGC.RequestChecks) tsv.sm = &stateManager{ statelessql: tsv.statelessql,