diff --git a/changelog/22.0/22.0.0/summary.md b/changelog/22.0/22.0.0/summary.md
index ebc0c485fc1..cb8372cd60e 100644
--- a/changelog/22.0/22.0.0/summary.md
+++ b/changelog/22.0/22.0.0/summary.md
@@ -10,7 +10,7 @@
- **[VTOrc Config File Changes](#vtorc-config-file-changes)**
- **[Minor Changes](#minor-changes)**
- **[VTTablet Flags](#flags-vttablet)**
-
+ - **[Topology read concurrency behaviour changes](#topo-read-concurrency-changes)**
## Major Changes
@@ -67,3 +67,9 @@ To upgrade to the newer version of the configuration file, first switch to using
- `twopc_abandon_age` flag now supports values in the time.Duration format (e.g., 1s, 2m, 1h).
While the flag will continue to accept float values (interpreted as seconds) for backward compatibility,
**float inputs are deprecated** and will be removed in a future release.
+
+### `--topo_read_concurrency` behaviour changes
+
+The `--topo_read_concurrency` flag was added to all components that access the topology and the provided limit is now applied separately for each global or local cell _(default `32`)_.
+
+All topology read calls _(`Get`, `GetVersion`, `List` and `ListDir`)_ now respect this per-cell limit. Previous to this version a single limit was applied to all cell calls and it was not respected by many topology calls.
diff --git a/go/flags/endtoend/vtbackup.txt b/go/flags/endtoend/vtbackup.txt
index bf3a9eb9690..b4405960711 100644
--- a/go/flags/endtoend/vtbackup.txt
+++ b/go/flags/endtoend/vtbackup.txt
@@ -231,6 +231,7 @@ Flags:
--topo_global_root string the path of the global topology data in the global topology server
--topo_global_server_address string the address of the global topology server
--topo_implementation string the topology implementation to use
+ --topo_read_concurrency int Maximum concurrency of topo reads per global or local cell. (default 32)
--topo_zk_auth_file string auth to use when connecting to the zk topo server, file contents should be :, e.g., digest:user:pass
--topo_zk_base_timeout duration zk base timeout (see zk.Connect) (default 30s)
--topo_zk_max_concurrency int maximum number of pending requests to send to a Zookeeper server. (default 64)
diff --git a/go/flags/endtoend/vtcombo.txt b/go/flags/endtoend/vtcombo.txt
index 7348fcf1753..5a1e4000fea 100644
--- a/go/flags/endtoend/vtcombo.txt
+++ b/go/flags/endtoend/vtcombo.txt
@@ -374,7 +374,7 @@ Flags:
--topo_global_root string the path of the global topology data in the global topology server
--topo_global_server_address string the address of the global topology server
--topo_implementation string the topology implementation to use
- --topo_read_concurrency int Concurrency of topo reads. (default 32)
+ --topo_read_concurrency int Maximum concurrency of topo reads per global or local cell. (default 32)
--topo_zk_auth_file string auth to use when connecting to the zk topo server, file contents should be :, e.g., digest:user:pass
--topo_zk_base_timeout duration zk base timeout (see zk.Connect) (default 30s)
--topo_zk_max_concurrency int maximum number of pending requests to send to a Zookeeper server. (default 64)
diff --git a/go/flags/endtoend/vtctld.txt b/go/flags/endtoend/vtctld.txt
index 8b1aa6f4a92..be0f5114e79 100644
--- a/go/flags/endtoend/vtctld.txt
+++ b/go/flags/endtoend/vtctld.txt
@@ -164,7 +164,7 @@ Flags:
--topo_global_root string the path of the global topology data in the global topology server
--topo_global_server_address string the address of the global topology server
--topo_implementation string the topology implementation to use
- --topo_read_concurrency int Concurrency of topo reads. (default 32)
+ --topo_read_concurrency int Maximum concurrency of topo reads per global or local cell. (default 32)
--topo_zk_auth_file string auth to use when connecting to the zk topo server, file contents should be :, e.g., digest:user:pass
--topo_zk_base_timeout duration zk base timeout (see zk.Connect) (default 30s)
--topo_zk_max_concurrency int maximum number of pending requests to send to a Zookeeper server. (default 64)
diff --git a/go/flags/endtoend/vtgate.txt b/go/flags/endtoend/vtgate.txt
index 4cb4cd34148..fde17f89c49 100644
--- a/go/flags/endtoend/vtgate.txt
+++ b/go/flags/endtoend/vtgate.txt
@@ -223,7 +223,7 @@ Flags:
--topo_global_root string the path of the global topology data in the global topology server
--topo_global_server_address string the address of the global topology server
--topo_implementation string the topology implementation to use
- --topo_read_concurrency int Concurrency of topo reads. (default 32)
+ --topo_read_concurrency int Maximum concurrency of topo reads per global or local cell. (default 32)
--topo_zk_auth_file string auth to use when connecting to the zk topo server, file contents should be :, e.g., digest:user:pass
--topo_zk_base_timeout duration zk base timeout (see zk.Connect) (default 30s)
--topo_zk_max_concurrency int maximum number of pending requests to send to a Zookeeper server. (default 64)
diff --git a/go/flags/endtoend/vtorc.txt b/go/flags/endtoend/vtorc.txt
index efccb0afdfc..c2799a72dc1 100644
--- a/go/flags/endtoend/vtorc.txt
+++ b/go/flags/endtoend/vtorc.txt
@@ -98,7 +98,7 @@ Flags:
--topo_global_root string the path of the global topology data in the global topology server
--topo_global_server_address string the address of the global topology server
--topo_implementation string the topology implementation to use
- --topo_read_concurrency int Concurrency of topo reads. (default 32)
+ --topo_read_concurrency int Maximum concurrency of topo reads per global or local cell. (default 32)
--topo_zk_auth_file string auth to use when connecting to the zk topo server, file contents should be :, e.g., digest:user:pass
--topo_zk_base_timeout duration zk base timeout (see zk.Connect) (default 30s)
--topo_zk_max_concurrency int maximum number of pending requests to send to a Zookeeper server. (default 64)
diff --git a/go/flags/endtoend/vttablet.txt b/go/flags/endtoend/vttablet.txt
index e4c6fde66af..132c2d06344 100644
--- a/go/flags/endtoend/vttablet.txt
+++ b/go/flags/endtoend/vttablet.txt
@@ -376,6 +376,7 @@ Flags:
--topo_global_root string the path of the global topology data in the global topology server
--topo_global_server_address string the address of the global topology server
--topo_implementation string the topology implementation to use
+ --topo_read_concurrency int Maximum concurrency of topo reads per global or local cell. (default 32)
--topo_zk_auth_file string auth to use when connecting to the zk topo server, file contents should be :, e.g., digest:user:pass
--topo_zk_base_timeout duration zk base timeout (see zk.Connect) (default 30s)
--topo_zk_max_concurrency int maximum number of pending requests to send to a Zookeeper server. (default 64)
diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go
index cea972e35a7..2f270bd7518 100644
--- a/go/vt/discovery/healthcheck.go
+++ b/go/vt/discovery/healthcheck.go
@@ -373,7 +373,7 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
if c == "" {
continue
}
- topoWatchers = append(topoWatchers, NewTopologyWatcher(ctx, topoServer, hc, filters, c, refreshInterval, refreshKnownTablets, topo.DefaultConcurrency))
+ topoWatchers = append(topoWatchers, NewTopologyWatcher(ctx, topoServer, hc, filters, c, refreshInterval, refreshKnownTablets))
}
hc.topoWatchers = topoWatchers
diff --git a/go/vt/discovery/topology_watcher.go b/go/vt/discovery/topology_watcher.go
index 64346d524ad..d1e358e1aa5 100644
--- a/go/vt/discovery/topology_watcher.go
+++ b/go/vt/discovery/topology_watcher.go
@@ -26,16 +26,13 @@ import (
"sync"
"time"
- "vitess.io/vitess/go/vt/topo/topoproto"
-
- "vitess.io/vitess/go/vt/key"
-
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/trace"
-
+ "vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/log"
- "vitess.io/vitess/go/vt/proto/topodata"
+ topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/topo"
+ "vitess.io/vitess/go/vt/topo/topoproto"
)
const (
@@ -56,7 +53,7 @@ var (
// tabletInfo is used internally by the TopologyWatcher struct.
type tabletInfo struct {
alias string
- tablet *topodata.Tablet
+ tablet *topodatapb.Tablet
}
// TopologyWatcher polls the topology periodically for changes to
@@ -70,7 +67,6 @@ type TopologyWatcher struct {
cell string
refreshInterval time.Duration
refreshKnownTablets bool
- concurrency int
ctx context.Context
cancelFunc context.CancelFunc
// wg keeps track of all launched Go routines.
@@ -92,7 +88,7 @@ type TopologyWatcher struct {
// NewTopologyWatcher returns a TopologyWatcher that monitors all
// the tablets in a cell, and reloads them as needed.
-func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, f TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int) *TopologyWatcher {
+func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, f TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool) *TopologyWatcher {
tw := &TopologyWatcher{
topoServer: topoServer,
healthcheck: hc,
@@ -100,7 +96,6 @@ func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthC
cell: cell,
refreshInterval: refreshInterval,
refreshKnownTablets: refreshKnownTablets,
- concurrency: topoReadConcurrency,
tablets: make(map[string]*tabletInfo),
}
tw.firstLoadChan = make(chan struct{})
@@ -112,7 +107,7 @@ func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthC
}
func (tw *TopologyWatcher) getTablets() ([]*topo.TabletInfo, error) {
- return tw.topoServer.GetTabletsByCell(tw.ctx, tw.cell, &topo.GetTabletsByCellOptions{Concurrency: tw.concurrency})
+ return tw.topoServer.GetTabletsByCell(tw.ctx, tw.cell, nil)
}
// Start starts the topology watcher.
@@ -271,14 +266,14 @@ func (tw *TopologyWatcher) TopoChecksum() uint32 {
// to be applied as an additional filter on the list of tablets returned by its getTablets function.
type TabletFilter interface {
// IsIncluded returns whether tablet is included in this filter
- IsIncluded(tablet *topodata.Tablet) bool
+ IsIncluded(tablet *topodatapb.Tablet) bool
}
// TabletFilters contains filters for tablets.
type TabletFilters []TabletFilter
// IsIncluded returns true if a tablet passes all filters.
-func (tf TabletFilters) IsIncluded(tablet *topodata.Tablet) bool {
+func (tf TabletFilters) IsIncluded(tablet *topodatapb.Tablet) bool {
for _, filter := range tf {
if !filter.IsIncluded(tablet) {
return false
@@ -299,7 +294,7 @@ type FilterByShard struct {
type filterShard struct {
keyspace string
shard string
- keyRange *topodata.KeyRange // only set if shard is also a KeyRange
+ keyRange *topodatapb.KeyRange // only set if shard is also a KeyRange
}
// NewFilterByShard creates a new FilterByShard for use by a
@@ -344,7 +339,7 @@ func NewFilterByShard(filters []string) (*FilterByShard, error) {
}
// IsIncluded returns true iff the tablet's keyspace and shard match what we have.
-func (fbs *FilterByShard) IsIncluded(tablet *topodata.Tablet) bool {
+func (fbs *FilterByShard) IsIncluded(tablet *topodatapb.Tablet) bool {
canonical, kr, err := topo.ValidateShardName(tablet.Shard)
if err != nil {
log.Errorf("Error parsing shard name %v, will ignore tablet: %v", tablet.Shard, err)
@@ -384,7 +379,7 @@ func NewFilterByKeyspace(selectedKeyspaces []string) *FilterByKeyspace {
}
// IsIncluded returns true if the tablet's keyspace matches what we have.
-func (fbk *FilterByKeyspace) IsIncluded(tablet *topodata.Tablet) bool {
+func (fbk *FilterByKeyspace) IsIncluded(tablet *topodatapb.Tablet) bool {
_, exist := fbk.keyspaces[tablet.Keyspace]
return exist
}
@@ -403,7 +398,7 @@ func NewFilterByTabletTags(tabletTags map[string]string) *FilterByTabletTags {
}
// IsIncluded returns true if the tablet's tags match what we expect.
-func (fbtg *FilterByTabletTags) IsIncluded(tablet *topodata.Tablet) bool {
+func (fbtg *FilterByTabletTags) IsIncluded(tablet *topodatapb.Tablet) bool {
if fbtg.tags == nil {
return true
}
diff --git a/go/vt/discovery/topology_watcher_test.go b/go/vt/discovery/topology_watcher_test.go
index cef367c9b74..89a656c0982 100644
--- a/go/vt/discovery/topology_watcher_test.go
+++ b/go/vt/discovery/topology_watcher_test.go
@@ -67,7 +67,7 @@ func TestStartAndCloseTopoWatcher(t *testing.T) {
fhc := NewFakeHealthCheck(nil)
defer fhc.Close()
topologyWatcherOperations.ZeroAll()
- tw := NewTopologyWatcher(context.Background(), ts, fhc, nil, "aa", 100*time.Microsecond, true, 5)
+ tw := NewTopologyWatcher(context.Background(), ts, fhc, nil, "aa", 100*time.Microsecond, true)
done := make(chan bool, 3)
result := make(chan bool, 1)
@@ -127,7 +127,7 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) {
logger := logutil.NewMemoryLogger()
topologyWatcherOperations.ZeroAll()
counts := topologyWatcherOperations.Counts()
- tw := NewTopologyWatcher(context.Background(), ts, fhc, filter, "aa", 10*time.Minute, refreshKnownTablets, 5)
+ tw := NewTopologyWatcher(context.Background(), ts, fhc, filter, "aa", 10*time.Minute, refreshKnownTablets)
counts = checkOpCounts(t, counts, map[string]int64{})
checkChecksum(t, tw, 0)
@@ -421,7 +421,7 @@ func TestFilterByKeyspace(t *testing.T) {
f := TabletFilters{NewFilterByKeyspace(testKeyspacesToWatch)}
ts := memorytopo.NewServer(ctx, testCell)
defer ts.Close()
- tw := NewTopologyWatcher(context.Background(), ts, hc, f, testCell, 10*time.Minute, true, 5)
+ tw := NewTopologyWatcher(context.Background(), ts, hc, f, testCell, 10*time.Minute, true)
for _, test := range testFilterByKeyspace {
// Add a new tablet to the topology.
@@ -502,7 +502,7 @@ func TestFilterByKeyspaceSkipsIgnoredTablets(t *testing.T) {
topologyWatcherOperations.ZeroAll()
counts := topologyWatcherOperations.Counts()
f := TabletFilters{NewFilterByKeyspace(testKeyspacesToWatch)}
- tw := NewTopologyWatcher(context.Background(), ts, fhc, f, "aa", 10*time.Minute, false /*refreshKnownTablets*/, 5)
+ tw := NewTopologyWatcher(context.Background(), ts, fhc, f, "aa", 10*time.Minute, false /*refreshKnownTablets*/)
counts = checkOpCounts(t, counts, map[string]int64{})
checkChecksum(t, tw, 0)
@@ -639,7 +639,7 @@ func TestGetTabletErrorDoesNotRemoveFromHealthcheck(t *testing.T) {
defer fhc.Close()
topologyWatcherOperations.ZeroAll()
counts := topologyWatcherOperations.Counts()
- tw := NewTopologyWatcher(context.Background(), ts, fhc, nil, "aa", 10*time.Minute, true, 5)
+ tw := NewTopologyWatcher(context.Background(), ts, fhc, nil, "aa", 10*time.Minute, true)
defer tw.Stop()
// Force fallback to getting tablets individually.
diff --git a/go/vt/topo/keyspace.go b/go/vt/topo/keyspace.go
index 743d8fd6dc0..710bbee0653 100755
--- a/go/vt/topo/keyspace.go
+++ b/go/vt/topo/keyspace.go
@@ -22,44 +22,26 @@ import (
"sort"
"sync"
- "github.com/spf13/pflag"
"golang.org/x/sync/errgroup"
"vitess.io/vitess/go/constants/sidecar"
+ "vitess.io/vitess/go/event"
"vitess.io/vitess/go/sqlescape"
"vitess.io/vitess/go/vt/key"
- "vitess.io/vitess/go/vt/servenv"
- "vitess.io/vitess/go/vt/vterrors"
-
- "vitess.io/vitess/go/event"
"vitess.io/vitess/go/vt/log"
- "vitess.io/vitess/go/vt/topo/events"
-
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
+ "vitess.io/vitess/go/vt/topo/events"
+ "vitess.io/vitess/go/vt/vterrors"
)
// This file contains keyspace utility functions.
-// Default concurrency to use in order to avoid overhwelming the topo server.
-var DefaultConcurrency = 32
-
// shardKeySuffix is the suffix of a shard key.
// The full key looks like this:
// /vitess/global/keyspaces/customer/shards/80-/Shard
const shardKeySuffix = "Shard"
-func registerFlags(fs *pflag.FlagSet) {
- fs.IntVar(&DefaultConcurrency, "topo_read_concurrency", DefaultConcurrency, "Concurrency of topo reads.")
-}
-
-func init() {
- servenv.OnParseFor("vtcombo", registerFlags)
- servenv.OnParseFor("vtctld", registerFlags)
- servenv.OnParseFor("vtgate", registerFlags)
- servenv.OnParseFor("vtorc", registerFlags)
-}
-
// KeyspaceInfo is a meta struct that contains metadata to give the
// data more context and convenience. This is the main way we interact
// with a keyspace.
@@ -210,7 +192,7 @@ func (ts *Server) UpdateKeyspace(ctx context.Context, ki *KeyspaceInfo) error {
type FindAllShardsInKeyspaceOptions struct {
// Concurrency controls the maximum number of concurrent calls to GetShard.
// If <= 0, Concurrency is set to 1.
- Concurrency int
+ Concurrency int64
}
// FindAllShardsInKeyspace reads and returns all the existing shards in a
@@ -228,7 +210,7 @@ func (ts *Server) FindAllShardsInKeyspace(ctx context.Context, keyspace string,
opt = &FindAllShardsInKeyspaceOptions{}
}
if opt.Concurrency <= 0 {
- opt.Concurrency = DefaultConcurrency
+ opt.Concurrency = DefaultReadConcurrency
}
// Unescape the keyspace name as this can e.g. come from the VSchema where
diff --git a/go/vt/topo/server.go b/go/vt/topo/server.go
index 4a3c2e6bb27..865dbc4bed8 100644
--- a/go/vt/topo/server.go
+++ b/go/vt/topo/server.go
@@ -49,6 +49,7 @@ import (
"sync"
"github.com/spf13/pflag"
+ "golang.org/x/sync/semaphore"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/proto/topodata"
@@ -181,6 +182,9 @@ var (
FlagBinaries = []string{"vttablet", "vtctl", "vtctld", "vtcombo", "vtgate",
"vtorc", "vtbackup"}
+
+ // Default read concurrency to use in order to avoid overhwelming the topo server.
+ DefaultReadConcurrency int64 = 32
)
func init() {
@@ -193,6 +197,7 @@ func registerTopoFlags(fs *pflag.FlagSet) {
fs.StringVar(&topoImplementation, "topo_implementation", topoImplementation, "the topology implementation to use")
fs.StringVar(&topoGlobalServerAddress, "topo_global_server_address", topoGlobalServerAddress, "the address of the global topology server")
fs.StringVar(&topoGlobalRoot, "topo_global_root", topoGlobalRoot, "the path of the global topology data in the global topology server")
+ fs.Int64Var(&DefaultReadConcurrency, "topo_read_concurrency", DefaultReadConcurrency, "Maximum concurrency of topo reads per global or local cell.")
}
// RegisterFactory registers a Factory for an implementation for a Server.
@@ -208,11 +213,12 @@ func RegisterFactory(name string, factory Factory) {
// NewWithFactory creates a new Server based on the given Factory.
// It also opens the global cell connection.
func NewWithFactory(factory Factory, serverAddress, root string) (*Server, error) {
+ globalReadSem := semaphore.NewWeighted(DefaultReadConcurrency)
conn, err := factory.Create(GlobalCell, serverAddress, root)
if err != nil {
return nil, err
}
- conn = NewStatsConn(GlobalCell, conn)
+ conn = NewStatsConn(GlobalCell, conn, globalReadSem)
var connReadOnly Conn
if factory.HasGlobalReadOnlyCell(serverAddress, root) {
@@ -220,7 +226,7 @@ func NewWithFactory(factory Factory, serverAddress, root string) (*Server, error
if err != nil {
return nil, err
}
- connReadOnly = NewStatsConn(GlobalReadOnlyCell, connReadOnly)
+ connReadOnly = NewStatsConn(GlobalReadOnlyCell, connReadOnly, globalReadSem)
} else {
connReadOnly = conn
}
@@ -302,7 +308,8 @@ func (ts *Server) ConnForCell(ctx context.Context, cell string) (Conn, error) {
conn, err := ts.factory.Create(cell, ci.ServerAddress, ci.Root)
switch {
case err == nil:
- conn = NewStatsConn(cell, conn)
+ cellReadSem := semaphore.NewWeighted(DefaultReadConcurrency)
+ conn = NewStatsConn(cell, conn, cellReadSem)
ts.cellConns[cell] = cellConn{ci, conn}
return conn, nil
case IsErrType(err, NoNode):
diff --git a/go/vt/topo/shard.go b/go/vt/topo/shard.go
index 7df6dc64b88..a610cac885a 100644
--- a/go/vt/topo/shard.go
+++ b/go/vt/topo/shard.go
@@ -658,16 +658,8 @@ func (ts *Server) GetTabletsByShardCell(ctx context.Context, keyspace, shard str
}
}
- // Divide the concurrency limit by the number of cells. If there are more
- // cells than the limit, default to concurrency of 1.
- cellConcurrency := 1
- if len(cells) < DefaultConcurrency {
- cellConcurrency = DefaultConcurrency / len(cells)
- }
-
mu := sync.Mutex{}
eg, ctx := errgroup.WithContext(ctx)
- eg.SetLimit(DefaultConcurrency)
tablets := make([]*TabletInfo, 0, len(cells))
var kss *KeyspaceShard
@@ -678,7 +670,6 @@ func (ts *Server) GetTabletsByShardCell(ctx context.Context, keyspace, shard str
}
}
options := &GetTabletsByCellOptions{
- Concurrency: cellConcurrency,
KeyspaceShard: kss,
}
for _, cell := range cells {
diff --git a/go/vt/topo/stats_conn.go b/go/vt/topo/stats_conn.go
index 39bc8c9bc43..ded362b9139 100644
--- a/go/vt/topo/stats_conn.go
+++ b/go/vt/topo/stats_conn.go
@@ -20,6 +20,8 @@ import (
"context"
"time"
+ "golang.org/x/sync/semaphore"
+
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"
@@ -37,6 +39,11 @@ var (
"TopologyConnErrors",
"TopologyConnErrors errors per operation",
[]string{"Operation", "Cell"})
+
+ topoStatsConnReadWaitTimings = stats.NewMultiTimings(
+ "TopologyConnReadWaits",
+ "TopologyConnReadWait timings",
+ []string{"Operation", "Cell"})
)
const readOnlyErrorStrFormat = "cannot perform %s on %s as the topology server connection is read-only"
@@ -46,14 +53,16 @@ type StatsConn struct {
cell string
conn Conn
readOnly bool
+ readSem *semaphore.Weighted
}
// NewStatsConn returns a StatsConn
-func NewStatsConn(cell string, conn Conn) *StatsConn {
+func NewStatsConn(cell string, conn Conn, readSem *semaphore.Weighted) *StatsConn {
return &StatsConn{
cell: cell,
conn: conn,
readOnly: false,
+ readSem: readSem,
}
}
@@ -61,6 +70,12 @@ func NewStatsConn(cell string, conn Conn) *StatsConn {
func (st *StatsConn) ListDir(ctx context.Context, dirPath string, full bool) ([]DirEntry, error) {
startTime := time.Now()
statsKey := []string{"ListDir", st.cell}
+ if err := st.readSem.Acquire(ctx, 1); err != nil {
+ return nil, err
+ }
+ defer st.readSem.Release(1)
+ topoStatsConnReadWaitTimings.Record(statsKey, startTime)
+ startTime = time.Now() // reset
defer topoStatsConnTimings.Record(statsKey, startTime)
res, err := st.conn.ListDir(ctx, dirPath, full)
if err != nil {
@@ -106,6 +121,12 @@ func (st *StatsConn) Update(ctx context.Context, filePath string, contents []byt
func (st *StatsConn) Get(ctx context.Context, filePath string) ([]byte, Version, error) {
startTime := time.Now()
statsKey := []string{"Get", st.cell}
+ if err := st.readSem.Acquire(ctx, 1); err != nil {
+ return nil, nil, err
+ }
+ defer st.readSem.Release(1)
+ topoStatsConnReadWaitTimings.Record(statsKey, startTime)
+ startTime = time.Now() // reset
defer topoStatsConnTimings.Record(statsKey, startTime)
bytes, version, err := st.conn.Get(ctx, filePath)
if err != nil {
@@ -119,6 +140,12 @@ func (st *StatsConn) Get(ctx context.Context, filePath string) ([]byte, Version,
func (st *StatsConn) GetVersion(ctx context.Context, filePath string, version int64) ([]byte, error) {
startTime := time.Now()
statsKey := []string{"GetVersion", st.cell}
+ if err := st.readSem.Acquire(ctx, 1); err != nil {
+ return nil, err
+ }
+ defer st.readSem.Release(1)
+ topoStatsConnReadWaitTimings.Record(statsKey, startTime)
+ startTime = time.Now() // reset
defer topoStatsConnTimings.Record(statsKey, startTime)
bytes, err := st.conn.GetVersion(ctx, filePath, version)
if err != nil {
@@ -132,6 +159,12 @@ func (st *StatsConn) GetVersion(ctx context.Context, filePath string, version in
func (st *StatsConn) List(ctx context.Context, filePathPrefix string) ([]KVInfo, error) {
startTime := time.Now()
statsKey := []string{"List", st.cell}
+ if err := st.readSem.Acquire(ctx, 1); err != nil {
+ return nil, err
+ }
+ defer st.readSem.Release(1)
+ topoStatsConnReadWaitTimings.Record(statsKey, startTime)
+ startTime = time.Now() // reset
defer topoStatsConnTimings.Record(statsKey, startTime)
bytes, err := st.conn.List(ctx, filePathPrefix)
if err != nil {
diff --git a/go/vt/topo/stats_conn_test.go b/go/vt/topo/stats_conn_test.go
index 605487697cc..9bc1d51d9ed 100644
--- a/go/vt/topo/stats_conn_test.go
+++ b/go/vt/topo/stats_conn_test.go
@@ -23,11 +23,14 @@ import (
"time"
"github.com/stretchr/testify/require"
+ "golang.org/x/sync/semaphore"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"
)
+var testStatsConnReadSem = semaphore.NewWeighted(1)
+
// The fakeConn is a wrapper for a Conn that emits stats for every operation
type fakeConn struct {
v Version
@@ -181,18 +184,33 @@ func (st *fakeConn) IsReadOnly() bool {
return st.readOnly
}
+// createTestReadSemaphoreContention simulates semaphore contention on the test read semaphore.
+func createTestReadSemaphoreContention(ctx context.Context, duration time.Duration) {
+ if err := testStatsConnReadSem.Acquire(ctx, 1); err != nil {
+ panic(err)
+ }
+ defer testStatsConnReadSem.Release(1)
+ time.Sleep(duration)
+}
+
// TestStatsConnTopoListDir emits stats on ListDir
func TestStatsConnTopoListDir(t *testing.T) {
conn := &fakeConn{}
- statsConn := NewStatsConn("global", conn)
+ statsConn := NewStatsConn("global", conn, testStatsConnReadSem)
ctx := context.Background()
+ go createTestReadSemaphoreContention(ctx, 100*time.Millisecond)
statsConn.ListDir(ctx, "", true)
timingCounts := topoStatsConnTimings.Counts()["ListDir.global"]
if got, want := timingCounts, int64(1); got != want {
t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want)
}
+ waitTimingsCounts := topoStatsConnReadWaitTimings.Counts()["ListDir.global"]
+ if got := waitTimingsCounts; got != 1 {
+ t.Errorf("stats were not properly recorded: got = %d, want = 1", got)
+ }
+
// error is zero before getting an error
errorCount := topoStatsConnErrors.Counts()["ListDir.global"]
if got, want := errorCount, int64(0); got != want {
@@ -211,7 +229,7 @@ func TestStatsConnTopoListDir(t *testing.T) {
// TestStatsConnTopoCreate emits stats on Create
func TestStatsConnTopoCreate(t *testing.T) {
conn := &fakeConn{}
- statsConn := NewStatsConn("global", conn)
+ statsConn := NewStatsConn("global", conn, testStatsConnReadSem)
ctx := context.Background()
statsConn.Create(ctx, "", []byte{})
@@ -238,7 +256,7 @@ func TestStatsConnTopoCreate(t *testing.T) {
// TestStatsConnTopoUpdate emits stats on Update
func TestStatsConnTopoUpdate(t *testing.T) {
conn := &fakeConn{}
- statsConn := NewStatsConn("global", conn)
+ statsConn := NewStatsConn("global", conn, testStatsConnReadSem)
ctx := context.Background()
statsConn.Update(ctx, "", []byte{}, conn.v)
@@ -265,15 +283,21 @@ func TestStatsConnTopoUpdate(t *testing.T) {
// TestStatsConnTopoGet emits stats on Get
func TestStatsConnTopoGet(t *testing.T) {
conn := &fakeConn{}
- statsConn := NewStatsConn("global", conn)
+ statsConn := NewStatsConn("global", conn, testStatsConnReadSem)
ctx := context.Background()
+ go createTestReadSemaphoreContention(ctx, time.Millisecond*100)
statsConn.Get(ctx, "")
timingCounts := topoStatsConnTimings.Counts()["Get.global"]
if got, want := timingCounts, int64(1); got != want {
t.Errorf("stats were not properly recorded: got = %d, want = %d", got, want)
}
+ waitTimingsCounts := topoStatsConnReadWaitTimings.Counts()["Get.global"]
+ if got := waitTimingsCounts; got != 1 {
+ t.Errorf("stats were not properly recorded: got = %d, want = 1", got)
+ }
+
// error is zero before getting an error
errorCount := topoStatsConnErrors.Counts()["Get.global"]
if got, want := errorCount, int64(0); got != want {
@@ -292,7 +316,7 @@ func TestStatsConnTopoGet(t *testing.T) {
// TestStatsConnTopoDelete emits stats on Delete
func TestStatsConnTopoDelete(t *testing.T) {
conn := &fakeConn{}
- statsConn := NewStatsConn("global", conn)
+ statsConn := NewStatsConn("global", conn, testStatsConnReadSem)
ctx := context.Background()
statsConn.Delete(ctx, "", conn.v)
@@ -319,7 +343,7 @@ func TestStatsConnTopoDelete(t *testing.T) {
// TestStatsConnTopoLock emits stats on Lock
func TestStatsConnTopoLock(t *testing.T) {
conn := &fakeConn{}
- statsConn := NewStatsConn("global", conn)
+ statsConn := NewStatsConn("global", conn, testStatsConnReadSem)
ctx := context.Background()
statsConn.Lock(ctx, "", "")
@@ -348,7 +372,7 @@ func TestStatsConnTopoLock(t *testing.T) {
// TestStatsConnTopoWatch emits stats on Watch
func TestStatsConnTopoWatch(t *testing.T) {
conn := &fakeConn{}
- statsConn := NewStatsConn("global", conn)
+ statsConn := NewStatsConn("global", conn, testStatsConnReadSem)
ctx := context.Background()
statsConn.Watch(ctx, "")
@@ -362,7 +386,7 @@ func TestStatsConnTopoWatch(t *testing.T) {
// TestStatsConnTopoNewLeaderParticipation emits stats on NewLeaderParticipation
func TestStatsConnTopoNewLeaderParticipation(t *testing.T) {
conn := &fakeConn{}
- statsConn := NewStatsConn("global", conn)
+ statsConn := NewStatsConn("global", conn, testStatsConnReadSem)
_, _ = statsConn.NewLeaderParticipation("", "")
timingCounts := topoStatsConnTimings.Counts()["NewLeaderParticipation.global"]
@@ -388,7 +412,7 @@ func TestStatsConnTopoNewLeaderParticipation(t *testing.T) {
// TestStatsConnTopoClose emits stats on Close
func TestStatsConnTopoClose(t *testing.T) {
conn := &fakeConn{}
- statsConn := NewStatsConn("global", conn)
+ statsConn := NewStatsConn("global", conn, testStatsConnReadSem)
statsConn.Close()
timingCounts := topoStatsConnTimings.Counts()["Close.global"]
diff --git a/go/vt/topo/tablet.go b/go/vt/topo/tablet.go
index e52e753a36b..10ba787a3c1 100644
--- a/go/vt/topo/tablet.go
+++ b/go/vt/topo/tablet.go
@@ -24,21 +24,17 @@ import (
"sync"
"time"
- "golang.org/x/sync/semaphore"
-
- "vitess.io/vitess/go/protoutil"
- "vitess.io/vitess/go/vt/key"
-
"vitess.io/vitess/go/event"
"vitess.io/vitess/go/netutil"
+ "vitess.io/vitess/go/protoutil"
"vitess.io/vitess/go/trace"
+ "vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/log"
- "vitess.io/vitess/go/vt/proto/vtrpc"
- "vitess.io/vitess/go/vt/vterrors"
-
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
+ "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/topo/events"
"vitess.io/vitess/go/vt/topo/topoproto"
+ "vitess.io/vitess/go/vt/vterrors"
)
// IsTrivialTypeChange returns if this db type be trivially reassigned
@@ -234,8 +230,6 @@ func (ts *Server) GetTabletAliasesByCell(ctx context.Context, cell string) ([]*t
// GetTabletsByCellOptions controls the behavior of
// Server.FindAllShardsInKeyspace.
type GetTabletsByCellOptions struct {
- // Concurrency controls the maximum number of concurrent calls to GetTablet.
- Concurrency int
// KeyspaceShard is the optional keyspace/shard that tablets must match.
// An empty shard value will match all shards in the keyspace.
KeyspaceShard *KeyspaceShard
@@ -497,29 +491,11 @@ func (ts *Server) GetTabletMap(ctx context.Context, tabletAliases []*topodatapb.
returnErr error
)
- concurrency := DefaultConcurrency
- if opt != nil && opt.Concurrency > 0 {
- concurrency = opt.Concurrency
- }
- var sem = semaphore.NewWeighted(int64(concurrency))
-
for _, tabletAlias := range tabletAliases {
wg.Add(1)
go func(tabletAlias *topodatapb.TabletAlias) {
defer wg.Done()
- if err := sem.Acquire(ctx, 1); err != nil {
- // Only happens if context is cancelled.
- mu.Lock()
- defer mu.Unlock()
- log.Warningf("%v: %v", tabletAlias, err)
- // We only need to set this on the first error.
- if returnErr == nil {
- returnErr = NewError(PartialResult, tabletAlias.GetCell())
- }
- return
- }
tabletInfo, err := ts.GetTablet(ctx, tabletAlias)
- sem.Release(1)
mu.Lock()
defer mu.Unlock()
if err != nil {
diff --git a/go/vt/topo/tablet_test.go b/go/vt/topo/tablet_test.go
index e659a0d01b9..1c242e8778b 100644
--- a/go/vt/topo/tablet_test.go
+++ b/go/vt/topo/tablet_test.go
@@ -69,7 +69,6 @@ func TestServerGetTabletsByCell(t *testing.T) {
},
},
// Ensure this doesn't panic.
- opt: &topo.GetTabletsByCellOptions{Concurrency: -1},
},
{
name: "single",
@@ -151,7 +150,6 @@ func TestServerGetTabletsByCell(t *testing.T) {
Shard: shard,
},
},
- opt: &topo.GetTabletsByCellOptions{Concurrency: 8},
},
{
name: "multiple with list error",
@@ -210,7 +208,6 @@ func TestServerGetTabletsByCell(t *testing.T) {
Shard: shard,
},
},
- opt: &topo.GetTabletsByCellOptions{Concurrency: 8},
listError: topo.NewError(topo.ResourceExhausted, ""),
},
{
@@ -249,7 +246,6 @@ func TestServerGetTabletsByCell(t *testing.T) {
},
},
opt: &topo.GetTabletsByCellOptions{
- Concurrency: 1,
KeyspaceShard: &topo.KeyspaceShard{
Keyspace: keyspace,
Shard: shard,
@@ -317,7 +313,6 @@ func TestServerGetTabletsByCell(t *testing.T) {
},
},
opt: &topo.GetTabletsByCellOptions{
- Concurrency: 1,
KeyspaceShard: &topo.KeyspaceShard{
Keyspace: keyspace,
Shard: "",
diff --git a/go/vt/vtorc/logic/tablet_discovery.go b/go/vt/vtorc/logic/tablet_discovery.go
index 990192a23f7..7066229ab06 100644
--- a/go/vt/vtorc/logic/tablet_discovery.go
+++ b/go/vt/vtorc/logic/tablet_discovery.go
@@ -151,7 +151,7 @@ func refreshTabletsUsing(ctx context.Context, loader func(tabletAlias string), f
}
func refreshTabletsInCell(ctx context.Context, cell string, loader func(tabletAlias string), forceRefresh bool) {
- tablets, err := ts.GetTabletsByCell(ctx, cell, &topo.GetTabletsByCellOptions{Concurrency: topo.DefaultConcurrency})
+ tablets, err := ts.GetTabletsByCell(ctx, cell, nil)
if err != nil {
log.Errorf("Error fetching topo info for cell %v: %v", cell, err)
return