Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

resolver: State: add Endpoints and deprecate Addresses #6471

Merged
merged 2 commits into from
Jul 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ type SubConn interface {
//
// This will trigger a state transition for the SubConn.
//
// Deprecated: This method is now part of the ClientConn interface and will
// eventually be removed from here.
// Deprecated: this method will be removed. Create new SubConns for new
easwars marked this conversation as resolved.
Show resolved Hide resolved
// addresses instead.
UpdateAddresses([]resolver.Address)
// Connect starts the connecting for this SubConn.
Connect()
Expand Down Expand Up @@ -150,6 +150,9 @@ type ClientConn interface {
// NewSubConn is called by balancer to create a new SubConn.
// It doesn't block and wait for the connections to be established.
// Behaviors of the SubConn can be controlled by options.
//
// Deprecated: please be aware that in a future version, SubConns will only
// support one address per SubConn.
NewSubConn([]resolver.Address, NewSubConnOptions) (SubConn, error)
// RemoveSubConn removes the SubConn from ClientConn.
// The SubConn will be shutdown.
Expand All @@ -159,7 +162,10 @@ type ClientConn interface {
// If so, the connection will be kept. Else, the connection will be
// gracefully closed, and a new connection will be created.
//
// This will trigger a state transition for the SubConn.
// This may trigger a state transition for the SubConn.
//
// Deprecated: this method will be removed. Create new SubConns for new
// addresses instead.
UpdateAddresses(SubConn, []resolver.Address)

// UpdateState notifies gRPC that the balancer's internal state has
Expand Down
4 changes: 4 additions & 0 deletions internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,10 @@ var (

// ORCAAllowAnyMinReportingInterval is for examples/orca use ONLY.
ORCAAllowAnyMinReportingInterval interface{} // func(so *orca.ServiceOptions)

// GRPCResolverSchemeExtraMetadata determines when gRPC will add extra
// metadata to RPCs.
GRPCResolverSchemeExtraMetadata string = "xds"
Comment on lines +168 to +170
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this supposed to be only for the xds scheme or do we plan to add this for the xdstp scheme as well?

Also, could you help me figure out how this is used. The code in stream.go adds the metadata, but I don't see how or where this is used. Thanks.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we supporting xdstp for the target string? If so, this needs to be extended.

This is read here when doing metadata matching for routes:

if extraMD, ok := grpcutil.ExtraMetadata(info.Context); ok {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we supporting xdstp for the target string?

Hmm ... A47 section on target-uri syntax says we will continue to support the xds scheme for convenience. It does not say that we will start supporting the xdstp scheme. Nor does it say we will not support the xdstp scheme and instead the translations will be handled by the bootstrap configuration.

Do you think we should improve the language in this gRFC section or is it clear to you?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not super air-tight, but by not saying "we're supporting xdstp as a gRPC name resolver scheme and here's how", then I wouldn't assume we are adding it. We'd need a spec for how to handle it as a channel-level target. The text that follows does describe how we'll support this new stuff in the existing xds resolver instead of adding a top-level resolver scheme for xdstp:

we want the deployment to be able to decide to use new-style resource names without requiring users to explicitly specify them. To that end, we will add support for an optional authority in the xds URI. The procedure for handling an xds URI is as follows:

)

// HealthChecker defines the signature of the client-side LB channel health checking function.
Expand Down
36 changes: 29 additions & 7 deletions resolver/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ type Address struct {
// BalancerAttributes contains arbitrary data about this address intended
// for consumption by the LB policy. These attributes do not affect SubConn
// creation, connection establishment, handshaking, etc.
//
// Deprecated: when an Address is inside an Endpoint, this field should not
// be used, and it will eventually be removed entirely.
BalancerAttributes *attributes.Attributes

// Metadata is the information associated with Addr, which may be used
Expand Down Expand Up @@ -167,11 +170,37 @@ type BuildOptions struct {
Dialer func(context.Context, string) (net.Conn, error)
}

// An Endpoint is one network endpoint, or server, which may have multiple
// addresses with which it can be accessed.
type Endpoint struct {
// Addresses contains a list of addresses used to access this endpoint.
Addresses []Address

// Attributes contains arbitrary data about this endpoint intended for
// consumption by the LB policy.
Attributes *attributes.Attributes
easwars marked this conversation as resolved.
Show resolved Hide resolved
}

// State contains the current Resolver state relevant to the ClientConn.
type State struct {
// Addresses is the latest set of resolved addresses for the target.
//
// If a resolver sets Addresses but does not set Endpoints, one Endpoint
// will be created for each Address before the State is passed to the LB
// policy. The BalancerAttributes of each entry in Addresses will be set
// in Endpoints.Attributes, and be cleared in the Endpoint's Address's
// BalancerAttributes.
//
// Soon, Addresses will be deprecated and replaced fully by Endpoints.
Addresses []Address

// Endpoints is the latest set of resolved endpoints for the target.
//
// If a resolver produces a State containing Endpoints but not Addresses,
// it must take care to ensure the LB policies it selects will support
// Endpoints.
Endpoints []Endpoint

// ServiceConfig contains the result from parsing the latest service
// config. If it is nil, it indicates no service config is present or the
// resolver does not provide service configs.
Expand Down Expand Up @@ -294,10 +323,3 @@ type Resolver interface {
// Close closes the resolver.
Close()
}

// UnregisterForTesting removes the resolver builder with the given scheme from the
// resolver map.
// This function is for testing only.
func UnregisterForTesting(scheme string) {
delete(m, scheme)
}
8 changes: 8 additions & 0 deletions resolver_conn_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,14 @@ func (ccr *ccResolverWrapper) serializerScheduleLocked(f func(context.Context))
// which includes addresses and service config.
func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error {
errCh := make(chan error, 1)
if s.Endpoints == nil {
s.Endpoints = make([]resolver.Endpoint, 0, len(s.Addresses))
for _, a := range s.Addresses {
ep := resolver.Endpoint{Addresses: []resolver.Address{a}, Attributes: a.BalancerAttributes}
ep.Addresses[0].BalancerAttributes = nil
s.Endpoints = append(s.Endpoints, ep)
}
}
ok := ccr.serializer.Schedule(func(context.Context) {
ccr.addChannelzTraceEvent(s)
ccr.curState = s
Expand Down
51 changes: 51 additions & 0 deletions resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,13 @@ import (
"net"
"testing"

"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/attributes"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/balancer/stub"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
)

type wrapResolverBuilder struct {
Expand Down Expand Up @@ -91,3 +96,49 @@ func (s) TestResolverCaseSensitivity(t *testing.T) {
}
cc.Close()
}

// TestResolverAddressesToEndpoints ensures one Endpoint is created for each
// entry in resolver.State.Addresses automatically.
func (s) TestResolverAddressesToEndpoints(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

const scheme = "testresolveraddressestoendpoints"
r := manual.NewBuilderWithScheme(scheme)

stateCh := make(chan balancer.ClientConnState, 1)
bf := stub.BalancerFuncs{
UpdateClientConnState: func(_ *stub.BalancerData, ccs balancer.ClientConnState) error {
stateCh <- ccs
return nil
},
}
balancerName := "stub-balancer-" + scheme
stub.Register(balancerName, bf)

a1 := attributes.New("x", "y")
a2 := attributes.New("a", "b")
r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: "addr1", BalancerAttributes: a1}, {Addr: "addr2", BalancerAttributes: a2}}})

cc, err := Dial(r.Scheme()+":///",
WithTransportCredentials(insecure.NewCredentials()),
WithResolvers(r),
WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, balancerName)))
if err != nil {
t.Fatalf("Unexpected error dialing: %v", err)
}
defer cc.Close()

select {
case got := <-stateCh:
want := []resolver.Endpoint{
{Addresses: []resolver.Address{{Addr: "addr1"}}, Attributes: a1},
{Addresses: []resolver.Address{{Addr: "addr2"}}, Attributes: a2},
}
if diff := cmp.Diff(got.ResolverState.Endpoints, want); diff != "" {
t.Errorf("Did not receive expected endpoints. Diff (-got +want):\n%v", diff)
}
case <-ctx.Done():
t.Fatalf("timed out waiting for endpoints")
}
}
3 changes: 2 additions & 1 deletion stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/encoding"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/balancerload"
"google.golang.org/grpc/internal/binarylog"
"google.golang.org/grpc/internal/channelz"
Expand Down Expand Up @@ -433,7 +434,7 @@ func (cs *clientStream) newAttemptLocked(isTransparent bool) (*csAttempt, error)
ctx = trace.NewContext(ctx, trInfo.tr)
}

if cs.cc.parsedTarget.URL.Scheme == "xds" {
if cs.cc.parsedTarget.URL.Scheme == internal.GRPCResolverSchemeExtraMetadata {
// Add extra metadata (metadata that will be added by transport) to context
// so the balancer can see them.
ctx = grpcutil.WithExtraMetadata(ctx, metadata.Pairs(
Expand Down
13 changes: 5 additions & 8 deletions test/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/balancer/stub"
"google.golang.org/grpc/internal/balancerload"
"google.golang.org/grpc/internal/grpcsync"
Expand Down Expand Up @@ -196,14 +197,10 @@ func testPickExtraMetadata(t *testing.T, e env) {
te.startServer(&testServer{security: e.security})
defer te.tearDown()

// Set resolver to xds to trigger the extra metadata code path.
r := manual.NewBuilderWithScheme("xds")
resolver.Register(r)
defer func() {
resolver.UnregisterForTesting("xds")
}()
r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: te.srvAddr}}})
te.resolverScheme = "xds"
// Trigger the extra-metadata-adding code path.
defer func(old string) { internal.GRPCResolverSchemeExtraMetadata = old }(internal.GRPCResolverSchemeExtraMetadata)
internal.GRPCResolverSchemeExtraMetadata = "passthrough"

cc := te.clientConn()
tc := testgrpc.NewTestServiceClient(cc)

Expand Down
22 changes: 2 additions & 20 deletions xds/googledirectpath/googlec2p_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,31 +60,13 @@ var (
)

func replaceResolvers() func() {
var registerForTesting bool
if resolver.Get(c2pScheme) == nil {
// If env var to enable c2p is not set, the resolver isn't registered.
// Need to register and unregister in defer.
registerForTesting = true
resolver.Register(&c2pResolverBuilder{})
}
oldDNS := resolver.Get("dns")
resolver.Register(testDNSResolver)
oldXDS := resolver.Get("xds")
resolver.Register(testXDSResolver)
return func() {
if oldDNS != nil {
resolver.Register(oldDNS)
} else {
resolver.UnregisterForTesting("dns")
}
if oldXDS != nil {
resolver.Register(oldXDS)
} else {
resolver.UnregisterForTesting("xds")
}
if registerForTesting {
resolver.UnregisterForTesting(c2pScheme)
}
resolver.Register(oldDNS)
resolver.Register(oldXDS)
}
}

Expand Down
7 changes: 7 additions & 0 deletions xds/internal/balancer/clusterresolver/clusterresolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,15 @@ func (b *clusterResolverBalancer) updateChildConfig() {
}
b.logger.Infof("Built child policy config: %v", pretty.ToJSON(childCfg))

endpoints := make([]resolver.Endpoint, len(addrs))
for i, a := range addrs {
endpoints[i].Attributes = a.BalancerAttributes
endpoints[i].Addresses = []resolver.Address{a}
endpoints[i].Addresses[0].BalancerAttributes = nil
}
if err := b.child.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{
Endpoints: endpoints,
Addresses: addrs,
ServiceConfig: b.configRaw,
Attributes: b.attrsWithClient,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ func setupDNS() (chan resolver.Target, chan struct{}, chan resolver.ResolveNowOp
mr.ResolveNowCallback = func(opts resolver.ResolveNowOptions) { resolveNowCh <- opts }

dnsResolverBuilder := resolver.Get("dns")
resolver.UnregisterForTesting("dns")
resolver.Register(mr)

return targetCh, closeCh, resolveNowCh, mr, func() { resolver.Register(dnsResolverBuilder) }
Expand Down
18 changes: 15 additions & 3 deletions xds/internal/balancer/clusterresolver/resource_resolver_dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,21 @@ func (dr *dnsDiscoveryMechanism) UpdateState(state resolver.State) error {
}

dr.mu.Lock()
addrs := make([]string, len(state.Addresses))
for i, a := range state.Addresses {
addrs[i] = a.Addr
var addrs []string
if len(state.Endpoints) > 0 {
// Assume 1 address per endpoint, which is how DNS is expected to
// behave. The slice will grow as needed, however.
addrs = make([]string, 0, len(state.Endpoints))
for _, e := range state.Endpoints {
for _, a := range e.Addresses {
addrs = append(addrs, a.Addr)
}
}
} else {
addrs = make([]string, len(state.Addresses))
for i, a := range state.Addresses {
addrs[i] = a.Addr
}
}
dr.addrs = addrs
dr.updateReceived = true
Expand Down