diff --git a/go/flags/endtoend/mysqlctld.txt b/go/flags/endtoend/mysqlctld.txt index d60a91ae65e..594329ce073 100644 --- a/go/flags/endtoend/mysqlctld.txt +++ b/go/flags/endtoend/mysqlctld.txt @@ -61,6 +61,7 @@ Flags: --db_tls_min_version string Configures the minimal TLS version negotiated when SSL is enabled. Defaults to TLSv1.2. Options: TLSv1.0, TLSv1.1, TLSv1.2, TLSv1.3. --dba_idle_timeout duration Idle timeout for dba connections (default 1m0s) --dba_pool_size int Size of the connection pool for dba connections (default 20) + --grpc-dial-concurrency-limit int Maximum concurrency of grpc dial operations. This should be less than the golang max thread limit of 10000. (default 1024) --grpc_auth_mode string Which auth plugin implementation to use (eg: static) --grpc_auth_mtls_allowed_substrings string List of substrings of at least one of the client certificate names (separated by colon). --grpc_auth_static_client_creds string When using grpc_static_auth in the server, this file provides the credentials to use to authenticate with server. diff --git a/go/flags/endtoend/vtbackup.txt b/go/flags/endtoend/vtbackup.txt index fba7c794fc0..bf3a9eb9690 100644 --- a/go/flags/endtoend/vtbackup.txt +++ b/go/flags/endtoend/vtbackup.txt @@ -128,6 +128,7 @@ Flags: --file_backup_storage_root string Root directory for the file backup storage. --gcs_backup_storage_bucket string Google Cloud Storage bucket to use for backups. --gcs_backup_storage_root string Root prefix for all backup-related object names. + --grpc-dial-concurrency-limit int Maximum concurrency of grpc dial operations. This should be less than the golang max thread limit of 10000. (default 1024) --grpc_auth_static_client_creds string When using grpc_static_auth in the server, this file provides the credentials to use to authenticate with server. --grpc_compression string Which protocol to use for compressing gRPC. Default: nothing. Supported: snappy --grpc_enable_tracing Enable gRPC tracing. diff --git a/go/flags/endtoend/vtcombo.txt b/go/flags/endtoend/vtcombo.txt index 4d1b92ce0d5..8c6eafe9c1c 100644 --- a/go/flags/endtoend/vtcombo.txt +++ b/go/flags/endtoend/vtcombo.txt @@ -167,7 +167,6 @@ Flags: --grpc_server_keepalive_timeout duration After having pinged for keepalive check, the server waits for a duration of Timeout and if no activity is seen even after that the connection is closed. (default 10s) --grpc_use_effective_callerid If set, and SSL is not used, will set the immediate caller id from the effective caller id's principal. --health_check_interval duration Interval between health checks (default 20s) - --healthcheck-dial-concurrency int Maximum concurrency of new healthcheck connections. This should be less than the golang max thread limit of 10000. (default 1024) --healthcheck_retry_delay duration health check retry delay (default 2ms) --healthcheck_timeout duration the health check timeout period (default 1m0s) --heartbeat_enable If true, vttablet records (if master) or checks (if replica) the current time of a replication heartbeat in the sidecar database's heartbeat table. The result is used to inform the serving state of the vttablet via healthchecks. diff --git a/go/flags/endtoend/vtctlclient.txt b/go/flags/endtoend/vtctlclient.txt index 3c9c0a3cbb0..e7402c0eefd 100644 --- a/go/flags/endtoend/vtctlclient.txt +++ b/go/flags/endtoend/vtctlclient.txt @@ -9,6 +9,7 @@ Usage of vtctlclient: --config-type string Config file type (omit to infer config type from file extension). --datadog-agent-host string host to send spans to. if empty, no tracing will be done --datadog-agent-port string port to send spans to. if empty, no tracing will be done + --grpc-dial-concurrency-limit int Maximum concurrency of grpc dial operations. This should be less than the golang max thread limit of 10000. (default 1024) --grpc_auth_static_client_creds string When using grpc_static_auth in the server, this file provides the credentials to use to authenticate with server. --grpc_compression string Which protocol to use for compressing gRPC. Default: nothing. Supported: snappy --grpc_enable_tracing Enable gRPC tracing. diff --git a/go/flags/endtoend/vtctld.txt b/go/flags/endtoend/vtctld.txt index f8e680da0f0..8b1aa6f4a92 100644 --- a/go/flags/endtoend/vtctld.txt +++ b/go/flags/endtoend/vtctld.txt @@ -57,6 +57,7 @@ Flags: --file_backup_storage_root string Root directory for the file backup storage. --gcs_backup_storage_bucket string Google Cloud Storage bucket to use for backups. --gcs_backup_storage_root string Root prefix for all backup-related object names. + --grpc-dial-concurrency-limit int Maximum concurrency of grpc dial operations. This should be less than the golang max thread limit of 10000. (default 1024) --grpc_auth_mode string Which auth plugin implementation to use (eg: static) --grpc_auth_mtls_allowed_substrings string List of substrings of at least one of the client certificate names (separated by colon). --grpc_auth_static_client_creds string When using grpc_static_auth in the server, this file provides the credentials to use to authenticate with server. @@ -85,7 +86,6 @@ Flags: --grpc_server_keepalive_enforcement_policy_permit_without_stream gRPC server permit client keepalive pings even when there are no active streams (RPCs) --grpc_server_keepalive_time duration After a duration of this time, if the server doesn't see any activity, it pings the client to see if the transport is still alive. (default 10s) --grpc_server_keepalive_timeout duration After having pinged for keepalive check, the server waits for a duration of Timeout and if no activity is seen even after that the connection is closed. (default 10s) - --healthcheck-dial-concurrency int Maximum concurrency of new healthcheck connections. This should be less than the golang max thread limit of 10000. (default 1024) -h, --help help for vtctld --jaeger-agent-host string host and port to send spans to. if empty, no tracing will be done --keep_logs duration keep logs for this long (using ctime) (zero to keep forever) diff --git a/go/flags/endtoend/vtgate.txt b/go/flags/endtoend/vtgate.txt index 37bfadd4938..4cb4cd34148 100644 --- a/go/flags/endtoend/vtgate.txt +++ b/go/flags/endtoend/vtgate.txt @@ -67,6 +67,7 @@ Flags: --foreign_key_mode string This is to provide how to handle foreign key constraint in create/alter table. Valid values are: allow, disallow (default "allow") --gate_query_cache_memory int gate server query cache size in bytes, maximum amount of memory to be cached. vtgate analyzes every incoming query and generate a query plan, these plans are being cached in a lru cache. This config controls the capacity of the lru cache. (default 33554432) --gateway_initial_tablet_timeout duration At startup, the tabletGateway will wait up to this duration to get at least one tablet per keyspace/shard/tablet type (default 30s) + --grpc-dial-concurrency-limit int Maximum concurrency of grpc dial operations. This should be less than the golang max thread limit of 10000. (default 1024) --grpc-send-session-in-streaming If set, will send the session as last packet in streaming api to support transactions in streaming --grpc-use-effective-groups If set, and SSL is not used, will set the immediate caller's security groups from the effective caller id's groups. --grpc-use-static-authentication-callerid If set, will set the immediate caller id to the username authenticated by the static auth plugin. @@ -99,7 +100,6 @@ Flags: --grpc_server_keepalive_time duration After a duration of this time, if the server doesn't see any activity, it pings the client to see if the transport is still alive. (default 10s) --grpc_server_keepalive_timeout duration After having pinged for keepalive check, the server waits for a duration of Timeout and if no activity is seen even after that the connection is closed. (default 10s) --grpc_use_effective_callerid If set, and SSL is not used, will set the immediate caller id from the effective caller id's principal. - --healthcheck-dial-concurrency int Maximum concurrency of new healthcheck connections. This should be less than the golang max thread limit of 10000. (default 1024) --healthcheck_retry_delay duration health check retry delay (default 2ms) --healthcheck_timeout duration the health check timeout period (default 1m0s) -h, --help help for vtgate diff --git a/go/flags/endtoend/vtgateclienttest.txt b/go/flags/endtoend/vtgateclienttest.txt index e7d8fc5e177..8a2f18b6b5a 100644 --- a/go/flags/endtoend/vtgateclienttest.txt +++ b/go/flags/endtoend/vtgateclienttest.txt @@ -14,6 +14,7 @@ Flags: --config-persistence-min-interval duration minimum interval between persisting dynamic config changes back to disk (if no change has occurred, nothing is done). (default 1s) --config-type string Config file type (omit to infer config type from file extension). --default_tablet_type topodatapb.TabletType The default tablet type to set for queries, when one is not explicitly selected. (default PRIMARY) + --grpc-dial-concurrency-limit int Maximum concurrency of grpc dial operations. This should be less than the golang max thread limit of 10000. (default 1024) --grpc_auth_mode string Which auth plugin implementation to use (eg: static) --grpc_auth_mtls_allowed_substrings string List of substrings of at least one of the client certificate names (separated by colon). --grpc_auth_static_client_creds string When using grpc_static_auth in the server, this file provides the credentials to use to authenticate with server. diff --git a/go/flags/endtoend/vtorc.txt b/go/flags/endtoend/vtorc.txt index 841cfc8b556..3917522a2f8 100644 --- a/go/flags/endtoend/vtorc.txt +++ b/go/flags/endtoend/vtorc.txt @@ -34,6 +34,7 @@ Flags: --config-type string Config file type (omit to infer config type from file extension). --consul_auth_static_file string JSON File to read the topos/tokens from. --emit_stats If set, emit stats to push-based monitoring and stats backends + --grpc-dial-concurrency-limit int Maximum concurrency of grpc dial operations. This should be less than the golang max thread limit of 10000. (default 1024) --grpc_auth_static_client_creds string When using grpc_static_auth in the server, this file provides the credentials to use to authenticate with server. --grpc_compression string Which protocol to use for compressing gRPC. Default: nothing. Supported: snappy --grpc_enable_tracing Enable gRPC tracing. diff --git a/go/flags/endtoend/vttablet.txt b/go/flags/endtoend/vttablet.txt index 4709fd27870..35a07b265bc 100644 --- a/go/flags/endtoend/vttablet.txt +++ b/go/flags/endtoend/vttablet.txt @@ -166,6 +166,7 @@ Flags: --gcs_backup_storage_bucket string Google Cloud Storage bucket to use for backups. --gcs_backup_storage_root string Root prefix for all backup-related object names. --gh-ost-path string override default gh-ost binary full path (default "gh-ost") + --grpc-dial-concurrency-limit int Maximum concurrency of grpc dial operations. This should be less than the golang max thread limit of 10000. (default 1024) --grpc_auth_mode string Which auth plugin implementation to use (eg: static) --grpc_auth_mtls_allowed_substrings string List of substrings of at least one of the client certificate names (separated by colon). --grpc_auth_static_client_creds string When using grpc_static_auth in the server, this file provides the credentials to use to authenticate with server. diff --git a/go/flags/endtoend/vttestserver.txt b/go/flags/endtoend/vttestserver.txt index 0aa69dfb204..042ffd37643 100644 --- a/go/flags/endtoend/vttestserver.txt +++ b/go/flags/endtoend/vttestserver.txt @@ -43,6 +43,7 @@ Flags: --external_topo_implementation string the topology implementation to use for vtcombo process --extra_my_cnf string extra files to add to the config, separated by ':' --foreign_key_mode string This is to provide how to handle foreign key constraint in create/alter table. Valid values are: allow, disallow (default "allow") + --grpc-dial-concurrency-limit int Maximum concurrency of grpc dial operations. This should be less than the golang max thread limit of 10000. (default 1024) --grpc_auth_mode string Which auth plugin implementation to use (eg: static) --grpc_auth_mtls_allowed_substrings string List of substrings of at least one of the client certificate names (separated by colon). --grpc_auth_static_client_creds string When using grpc_static_auth in the server, this file provides the credentials to use to authenticate with server. diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index 38bc998632b..2bd8ef4c18a 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -47,7 +47,6 @@ import ( "github.com/google/safehtml/template" "github.com/google/safehtml/template/uncheckedconversions" "github.com/spf13/pflag" - "golang.org/x/sync/semaphore" "vitess.io/vitess/go/flagutil" "vitess.io/vitess/go/netutil" @@ -93,9 +92,6 @@ var ( // refreshKnownTablets tells us whether to process all tablets or only new tablets. refreshKnownTablets = true - // healthCheckDialConcurrency tells us how many healthcheck connections can be opened to tablets at once. This should be less than the golang max thread limit of 10000. - healthCheckDialConcurrency int64 = 1024 - // How much to sleep between each check. waitAvailableTabletInterval = 100 * time.Millisecond @@ -181,7 +177,6 @@ func registerWebUIFlags(fs *pflag.FlagSet) { fs.StringVar(&TabletURLTemplateString, "tablet_url_template", "http://{{.GetTabletHostPort}}", "Format string describing debug tablet url formatting. See getTabletDebugURL() for how to customize this.") fs.DurationVar(&refreshInterval, "tablet_refresh_interval", 1*time.Minute, "Tablet refresh interval.") fs.BoolVar(&refreshKnownTablets, "tablet_refresh_known_tablets", true, "Whether to reload the tablet's address/port map from topo in case they change.") - fs.Int64Var(&healthCheckDialConcurrency, "healthcheck-dial-concurrency", 1024, "Maximum concurrency of new healthcheck connections. This should be less than the golang max thread limit of 10000.") ParseTabletURLTemplateFromFlag() } @@ -301,8 +296,6 @@ type HealthCheckImpl struct { subscribers map[chan *TabletHealth]struct{} // loadTablets trigger is used to immediately load a new primary tablet when the current one has been demoted loadTabletsTrigger chan struct{} - // healthCheckDialSem is used to limit how many healthcheck connections can be opened to tablets at once. - healthCheckDialSem *semaphore.Weighted } // NewVTGateHealthCheckFilters returns healthcheck filters for vtgate. @@ -362,7 +355,6 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur cell: localCell, retryDelay: retryDelay, healthCheckTimeout: healthCheckTimeout, - healthCheckDialSem: semaphore.NewWeighted(healthCheckDialConcurrency), healthByAlias: make(map[tabletAliasString]*tabletHealthCheck), healthData: make(map[KeyspaceShardTabletType]map[tabletAliasString]*TabletHealth), healthy: make(map[KeyspaceShardTabletType][]*TabletHealth), @@ -876,7 +868,7 @@ func (hc *HealthCheckImpl) TabletConnection(ctx context.Context, alias *topodata // TODO: test that throws this error return nil, vterrors.Errorf(vtrpc.Code_NOT_FOUND, "tablet: %v is either down or nonexistent", alias) } - return thc.Connection(ctx, hc), nil + return thc.Connection(ctx), nil } // getAliasByCell should only be called while holding hc.mu diff --git a/go/vt/discovery/tablet_health_check.go b/go/vt/discovery/tablet_health_check.go index 64450f4c8c6..ecadeefdf78 100644 --- a/go/vt/discovery/tablet_health_check.go +++ b/go/vt/discovery/tablet_health_check.go @@ -19,7 +19,6 @@ package discovery import ( "context" "fmt" - "net" "strings" "sync" "sync/atomic" @@ -34,16 +33,12 @@ import ( "vitess.io/vitess/go/vt/vttablet/queryservice" "vitess.io/vitess/go/vt/vttablet/tabletconn" - "google.golang.org/grpc" "google.golang.org/protobuf/proto" "vitess.io/vitess/go/vt/proto/query" "vitess.io/vitess/go/vt/proto/topodata" ) -// withDialerContextOnce ensures grpc.WithDialContext() is added once to the options. -var withDialerContextOnce sync.Once - // tabletHealthCheck maintains the health status of a tablet. A map of this // structure is maintained in HealthCheck. type tabletHealthCheck struct { @@ -127,8 +122,8 @@ func (thc *tabletHealthCheck) setServingState(serving bool, reason string) { } // stream streams healthcheck responses to callback. -func (thc *tabletHealthCheck) stream(ctx context.Context, hc *HealthCheckImpl, callback func(*query.StreamHealthResponse) error) error { - conn := thc.Connection(ctx, hc) +func (thc *tabletHealthCheck) stream(ctx context.Context, callback func(*query.StreamHealthResponse) error) error { + conn := thc.Connection(ctx) if conn == nil { // This signals the caller to retry return nil @@ -141,34 +136,14 @@ func (thc *tabletHealthCheck) stream(ctx context.Context, hc *HealthCheckImpl, c return err } -func (thc *tabletHealthCheck) Connection(ctx context.Context, hc *HealthCheckImpl) queryservice.QueryService { +func (thc *tabletHealthCheck) Connection(ctx context.Context) queryservice.QueryService { thc.connMu.Lock() defer thc.connMu.Unlock() - return thc.connectionLocked(ctx, hc) -} - -func healthCheckDialerFactory(hc *HealthCheckImpl) func(ctx context.Context, addr string) (net.Conn, error) { - return func(ctx context.Context, addr string) (net.Conn, error) { - // Limit the number of healthcheck connections opened in parallel to avoid high OS-thread - // usage due to blocking networking syscalls (eg: DNS lookups, TCP connection opens, - // etc). Without this limit it is possible for vtgates watching >10k tablets to hit - // the panic: 'runtime: program exceeds 10000-thread limit'. - if err := hc.healthCheckDialSem.Acquire(ctx, 1); err != nil { - return nil, err - } - defer hc.healthCheckDialSem.Release(1) - var dialer net.Dialer - return dialer.DialContext(ctx, "tcp", addr) - } + return thc.connectionLocked(ctx) } -func (thc *tabletHealthCheck) connectionLocked(ctx context.Context, hc *HealthCheckImpl) queryservice.QueryService { +func (thc *tabletHealthCheck) connectionLocked(ctx context.Context) queryservice.QueryService { if thc.Conn == nil { - withDialerContextOnce.Do(func() { - grpcclient.RegisterGRPCDialOptions(func(opts []grpc.DialOption) ([]grpc.DialOption, error) { - return append(opts, grpc.WithContextDialer(healthCheckDialerFactory(hc))), nil - }) - }) conn, err := tabletconn.GetDialer()(ctx, thc.Tablet, grpcclient.FailFast(true)) if err != nil { thc.LastError = err @@ -297,7 +272,7 @@ func (thc *tabletHealthCheck) checkConn(hc *HealthCheckImpl) { }() // Read stream health responses. - err := thc.stream(streamCtx, hc, func(shr *query.StreamHealthResponse) error { + err := thc.stream(streamCtx, func(shr *query.StreamHealthResponse) error { // We received a message. Reset the back-off. retryDelay = hc.retryDelay // Don't block on send to avoid deadlocks. diff --git a/go/vt/grpcclient/client.go b/go/vt/grpcclient/client.go index b8a8847ac4f..938da95bce6 100644 --- a/go/vt/grpcclient/client.go +++ b/go/vt/grpcclient/client.go @@ -21,12 +21,14 @@ package grpcclient import ( "context" "crypto/tls" + "net" "sync" "time" grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/spf13/pflag" + "golang.org/x/sync/semaphore" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" @@ -46,6 +48,10 @@ var ( initialConnWindowSize int initialWindowSize int + // `dialConcurrencyLimit` tells us how many tablet grpc connections can be dialed concurrently. + // This should be less than the golang max thread limit of 10000. + dialConcurrencyLimit int64 = 1024 + // every vitess binary that makes grpc client-side calls. grpcclientBinaries = []string{ "mysqlctld", @@ -74,10 +80,33 @@ func RegisterFlags(fs *pflag.FlagSet) { fs.StringVar(&credsFile, "grpc_auth_static_client_creds", credsFile, "When using grpc_static_auth in the server, this file provides the credentials to use to authenticate with server.") } +func RegisterDeprecatedDialConcurrencyFlagsHealthcheck(fs *pflag.FlagSet) { + fs.Int64Var(&dialConcurrencyLimit, "healthcheck-dial-concurrency", 1024, "Maximum concurrency of new healthcheck connections. This should be less than the golang max thread limit of 10000.") + fs.MarkDeprecated("healthcheck-dial-concurrency", "This option is deprecated and will be removed in a future release. Use --grpc-dial-concurrency-limit instead.") +} + +func RegisterDeprecatedDialConcurrencyFlagsHealthcheckForVtcombo(fs *pflag.FlagSet) { + fs.Int64Var(&dialConcurrencyLimit, "healthcheck-dial-concurrency", 1024, "Maximum concurrency of new healthcheck connections. This should be less than the golang max thread limit of 10000.") + fs.MarkDeprecated("healthcheck-dial-concurrency", "This option is deprecated and will be removed in a future release.") +} + +func RegisterDialConcurrencyFlags(fs *pflag.FlagSet) { + fs.Int64Var(&dialConcurrencyLimit, "grpc-dial-concurrency-limit", 1024, "Maximum concurrency of grpc dial operations. This should be less than the golang max thread limit of 10000.") +} + func init() { for _, cmd := range grpcclientBinaries { servenv.OnParseFor(cmd, RegisterFlags) + + if cmd == "vtgate" || cmd == "vtctld" { + servenv.OnParseFor(cmd, RegisterDeprecatedDialConcurrencyFlagsHealthcheck) + } + + servenv.OnParseFor(cmd, RegisterDialConcurrencyFlags) } + + // vtcombo doesn't really use grpc, but we need to expose this flag for backwards compat + servenv.OnParseFor("vtcombo", RegisterDeprecatedDialConcurrencyFlagsHealthcheckForVtcombo) } // FailFast is a self-documenting type for the grpc.FailFast. @@ -129,6 +158,10 @@ func DialContext(ctx context.Context, target string, failFast FailFast, opts ... newopts = append(newopts, grpc.WithInitialWindowSize(int32(initialWindowSize))) } + if dialConcurrencyLimit > 0 { + newopts = append(newopts, dialConcurrencyLimitOption()) + } + newopts = append(newopts, opts...) var err error grpcDialOptionsMu.Lock() @@ -175,6 +208,35 @@ func SecureDialOption(cert, key, ca, crl, name string) (grpc.DialOption, error) return grpc.WithTransportCredentials(creds), nil } +var dialConcurrencyLimitOpt grpc.DialOption + +// withDialerContextOnce ensures grpc.WithDialContext() is added once to the options. +var dialConcurrencyLimitOnce sync.Once + +func dialConcurrencyLimitOption() grpc.DialOption { + dialConcurrencyLimitOnce.Do(func() { + // This semaphore is used to limit how many grpc connections can be dialed to tablets simultanously. + // This does not limit how many tablet connections can be open at the same time. + sem := semaphore.NewWeighted(dialConcurrencyLimit) + + dialConcurrencyLimitOpt = grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) { + // Limit the number of grpc connections opened in parallel to avoid high OS-thread + // usage due to blocking networking syscalls (eg: DNS lookups, TCP connection opens, + // etc). Without this limit it is possible for vtgates watching >10k tablets to hit + // the panic: 'runtime: program exceeds 10000-thread limit'. + if err := sem.Acquire(ctx, 1); err != nil { + return nil, err + } + defer sem.Release(1) + + var dialer net.Dialer + return dialer.DialContext(ctx, "tcp", addr) + }) + }) + + return dialConcurrencyLimitOpt +} + // Allows for building a chain of interceptors without knowing the total size up front type clientInterceptorBuilder struct { unaryInterceptors []grpc.UnaryClientInterceptor