diff --git a/xds/internal/xdsclient/internal/internal.go b/xds/internal/xdsclient/internal/internal.go index e12610744109..b66697206c08 100644 --- a/xds/internal/xdsclient/internal/internal.go +++ b/xds/internal/xdsclient/internal/internal.go @@ -20,6 +20,9 @@ package internal // The following vars can be overridden by tests. var ( - // NewADSStream is a function that returns a new ADS stream. + // GRPCNewClient returns a new gRPC Client. + GRPCNewClient any // func(string, ...grpc.DialOption) (*grpc.ClientConn, error) + + // NewADSStream returns a new ADS stream. NewADSStream any // func(context.Context, *grpc.ClientConn) (v3adsgrpc.AggregatedDiscoveryService_StreamAggregatedResourcesClient, error) ) diff --git a/xds/internal/xdsclient/transport/grpctransport/grpctransport.go b/xds/internal/xdsclient/transport/grpctransport/grpctransport.go new file mode 100644 index 000000000000..9bb8c737ffcd --- /dev/null +++ b/xds/internal/xdsclient/transport/grpctransport/grpctransport.go @@ -0,0 +1,138 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Package grpctransport provides an implementation of the transport interface +// using gRPC. +package grpctransport + +import ( + "context" + "fmt" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/keepalive" + "google.golang.org/grpc/xds/internal/xdsclient/internal" + "google.golang.org/grpc/xds/internal/xdsclient/transport" + + v3adsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" + v3adspb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" + v3lrsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v3" + v3lrspb "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v3" +) + +func init() { + internal.GRPCNewClient = grpc.NewClient + internal.NewADSStream = func(ctx context.Context, cc *grpc.ClientConn) (v3adsgrpc.AggregatedDiscoveryService_StreamAggregatedResourcesClient, error) { + return v3adsgrpc.NewAggregatedDiscoveryServiceClient(cc).StreamAggregatedResources(ctx) + } +} + +// Builder provides a way to build a gRPC-based transport to an xDS server. +type Builder struct{} + +// Build creates a new gRPC-based transport to an xDS server using the provided +// options. This involves creating a grpc.ClientConn to the server identified by +// the server URI in the provided options. +func (b *Builder) Build(opts transport.BuildOptions) (transport.Interface, error) { + if opts.ServerConfig == nil { + return nil, fmt.Errorf("ServerConfig field in opts cannot be nil") + } + + // NOTE: The bootstrap package ensures that the server_uri and credentials + // inside the server config are always populated. If we end up using a + // different type in BuildOptions to specify the server configuration, we + // must ensure that those fields are not empty before proceeding. + + // Dial the xDS management server with dial options specified by the server + // configuration and a static keepalive configuration that is common across + // gRPC language implementations. + kpCfg := grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: 5 * time.Minute, + Timeout: 20 * time.Second, + }) + dopts := append(opts.ServerConfig.DialOptions(), kpCfg) + dialer := internal.GRPCNewClient.(func(string, ...grpc.DialOption) (*grpc.ClientConn, error)) + cc, err := dialer(opts.ServerConfig.ServerURI(), dopts...) + if err != nil { + // An error from a non-blocking dial indicates something serious. + return nil, fmt.Errorf("failed to create a grpc transport to the management server %q: %v", opts.ServerConfig.ServerURI(), err) + } + cc.Connect() + + return &grpcTransport{cc: cc}, nil +} + +type grpcTransport struct { + cc *grpc.ClientConn +} + +func (g *grpcTransport) CreateStreamingCall(ctx context.Context, method string) (transport.StreamingCall, error) { + switch method { + case v3adsgrpc.AggregatedDiscoveryService_StreamAggregatedResources_FullMethodName: + return g.newADSStreamingCall(ctx) + case v3lrsgrpc.LoadReportingService_StreamLoadStats_FullMethodName: + return g.newLRSStreamingCall(ctx) + default: + return nil, fmt.Errorf("unsupported method: %v", method) + } +} + +func (g *grpcTransport) newADSStreamingCall(ctx context.Context) (transport.StreamingCall, error) { + newStream := internal.NewADSStream.(func(context.Context, *grpc.ClientConn) (v3adsgrpc.AggregatedDiscoveryService_StreamAggregatedResourcesClient, error)) + stream, err := newStream(ctx, g.cc) + if err != nil { + return nil, fmt.Errorf("failed to create an ADS stream: %v", err) + } + return &adsStream{stream: stream}, nil +} + +func (g *grpcTransport) newLRSStreamingCall(ctx context.Context) (transport.StreamingCall, error) { + stream, err := v3lrsgrpc.NewLoadReportingServiceClient(g.cc).StreamLoadStats(ctx) + if err != nil { + return nil, fmt.Errorf("failed to create an LRS stream: %v", err) + } + return &lrsStream{stream: stream}, nil +} + +func (g *grpcTransport) Close() error { + return g.cc.Close() +} + +type adsStream struct { + stream v3adsgrpc.AggregatedDiscoveryService_StreamAggregatedResourcesClient +} + +func (a *adsStream) Send(msg any) error { + return a.stream.Send(msg.(*v3adspb.DiscoveryRequest)) +} + +func (a *adsStream) Recv() (any, error) { + return a.stream.Recv() +} + +type lrsStream struct { + stream v3lrsgrpc.LoadReportingService_StreamLoadStatsClient +} + +func (l *lrsStream) Send(msg any) error { + return l.stream.Send(msg.(*v3lrspb.LoadStatsRequest)) +} + +func (l *lrsStream) Recv() (any, error) { + return l.stream.Recv() +} diff --git a/xds/internal/xdsclient/transport/grpctransport/grpctransport_ext_test.go b/xds/internal/xdsclient/transport/grpctransport/grpctransport_ext_test.go new file mode 100644 index 000000000000..8831fb8299a7 --- /dev/null +++ b/xds/internal/xdsclient/transport/grpctransport/grpctransport_ext_test.go @@ -0,0 +1,91 @@ +/* + * + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package grpctransport_test + +import ( + "testing" + + "google.golang.org/grpc" + "google.golang.org/grpc/internal/grpctest" + internalbootstrap "google.golang.org/grpc/internal/xds/bootstrap" + "google.golang.org/grpc/xds/internal/xdsclient/internal" + "google.golang.org/grpc/xds/internal/xdsclient/transport" + "google.golang.org/grpc/xds/internal/xdsclient/transport/grpctransport" +) + +type s struct { + grpctest.Tester +} + +func Test(t *testing.T) { + grpctest.RunSubTests(t, s{}) +} + +// Tests that the grpctransport.Builder creates a new grpc.ClientConn every time +// Build() is called. +func (s) TestBuild_CustomDialer(t *testing.T) { + // Override the dialer with a custom one. + customDialerCalled := false + origDialer := internal.GRPCNewClient + internal.GRPCNewClient = func(target string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { + customDialerCalled = true + return grpc.NewClient(target, opts...) + } + defer func() { internal.GRPCNewClient = origDialer }() + + serverCfg, err := internalbootstrap.ServerConfigForTesting(internalbootstrap.ServerConfigTestingOptions{URI: "server-address"}) + if err != nil { + t.Fatalf("Failed to create server config for testing: %v", err) + } + + // Create a new transport and ensure that the custom dialer was called. + opts := transport.BuildOptions{ServerConfig: serverCfg} + builder := &grpctransport.Builder{} + tr, err := builder.Build(opts) + if err != nil { + t.Fatalf("Builder.Build(%+v) failed: %v", opts, err) + } + defer tr.Close() + + if !customDialerCalled { + t.Fatalf("Builder.Build(%+v): custom dialer called = false, want true", opts) + } + customDialerCalled = false + + // Create another transport and ensure that the custom dialer was called. + tr, err = builder.Build(opts) + if err != nil { + t.Fatalf("Builder.Build(%+v) failed: %v", opts, err) + } + defer tr.Close() + + if !customDialerCalled { + t.Fatalf("Builder.Build(%+v): custom dialer called = false, want true", opts) + } +} + +// Tests that the grpctransport.Builder fails to build a transport when the +// provided BuildOptions do not contain a ServerConfig. +func (s) TestBuild_EmptyServerConfig(t *testing.T) { + builder := &grpctransport.Builder{} + opts := transport.BuildOptions{} + if tr, err := builder.Build(opts); err == nil { + tr.Close() + t.Fatalf("Builder.Build(%+v) succeeded when expected to fail", opts) + } +} diff --git a/xds/internal/xdsclient/transport/lrs/lrs_stream.go b/xds/internal/xdsclient/transport/lrs/lrs_stream.go new file mode 100644 index 000000000000..36e70bc7170f --- /dev/null +++ b/xds/internal/xdsclient/transport/lrs/lrs_stream.go @@ -0,0 +1,333 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Package lrs provides the implementation of an LRS (Load Reporting Service) +// stream for the xDS client. +package lrs + +import ( + "context" + "fmt" + "io" + "sync" + "time" + + "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/internal/backoff" + igrpclog "google.golang.org/grpc/internal/grpclog" + "google.golang.org/grpc/internal/grpcsync" + "google.golang.org/grpc/internal/pretty" + "google.golang.org/grpc/xds/internal" + "google.golang.org/grpc/xds/internal/xdsclient/load" + "google.golang.org/grpc/xds/internal/xdsclient/transport" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/durationpb" + + v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" + v3lrspb "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v3" +) + +// Any per-RPC level logs which print complete request or response messages +// should be gated at this verbosity level. Other per-RPC level logs which print +// terse output should be at `INFO` and verbosity 2. +const perRPCVerbosityLevel = 9 + +// StreamImpl provides all the functionality associated with an LRS (Load Reporting +// Service) stream on the client-side. It manages the lifecycle of the LRS stream, +// including starting, stopping, and retrying the stream. It also provides a +// load.Store that can be used to report load, and a cleanup function that should +// be called when the load reporting is no longer needed. +type StreamImpl struct { + // The following fields are initialized when a Stream instance is created + // and are read-only afterwards, and hence can be accessed without a mutex. + transport transport.Interface // Transport to use for LRS stream. + backoff func(int) time.Duration // Backoff for retries after stream failures. + nodeProto *v3corepb.Node // Identifies the gRPC application. + doneCh chan struct{} // To notify exit of LRS goroutine. + logger *igrpclog.PrefixLogger + + // Guards access to the below fields. + mu sync.Mutex + cancelStream context.CancelFunc // Cancel the stream. If nil, the stream is not active. + refCount int // Number of interested parties. + lrsStore *load.Store // Store returned to user for pushing loads. +} + +// StreamOpts holds the options for creating an lrsStream. +type StreamOpts struct { + Transport transport.Interface // xDS transport to create the stream on. + Backoff func(int) time.Duration // Backoff for retries after stream failures. + NodeProto *v3corepb.Node // Node proto to identify the gRPC application. + LogPrefix string // Prefix to be used for log messages. +} + +// NewStreamImpl creates a new StreamImpl with the provided options. +// +// The actual streaming RPC call is initiated when the first call to ReportLoad +// is made, and is terminated when the last call to ReportLoad is canceled. +func NewStreamImpl(opts StreamOpts) *StreamImpl { + lrs := &StreamImpl{ + transport: opts.Transport, + backoff: opts.Backoff, + nodeProto: opts.NodeProto, + lrsStore: load.NewStore(), + } + + l := grpclog.Component("xds") + lrs.logger = igrpclog.NewPrefixLogger(l, opts.LogPrefix+fmt.Sprintf("[lrs-stream %p] ", lrs)) + return lrs +} + +// ReportLoad returns a load.Store that can be used to report load, and a +// cleanup function that should be called when the load reporting is no longer +// needed. +// +// The first call to ReportLoad sets the reference count to one, and starts the +// LRS streaming call. Subsequent calls increment the reference count and return +// the same load.Store. +// +// The cleanup function decrements the reference count and stops the LRS stream +// when the last reference is removed. +func (lrs *StreamImpl) ReportLoad() (*load.Store, func()) { + lrs.mu.Lock() + defer lrs.mu.Unlock() + + cleanup := grpcsync.OnceFunc(func() { + lrs.mu.Lock() + defer lrs.mu.Unlock() + + if lrs.refCount == 0 { + lrs.logger.Errorf("Attempting to stop already stopped StreamImpl") + return + } + lrs.refCount-- + if lrs.refCount != 0 { + return + } + lrs.cancelStream() + lrs.cancelStream = nil + lrs.logger.Infof("Stopping StreamImpl") + }) + + if lrs.refCount != 0 { + lrs.refCount++ + return lrs.lrsStore, cleanup + } + + lrs.refCount++ + ctx, cancel := context.WithCancel(context.Background()) + lrs.cancelStream = cancel + lrs.doneCh = make(chan struct{}) + go lrs.runner(ctx) + return lrs.lrsStore, cleanup +} + +// runner is responsible for managing the lifetime of an LRS streaming call. It +// creates the stream, sends the initial LoadStatsRequest, receives the first +// LoadStatsResponse, and then starts a goroutine to periodically send +// LoadStatsRequests. The runner will restart the stream if it encounters any +// errors. +func (lrs *StreamImpl) runner(ctx context.Context) { + defer close(lrs.doneCh) + + // This feature indicates that the client supports the + // LoadStatsResponse.send_all_clusters field in the LRS response. + node := proto.Clone(lrs.nodeProto).(*v3corepb.Node) + node.ClientFeatures = append(node.ClientFeatures, "envoy.lrs.supports_send_all_clusters") + + runLoadReportStream := func() error { + // streamCtx is created and canceled in case we terminate the stream + // early for any reason, to avoid gRPC-Go leaking the RPC's monitoring + // goroutine. + streamCtx, cancel := context.WithCancel(ctx) + defer cancel() + + stream, err := lrs.transport.CreateStreamingCall(streamCtx, "/envoy.service.load_stats.v3.LoadReportingService/StreamLoadStats") + if err != nil { + lrs.logger.Warningf("Failed to create new LRS streaming RPC: %v", err) + return nil + } + if lrs.logger.V(2) { + lrs.logger.Infof("LRS stream created") + } + + if err := lrs.sendFirstLoadStatsRequest(stream, node); err != nil { + lrs.logger.Warningf("Sending first LRS request failed: %v", err) + return nil + } + + clusters, interval, err := lrs.recvFirstLoadStatsResponse(stream) + if err != nil { + lrs.logger.Warningf("Reading from LRS streaming RPC failed: %v", err) + return nil + } + + // We reset backoff state when we successfully receive at least one + // message from the server. + lrs.sendLoads(streamCtx, stream, clusters, interval) + return backoff.ErrResetBackoff + } + backoff.RunF(ctx, runLoadReportStream, lrs.backoff) +} + +// sendLoads is responsible for periodically sending load reports to the LRS +// server at the specified interval for the specified clusters, until the passed +// in context is canceled. +func (lrs *StreamImpl) sendLoads(ctx context.Context, stream transport.StreamingCall, clusterNames []string, interval time.Duration) { + tick := time.NewTicker(interval) + defer tick.Stop() + for { + select { + case <-tick.C: + case <-ctx.Done(): + return + } + if err := lrs.sendLoadStatsRequest(stream, lrs.lrsStore.Stats(clusterNames)); err != nil { + lrs.logger.Warningf("Writing to LRS stream failed: %v", err) + return + } + } +} + +func (lrs *StreamImpl) sendFirstLoadStatsRequest(stream transport.StreamingCall, node *v3corepb.Node) error { + req := &v3lrspb.LoadStatsRequest{Node: node} + if lrs.logger.V(perRPCVerbosityLevel) { + lrs.logger.Infof("Sending initial LoadStatsRequest: %s", pretty.ToJSON(req)) + } + err := stream.Send(req) + if err == io.EOF { + return getStreamError(stream) + } + return err +} + +// recvFirstLoadStatsResponse receives the first LoadStatsResponse from the LRS +// server. Returns the following: +// - a list of cluster names requested by the server or an empty slice if the +// server requested for load from all clusters +// - the load reporting interval, and +// - any error encountered +func (lrs *StreamImpl) recvFirstLoadStatsResponse(stream transport.StreamingCall) ([]string, time.Duration, error) { + r, err := stream.Recv() + if err != nil { + return nil, 0, fmt.Errorf("lrs: failed to receive first LoadStatsResponse: %v", err) + } + resp, ok := r.(*v3lrspb.LoadStatsResponse) + if !ok { + return nil, time.Duration(0), fmt.Errorf("lrs: unexpected message type %T", r) + } + if lrs.logger.V(perRPCVerbosityLevel) { + lrs.logger.Infof("Received first LoadStatsResponse: %s", pretty.ToJSON(resp)) + } + + internal := resp.GetLoadReportingInterval() + if internal.CheckValid() != nil { + return nil, 0, fmt.Errorf("lrs: invalid load_reporting_interval: %v", err) + } + loadReportingInterval := internal.AsDuration() + + clusters := resp.Clusters + if resp.SendAllClusters { + // Return an empty slice to send stats for all clusters. + clusters = []string{} + } + + return clusters, loadReportingInterval, nil +} + +func (lrs *StreamImpl) sendLoadStatsRequest(stream transport.StreamingCall, loads []*load.Data) error { + clusterStats := make([]*v3endpointpb.ClusterStats, 0, len(loads)) + for _, sd := range loads { + droppedReqs := make([]*v3endpointpb.ClusterStats_DroppedRequests, 0, len(sd.Drops)) + for category, count := range sd.Drops { + droppedReqs = append(droppedReqs, &v3endpointpb.ClusterStats_DroppedRequests{ + Category: category, + DroppedCount: count, + }) + } + localityStats := make([]*v3endpointpb.UpstreamLocalityStats, 0, len(sd.LocalityStats)) + for l, localityData := range sd.LocalityStats { + lid, err := internal.LocalityIDFromString(l) + if err != nil { + return err + } + loadMetricStats := make([]*v3endpointpb.EndpointLoadMetricStats, 0, len(localityData.LoadStats)) + for name, loadData := range localityData.LoadStats { + loadMetricStats = append(loadMetricStats, &v3endpointpb.EndpointLoadMetricStats{ + MetricName: name, + NumRequestsFinishedWithMetric: loadData.Count, + TotalMetricValue: loadData.Sum, + }) + } + localityStats = append(localityStats, &v3endpointpb.UpstreamLocalityStats{ + Locality: &v3corepb.Locality{ + Region: lid.Region, + Zone: lid.Zone, + SubZone: lid.SubZone, + }, + TotalSuccessfulRequests: localityData.RequestStats.Succeeded, + TotalRequestsInProgress: localityData.RequestStats.InProgress, + TotalErrorRequests: localityData.RequestStats.Errored, + TotalIssuedRequests: localityData.RequestStats.Issued, + LoadMetricStats: loadMetricStats, + UpstreamEndpointStats: nil, // TODO: populate for per endpoint loads. + }) + } + + clusterStats = append(clusterStats, &v3endpointpb.ClusterStats{ + ClusterName: sd.Cluster, + ClusterServiceName: sd.Service, + UpstreamLocalityStats: localityStats, + TotalDroppedRequests: sd.TotalDrops, + DroppedRequests: droppedReqs, + LoadReportInterval: durationpb.New(sd.ReportInterval), + }) + } + + req := &v3lrspb.LoadStatsRequest{ClusterStats: clusterStats} + if lrs.logger.V(perRPCVerbosityLevel) { + lrs.logger.Infof("Sending LRS loads: %s", pretty.ToJSON(req)) + } + err := stream.Send(req) + if err == io.EOF { + return getStreamError(stream) + } + return err +} + +func getStreamError(stream transport.StreamingCall) error { + for { + if _, err := stream.Recv(); err != nil { + return err + } + } +} + +// Stop blocks until the stream is closed and all spawned goroutines exit. +func (lrs *StreamImpl) Stop() { + lrs.mu.Lock() + defer lrs.mu.Unlock() + + if lrs.cancelStream == nil { + return + } + lrs.cancelStream() + lrs.cancelStream = nil + lrs.logger.Infof("Stopping LRS stream") + <-lrs.doneCh +} diff --git a/xds/internal/xdsclient/transport/transport_interface.go b/xds/internal/xdsclient/transport/transport_interface.go new file mode 100644 index 000000000000..db8a19931916 --- /dev/null +++ b/xds/internal/xdsclient/transport/transport_interface.go @@ -0,0 +1,67 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Package transport defines the interface that describe the functionality +// required to communicate with an xDS server using streaming calls. +package transport + +import ( + "context" + + "google.golang.org/grpc/internal/xds/bootstrap" +) + +// Builder is an interface for building a new xDS transport. +type Builder interface { + // Build creates a new xDS transport with the provided options. + Build(opts BuildOptions) (Transport, error) +} + +// BuildOptions contains the options for building a new xDS transport. +type BuildOptions struct { + // ServerConfig contains the configuration that controls how the transport + // interacts with the XDS server. This includes the server URI and the + // credentials to use to connect to the server, among other things. + ServerConfig *bootstrap.ServerConfig +} + +// Interface provides the functionality to communicate with an XDS server using +// streaming calls. +// +// TODO(easwars): Rename this to Transport once the existing Transport type is +// removed. +type Interface interface { + // CreateStreamingCall creates a new streaming call to the XDS server for the + // specified method name. The returned StreamingCall interface can be used to + // send and receive messages on the stream. + CreateStreamingCall(context.Context, string) (StreamingCall, error) + + // Close closes the underlying connection and cleans up any resources used by the + // Transport. + Close() error +} + +// StreamingCall is an interface that provides a way to send and receive +// messages on a stream. The methods accept or return any.Any messages instead +// of concrete types to allow this interface to be used for both ADS and LRS. +type StreamingCall interface { + // Send sends the provided message on the stream. + Send(any) error + + // Recv block until the next message is received on the stream. + Recv() (any, error) +}