diff --git a/go.mod b/go.mod index ca82576cfaf..32c33cdd66d 100644 --- a/go.mod +++ b/go.mod @@ -114,6 +114,8 @@ require ( modernc.org/sqlite v1.29.2 ) +require github.com/go-chi/chi/v5 v5.0.10 // indirect + require ( cloud.google.com/go v0.112.1 // indirect cloud.google.com/go/compute v1.25.0 // indirect @@ -165,6 +167,7 @@ require ( github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus/client_model v0.6.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect + github.com/rantav/go-grpc-channelz v0.0.4 github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect github.com/rivo/uniseg v0.4.7 // indirect github.com/rogpeppe/go-internal v1.12.0 // indirect diff --git a/go.sum b/go.sum index 2dcec89ecc8..14840ea1993 100644 --- a/go.sum +++ b/go.sum @@ -137,6 +137,9 @@ github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nos github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= github.com/gammazero/deque v0.2.1 h1:qSdsbG6pgp6nL7A0+K/B7s12mcCY/5l5SIUpMOl+dC0= github.com/gammazero/deque v0.2.1/go.mod h1:LFroj8x4cMYCukHJDbxFCkT+r9AndaJnFMuZDV34tuU= +github.com/go-chi/chi v1.5.4 h1:QHdzF2szwjqVV4wmByUnTcsbIg7UGaQ0tPF2t5GcAIs= +github.com/go-chi/chi/v5 v5.0.10 h1:rLz5avzKpjqxrYwXNfmjkrYYXOyLJd37pz53UFHC6vk= +github.com/go-chi/chi/v5 v5.0.10/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8= github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vbaY= @@ -422,6 +425,8 @@ github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsT github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A= github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo= github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= +github.com/rantav/go-grpc-channelz v0.0.4 h1:8GvqhA6siQVBsZYzal3yHhyJ9YiHEJx7RtSH2Jvm9Co= +github.com/rantav/go-grpc-channelz v0.0.4/go.mod h1:HodrRmnnH1zXcEEfK7EJrI23YMPMT7uvyAYkq2JUIcI= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/richardartoul/molecule v1.0.1-0.20221107223329-32cfee06a052 h1:Qp27Idfgi6ACvFQat5+VJvlYToylpM/hcyLBI3WaKPA= diff --git a/go/cmd/vtgateproxy/vtgateproxy.go b/go/cmd/vtgateproxy/vtgateproxy.go index eacdb0c13f9..8f14d6c111f 100644 --- a/go/cmd/vtgateproxy/vtgateproxy.go +++ b/go/cmd/vtgateproxy/vtgateproxy.go @@ -17,6 +17,12 @@ limitations under the License. package main import ( + "fmt" + "net/http" + + channelz "github.com/rantav/go-grpc-channelz" + "google.golang.org/grpc/channelz/service" + "vitess.io/vitess/go/exit" "vitess.io/vitess/go/stats/prometheusbackend" "vitess.io/vitess/go/vt/servenv" @@ -36,6 +42,13 @@ func main() { prometheusbackend.Init("vtgateproxy") servenv.OnRun(func() { + // channelz is served over gRPC, so we bind it to the generic servenv server; the http + // handler queries that server locally for observability. + service.RegisterChannelzServiceToServer(servenv.GRPCServer) + + // Register the channelz handler to /channelz/ (note trailing / which is required). + http.Handle("/", channelz.CreateHandler("/", fmt.Sprintf(":%d", servenv.GRPCPort()))) + vtgateproxy.Init() }) diff --git a/go/vt/vtgateproxy/discovery.go b/go/vt/vtgateproxy/discovery.go index a4b48340bfd..efb20980b87 100644 --- a/go/vt/vtgateproxy/discovery.go +++ b/go/vt/vtgateproxy/discovery.go @@ -317,7 +317,7 @@ func (b *JSONGateResolverBuilder) GetPools() []string { return pools } -func (b *JSONGateResolverBuilder) GetTargets(poolType string) []targetHost { +func (b *JSONGateResolverBuilder) getTargets(poolType string) []targetHost { // Copy the target slice b.mu.RLock() targets := []targetHost{} @@ -352,7 +352,7 @@ func (b *JSONGateResolverBuilder) update(r *JSONGateResolver) { log.V(100).Infof("resolving target %s to %d connections\n", r.target.URL.String(), *numConnections) - targets := b.GetTargets(r.poolType) + targets := b.getTargets(r.poolType) var addrs []resolver.Address for _, target := range targets { @@ -361,7 +361,7 @@ func (b *JSONGateResolverBuilder) update(r *JSONGateResolver) { log.V(100).Infof("updated targets for %s to %v", r.target.URL.String(), targets) - r.clientConn.UpdateState(resolver.State{Addresses: addrs}) + _ = r.clientConn.UpdateState(resolver.State{Addresses: addrs}) } // Build a new Resolver to route to the given target @@ -398,7 +398,7 @@ func (b *JSONGateResolverBuilder) debugTargets() any { pools := b.GetPools() targets := map[string][]targetHost{} for pool := range b.targets { - targets[pool] = b.GetTargets(pool) + targets[pool] = b.getTargets(pool) } return struct { Pools []string diff --git a/go/vt/vtgateproxy/mysql_server.go b/go/vt/vtgateproxy/mysql_server.go index e07b4ff109f..5bc05836ea5 100644 --- a/go/vt/vtgateproxy/mysql_server.go +++ b/go/vt/vtgateproxy/mysql_server.go @@ -55,6 +55,7 @@ var ( mysqlAuthServerImpl = flag.String("mysql_auth_server_impl", "static", "Which auth server implementation to use. Options: none, ldap, clientcert, static, vault.") mysqlAllowClearTextWithoutTLS = flag.Bool("mysql_allow_clear_text_without_tls", false, "If set, the server will allow the use of a clear text password over non-SSL connections.") mysqlProxyProtocol = flag.Bool("proxy_protocol", false, "Enable HAProxy PROXY protocol on MySQL listener socket") + mysqlConnBufferPooling = flag.Bool("mysql_conn_buffer_pooling", false, "Enable mysql conn buffer pooling.") mysqlServerRequireSecureTransport = flag.Bool("mysql_server_require_secure_transport", false, "Reject insecure connections but only if mysql_server_ssl_cert and mysql_server_ssl_key are provided") @@ -439,12 +440,12 @@ func initMySQLProtocol() { proxyHandle = newProxyHandler(vtGateProxy) if *mysqlServerPort >= 0 { log.Infof("Mysql Server listening on Port %d", *mysqlServerPort) - mysqlListener, err = mysql.NewListener(*mysqlTCPVersion, net.JoinHostPort(*mysqlServerBindAddress, fmt.Sprintf("%v", *mysqlServerPort)), authServer, proxyHandle, *mysqlConnReadTimeout, *mysqlConnWriteTimeout, *mysqlProxyProtocol) + mysqlListener, err = mysql.NewListener(*mysqlTCPVersion, net.JoinHostPort(*mysqlServerBindAddress, fmt.Sprintf("%v", *mysqlServerPort)), authServer, proxyHandle, *mysqlConnReadTimeout, *mysqlConnWriteTimeout, *mysqlProxyProtocol, *mysqlConnBufferPooling) if err != nil { log.Exitf("mysql.NewListener failed: %v", err) } - if *servenv.MySQLServerVersion != "" { - mysqlListener.ServerVersion = *servenv.MySQLServerVersion + if servenv.MySQLServerVersion() != "" { + mysqlListener.ServerVersion = servenv.MySQLServerVersion() } if *mysqlSslCert != "" && *mysqlSslKey != "" { tlsVersion, err := vttls.TLSVersionToNumber(*mysqlTLSMinVersion) @@ -482,7 +483,7 @@ func initMySQLProtocol() { // newMysqlUnixSocket creates a new unix socket mysql listener. If a socket file already exists, attempts // to clean it up. func newMysqlUnixSocket(address string, authServer mysql.AuthServer, handler mysql.Handler) (*mysql.Listener, error) { - listener, err := mysql.NewListener("unix", address, authServer, handler, *mysqlConnReadTimeout, *mysqlConnWriteTimeout, false) + listener, err := mysql.NewListener("unix", address, authServer, handler, *mysqlConnReadTimeout, *mysqlConnWriteTimeout, false, false) switch err := err.(type) { case nil: return listener, nil @@ -503,7 +504,7 @@ func newMysqlUnixSocket(address string, authServer mysql.AuthServer, handler mys log.Errorf("Couldn't remove existent socket file: %s", address) return nil, err } - listener, listenerErr := mysql.NewListener("unix", address, authServer, handler, *mysqlConnReadTimeout, *mysqlConnWriteTimeout, false) + listener, listenerErr := mysql.NewListener("unix", address, authServer, handler, *mysqlConnReadTimeout, *mysqlConnWriteTimeout, false, false) return listener, listenerErr default: return nil, err diff --git a/go/vt/vtgateproxy/vtgateproxy.go b/go/vt/vtgateproxy/vtgateproxy.go index 1911f016a88..b94be1b9d8d 100644 --- a/go/vt/vtgateproxy/vtgateproxy.go +++ b/go/vt/vtgateproxy/vtgateproxy.go @@ -37,8 +37,10 @@ import ( querypb "vitess.io/vitess/go/vt/proto/query" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/vterrors" - _ "vitess.io/vitess/go/vt/vtgate/grpcvtgateconn" "vitess.io/vitess/go/vt/vtgate/vtgateconn" + + // we're blank importing this for _reasons_ + _ "vitess.io/vitess/go/vt/vtgate/grpcvtgateconn" ) const ( @@ -58,7 +60,7 @@ var ( timings = stats.NewTimings("Timings", "proxy timings by operation", "operation") - vtGateProxy *VTGateProxy = &VTGateProxy{ + vtGateProxy = &VTGateProxy{ targetConns: map[string]*vtgateconn.VTGateConn{}, mu: sync.RWMutex{}, } @@ -111,7 +113,7 @@ func (proxy *VTGateProxy) getConnection(ctx context.Context, target string) (*vt func (proxy *VTGateProxy) NewSession(ctx context.Context, options *querypb.ExecuteOptions, connectionAttributes map[string]string) (*vtgateconn.VTGateSession, error) { - targetUrl := url.URL{ + targetURL := url.URL{ Scheme: "vtgate", Host: "pool", } @@ -134,9 +136,9 @@ func (proxy *VTGateProxy) NewSession(ctx context.Context, options *querypb.Execu } } - targetUrl.RawQuery = values.Encode() + targetURL.RawQuery = values.Encode() - conn, err := proxy.getConnection(ctx, targetUrl.String()) + conn, err := proxy.getConnection(ctx, targetURL.String()) if err != nil { return nil, err } @@ -194,7 +196,7 @@ func (proxy *VTGateProxy) StreamExecute(ctx context.Context, session *vtgateconn if err != nil { return err } - callback(qr) + _ = callback(qr) } return nil @@ -206,7 +208,7 @@ func Init() { return append(opts, grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`)), nil }) - RegisterJSONGateResolver( + _, _ = RegisterJSONGateResolver( *vtgateHostsFile, *addressField, *portField,