From d93e3ba48eb76c01573fcff1a06fa2cdc013685f Mon Sep 17 00:00:00 2001 From: "vitess-bot[bot]" <108069721+vitess-bot[bot]@users.noreply.github.com> Date: Wed, 20 Sep 2023 06:51:56 +0300 Subject: [PATCH] Cherry-pick adac81020c7a493de3c72b1df3457d54c9478aee with conflicts --- .../tabletmanager/tablegc/tablegc_test.go | 17 +++ go/vt/vttablet/tabletserver/gc/tablegc.go | 101 ++++++++++++++++-- .../vttablet/tabletserver/gc/tablegc_test.go | 87 +++++++++++++++ 3 files changed, 194 insertions(+), 11 deletions(-) diff --git a/go/test/endtoend/tabletmanager/tablegc/tablegc_test.go b/go/test/endtoend/tabletmanager/tablegc/tablegc_test.go index bf8e5310eca..10447c6a746 100644 --- a/go/test/endtoend/tabletmanager/tablegc/tablegc_test.go +++ b/go/test/endtoend/tabletmanager/tablegc/tablegc_test.go @@ -18,6 +18,7 @@ package tablegc import ( "context" "flag" + "fmt" "os" "testing" "time" @@ -415,3 +416,19 @@ func TestPurgeView(t *testing.T) { validateTableExists(t, "t1") validateAnyState(t, 1024, schema.EvacTableGCState, schema.DropTableGCState, schema.TableDroppedGCState) } + +func TestDropView(t *testing.T) { + viewName, err := schema.GenerateGCTableName(schema.DropTableGCState, time.Now().Add(tableTransitionExpiration)) // shortly in the future + require.NoError(t, err) + createStatement := fmt.Sprintf("create or replace view %s as select 1", viewName) + + _, err = primaryTablet.VttabletProcess.QueryTablet(createStatement, keyspaceName, true) + require.NoError(t, err) + + // view should be there, because the timestamp hint is still in the near future. + validateTableExists(t, viewName) + + time.Sleep(tableTransitionExpiration / 2) + // But by now, after the above sleep, the view's timestamp hint is in the past, and we expect TableGC to have dropped the view. + validateTableDoesNotExist(t, viewName) +} diff --git a/go/vt/vttablet/tabletserver/gc/tablegc.go b/go/vt/vttablet/tabletserver/gc/tablegc.go index 535239a741d..97f00b8376a 100644 --- a/go/vt/vttablet/tabletserver/gc/tablegc.go +++ b/go/vt/vttablet/tabletserver/gc/tablegc.go @@ -73,9 +73,15 @@ var ( sqlPurgeTable = `delete from %a limit 50` sqlShowVtTables = `show full tables like '\_vt\_%'` sqlDropTable = "drop table if exists `%a`" + sqlDropView = "drop view if exists `%a`" purgeReentranceFlag int64 ) +type gcTable struct { + tableName string + isBaseTable bool +} + // transitionRequest encapsulates a request to transition a table to next state type transitionRequest struct { fromTableName string @@ -238,6 +244,14 @@ func (collector *TableGC) Close() { // Operate is the main entry point for the table garbage collector operation and logic. func (collector *TableGC) Operate(ctx context.Context) { +<<<<<<< HEAD +======= + dropTablesChan := make(chan *gcTable) + purgeRequestsChan := make(chan bool) + transitionRequestsChan := make(chan *transitionRequest) + + tickers := [](*timer.SuspendableTicker){} +>>>>>>> adac81020c (TableGC: support DROP VIEW (#14020)) addTicker := func(d time.Duration) *timer.SuspendableTicker { collector.initMutex.Lock() defer collector.initMutex.Unlock() @@ -282,7 +296,16 @@ func (collector *TableGC) Operate(ctx context.Context) { } case <-tableCheckTicker.C: { +<<<<<<< HEAD _ = collector.checkTables(ctx) +======= + log.Info("TableGC: tableCheckTicker") + if gcTables, err := collector.readTables(ctx); err != nil { + log.Errorf("TableGC: error while reading tables: %+v", err) + } else { + _ = collector.checkTables(ctx, gcTables, dropTablesChan, transitionRequestsChan) + } +>>>>>>> adac81020c (TableGC: support DROP VIEW (#14020)) } case <-purgeReentranceTicker.C: { @@ -297,10 +320,18 @@ func (collector *TableGC) Operate(ctx context.Context) { } }() } +<<<<<<< HEAD case dropTableName := <-collector.dropTablesChan: { if err := collector.dropTable(ctx, dropTableName); err != nil { log.Errorf("TableGC: error dropping table %s: %+v", dropTableName, err) +======= + case dropTable := <-dropTablesChan: + { + log.Infof("TableGC: found %v in dropTablesChan", dropTable.tableName) + if err := collector.dropTable(ctx, dropTable.tableName, dropTable.isBaseTable); err != nil { + log.Errorf("TableGC: error dropping table %s: %+v", dropTable.tableName, err) +>>>>>>> adac81020c (TableGC: support DROP VIEW (#14020)) } } case transition := <-collector.transitionRequestsChan: @@ -383,6 +414,7 @@ func (collector *TableGC) shouldTransitionTable(tableName string) (shouldTransit return true, state, uuid, nil } +<<<<<<< HEAD // checkTables looks for potential GC tables in the MySQL server+schema. // It lists _vt_% tables, then filters through those which are due-date. // It then applies the necessary operation per table. @@ -390,26 +422,41 @@ func (collector *TableGC) checkTables(ctx context.Context) error { if atomic.LoadInt64(&collector.isPrimary) == 0 { return nil } +======= +// readTables reads the list of _vt_% tables from the database +func (collector *TableGC) readTables(ctx context.Context) (gcTables []*gcTable, err error) { + log.Infof("TableGC: read tables") +>>>>>>> adac81020c (TableGC: support DROP VIEW (#14020)) conn, err := collector.pool.Get(ctx, nil) if err != nil { - return err + return nil, err } defer conn.Recycle() - log.Infof("TableGC: check tables") - res, err := conn.Exec(ctx, sqlShowVtTables, math.MaxInt32, true) if err != nil { - return err + return nil, err } for _, row := range res.Rows { tableName := row[0].ToString() tableType := row[1].ToString() isBaseTable := (tableType == "BASE TABLE") + gcTables = append(gcTables, &gcTable{tableName: tableName, isBaseTable: isBaseTable}) + } + return gcTables, nil +} + +// checkTables looks for potential GC tables in the MySQL server+schema. +// It lists _vt_% tables, then filters through those which are due-date. +// It then applies the necessary operation per table. +func (collector *TableGC) checkTables(ctx context.Context, gcTables []*gcTable, dropTablesChan chan<- *gcTable, transitionRequestsChan chan<- *transitionRequest) error { + log.Infof("TableGC: check tables") - shouldTransition, state, uuid, err := collector.shouldTransitionTable(tableName) + for i := range gcTables { + table := gcTables[i] // we capture as local variable as we will later use this in a goroutine + shouldTransition, state, uuid, err := collector.shouldTransitionTable(table.tableName) if err != nil { log.Errorf("TableGC: error while checking tables: %+v", err) @@ -420,28 +467,51 @@ func (collector *TableGC) checkTables(ctx context.Context) error { continue } - log.Infof("TableGC: will operate on table %s", tableName) + log.Infof("TableGC: will operate on table %s", table.tableName) if state == schema.HoldTableGCState { // Hold period expired. Moving to next state +<<<<<<< HEAD collector.submitTransitionRequest(ctx, state, tableName, isBaseTable, uuid) +======= + collector.submitTransitionRequest(ctx, transitionRequestsChan, state, table.tableName, table.isBaseTable, uuid) +>>>>>>> adac81020c (TableGC: support DROP VIEW (#14020)) } if state == schema.PurgeTableGCState { - if isBaseTable { + if table.isBaseTable { // This table needs to be purged. Make sure to enlist it (we may already have) +<<<<<<< HEAD collector.addPurgingTable(tableName) } else { // This is a view. We don't need to delete rows from views. Just transition into next phase collector.submitTransitionRequest(ctx, state, tableName, isBaseTable, uuid) +======= + if !collector.addPurgingTable(table.tableName) { + collector.submitTransitionRequest(ctx, transitionRequestsChan, state, table.tableName, table.isBaseTable, uuid) + } + } else { + // This is a view. We don't need to delete rows from views. Just transition into next phase + collector.submitTransitionRequest(ctx, transitionRequestsChan, state, table.tableName, table.isBaseTable, uuid) +>>>>>>> adac81020c (TableGC: support DROP VIEW (#14020)) } } if state == schema.EvacTableGCState { // This table was in EVAC state for the required period. It will transition into DROP state +<<<<<<< HEAD collector.submitTransitionRequest(ctx, state, tableName, isBaseTable, uuid) } if state == schema.DropTableGCState { // This table needs to be dropped immediately. go func() { collector.dropTablesChan <- tableName }() +======= + collector.submitTransitionRequest(ctx, transitionRequestsChan, state, table.tableName, table.isBaseTable, uuid) + } + if state == schema.DropTableGCState { + // This table needs to be dropped immediately. + go func() { + dropTablesChan <- table + }() +>>>>>>> adac81020c (TableGC: support DROP VIEW (#14020)) } } @@ -542,25 +612,34 @@ func (collector *TableGC) purge(ctx context.Context) (tableName string, err erro // dropTable runs an actual DROP TABLE statement, and marks the end of the line for the // tables' GC lifecycle. +<<<<<<< HEAD func (collector *TableGC) dropTable(ctx context.Context, tableName string) error { if atomic.LoadInt64(&collector.isPrimary) == 0 { return nil } conn, err := collector.pool.Get(ctx, nil) +======= +func (collector *TableGC) dropTable(ctx context.Context, tableName string, isBaseTable bool) error { + conn, err := dbconnpool.NewDBConnection(ctx, collector.env.Config().DB.DbaWithDB()) +>>>>>>> adac81020c (TableGC: support DROP VIEW (#14020)) if err != nil { return err } - defer conn.Recycle() + defer conn.Close() - parsed := sqlparser.BuildParsedQuery(sqlDropTable, tableName) + sqlDrop := sqlDropTable + if !isBaseTable { + sqlDrop = sqlDropView + } + parsed := sqlparser.BuildParsedQuery(sqlDrop, tableName) log.Infof("TableGC: dropping table: %s", tableName) - _, err = conn.Exec(ctx, parsed.Query, 1, true) + _, err = conn.ExecuteFetch(parsed.Query, 1, false) if err != nil { return err } - log.Infof("TableGC: dropped table: %s", tableName) + log.Infof("TableGC: dropped table: %s, isBaseTable: %v", tableName, isBaseTable) return nil } diff --git a/go/vt/vttablet/tabletserver/gc/tablegc_test.go b/go/vt/vttablet/tabletserver/gc/tablegc_test.go index de6ad401f95..cb3e1d8bb77 100644 --- a/go/vt/vttablet/tabletserver/gc/tablegc_test.go +++ b/go/vt/vttablet/tabletserver/gc/tablegc_test.go @@ -17,11 +17,14 @@ limitations under the License. package gc import ( + "context" "testing" + "time" "vitess.io/vitess/go/vt/schema" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestNextTableToPurge(t *testing.T) { @@ -219,3 +222,87 @@ func TestShouldTransitionTable(t *testing.T) { } } } + +func TestCheckTables(t *testing.T) { + collector := &TableGC{ + isOpen: 0, + purgingTables: map[string]bool{}, + } + var err error + collector.lifecycleStates, err = schema.ParseGCLifecycle("hold,purge,evac,drop") + require.NoError(t, err) + + gcTables := []*gcTable{ + { + tableName: "_vt_something_that_isnt_a_gc_table", + isBaseTable: true, + }, + { + tableName: "_vt_HOLD_11111111111111111111111111111111_20990920093324", // 2099 is in the far future + isBaseTable: true, + }, + { + tableName: "_vt_HOLD_22222222222222222222222222222222_20200920093324", + isBaseTable: true, + }, + { + tableName: "_vt_DROP_33333333333333333333333333333333_20200919083451", + isBaseTable: true, + }, + { + tableName: "_vt_DROP_44444444444444444444444444444444_20200919083451", + isBaseTable: false, + }, + } + // one gcTable above is irrelevant, does not have a GC table name + // one will not transition: its date is 2099 + expectResponses := len(gcTables) - 2 + expectDropTables := []*gcTable{ + { + tableName: "_vt_DROP_33333333333333333333333333333333_20200919083451", + isBaseTable: true, + }, + { + tableName: "_vt_DROP_44444444444444444444444444444444_20200919083451", + isBaseTable: false, + }, + } + expectTransitionRequests := []*transitionRequest{ + { + fromTableName: "_vt_HOLD_22222222222222222222222222222222_20200920093324", + isBaseTable: true, + toGCState: schema.PurgeTableGCState, + uuid: "22222222222222222222222222222222", + }, + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + dropTablesChan := make(chan *gcTable) + transitionRequestsChan := make(chan *transitionRequest) + + err = collector.checkTables(ctx, gcTables, dropTablesChan, transitionRequestsChan) + assert.NoError(t, err) + + var responses int + var foundDropTables []*gcTable + var foundTransitionRequests []*transitionRequest + for { + if responses == expectResponses { + break + } + select { + case <-ctx.Done(): + assert.FailNow(t, "timeout") + return + case gcTable := <-dropTablesChan: + responses++ + foundDropTables = append(foundDropTables, gcTable) + case request := <-transitionRequestsChan: + responses++ + foundTransitionRequests = append(foundTransitionRequests, request) + } + } + assert.ElementsMatch(t, expectDropTables, foundDropTables) + assert.ElementsMatch(t, expectTransitionRequests, foundTransitionRequests) +}