Skip to content

Commit

Permalink
balancer/endpointsharding: Ignore empty endpoints (#7674)
Browse files Browse the repository at this point in the history
  • Loading branch information
zasweq authored Oct 28, 2024
1 parent 4084b14 commit e7435d6
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 14 deletions.
19 changes: 5 additions & 14 deletions balancer/endpointsharding/endpointsharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ package endpointsharding
import (
"encoding/json"
"errors"
"fmt"
"math/rand"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -80,20 +79,9 @@ type endpointSharding struct {
// for endpoints that are no longer present. It also updates all the children,
// and sends a single synchronous update of the childrens' aggregated state at
// the end of the UpdateClientConnState operation. If any endpoint has no
// addresses, returns error without forwarding any updates. Otherwise returns
// first error found from a child, but fully processes the new update.
// addresses it will ignore that endpoint. Otherwise, returns first error found
// from a child, but fully processes the new update.
func (es *endpointSharding) UpdateClientConnState(state balancer.ClientConnState) error {
if len(state.ResolverState.Endpoints) == 0 {
return errors.New("endpoints list is empty")
}
// Check/return early if any endpoints have no addresses.
// TODO: make this configurable if needed.
for i, endpoint := range state.ResolverState.Endpoints {
if len(endpoint.Addresses) == 0 {
return fmt.Errorf("endpoint %d has empty addresses", i)
}
}

es.inhibitChildUpdates.Store(true)
defer func() {
es.inhibitChildUpdates.Store(false)
Expand All @@ -106,6 +94,9 @@ func (es *endpointSharding) UpdateClientConnState(state balancer.ClientConnState

// Update/Create new children.
for _, endpoint := range state.ResolverState.Endpoints {
if len(endpoint.Addresses) == 0 {
continue
}
if _, ok := newChildren.Get(endpoint); ok {
// Endpoint child was already created, continue to avoid duplicate
// update.
Expand Down
18 changes: 18 additions & 0 deletions resolver/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package resolver

import (
"context"
"errors"
"fmt"
"net"
"net/url"
Expand Down Expand Up @@ -330,3 +331,20 @@ type AuthorityOverrider interface {
// typically in line, and must keep it unchanged.
OverrideAuthority(Target) string
}

// ValidateEndpoints validates endpoints from a petiole policy's perspective.
// Petiole policies should call this before calling into their children. See
// [gRPC A61](https://github.com/grpc/proposal/blob/master/A61-IPv4-IPv6-dualstack-backends.md)
// for details.
func ValidateEndpoints(endpoints []Endpoint) error {
if len(endpoints) == 0 {
return errors.New("endpoints list is empty")
}

for _, endpoint := range endpoints {
for range endpoint.Addresses {
return nil
}
}
return errors.New("endpoints list contains no addresses")
}
72 changes: 72 additions & 0 deletions resolver/resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,75 @@ type s struct {
func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}

// TestValidateEndpoints tests different scenarios of resolver addresses being
// validated by the ValidateEndpoint helper.
func (s) TestValidateEndpoints(t *testing.T) {
addr1 := Address{Addr: "addr1"}
addr2 := Address{Addr: "addr2"}
addr3 := Address{Addr: "addr3"}
addr4 := Address{Addr: "addr4"}
tests := []struct {
name string
endpoints []Endpoint
wantErr bool
}{
{
name: "duplicate-address-across-endpoints",
endpoints: []Endpoint{
{Addresses: []Address{addr1}},
{Addresses: []Address{addr1}},
},
wantErr: false,
},
{
name: "duplicate-address-same-endpoint",
endpoints: []Endpoint{
{Addresses: []Address{addr1, addr1}},
},
wantErr: false,
},
{
name: "duplicate-address-across-endpoints-plural-addresses",
endpoints: []Endpoint{
{Addresses: []Address{addr1, addr2, addr3}},
{Addresses: []Address{addr3, addr4}},
},
wantErr: false,
},
{
name: "no-shared-addresses",
endpoints: []Endpoint{
{Addresses: []Address{addr1, addr2}},
{Addresses: []Address{addr3, addr4}},
},
wantErr: false,
},
{
name: "endpoint-with-no-addresses",
endpoints: []Endpoint{
{Addresses: []Address{addr1, addr2}},
{Addresses: []Address{}},
},
wantErr: false,
},
{
name: "empty-endpoints-list",
endpoints: []Endpoint{},
wantErr: true,
},
{
name: "endpoint-list-with-no-addresses",
endpoints: []Endpoint{{}, {}},
wantErr: true,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
err := ValidateEndpoints(test.endpoints)
if (err != nil) != test.wantErr {
t.Fatalf("ValidateEndpoints() wantErr: %v, got: %v", test.wantErr, err)
}
})
}
}

0 comments on commit e7435d6

Please sign in to comment.