Skip to content

Commit

Permalink
xdsclient: make sending requests more deterministic (#7774)
Browse files Browse the repository at this point in the history
  • Loading branch information
easwars authored Oct 25, 2024
1 parent 94e1b29 commit 67b9ebf
Showing 1 changed file with 39 additions and 24 deletions.
63 changes: 39 additions & 24 deletions xds/internal/xdsclient/transport/ads/ads_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ type resourceTypeState struct {
nonce string // Last received nonce. Should be reset when the stream breaks.
bufferedRequests chan struct{} // Channel to buffer requests when writing is blocked.
subscribedResources map[string]*ResourceWatchState // Map of subscribed resource names to their state.
pendingWrite bool // True if there is a pending write for this resource type.
}

// StreamImpl provides the functionality associated with an ADS (Aggregated
Expand Down Expand Up @@ -203,6 +204,7 @@ func (s *StreamImpl) Subscribe(typ xdsresource.Type, name string) {
// Create state for the newly subscribed resource. The watch timer will
// be started when a request for this resource is actually sent out.
state.subscribedResources[name] = &ResourceWatchState{State: ResourceWatchStateStarted}
state.pendingWrite = true

// Send a request for the resource type with updated subscriptions.
s.requestCh.Put(typ)
Expand Down Expand Up @@ -233,6 +235,7 @@ func (s *StreamImpl) Unsubscribe(typ xdsresource.Type, name string) {
rs.ExpiryTimer.Stop()
}
delete(state.subscribedResources, name)
state.pendingWrite = true

// Send a request for the resource type with updated subscriptions.
s.requestCh.Put(typ)
Expand Down Expand Up @@ -346,17 +349,7 @@ func (s *StreamImpl) sendNew(stream transport.StreamingCall, typ xdsresource.Typ
return nil
}

names := resourceNames(state.subscribedResources)
if err := s.sendMessageLocked(stream, names, typ.TypeURL(), state.version, state.nonce, nil); err != nil {
return err

}
select {
case <-state.bufferedRequests:
default:
}
s.startWatchTimersLocked(typ, names)
return nil
return s.sendMessageIfWritePendingLocked(stream, typ, state)
}

// sendExisting sends out discovery requests for existing resources when
Expand Down Expand Up @@ -385,18 +378,10 @@ func (s *StreamImpl) sendExisting(stream transport.StreamingCall) error {
continue
}

names := resourceNames(state.subscribedResources)
if s.logger.V(2) {
s.logger.Infof("Re-requesting resources %v of type %q, as the stream has been recreated", names, typ.TypeURL())
}
if err := s.sendMessageLocked(stream, names, typ.TypeURL(), state.version, state.nonce, nil); err != nil {
state.pendingWrite = true
if err := s.sendMessageIfWritePendingLocked(stream, typ, state); err != nil {
return err
}
select {
case <-state.bufferedRequests:
default:
}
s.startWatchTimersLocked(typ, names)
}
return nil
}
Expand All @@ -413,11 +398,9 @@ func (s *StreamImpl) sendBuffered(stream transport.StreamingCall) error {
for typ, state := range s.resourceTypeState {
select {
case <-state.bufferedRequests:
names := resourceNames(state.subscribedResources)
if err := s.sendMessageLocked(stream, names, typ.TypeURL(), state.version, state.nonce, nil); err != nil {
if err := s.sendMessageIfWritePendingLocked(stream, typ, state); err != nil {
return err
}
s.startWatchTimersLocked(typ, names)
default:
// No buffered request.
continue
Expand All @@ -426,6 +409,38 @@ func (s *StreamImpl) sendBuffered(stream transport.StreamingCall) error {
return nil
}

// sendMessageIfWritePendingLocked attempts to sends a discovery request to the
// server, if there is a pending write for the given resource type.
//
// If the request is successfully sent, the pending write field is cleared and
// watch timers are started for the resources in the request.
//
// Caller needs to hold c.mu.
func (s *StreamImpl) sendMessageIfWritePendingLocked(stream transport.StreamingCall, typ xdsresource.Type, state *resourceTypeState) error {
if !state.pendingWrite {
if s.logger.V(2) {
s.logger.Infof("Skipping sending request for type %q, because all subscribed resources were already sent", typ.TypeURL())
}
return nil
}

names := resourceNames(state.subscribedResources)
if err := s.sendMessageLocked(stream, names, typ.TypeURL(), state.version, state.nonce, nil); err != nil {
return err
}
state.pendingWrite = false

// Drain the buffered requests channel because we just sent a request for this
// resource type.
select {
case <-state.bufferedRequests:
default:
}

s.startWatchTimersLocked(typ, names)
return nil
}

// sendMessageLocked sends a discovery request to the server, populating the
// different fields of the message with the given parameters. Returns a non-nil
// error if the request could not be sent.
Expand Down

0 comments on commit 67b9ebf

Please sign in to comment.