From 26a505ef7fba57aeee83aef18c22fbcf43592b7d Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Tue, 3 Dec 2024 07:08:16 +0100 Subject: [PATCH 01/10] WIP Signed-off-by: Tim Vaillancourt --- go/vt/vtorc/logic/tablet_discovery.go | 75 +++++++++++++++++++++++++++ 1 file changed, 75 insertions(+) diff --git a/go/vt/vtorc/logic/tablet_discovery.go b/go/vt/vtorc/logic/tablet_discovery.go index 1ee9e2cfefa..b8e92d29eb6 100644 --- a/go/vt/vtorc/logic/tablet_discovery.go +++ b/go/vt/vtorc/logic/tablet_discovery.go @@ -60,6 +60,81 @@ func RegisterFlags(fs *pflag.FlagSet) { fs.DurationVar(&shutdownWaitTime, "shutdown_wait_time", shutdownWaitTime, "Maximum time to wait for VTOrc to release all the locks that it is holding before shutting down on SIGTERM") } +// tabletFilter represents a filter for tablet records. +type tabletFilter interface { + IsIncluded(tablet *topodatapb.Tablet) bool +} + +// keyspaceShardTabletFilter filters tablet records +// by keyspace and optionally shard. +type keyspaceShardTabletFilter struct { + keyspace, shard string +} + +// NewKeyspaceTabletFilter creates a tablet record filter +// by keyspace. +func NewKeyspaceTabletFilter(keyspace string) tabletFilter { + return &keyspaceShardTabletFilter{keyspace} +} + +// NewKeyspaceShardTabletFilter creates a tablet record filter +// by keyspace and shard. +func NewKeyspaceShardTabletFilter(keyspace, shard string) tabletFilter { + return &keyspaceShardTabletFilter{keyspace, shard} +} + +// IsIncluded returns true if a tablet record passes the filter. +func (f keyspaceShardTabletFilter) IsIncluded(tablet *topodatapb.Tablet) bool { + if f.shard == "" { + return tablet.Keyspace == f.keyspace + } + return tablet.Keyspace == f.keyspace && tablet.Shard == f.shard +} + +// tabletFilters represents one or many tabletFilter objects. +type tabletFilters []tabletFilter + +// IsIncluded returns true if a tablet record passes the filter. +func (f tabletFilters) IsIncluded(tablet *topodatapb.Tablet) bool { + for _, filter := range f { + if !filter.IsIncluded(tablet) { + return false + } + } + return true +} + +// GetAllTablets gets all tablets from all cells using a goroutine per cell and a +// concurrency limit enforced by errgroup. +func GetAllTablets(ctx context.Context, cells []string) ([]*topo.TabletInfo, error) { + maxConcurrency := 4 + cellConcurrency := 1 + if maxConcurrency < topo.DefaultConcurrency { + cellConcurrency = topo.DefaultConcurrency / maxConcurrency + } + + var tabletsMu sync.Mutex + tablets := make([]*topo.TabletInfo, 0) + eg, ctx := errgroup.WaitContext(ctx) + eg.SetLimit(maxConcurrency) // limit to 4 concurrency + for _, cell := range cells { + eg.Go(func() error { + t, err := ts.GetTabletsByCell(ctx, cell, &topo.GetTabletsByCellOptions{ + Concurrency: cellConcurrency, + }) + if err != nil { + log.Errorf("Failed to load tablets from cell %s: %+v", cell, err) + return nil + } + tabletsMu.Lock() + defer tabletsMu.Unlock() + tablets = append(tablets, t) + return nil + }) + } + return tablets, eg.Wait() +} + // OpenTabletDiscovery opens the vitess topo if enables and returns a ticker // channel for polling. func OpenTabletDiscovery() <-chan time.Time { From 6e6f23a7f6e0a8a460de4328375d03a56e723e15 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Thu, 12 Dec 2024 23:05:06 +0100 Subject: [PATCH 02/10] WIP Signed-off-by: Tim Vaillancourt --- go/vt/vtorc/logic/tablet_discovery.go | 206 ++++++++++---------------- 1 file changed, 77 insertions(+), 129 deletions(-) diff --git a/go/vt/vtorc/logic/tablet_discovery.go b/go/vt/vtorc/logic/tablet_discovery.go index e36182b518b..e5aed1fb41e 100644 --- a/go/vt/vtorc/logic/tablet_discovery.go +++ b/go/vt/vtorc/logic/tablet_discovery.go @@ -27,19 +27,20 @@ import ( "time" "github.com/spf13/pflag" + "golang.org/x/sync/errgroup" "google.golang.org/protobuf/encoding/prototext" "google.golang.org/protobuf/proto" "vitess.io/vitess/go/vt/external/golib/sqlutils" "vitess.io/vitess/go/vt/log" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/servenv" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vtorc/config" "vitess.io/vitess/go/vt/vtorc/db" "vitess.io/vitess/go/vt/vtorc/inst" "vitess.io/vitess/go/vt/vttablet/tmclient" - - topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) var ( @@ -48,77 +49,64 @@ var ( clustersToWatch []string shutdownWaitTime = 30 * time.Second shardsLockCounter int32 + shardsToWatch map[string]bool + // ErrNoPrimaryTablet is a fixed error message. ErrNoPrimaryTablet = errors.New("no primary tablet found") ) -// RegisterFlags registers the flags required by VTOrc -func RegisterFlags(fs *pflag.FlagSet) { - fs.StringSliceVar(&clustersToWatch, "clusters_to_watch", clustersToWatch, "Comma-separated list of keyspaces or keyspace/shards that this instance will monitor and repair. Defaults to all clusters in the topology. Example: \"ks1,ks2/-80\"") - fs.DurationVar(&shutdownWaitTime, "shutdown_wait_time", shutdownWaitTime, "Maximum time to wait for VTOrc to release all the locks that it is holding before shutting down on SIGTERM") -} - -// tabletFilter represents a filter for tablet records. -type tabletFilter interface { - IsIncluded(tablet *topodatapb.Tablet) bool +func init() { + servenv.OnInit(parseClustersToWatch) } -// keyspaceShardTabletFilter filters tablet records -// by keyspace and optionally shard. -type keyspaceShardTabletFilter struct { - keyspace, shard string -} - -// NewKeyspaceTabletFilter creates a tablet record filter -// by keyspace. -func NewKeyspaceTabletFilter(keyspace string) tabletFilter { - return &keyspaceShardTabletFilter{keyspace} -} - -// NewKeyspaceShardTabletFilter creates a tablet record filter -// by keyspace and shard. -func NewKeyspaceShardTabletFilter(keyspace, shard string) tabletFilter { - return &keyspaceShardTabletFilter{keyspace, shard} -} - -// IsIncluded returns true if a tablet record passes the filter. -func (f keyspaceShardTabletFilter) IsIncluded(tablet *topodatapb.Tablet) bool { - if f.shard == "" { - return tablet.Keyspace == f.keyspace - } - return tablet.Keyspace == f.keyspace && tablet.Shard == f.shard -} - -// tabletFilters represents one or many tabletFilter objects. -type tabletFilters []tabletFilter - -// IsIncluded returns true if a tablet record passes the filter. -func (f tabletFilters) IsIncluded(tablet *topodatapb.Tablet) bool { - for _, filter := range f { - if !filter.IsIncluded(tablet) { - return false +// parseClustersToWatch parses the --clusters_to_watch flag-value +// into a map of keyspace/shards. This is called once at init +// time because the list never changes. +func parseClustersToWatch() { + for _, ks := range clustersToWatch { + if strings.Contains(ks, "/") { + // Validate keyspace/shard parses. + if _, _, err := topoproto.ParseKeyspaceShard(ks); err != nil { + log.Errorf("Could not parse keyspace/shard %q: %+v", ks, err) + continue + } + shardsToWatch[ks] = true + } else { + // Assume this is a keyspace and find all shards in keyspace. + ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout) + defer cancel() + shards, err := ts.GetShardNames(ctx, ks) + if err != nil { + // Log the errr and continue + log.Errorf("Error fetching shards for keyspace: %v", ks) + continue + } + if len(shards) == 0 { + log.Errorf("Topo has no shards for ks: %v", ks) + continue + } + for _, s := range shards { + shardsToWatch[topoproto.KeyspaceShardString(ks, s)] = true + } } } - return true } -// GetAllTablets gets all tablets from all cells using a goroutine per cell and a -// concurrency limit enforced by errgroup. -func GetAllTablets(ctx context.Context, cells []string) ([]*topo.TabletInfo, error) { - maxConcurrency := 4 - cellConcurrency := 1 - if maxConcurrency < topo.DefaultConcurrency { - cellConcurrency = topo.DefaultConcurrency / maxConcurrency - } +// RegisterFlags registers the flags required by VTOrc +func RegisterFlags(fs *pflag.FlagSet) { + fs.StringSliceVar(&clustersToWatch, "clusters_to_watch", clustersToWatch, "Comma-separated list of keyspaces or keyspace/shards that this instance will monitor and repair. Defaults to all clusters in the topology. Example: \"ks1,ks2/-80\"") + fs.DurationVar(&shutdownWaitTime, "shutdown_wait_time", shutdownWaitTime, "Maximum time to wait for VTOrc to release all the locks that it is holding before shutting down on SIGTERM") +} +// GetAllTablets gets all tablets from all cells using a goroutine per cell. +func GetAllTablets(ctx context.Context, cells []string) []*topo.TabletInfo { var tabletsMu sync.Mutex tablets := make([]*topo.TabletInfo, 0) - eg, ctx := errgroup.WaitContext(ctx) - eg.SetLimit(maxConcurrency) // limit to 4 concurrency + eg, ctx := errgroup.WithContext(ctx) for _, cell := range cells { eg.Go(func() error { t, err := ts.GetTabletsByCell(ctx, cell, &topo.GetTabletsByCellOptions{ - Concurrency: cellConcurrency, + Concurrency: 1, }) if err != nil { log.Errorf("Failed to load tablets from cell %s: %+v", cell, err) @@ -126,11 +114,12 @@ func GetAllTablets(ctx context.Context, cells []string) ([]*topo.TabletInfo, err } tabletsMu.Lock() defer tabletsMu.Unlock() - tablets = append(tablets, t) + tablets = append(tablets, t...) return nil }) } - return tablets, eg.Wait() + _ = eg.Wait() // always nil + return tablets } // OpenTabletDiscovery opens the vitess topo if enables and returns a ticker @@ -159,81 +148,40 @@ func refreshAllTablets(ctx context.Context) error { }, false /* forceRefresh */) } +// refreshTabletsUsing refreshes tablets using a provided loader. func refreshTabletsUsing(ctx context.Context, loader func(tabletAlias string), forceRefresh bool) error { - if len(clustersToWatch) == 0 { // all known clusters - ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) - defer cancel() - cells, err := ts.GetKnownCells(ctx) - if err != nil { - return err - } + // Get all cells. + ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) + defer cancel() + cells, err := ts.GetKnownCells(ctx) + if err != nil { + return err + } - refreshCtx, refreshCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) - defer refreshCancel() - var wg sync.WaitGroup - for _, cell := range cells { - wg.Add(1) - go func(cell string) { - defer wg.Done() - refreshTabletsInCell(refreshCtx, cell, loader, forceRefresh) - }(cell) - } - wg.Wait() - } else { - // Parse input and build list of keyspaces / shards - var keyspaceShards []*topo.KeyspaceShard - for _, ks := range clustersToWatch { - if strings.Contains(ks, "/") { - // This is a keyspace/shard specification - input := strings.Split(ks, "/") - keyspaceShards = append(keyspaceShards, &topo.KeyspaceShard{Keyspace: input[0], Shard: input[1]}) - } else { - // Assume this is a keyspace and find all shards in keyspace - ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) - defer cancel() - shards, err := ts.GetShardNames(ctx, ks) - if err != nil { - // Log the errr and continue - log.Errorf("Error fetching shards for keyspace: %v", ks) - continue - } - if len(shards) == 0 { - log.Errorf("Topo has no shards for ks: %v", ks) - continue - } - for _, s := range shards { - keyspaceShards = append(keyspaceShards, &topo.KeyspaceShard{Keyspace: ks, Shard: s}) - } - } - } - if len(keyspaceShards) == 0 { - log.Errorf("Found no keyspaceShards for input: %+v", clustersToWatch) - return nil - } - refreshCtx, refreshCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) - defer refreshCancel() - var wg sync.WaitGroup - for _, ks := range keyspaceShards { - wg.Add(1) - go func(ks *topo.KeyspaceShard) { - defer wg.Done() - refreshTabletsInKeyspaceShard(refreshCtx, ks.Keyspace, ks.Shard, loader, forceRefresh, nil) - }(ks) - } - wg.Wait() + // Get all tablets from all cells. + getTabletsCtx, getTabletsCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) + defer getTabletsCancel() + tablets := GetAllTablets(getTabletsCtx, cells) + if len(tablets) == 0 { + log.Error("Found no tablets") + return nil } - return nil -} -func refreshTabletsInCell(ctx context.Context, cell string, loader func(tabletAlias string), forceRefresh bool) { - tablets, err := ts.GetTabletsByCell(ctx, cell, &topo.GetTabletsByCellOptions{Concurrency: topo.DefaultConcurrency}) - if err != nil { - log.Errorf("Error fetching topo info for cell %v: %v", cell, err) - return + // Filter tablets that should not be watched using shardsToWatch map. + filteredTablets := make([]*topo.TabletInfo, 0, len(tablets)) + for _, t := range tablets { + shardKey := topoproto.KeyspaceShardString(t.Tablet.Keyspace, t.Tablet.Shard) + if len(shardsToWatch) > 0 && !shardsToWatch[shardKey] { + continue // filter + } + filteredTablets = append(filteredTablets, t) } - query := "select alias from vitess_tablet where cell = ?" - args := sqlutils.Args(cell) - refreshTablets(tablets, query, args, loader, forceRefresh, nil) + + // Refresh the filtered tablets. + query := "select alias from vitess_tablet" + refreshTablets(filteredTablets, query, nil, loader, forceRefresh, nil) + + return nil } // forceRefreshAllTabletsInShard is used to refresh all the tablet's information (both MySQL information and topo records) From 4babcaf2e3b49ff51b635dd95df5d1a9f0016e76 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Thu, 12 Dec 2024 23:31:30 +0100 Subject: [PATCH 03/10] add test Signed-off-by: Tim Vaillancourt --- go/vt/vtorc/logic/tablet_discovery.go | 9 ++-- go/vt/vtorc/logic/tablet_discovery_test.go | 50 ++++++++++++++++++++++ 2 files changed, 53 insertions(+), 6 deletions(-) diff --git a/go/vt/vtorc/logic/tablet_discovery.go b/go/vt/vtorc/logic/tablet_discovery.go index e5aed1fb41e..c56836f089d 100644 --- a/go/vt/vtorc/logic/tablet_discovery.go +++ b/go/vt/vtorc/logic/tablet_discovery.go @@ -34,7 +34,6 @@ import ( "vitess.io/vitess/go/vt/external/golib/sqlutils" "vitess.io/vitess/go/vt/log" topodatapb "vitess.io/vitess/go/vt/proto/topodata" - "vitess.io/vitess/go/vt/servenv" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vtorc/config" @@ -49,16 +48,12 @@ var ( clustersToWatch []string shutdownWaitTime = 30 * time.Second shardsLockCounter int32 - shardsToWatch map[string]bool + shardsToWatch = make(map[string]bool, 0) // ErrNoPrimaryTablet is a fixed error message. ErrNoPrimaryTablet = errors.New("no primary tablet found") ) -func init() { - servenv.OnInit(parseClustersToWatch) -} - // parseClustersToWatch parses the --clusters_to_watch flag-value // into a map of keyspace/shards. This is called once at init // time because the list never changes. @@ -131,6 +126,8 @@ func OpenTabletDiscovery() <-chan time.Time { if _, err := db.ExecVTOrc("DELETE FROM vitess_tablet"); err != nil { log.Error(err) } + // Parse --clusters_to_watch into a filter. + parseClustersToWatch() // We refresh all information from the topo once before we start the ticks to do // it on a timer. ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout) diff --git a/go/vt/vtorc/logic/tablet_discovery_test.go b/go/vt/vtorc/logic/tablet_discovery_test.go index f6a7af38382..cbb39bc8c71 100644 --- a/go/vt/vtorc/logic/tablet_discovery_test.go +++ b/go/vt/vtorc/logic/tablet_discovery_test.go @@ -19,6 +19,7 @@ package logic import ( "context" "fmt" + "strings" "sync/atomic" "testing" "time" @@ -101,6 +102,55 @@ var ( } ) +func TestParseClustersToWatch(t *testing.T) { + oldClustersToWatch := clustersToWatch + oldTs := ts + defer func() { + clustersToWatch = oldClustersToWatch + shardsToWatch = nil + ts = oldTs + }() + + // Create a memory topo-server and create the keyspace and shard records + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ts = memorytopo.NewServer(ctx, cell1) + _, err := ts.GetOrCreateShard(context.Background(), keyspace, shard) + require.NoError(t, err) + + testCases := []struct { + in []string + expected map[string]bool + }{ + { + in: []string{"test/"}, + expected: map[string]bool{"test/": true}, + }, + { + in: []string{"test/-"}, + expected: map[string]bool{"test/-": true}, + }, + { + in: []string{keyspace}, + expected: map[string]bool{ + topoproto.KeyspaceShardString(keyspace, shard): true, + }, + }, + } + + for _, testCase := range testCases { + t.Run(strings.Join(testCase.in, ","), func(t *testing.T) { + defer func() { + shardsToWatch = make(map[string]bool, 0) + }() + clustersToWatch = testCase.in + parseClustersToWatch() + require.Equal(t, testCase.expected, shardsToWatch) + }) + } +} + func TestRefreshTabletsInKeyspaceShard(t *testing.T) { // Store the old flags and restore on test completion oldTs := ts From eab79e97529ff1a375cb40413c56bcf26ea6569d Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Thu, 12 Dec 2024 23:36:00 +0100 Subject: [PATCH 04/10] more tests Signed-off-by: Tim Vaillancourt --- go/vt/vtorc/logic/tablet_discovery_test.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/go/vt/vtorc/logic/tablet_discovery_test.go b/go/vt/vtorc/logic/tablet_discovery_test.go index cbb39bc8c71..07bcad80cbb 100644 --- a/go/vt/vtorc/logic/tablet_discovery_test.go +++ b/go/vt/vtorc/logic/tablet_discovery_test.go @@ -123,6 +123,14 @@ func TestParseClustersToWatch(t *testing.T) { in []string expected map[string]bool }{ + { + in: []string{}, + expected: map[string]bool{}, + }, + { + in: []string{""}, + expected: map[string]bool{}, + }, { in: []string{"test/"}, expected: map[string]bool{"test/": true}, @@ -132,6 +140,7 @@ func TestParseClustersToWatch(t *testing.T) { expected: map[string]bool{"test/-": true}, }, { + // confirm shards fetch from topo in: []string{keyspace}, expected: map[string]bool{ topoproto.KeyspaceShardString(keyspace, shard): true, From ddf751488abc42ef15689232d5cdcd12020c0190 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Thu, 12 Dec 2024 23:50:51 +0100 Subject: [PATCH 05/10] add test Signed-off-by: Tim Vaillancourt --- go/vt/vtorc/logic/tablet_discovery_test.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/go/vt/vtorc/logic/tablet_discovery_test.go b/go/vt/vtorc/logic/tablet_discovery_test.go index 07bcad80cbb..ac7bc26bf86 100644 --- a/go/vt/vtorc/logic/tablet_discovery_test.go +++ b/go/vt/vtorc/logic/tablet_discovery_test.go @@ -139,6 +139,14 @@ func TestParseClustersToWatch(t *testing.T) { in: []string{"test/-"}, expected: map[string]bool{"test/-": true}, }, + { + in: []string{"test/-", "test2/-80", "test2/80-"}, + expected: map[string]bool{ + "test/-": true, + "test2/-80": true, + "test2/80-": true, + }, + }, { // confirm shards fetch from topo in: []string{keyspace}, From 2bec10c50da59d3bcc5608a03eac043d50cdd05c Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Fri, 13 Dec 2024 00:42:32 +0100 Subject: [PATCH 06/10] handle keyspace with trailing-slash, no shards Signed-off-by: Tim Vaillancourt --- go/vt/vtorc/logic/tablet_discovery.go | 7 ++++++- go/vt/vtorc/logic/tablet_discovery_test.go | 17 +++++++++++------ 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/go/vt/vtorc/logic/tablet_discovery.go b/go/vt/vtorc/logic/tablet_discovery.go index c56836f089d..08b3b9cc7a2 100644 --- a/go/vt/vtorc/logic/tablet_discovery.go +++ b/go/vt/vtorc/logic/tablet_discovery.go @@ -59,7 +59,7 @@ var ( // time because the list never changes. func parseClustersToWatch() { for _, ks := range clustersToWatch { - if strings.Contains(ks, "/") { + if strings.Contains(ks, "/") && !strings.HasSuffix(ks, "/") { // Validate keyspace/shard parses. if _, _, err := topoproto.ParseKeyspaceShard(ks); err != nil { log.Errorf("Could not parse keyspace/shard %q: %+v", ks, err) @@ -67,6 +67,11 @@ func parseClustersToWatch() { } shardsToWatch[ks] = true } else { + // Remove trailing slash, if exists + if strings.HasSuffix(ks, "/") { + ks = strings.TrimSuffix(ks, "/") + } + // Assume this is a keyspace and find all shards in keyspace. ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout) defer cancel() diff --git a/go/vt/vtorc/logic/tablet_discovery_test.go b/go/vt/vtorc/logic/tablet_discovery_test.go index ac7bc26bf86..a03f69906a4 100644 --- a/go/vt/vtorc/logic/tablet_discovery_test.go +++ b/go/vt/vtorc/logic/tablet_discovery_test.go @@ -132,12 +132,10 @@ func TestParseClustersToWatch(t *testing.T) { expected: map[string]bool{}, }, { - in: []string{"test/"}, - expected: map[string]bool{"test/": true}, - }, - { - in: []string{"test/-"}, - expected: map[string]bool{"test/-": true}, + in: []string{"test/-"}, + expected: map[string]bool{ + "test/-": true, + }, }, { in: []string{"test/-", "test2/-80", "test2/80-"}, @@ -154,6 +152,13 @@ func TestParseClustersToWatch(t *testing.T) { topoproto.KeyspaceShardString(keyspace, shard): true, }, }, + { + // confirm shards fetch from topo when keyspace has trailing-slash + in: []string{keyspace + "/"}, + expected: map[string]bool{ + topoproto.KeyspaceShardString(keyspace, shard): true, + }, + }, } for _, testCase := range testCases { From 71d84d5479a006cf4ea97d99a32be31d1642eb18 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Fri, 13 Dec 2024 00:52:02 +0100 Subject: [PATCH 07/10] simplify Signed-off-by: Tim Vaillancourt --- go/vt/vtorc/logic/tablet_discovery.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/go/vt/vtorc/logic/tablet_discovery.go b/go/vt/vtorc/logic/tablet_discovery.go index 08b3b9cc7a2..2a9103c0ae9 100644 --- a/go/vt/vtorc/logic/tablet_discovery.go +++ b/go/vt/vtorc/logic/tablet_discovery.go @@ -67,17 +67,15 @@ func parseClustersToWatch() { } shardsToWatch[ks] = true } else { - // Remove trailing slash, if exists - if strings.HasSuffix(ks, "/") { - ks = strings.TrimSuffix(ks, "/") - } + // Remove trailing slash, if exists. + ks = strings.TrimSuffix(ks, "/") // Assume this is a keyspace and find all shards in keyspace. ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout) defer cancel() shards, err := ts.GetShardNames(ctx, ks) if err != nil { - // Log the errr and continue + // Log the err and continue. log.Errorf("Error fetching shards for keyspace: %v", ks) continue } From a1c4167b20a23f4ac4f39ada2e93f96cf703c573 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Fri, 13 Dec 2024 04:31:43 +0100 Subject: [PATCH 08/10] tweaks Signed-off-by: Tim Vaillancourt --- go/vt/vtorc/logic/tablet_discovery.go | 54 +++++++++++++--------- go/vt/vtorc/logic/tablet_discovery_test.go | 4 +- go/vt/vtorc/logic/vtorc.go | 6 +++ 3 files changed, 40 insertions(+), 24 deletions(-) diff --git a/go/vt/vtorc/logic/tablet_discovery.go b/go/vt/vtorc/logic/tablet_discovery.go index 2a9103c0ae9..8096eeccff2 100644 --- a/go/vt/vtorc/logic/tablet_discovery.go +++ b/go/vt/vtorc/logic/tablet_discovery.go @@ -49,15 +49,27 @@ var ( shutdownWaitTime = 30 * time.Second shardsLockCounter int32 shardsToWatch = make(map[string]bool, 0) + shardsToWatchMu sync.Mutex // ErrNoPrimaryTablet is a fixed error message. ErrNoPrimaryTablet = errors.New("no primary tablet found") ) -// parseClustersToWatch parses the --clusters_to_watch flag-value -// into a map of keyspace/shards. This is called once at init -// time because the list never changes. -func parseClustersToWatch() { +// RegisterFlags registers the flags required by VTOrc +func RegisterFlags(fs *pflag.FlagSet) { + fs.StringSliceVar(&clustersToWatch, "clusters_to_watch", clustersToWatch, "Comma-separated list of keyspaces or keyspace/shards that this instance will monitor and repair. Defaults to all clusters in the topology. Example: \"ks1,ks2/-80\"") + fs.DurationVar(&shutdownWaitTime, "shutdown_wait_time", shutdownWaitTime, "Maximum time to wait for VTOrc to release all the locks that it is holding before shutting down on SIGTERM") +} + +// updateShardsToWatch parses the --clusters_to_watch flag-value +// into a map of keyspace/shards. +func updateShardsToWatch() { + if ts == nil { + return + } + shardsToWatchMu.Lock() + defer shardsToWatchMu.Unlock() + for _, ks := range clustersToWatch { if strings.Contains(ks, "/") && !strings.HasSuffix(ks, "/") { // Validate keyspace/shard parses. @@ -90,14 +102,8 @@ func parseClustersToWatch() { } } -// RegisterFlags registers the flags required by VTOrc -func RegisterFlags(fs *pflag.FlagSet) { - fs.StringSliceVar(&clustersToWatch, "clusters_to_watch", clustersToWatch, "Comma-separated list of keyspaces or keyspace/shards that this instance will monitor and repair. Defaults to all clusters in the topology. Example: \"ks1,ks2/-80\"") - fs.DurationVar(&shutdownWaitTime, "shutdown_wait_time", shutdownWaitTime, "Maximum time to wait for VTOrc to release all the locks that it is holding before shutting down on SIGTERM") -} - -// GetAllTablets gets all tablets from all cells using a goroutine per cell. -func GetAllTablets(ctx context.Context, cells []string) []*topo.TabletInfo { +// getAllTablets gets all tablets from all cells using a goroutine per cell. +func getAllTablets(ctx context.Context, cells []string) []*topo.TabletInfo { var tabletsMu sync.Mutex tablets := make([]*topo.TabletInfo, 0) eg, ctx := errgroup.WithContext(ctx) @@ -130,7 +136,7 @@ func OpenTabletDiscovery() <-chan time.Time { log.Error(err) } // Parse --clusters_to_watch into a filter. - parseClustersToWatch() + updateShardsToWatch() // We refresh all information from the topo once before we start the ticks to do // it on a timer. ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout) @@ -161,25 +167,29 @@ func refreshTabletsUsing(ctx context.Context, loader func(tabletAlias string), f // Get all tablets from all cells. getTabletsCtx, getTabletsCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) defer getTabletsCancel() - tablets := GetAllTablets(getTabletsCtx, cells) + tablets := getAllTablets(getTabletsCtx, cells) if len(tablets) == 0 { log.Error("Found no tablets") return nil } // Filter tablets that should not be watched using shardsToWatch map. - filteredTablets := make([]*topo.TabletInfo, 0, len(tablets)) - for _, t := range tablets { - shardKey := topoproto.KeyspaceShardString(t.Tablet.Keyspace, t.Tablet.Shard) - if len(shardsToWatch) > 0 && !shardsToWatch[shardKey] { - continue // filter + matchedTablets := make([]*topo.TabletInfo, 0, len(tablets)) + func() { + shardsToWatchMu.Lock() + defer shardsToWatchMu.Unlock() + for _, t := range tablets { + shardKey := topoproto.KeyspaceShardString(t.Tablet.Keyspace, t.Tablet.Shard) + if len(shardsToWatch) > 0 && !shardsToWatch[shardKey] { + continue // filter + } + matchedTablets = append(matchedTablets, t) } - filteredTablets = append(filteredTablets, t) - } + }() // Refresh the filtered tablets. query := "select alias from vitess_tablet" - refreshTablets(filteredTablets, query, nil, loader, forceRefresh, nil) + refreshTablets(matchedTablets, query, nil, loader, forceRefresh, nil) return nil } diff --git a/go/vt/vtorc/logic/tablet_discovery_test.go b/go/vt/vtorc/logic/tablet_discovery_test.go index a03f69906a4..15e981177f2 100644 --- a/go/vt/vtorc/logic/tablet_discovery_test.go +++ b/go/vt/vtorc/logic/tablet_discovery_test.go @@ -102,7 +102,7 @@ var ( } ) -func TestParseClustersToWatch(t *testing.T) { +func TestUpdateShardsToWatch(t *testing.T) { oldClustersToWatch := clustersToWatch oldTs := ts defer func() { @@ -167,7 +167,7 @@ func TestParseClustersToWatch(t *testing.T) { shardsToWatch = make(map[string]bool, 0) }() clustersToWatch = testCase.in - parseClustersToWatch() + updateShardsToWatch() require.Equal(t, testCase.expected, shardsToWatch) }) } diff --git a/go/vt/vtorc/logic/vtorc.go b/go/vt/vtorc/logic/vtorc.go index 39326525ce2..c6a4a4ee46d 100644 --- a/go/vt/vtorc/logic/vtorc.go +++ b/go/vt/vtorc/logic/vtorc.go @@ -326,6 +326,12 @@ func refreshAllInformation(ctx context.Context) error { return RefreshAllKeyspacesAndShards(ctx) }) + // Refresh shards to watch. + eg.Go(func() error { + updateShardsToWatch() + return nil + }) + // Refresh all tablets. eg.Go(func() error { return refreshAllTablets(ctx) From 0df08482515e55fda3db617ad15611da1b0e2a68 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Fri, 13 Dec 2024 22:14:11 +0100 Subject: [PATCH 09/10] reload keyspace shards if not specified Signed-off-by: Tim Vaillancourt --- go/vt/vtorc/logic/tablet_discovery.go | 82 ++++++++++++---------- go/vt/vtorc/logic/tablet_discovery_test.go | 27 ++++--- 2 files changed, 57 insertions(+), 52 deletions(-) diff --git a/go/vt/vtorc/logic/tablet_discovery.go b/go/vt/vtorc/logic/tablet_discovery.go index 8096eeccff2..d87612f2dc3 100644 --- a/go/vt/vtorc/logic/tablet_discovery.go +++ b/go/vt/vtorc/logic/tablet_discovery.go @@ -48,7 +48,7 @@ var ( clustersToWatch []string shutdownWaitTime = 30 * time.Second shardsLockCounter int32 - shardsToWatch = make(map[string]bool, 0) + shardsToWatch map[string][]string shardsToWatchMu sync.Mutex // ErrNoPrimaryTablet is a fixed error message. @@ -64,27 +64,26 @@ func RegisterFlags(fs *pflag.FlagSet) { // updateShardsToWatch parses the --clusters_to_watch flag-value // into a map of keyspace/shards. func updateShardsToWatch() { - if ts == nil { + if len(clustersToWatch) == 0 { return } - shardsToWatchMu.Lock() - defer shardsToWatchMu.Unlock() + newShardsToWatch := make(map[string][]string, 0) for _, ks := range clustersToWatch { if strings.Contains(ks, "/") && !strings.HasSuffix(ks, "/") { // Validate keyspace/shard parses. - if _, _, err := topoproto.ParseKeyspaceShard(ks); err != nil { + k, s, err := topoproto.ParseKeyspaceShard(ks) + if err != nil { log.Errorf("Could not parse keyspace/shard %q: %+v", ks, err) continue } - shardsToWatch[ks] = true + newShardsToWatch[k] = append(newShardsToWatch[k], s) } else { - // Remove trailing slash, if exists. - ks = strings.TrimSuffix(ks, "/") - - // Assume this is a keyspace and find all shards in keyspace. ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout) defer cancel() + // Assume this is a keyspace and find all shards in keyspace. + // Remove trailing slash if exists. + ks = strings.TrimSuffix(ks, "/") shards, err := ts.GetShardNames(ctx, ks) if err != nil { // Log the err and continue. @@ -95,11 +94,38 @@ func updateShardsToWatch() { log.Errorf("Topo has no shards for ks: %v", ks) continue } - for _, s := range shards { - shardsToWatch[topoproto.KeyspaceShardString(ks, s)] = true - } + newShardsToWatch[ks] = shards } } + if len(newShardsToWatch) == 0 { + log.Error("No keyspace/shards to watch") + return + } + + shardsToWatchMu.Lock() + defer shardsToWatchMu.Unlock() + shardsToWatch = newShardsToWatch +} + +// OpenTabletDiscovery opens the vitess topo if enables and returns a ticker +// channel for polling. +func OpenTabletDiscovery() <-chan time.Time { + ts = topo.Open() + tmc = inst.InitializeTMC() + // Clear existing cache and perform a new refresh. + if _, err := db.ExecVTOrc("DELETE FROM vitess_tablet"); err != nil { + log.Error(err) + } + // Parse --clusters_to_watch into a filter. + updateShardsToWatch() + // We refresh all information from the topo once before we start the ticks to do + // it on a timer. + ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout) + defer cancel() + if err := refreshAllInformation(ctx); err != nil { + log.Errorf("failed to initialize topo information: %+v", err) + } + return time.Tick(config.GetTopoInformationRefreshDuration()) //nolint SA1015: using time.Tick leaks the underlying ticker } // getAllTablets gets all tablets from all cells using a goroutine per cell. @@ -126,27 +152,6 @@ func getAllTablets(ctx context.Context, cells []string) []*topo.TabletInfo { return tablets } -// OpenTabletDiscovery opens the vitess topo if enables and returns a ticker -// channel for polling. -func OpenTabletDiscovery() <-chan time.Time { - ts = topo.Open() - tmc = inst.InitializeTMC() - // Clear existing cache and perform a new refresh. - if _, err := db.ExecVTOrc("DELETE FROM vitess_tablet"); err != nil { - log.Error(err) - } - // Parse --clusters_to_watch into a filter. - updateShardsToWatch() - // We refresh all information from the topo once before we start the ticks to do - // it on a timer. - ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout) - defer cancel() - if err := refreshAllInformation(ctx); err != nil { - log.Errorf("failed to initialize topo information: %+v", err) - } - return time.Tick(config.GetTopoInformationRefreshDuration()) //nolint SA1015: using time.Tick leaks the underlying ticker -} - // refreshAllTablets reloads the tablets from topo and discovers the ones which haven't been refreshed in a while func refreshAllTablets(ctx context.Context) error { return refreshTabletsUsing(ctx, func(tabletAlias string) { @@ -179,9 +184,11 @@ func refreshTabletsUsing(ctx context.Context, loader func(tabletAlias string), f shardsToWatchMu.Lock() defer shardsToWatchMu.Unlock() for _, t := range tablets { - shardKey := topoproto.KeyspaceShardString(t.Tablet.Keyspace, t.Tablet.Shard) - if len(shardsToWatch) > 0 && !shardsToWatch[shardKey] { - continue // filter + if len(shardsToWatch) > 0 { + _, ok := shardsToWatch[t.Tablet.Keyspace] + if !ok || !slices.Contains(shardsToWatch[t.Tablet.Keyspace], t.Tablet.Shard) { + continue // filter + } } matchedTablets = append(matchedTablets, t) } @@ -190,7 +197,6 @@ func refreshTabletsUsing(ctx context.Context, loader func(tabletAlias string), f // Refresh the filtered tablets. query := "select alias from vitess_tablet" refreshTablets(matchedTablets, query, nil, loader, forceRefresh, nil) - return nil } diff --git a/go/vt/vtorc/logic/tablet_discovery_test.go b/go/vt/vtorc/logic/tablet_discovery_test.go index 15e981177f2..fff2bf28eef 100644 --- a/go/vt/vtorc/logic/tablet_discovery_test.go +++ b/go/vt/vtorc/logic/tablet_discovery_test.go @@ -121,42 +121,41 @@ func TestUpdateShardsToWatch(t *testing.T) { testCases := []struct { in []string - expected map[string]bool + expected map[string][]string }{ { in: []string{}, - expected: map[string]bool{}, + expected: nil, }, { in: []string{""}, - expected: map[string]bool{}, + expected: map[string][]string{}, }, { in: []string{"test/-"}, - expected: map[string]bool{ - "test/-": true, + expected: map[string][]string{ + "test": []string{"-"}, }, }, { in: []string{"test/-", "test2/-80", "test2/80-"}, - expected: map[string]bool{ - "test/-": true, - "test2/-80": true, - "test2/80-": true, + expected: map[string][]string{ + "test": []string{"-"}, + "test2": []string{"-80", "80-"}, }, }, { // confirm shards fetch from topo in: []string{keyspace}, - expected: map[string]bool{ - topoproto.KeyspaceShardString(keyspace, shard): true, + expected: map[string][]string{ + keyspace: []string{shard}, }, }, { // confirm shards fetch from topo when keyspace has trailing-slash in: []string{keyspace + "/"}, - expected: map[string]bool{ - topoproto.KeyspaceShardString(keyspace, shard): true, + expected: map[string][]string{ + keyspace: []string{shard}, }, }, } @@ -164,7 +163,7 @@ func TestUpdateShardsToWatch(t *testing.T) { for _, testCase := range testCases { t.Run(strings.Join(testCase.in, ","), func(t *testing.T) { defer func() { - shardsToWatch = make(map[string]bool, 0) + shardsToWatch = make(map[string][]string, 0) }() clustersToWatch = testCase.in updateShardsToWatch() From 189b445793959d8990e6274b5f14fd561c59734f Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Fri, 13 Dec 2024 22:25:26 +0100 Subject: [PATCH 10/10] gofmt Signed-off-by: Tim Vaillancourt --- go/vt/vtorc/logic/tablet_discovery_test.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/go/vt/vtorc/logic/tablet_discovery_test.go b/go/vt/vtorc/logic/tablet_discovery_test.go index fff2bf28eef..54284e8a017 100644 --- a/go/vt/vtorc/logic/tablet_discovery_test.go +++ b/go/vt/vtorc/logic/tablet_discovery_test.go @@ -134,28 +134,28 @@ func TestUpdateShardsToWatch(t *testing.T) { { in: []string{"test/-"}, expected: map[string][]string{ - "test": []string{"-"}, + "test": {"-"}, }, }, { in: []string{"test/-", "test2/-80", "test2/80-"}, expected: map[string][]string{ - "test": []string{"-"}, - "test2": []string{"-80", "80-"}, + "test": {"-"}, + "test2": {"-80", "80-"}, }, }, { // confirm shards fetch from topo in: []string{keyspace}, expected: map[string][]string{ - keyspace: []string{shard}, + keyspace: {shard}, }, }, { // confirm shards fetch from topo when keyspace has trailing-slash in: []string{keyspace + "/"}, expected: map[string][]string{ - keyspace: []string{shard}, + keyspace: {shard}, }, }, }