Skip to content

Commit

Permalink
Add channelz/ (#326)
Browse files Browse the repository at this point in the history
Signed-off-by: Henry Robinson <[email protected]>
Signed-off-by: Esme Lamb <[email protected]>
  • Loading branch information
henryr authored and dedelala committed Jan 8, 2025
1 parent 8e6cb89 commit 6b0586e
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 16 deletions.
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ require (
modernc.org/sqlite v1.29.2
)

require github.com/go-chi/chi/v5 v5.0.10 // indirect

require (
cloud.google.com/go v0.112.1 // indirect
cloud.google.com/go/compute v1.25.0 // indirect
Expand Down Expand Up @@ -165,6 +167,7 @@ require (
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_model v0.6.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
github.com/rantav/go-grpc-channelz v0.0.4
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
github.com/rivo/uniseg v0.4.7 // indirect
github.com/rogpeppe/go-internal v1.12.0 // indirect
Expand Down
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nos
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
github.com/gammazero/deque v0.2.1 h1:qSdsbG6pgp6nL7A0+K/B7s12mcCY/5l5SIUpMOl+dC0=
github.com/gammazero/deque v0.2.1/go.mod h1:LFroj8x4cMYCukHJDbxFCkT+r9AndaJnFMuZDV34tuU=
github.com/go-chi/chi v1.5.4 h1:QHdzF2szwjqVV4wmByUnTcsbIg7UGaQ0tPF2t5GcAIs=
github.com/go-chi/chi/v5 v5.0.10 h1:rLz5avzKpjqxrYwXNfmjkrYYXOyLJd37pz53UFHC6vk=
github.com/go-chi/chi/v5 v5.0.10/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8=
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vbaY=
Expand Down Expand Up @@ -422,6 +425,8 @@ github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsT
github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A=
github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo=
github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo=
github.com/rantav/go-grpc-channelz v0.0.4 h1:8GvqhA6siQVBsZYzal3yHhyJ9YiHEJx7RtSH2Jvm9Co=
github.com/rantav/go-grpc-channelz v0.0.4/go.mod h1:HodrRmnnH1zXcEEfK7EJrI23YMPMT7uvyAYkq2JUIcI=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/richardartoul/molecule v1.0.1-0.20221107223329-32cfee06a052 h1:Qp27Idfgi6ACvFQat5+VJvlYToylpM/hcyLBI3WaKPA=
Expand Down
13 changes: 13 additions & 0 deletions go/cmd/vtgateproxy/vtgateproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ limitations under the License.
package main

import (
"fmt"
"net/http"

channelz "github.com/rantav/go-grpc-channelz"
"google.golang.org/grpc/channelz/service"

"vitess.io/vitess/go/exit"
"vitess.io/vitess/go/stats/prometheusbackend"
"vitess.io/vitess/go/vt/servenv"
Expand All @@ -36,6 +42,13 @@ func main() {
prometheusbackend.Init("vtgateproxy")

servenv.OnRun(func() {
// channelz is served over gRPC, so we bind it to the generic servenv server; the http
// handler queries that server locally for observability.
service.RegisterChannelzServiceToServer(servenv.GRPCServer)

// Register the channelz handler to /channelz/ (note trailing / which is required).
http.Handle("/", channelz.CreateHandler("/", fmt.Sprintf(":%d", servenv.GRPCPort())))

vtgateproxy.Init()
})

Expand Down
8 changes: 4 additions & 4 deletions go/vt/vtgateproxy/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ func (b *JSONGateResolverBuilder) GetPools() []string {
return pools
}

func (b *JSONGateResolverBuilder) GetTargets(poolType string) []targetHost {
func (b *JSONGateResolverBuilder) getTargets(poolType string) []targetHost {
// Copy the target slice
b.mu.RLock()
targets := []targetHost{}
Expand Down Expand Up @@ -352,7 +352,7 @@ func (b *JSONGateResolverBuilder) update(r *JSONGateResolver) {

log.V(100).Infof("resolving target %s to %d connections\n", r.target.URL.String(), *numConnections)

targets := b.GetTargets(r.poolType)
targets := b.getTargets(r.poolType)

var addrs []resolver.Address
for _, target := range targets {
Expand All @@ -361,7 +361,7 @@ func (b *JSONGateResolverBuilder) update(r *JSONGateResolver) {

log.V(100).Infof("updated targets for %s to %v", r.target.URL.String(), targets)

r.clientConn.UpdateState(resolver.State{Addresses: addrs})
_ = r.clientConn.UpdateState(resolver.State{Addresses: addrs})
}

// Build a new Resolver to route to the given target
Expand Down Expand Up @@ -398,7 +398,7 @@ func (b *JSONGateResolverBuilder) debugTargets() any {
pools := b.GetPools()
targets := map[string][]targetHost{}
for pool := range b.targets {
targets[pool] = b.GetTargets(pool)
targets[pool] = b.getTargets(pool)
}
return struct {
Pools []string
Expand Down
11 changes: 6 additions & 5 deletions go/vt/vtgateproxy/mysql_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ var (
mysqlAuthServerImpl = flag.String("mysql_auth_server_impl", "static", "Which auth server implementation to use. Options: none, ldap, clientcert, static, vault.")
mysqlAllowClearTextWithoutTLS = flag.Bool("mysql_allow_clear_text_without_tls", false, "If set, the server will allow the use of a clear text password over non-SSL connections.")
mysqlProxyProtocol = flag.Bool("proxy_protocol", false, "Enable HAProxy PROXY protocol on MySQL listener socket")
mysqlConnBufferPooling = flag.Bool("mysql_conn_buffer_pooling", false, "Enable mysql conn buffer pooling.")

mysqlServerRequireSecureTransport = flag.Bool("mysql_server_require_secure_transport", false, "Reject insecure connections but only if mysql_server_ssl_cert and mysql_server_ssl_key are provided")

Expand Down Expand Up @@ -439,12 +440,12 @@ func initMySQLProtocol() {
proxyHandle = newProxyHandler(vtGateProxy)
if *mysqlServerPort >= 0 {
log.Infof("Mysql Server listening on Port %d", *mysqlServerPort)
mysqlListener, err = mysql.NewListener(*mysqlTCPVersion, net.JoinHostPort(*mysqlServerBindAddress, fmt.Sprintf("%v", *mysqlServerPort)), authServer, proxyHandle, *mysqlConnReadTimeout, *mysqlConnWriteTimeout, *mysqlProxyProtocol)
mysqlListener, err = mysql.NewListener(*mysqlTCPVersion, net.JoinHostPort(*mysqlServerBindAddress, fmt.Sprintf("%v", *mysqlServerPort)), authServer, proxyHandle, *mysqlConnReadTimeout, *mysqlConnWriteTimeout, *mysqlProxyProtocol, *mysqlConnBufferPooling)
if err != nil {
log.Exitf("mysql.NewListener failed: %v", err)
}
if *servenv.MySQLServerVersion != "" {
mysqlListener.ServerVersion = *servenv.MySQLServerVersion
if servenv.MySQLServerVersion() != "" {
mysqlListener.ServerVersion = servenv.MySQLServerVersion()
}
if *mysqlSslCert != "" && *mysqlSslKey != "" {
tlsVersion, err := vttls.TLSVersionToNumber(*mysqlTLSMinVersion)
Expand Down Expand Up @@ -482,7 +483,7 @@ func initMySQLProtocol() {
// newMysqlUnixSocket creates a new unix socket mysql listener. If a socket file already exists, attempts
// to clean it up.
func newMysqlUnixSocket(address string, authServer mysql.AuthServer, handler mysql.Handler) (*mysql.Listener, error) {
listener, err := mysql.NewListener("unix", address, authServer, handler, *mysqlConnReadTimeout, *mysqlConnWriteTimeout, false)
listener, err := mysql.NewListener("unix", address, authServer, handler, *mysqlConnReadTimeout, *mysqlConnWriteTimeout, false, false)
switch err := err.(type) {
case nil:
return listener, nil
Expand All @@ -503,7 +504,7 @@ func newMysqlUnixSocket(address string, authServer mysql.AuthServer, handler mys
log.Errorf("Couldn't remove existent socket file: %s", address)
return nil, err
}
listener, listenerErr := mysql.NewListener("unix", address, authServer, handler, *mysqlConnReadTimeout, *mysqlConnWriteTimeout, false)
listener, listenerErr := mysql.NewListener("unix", address, authServer, handler, *mysqlConnReadTimeout, *mysqlConnWriteTimeout, false, false)
return listener, listenerErr
default:
return nil, err
Expand Down
16 changes: 9 additions & 7 deletions go/vt/vtgateproxy/vtgateproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@ import (
querypb "vitess.io/vitess/go/vt/proto/query"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"
_ "vitess.io/vitess/go/vt/vtgate/grpcvtgateconn"
"vitess.io/vitess/go/vt/vtgate/vtgateconn"

// we're blank importing this for _reasons_
_ "vitess.io/vitess/go/vt/vtgate/grpcvtgateconn"
)

const (
Expand All @@ -58,7 +60,7 @@ var (

timings = stats.NewTimings("Timings", "proxy timings by operation", "operation")

vtGateProxy *VTGateProxy = &VTGateProxy{
vtGateProxy = &VTGateProxy{
targetConns: map[string]*vtgateconn.VTGateConn{},
mu: sync.RWMutex{},
}
Expand Down Expand Up @@ -111,7 +113,7 @@ func (proxy *VTGateProxy) getConnection(ctx context.Context, target string) (*vt

func (proxy *VTGateProxy) NewSession(ctx context.Context, options *querypb.ExecuteOptions, connectionAttributes map[string]string) (*vtgateconn.VTGateSession, error) {

targetUrl := url.URL{
targetURL := url.URL{
Scheme: "vtgate",
Host: "pool",
}
Expand All @@ -134,9 +136,9 @@ func (proxy *VTGateProxy) NewSession(ctx context.Context, options *querypb.Execu
}
}

targetUrl.RawQuery = values.Encode()
targetURL.RawQuery = values.Encode()

conn, err := proxy.getConnection(ctx, targetUrl.String())
conn, err := proxy.getConnection(ctx, targetURL.String())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -194,7 +196,7 @@ func (proxy *VTGateProxy) StreamExecute(ctx context.Context, session *vtgateconn
if err != nil {
return err
}
callback(qr)
_ = callback(qr)
}

return nil
Expand All @@ -206,7 +208,7 @@ func Init() {
return append(opts, grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`)), nil
})

RegisterJSONGateResolver(
_, _ = RegisterJSONGateResolver(
*vtgateHostsFile,
*addressField,
*portField,
Expand Down

0 comments on commit 6b0586e

Please sign in to comment.