Skip to content

Commit

Permalink
xds client ads stream without e2e test/example
Browse files Browse the repository at this point in the history
  • Loading branch information
purnesh42H committed Dec 9, 2024
1 parent 1fea0aa commit 9f93856
Show file tree
Hide file tree
Showing 16 changed files with 2,010 additions and 348 deletions.
60 changes: 59 additions & 1 deletion xds/clients/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,20 @@

package clients

import (
"fmt"
"slices"
"strings"

v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/structpb"
)

// ServerConfig contains the configuration to connect to a server.
type ServerConfig struct {
ServerURI string
ServerURI string
IgnoreResourceDeletion bool

Extensions any
}
Expand All @@ -41,6 +52,8 @@ type Node struct {
Metadata any
UserAgentName string
UserAgentVersion string

clientFeatures []string
}

// Locality is the representation of the locality field within a node.
Expand All @@ -49,3 +62,48 @@ type Locality struct {
Zone string
SubZone string
}

// Equal reports whether sc and other are considered equal.
func (sc *ServerConfig) Equal(other *ServerConfig) bool {
switch {
case sc == nil && other == nil:
return true
case sc.ServerURI != other.ServerURI:
return false

Check warning on line 72 in xds/clients/config.go

View check run for this annotation

Codecov / codecov/patch

xds/clients/config.go#L67-L72

Added lines #L67 - L72 were not covered by tests
}
return true

Check warning on line 74 in xds/clients/config.go

View check run for this annotation

Codecov / codecov/patch

xds/clients/config.go#L74

Added line #L74 was not covered by tests
}

// String returns the string representation of the ServerConfig.
func (sc *ServerConfig) String() string {
return strings.Join([]string{sc.ServerURI, fmt.Sprintf("%v", sc.IgnoreResourceDeletion)}, "-")

Check warning on line 79 in xds/clients/config.go

View check run for this annotation

Codecov / codecov/patch

xds/clients/config.go#L78-L79

Added lines #L78 - L79 were not covered by tests
}

func (n Node) ToProto() *v3corepb.Node {
return &v3corepb.Node{
Id: n.ID,
Cluster: n.Cluster,
Locality: func() *v3corepb.Locality {
if n.Locality.IsEmpty() {
return nil
}
return &v3corepb.Locality{
Region: n.Locality.Region,
Zone: n.Locality.Zone,
SubZone: n.Locality.SubZone,
}

Check warning on line 94 in xds/clients/config.go

View check run for this annotation

Codecov / codecov/patch

xds/clients/config.go#L82-L94

Added lines #L82 - L94 were not covered by tests
}(),
Metadata: proto.Clone(n.Metadata.(*structpb.Struct)).(*structpb.Struct),
UserAgentName: n.UserAgentName,
UserAgentVersionType: &v3corepb.Node_UserAgentVersion{UserAgentVersion: n.UserAgentVersion},
ClientFeatures: slices.Clone(n.clientFeatures),
}
}

func (l Locality) IsEmpty() bool {
return l.Equal(Locality{})

Check warning on line 104 in xds/clients/config.go

View check run for this annotation

Codecov / codecov/patch

xds/clients/config.go#L103-L104

Added lines #L103 - L104 were not covered by tests
}

func (l Locality) Equal(other Locality) bool {
return l.Region == other.Region && l.Zone == other.Zone && l.SubZone == other.SubZone

Check warning on line 108 in xds/clients/config.go

View check run for this annotation

Codecov / codecov/patch

xds/clients/config.go#L107-L108

Added lines #L107 - L108 were not covered by tests
}
46 changes: 35 additions & 11 deletions xds/clients/grpctransport/grpc_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,13 @@ type grpcTransport struct {
}

func (g *grpcTransport) NewStream(ctx context.Context, method string) (clients.Stream[clients.StreamRequest, any], error) {
return nil, nil
switch method {
case v3adsgrpc.AggregatedDiscoveryService_StreamAggregatedResources_FullMethodName:
return g.newADSStream(ctx)
case v3lrsgrpc.LoadReportingService_StreamLoadStats_FullMethodName:
return g.newLRSStream(ctx)

Check warning on line 104 in xds/clients/grpctransport/grpc_transport.go

View check run for this annotation

Codecov / codecov/patch

xds/clients/grpctransport/grpc_transport.go#L99-L104

Added lines #L99 - L104 were not covered by tests
}
return nil, fmt.Errorf("unsupported method: %v", method)

Check warning on line 106 in xds/clients/grpctransport/grpc_transport.go

View check run for this annotation

Codecov / codecov/patch

xds/clients/grpctransport/grpc_transport.go#L106

Added line #L106 was not covered by tests
}

func (g *grpcTransport) Close() error {
Expand All @@ -108,46 +114,64 @@ type adsStream[Req clients.StreamRequest, Res any] struct {
stream v3adsgrpc.AggregatedDiscoveryService_StreamAggregatedResourcesClient
}

func (a *adsStream[Req, Res]) Send(msg *Req) error {
func (a *adsStream[Req, Res]) Send(msg Req) error {
protoReq, ok := any(msg).(proto.Message)
if !ok {
return fmt.Errorf("msg %v is not a valid Protobuf message", msg)
}
return a.stream.Send(protoReq.(*v3adspb.DiscoveryRequest))

Check warning on line 122 in xds/clients/grpctransport/grpc_transport.go

View check run for this annotation

Codecov / codecov/patch

xds/clients/grpctransport/grpc_transport.go#L117-L122

Added lines #L117 - L122 were not covered by tests
}

func (a *adsStream[Req, Res]) Recv() (*Res, error) {
func (a *adsStream[Req, Res]) Recv() (Res, error) {
var typedRes Res
res, err := a.stream.Recv()
if err != nil {
return nil, err
return typedRes, err
}
typedRes, ok := any(res).(Res)
if !ok {
return nil, fmt.Errorf("response type mismatch")
return typedRes, fmt.Errorf("response type mismatch")
}
return &typedRes, nil
return typedRes, nil

Check warning on line 135 in xds/clients/grpctransport/grpc_transport.go

View check run for this annotation

Codecov / codecov/patch

xds/clients/grpctransport/grpc_transport.go#L125-L135

Added lines #L125 - L135 were not covered by tests
}

type lrsStream[Req clients.StreamRequest, Res any] struct {
stream v3lrsgrpc.LoadReportingService_StreamLoadStatsClient
}

func (l *lrsStream[Req, Res]) Send(msg *Req) error {
func (l *lrsStream[Req, Res]) Send(msg Req) error {
protoReq, ok := any(msg).(proto.Message)
if !ok {
return fmt.Errorf("msg %v is not a valid Protobuf message", msg)
}
return l.stream.Send(protoReq.(*v3lrspb.LoadStatsRequest))

Check warning on line 147 in xds/clients/grpctransport/grpc_transport.go

View check run for this annotation

Codecov / codecov/patch

xds/clients/grpctransport/grpc_transport.go#L142-L147

Added lines #L142 - L147 were not covered by tests
}

func (l *lrsStream[Req, Res]) Recv() (*Res, error) {
func (l *lrsStream[Req, Res]) Recv() (Res, error) {
var typedRes Res
res, err := l.stream.Recv()
if err != nil {
return nil, err
return typedRes, err
}
typedRes, ok := any(res).(Res)
if !ok {
return nil, fmt.Errorf("response type mismatch")
return typedRes, fmt.Errorf("response type mismatch")
}
return typedRes, nil

Check warning on line 160 in xds/clients/grpctransport/grpc_transport.go

View check run for this annotation

Codecov / codecov/patch

xds/clients/grpctransport/grpc_transport.go#L150-L160

Added lines #L150 - L160 were not covered by tests
}

func (g *grpcTransport) newADSStream(ctx context.Context) (clients.Stream[clients.StreamRequest, any], error) {
stream, err := v3adsgrpc.NewAggregatedDiscoveryServiceClient(g.cc).StreamAggregatedResources(ctx)
if err != nil {
return nil, fmt.Errorf("failed to create an ADS stream: %v", err)
}
return &adsStream[clients.StreamRequest, any]{stream: stream}, nil

Check warning on line 168 in xds/clients/grpctransport/grpc_transport.go

View check run for this annotation

Codecov / codecov/patch

xds/clients/grpctransport/grpc_transport.go#L163-L168

Added lines #L163 - L168 were not covered by tests
}

func (g *grpcTransport) newLRSStream(ctx context.Context) (clients.Stream[clients.StreamRequest, any], error) {
stream, err := v3lrsgrpc.NewLoadReportingServiceClient(g.cc).StreamLoadStats(ctx)
if err != nil {
return nil, err
}
return &typedRes, nil
return &lrsStream[clients.StreamRequest, any]{stream: stream}, nil

Check warning on line 176 in xds/clients/grpctransport/grpc_transport.go

View check run for this annotation

Codecov / codecov/patch

xds/clients/grpctransport/grpc_transport.go#L171-L176

Added lines #L171 - L176 were not covered by tests
}
Loading

0 comments on commit 9f93856

Please sign in to comment.