Skip to content

Commit

Permalink
Move concurrent connection dial limit out of healthcheck. (#16378)
Browse files Browse the repository at this point in the history
Signed-off-by: Arthur Schreiber <[email protected]>
  • Loading branch information
arthurschreiber authored Oct 7, 2024
1 parent 45462ca commit 056afc9
Show file tree
Hide file tree
Showing 13 changed files with 78 additions and 43 deletions.
1 change: 1 addition & 0 deletions go/flags/endtoend/mysqlctld.txt
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ Flags:
--db_tls_min_version string Configures the minimal TLS version negotiated when SSL is enabled. Defaults to TLSv1.2. Options: TLSv1.0, TLSv1.1, TLSv1.2, TLSv1.3.
--dba_idle_timeout duration Idle timeout for dba connections (default 1m0s)
--dba_pool_size int Size of the connection pool for dba connections (default 20)
--grpc-dial-concurrency-limit int Maximum concurrency of grpc dial operations. This should be less than the golang max thread limit of 10000. (default 1024)
--grpc_auth_mode string Which auth plugin implementation to use (eg: static)
--grpc_auth_mtls_allowed_substrings string List of substrings of at least one of the client certificate names (separated by colon).
--grpc_auth_static_client_creds string When using grpc_static_auth in the server, this file provides the credentials to use to authenticate with server.
Expand Down
1 change: 1 addition & 0 deletions go/flags/endtoend/vtbackup.txt
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ Flags:
--file_backup_storage_root string Root directory for the file backup storage.
--gcs_backup_storage_bucket string Google Cloud Storage bucket to use for backups.
--gcs_backup_storage_root string Root prefix for all backup-related object names.
--grpc-dial-concurrency-limit int Maximum concurrency of grpc dial operations. This should be less than the golang max thread limit of 10000. (default 1024)
--grpc_auth_static_client_creds string When using grpc_static_auth in the server, this file provides the credentials to use to authenticate with server.
--grpc_compression string Which protocol to use for compressing gRPC. Default: nothing. Supported: snappy
--grpc_enable_tracing Enable gRPC tracing.
Expand Down
1 change: 0 additions & 1 deletion go/flags/endtoend/vtcombo.txt
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,6 @@ Flags:
--grpc_server_keepalive_timeout duration After having pinged for keepalive check, the server waits for a duration of Timeout and if no activity is seen even after that the connection is closed. (default 10s)
--grpc_use_effective_callerid If set, and SSL is not used, will set the immediate caller id from the effective caller id's principal.
--health_check_interval duration Interval between health checks (default 20s)
--healthcheck-dial-concurrency int Maximum concurrency of new healthcheck connections. This should be less than the golang max thread limit of 10000. (default 1024)
--healthcheck_retry_delay duration health check retry delay (default 2ms)
--healthcheck_timeout duration the health check timeout period (default 1m0s)
--heartbeat_enable If true, vttablet records (if master) or checks (if replica) the current time of a replication heartbeat in the sidecar database's heartbeat table. The result is used to inform the serving state of the vttablet via healthchecks.
Expand Down
1 change: 1 addition & 0 deletions go/flags/endtoend/vtctlclient.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Usage of vtctlclient:
--config-type string Config file type (omit to infer config type from file extension).
--datadog-agent-host string host to send spans to. if empty, no tracing will be done
--datadog-agent-port string port to send spans to. if empty, no tracing will be done
--grpc-dial-concurrency-limit int Maximum concurrency of grpc dial operations. This should be less than the golang max thread limit of 10000. (default 1024)
--grpc_auth_static_client_creds string When using grpc_static_auth in the server, this file provides the credentials to use to authenticate with server.
--grpc_compression string Which protocol to use for compressing gRPC. Default: nothing. Supported: snappy
--grpc_enable_tracing Enable gRPC tracing.
Expand Down
2 changes: 1 addition & 1 deletion go/flags/endtoend/vtctld.txt
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ Flags:
--file_backup_storage_root string Root directory for the file backup storage.
--gcs_backup_storage_bucket string Google Cloud Storage bucket to use for backups.
--gcs_backup_storage_root string Root prefix for all backup-related object names.
--grpc-dial-concurrency-limit int Maximum concurrency of grpc dial operations. This should be less than the golang max thread limit of 10000. (default 1024)
--grpc_auth_mode string Which auth plugin implementation to use (eg: static)
--grpc_auth_mtls_allowed_substrings string List of substrings of at least one of the client certificate names (separated by colon).
--grpc_auth_static_client_creds string When using grpc_static_auth in the server, this file provides the credentials to use to authenticate with server.
Expand Down Expand Up @@ -85,7 +86,6 @@ Flags:
--grpc_server_keepalive_enforcement_policy_permit_without_stream gRPC server permit client keepalive pings even when there are no active streams (RPCs)
--grpc_server_keepalive_time duration After a duration of this time, if the server doesn't see any activity, it pings the client to see if the transport is still alive. (default 10s)
--grpc_server_keepalive_timeout duration After having pinged for keepalive check, the server waits for a duration of Timeout and if no activity is seen even after that the connection is closed. (default 10s)
--healthcheck-dial-concurrency int Maximum concurrency of new healthcheck connections. This should be less than the golang max thread limit of 10000. (default 1024)
-h, --help help for vtctld
--jaeger-agent-host string host and port to send spans to. if empty, no tracing will be done
--keep_logs duration keep logs for this long (using ctime) (zero to keep forever)
Expand Down
2 changes: 1 addition & 1 deletion go/flags/endtoend/vtgate.txt
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ Flags:
--foreign_key_mode string This is to provide how to handle foreign key constraint in create/alter table. Valid values are: allow, disallow (default "allow")
--gate_query_cache_memory int gate server query cache size in bytes, maximum amount of memory to be cached. vtgate analyzes every incoming query and generate a query plan, these plans are being cached in a lru cache. This config controls the capacity of the lru cache. (default 33554432)
--gateway_initial_tablet_timeout duration At startup, the tabletGateway will wait up to this duration to get at least one tablet per keyspace/shard/tablet type (default 30s)
--grpc-dial-concurrency-limit int Maximum concurrency of grpc dial operations. This should be less than the golang max thread limit of 10000. (default 1024)
--grpc-send-session-in-streaming If set, will send the session as last packet in streaming api to support transactions in streaming
--grpc-use-effective-groups If set, and SSL is not used, will set the immediate caller's security groups from the effective caller id's groups.
--grpc-use-static-authentication-callerid If set, will set the immediate caller id to the username authenticated by the static auth plugin.
Expand Down Expand Up @@ -99,7 +100,6 @@ Flags:
--grpc_server_keepalive_time duration After a duration of this time, if the server doesn't see any activity, it pings the client to see if the transport is still alive. (default 10s)
--grpc_server_keepalive_timeout duration After having pinged for keepalive check, the server waits for a duration of Timeout and if no activity is seen even after that the connection is closed. (default 10s)
--grpc_use_effective_callerid If set, and SSL is not used, will set the immediate caller id from the effective caller id's principal.
--healthcheck-dial-concurrency int Maximum concurrency of new healthcheck connections. This should be less than the golang max thread limit of 10000. (default 1024)
--healthcheck_retry_delay duration health check retry delay (default 2ms)
--healthcheck_timeout duration the health check timeout period (default 1m0s)
-h, --help help for vtgate
Expand Down
1 change: 1 addition & 0 deletions go/flags/endtoend/vtgateclienttest.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Flags:
--config-persistence-min-interval duration minimum interval between persisting dynamic config changes back to disk (if no change has occurred, nothing is done). (default 1s)
--config-type string Config file type (omit to infer config type from file extension).
--default_tablet_type topodatapb.TabletType The default tablet type to set for queries, when one is not explicitly selected. (default PRIMARY)
--grpc-dial-concurrency-limit int Maximum concurrency of grpc dial operations. This should be less than the golang max thread limit of 10000. (default 1024)
--grpc_auth_mode string Which auth plugin implementation to use (eg: static)
--grpc_auth_mtls_allowed_substrings string List of substrings of at least one of the client certificate names (separated by colon).
--grpc_auth_static_client_creds string When using grpc_static_auth in the server, this file provides the credentials to use to authenticate with server.
Expand Down
1 change: 1 addition & 0 deletions go/flags/endtoend/vtorc.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ Flags:
--config-type string Config file type (omit to infer config type from file extension).
--consul_auth_static_file string JSON File to read the topos/tokens from.
--emit_stats If set, emit stats to push-based monitoring and stats backends
--grpc-dial-concurrency-limit int Maximum concurrency of grpc dial operations. This should be less than the golang max thread limit of 10000. (default 1024)
--grpc_auth_static_client_creds string When using grpc_static_auth in the server, this file provides the credentials to use to authenticate with server.
--grpc_compression string Which protocol to use for compressing gRPC. Default: nothing. Supported: snappy
--grpc_enable_tracing Enable gRPC tracing.
Expand Down
1 change: 1 addition & 0 deletions go/flags/endtoend/vttablet.txt
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ Flags:
--gcs_backup_storage_bucket string Google Cloud Storage bucket to use for backups.
--gcs_backup_storage_root string Root prefix for all backup-related object names.
--gh-ost-path string override default gh-ost binary full path (default "gh-ost")
--grpc-dial-concurrency-limit int Maximum concurrency of grpc dial operations. This should be less than the golang max thread limit of 10000. (default 1024)
--grpc_auth_mode string Which auth plugin implementation to use (eg: static)
--grpc_auth_mtls_allowed_substrings string List of substrings of at least one of the client certificate names (separated by colon).
--grpc_auth_static_client_creds string When using grpc_static_auth in the server, this file provides the credentials to use to authenticate with server.
Expand Down
1 change: 1 addition & 0 deletions go/flags/endtoend/vttestserver.txt
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ Flags:
--external_topo_implementation string the topology implementation to use for vtcombo process
--extra_my_cnf string extra files to add to the config, separated by ':'
--foreign_key_mode string This is to provide how to handle foreign key constraint in create/alter table. Valid values are: allow, disallow (default "allow")
--grpc-dial-concurrency-limit int Maximum concurrency of grpc dial operations. This should be less than the golang max thread limit of 10000. (default 1024)
--grpc_auth_mode string Which auth plugin implementation to use (eg: static)
--grpc_auth_mtls_allowed_substrings string List of substrings of at least one of the client certificate names (separated by colon).
--grpc_auth_static_client_creds string When using grpc_static_auth in the server, this file provides the credentials to use to authenticate with server.
Expand Down
10 changes: 1 addition & 9 deletions go/vt/discovery/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ import (
"github.com/google/safehtml/template"
"github.com/google/safehtml/template/uncheckedconversions"
"github.com/spf13/pflag"
"golang.org/x/sync/semaphore"

"vitess.io/vitess/go/flagutil"
"vitess.io/vitess/go/netutil"
Expand Down Expand Up @@ -93,9 +92,6 @@ var (
// refreshKnownTablets tells us whether to process all tablets or only new tablets.
refreshKnownTablets = true

// healthCheckDialConcurrency tells us how many healthcheck connections can be opened to tablets at once. This should be less than the golang max thread limit of 10000.
healthCheckDialConcurrency int64 = 1024

// How much to sleep between each check.
waitAvailableTabletInterval = 100 * time.Millisecond

Expand Down Expand Up @@ -181,7 +177,6 @@ func registerWebUIFlags(fs *pflag.FlagSet) {
fs.StringVar(&TabletURLTemplateString, "tablet_url_template", "http://{{.GetTabletHostPort}}", "Format string describing debug tablet url formatting. See getTabletDebugURL() for how to customize this.")
fs.DurationVar(&refreshInterval, "tablet_refresh_interval", 1*time.Minute, "Tablet refresh interval.")
fs.BoolVar(&refreshKnownTablets, "tablet_refresh_known_tablets", true, "Whether to reload the tablet's address/port map from topo in case they change.")
fs.Int64Var(&healthCheckDialConcurrency, "healthcheck-dial-concurrency", 1024, "Maximum concurrency of new healthcheck connections. This should be less than the golang max thread limit of 10000.")
ParseTabletURLTemplateFromFlag()
}

Expand Down Expand Up @@ -301,8 +296,6 @@ type HealthCheckImpl struct {
subscribers map[chan *TabletHealth]struct{}
// loadTablets trigger is used to immediately load a new primary tablet when the current one has been demoted
loadTabletsTrigger chan struct{}
// healthCheckDialSem is used to limit how many healthcheck connections can be opened to tablets at once.
healthCheckDialSem *semaphore.Weighted
}

// NewVTGateHealthCheckFilters returns healthcheck filters for vtgate.
Expand Down Expand Up @@ -362,7 +355,6 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
cell: localCell,
retryDelay: retryDelay,
healthCheckTimeout: healthCheckTimeout,
healthCheckDialSem: semaphore.NewWeighted(healthCheckDialConcurrency),
healthByAlias: make(map[tabletAliasString]*tabletHealthCheck),
healthData: make(map[KeyspaceShardTabletType]map[tabletAliasString]*TabletHealth),
healthy: make(map[KeyspaceShardTabletType][]*TabletHealth),
Expand Down Expand Up @@ -876,7 +868,7 @@ func (hc *HealthCheckImpl) TabletConnection(ctx context.Context, alias *topodata
// TODO: test that throws this error
return nil, vterrors.Errorf(vtrpc.Code_NOT_FOUND, "tablet: %v is either down or nonexistent", alias)
}
return thc.Connection(ctx, hc), nil
return thc.Connection(ctx), nil
}

// getAliasByCell should only be called while holding hc.mu
Expand Down
37 changes: 6 additions & 31 deletions go/vt/discovery/tablet_health_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package discovery
import (
"context"
"fmt"
"net"
"strings"
"sync"
"sync/atomic"
Expand All @@ -34,16 +33,12 @@ import (
"vitess.io/vitess/go/vt/vttablet/queryservice"
"vitess.io/vitess/go/vt/vttablet/tabletconn"

"google.golang.org/grpc"
"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/proto/topodata"
)

// withDialerContextOnce ensures grpc.WithDialContext() is added once to the options.
var withDialerContextOnce sync.Once

// tabletHealthCheck maintains the health status of a tablet. A map of this
// structure is maintained in HealthCheck.
type tabletHealthCheck struct {
Expand Down Expand Up @@ -127,8 +122,8 @@ func (thc *tabletHealthCheck) setServingState(serving bool, reason string) {
}

// stream streams healthcheck responses to callback.
func (thc *tabletHealthCheck) stream(ctx context.Context, hc *HealthCheckImpl, callback func(*query.StreamHealthResponse) error) error {
conn := thc.Connection(ctx, hc)
func (thc *tabletHealthCheck) stream(ctx context.Context, callback func(*query.StreamHealthResponse) error) error {
conn := thc.Connection(ctx)
if conn == nil {
// This signals the caller to retry
return nil
Expand All @@ -141,34 +136,14 @@ func (thc *tabletHealthCheck) stream(ctx context.Context, hc *HealthCheckImpl, c
return err
}

func (thc *tabletHealthCheck) Connection(ctx context.Context, hc *HealthCheckImpl) queryservice.QueryService {
func (thc *tabletHealthCheck) Connection(ctx context.Context) queryservice.QueryService {
thc.connMu.Lock()
defer thc.connMu.Unlock()
return thc.connectionLocked(ctx, hc)
}

func healthCheckDialerFactory(hc *HealthCheckImpl) func(ctx context.Context, addr string) (net.Conn, error) {
return func(ctx context.Context, addr string) (net.Conn, error) {
// Limit the number of healthcheck connections opened in parallel to avoid high OS-thread
// usage due to blocking networking syscalls (eg: DNS lookups, TCP connection opens,
// etc). Without this limit it is possible for vtgates watching >10k tablets to hit
// the panic: 'runtime: program exceeds 10000-thread limit'.
if err := hc.healthCheckDialSem.Acquire(ctx, 1); err != nil {
return nil, err
}
defer hc.healthCheckDialSem.Release(1)
var dialer net.Dialer
return dialer.DialContext(ctx, "tcp", addr)
}
return thc.connectionLocked(ctx)
}

func (thc *tabletHealthCheck) connectionLocked(ctx context.Context, hc *HealthCheckImpl) queryservice.QueryService {
func (thc *tabletHealthCheck) connectionLocked(ctx context.Context) queryservice.QueryService {
if thc.Conn == nil {
withDialerContextOnce.Do(func() {
grpcclient.RegisterGRPCDialOptions(func(opts []grpc.DialOption) ([]grpc.DialOption, error) {
return append(opts, grpc.WithContextDialer(healthCheckDialerFactory(hc))), nil
})
})
conn, err := tabletconn.GetDialer()(ctx, thc.Tablet, grpcclient.FailFast(true))
if err != nil {
thc.LastError = err
Expand Down Expand Up @@ -297,7 +272,7 @@ func (thc *tabletHealthCheck) checkConn(hc *HealthCheckImpl) {
}()

// Read stream health responses.
err := thc.stream(streamCtx, hc, func(shr *query.StreamHealthResponse) error {
err := thc.stream(streamCtx, func(shr *query.StreamHealthResponse) error {
// We received a message. Reset the back-off.
retryDelay = hc.retryDelay
// Don't block on send to avoid deadlocks.
Expand Down
Loading

0 comments on commit 056afc9

Please sign in to comment.