diff --git a/xds/internal/xdsclient/transport/ads/ads_stream.go b/xds/internal/xdsclient/transport/ads/ads_stream.go index 457bb3a171a2..e5f6cefe6159 100644 --- a/xds/internal/xdsclient/transport/ads/ads_stream.go +++ b/xds/internal/xdsclient/transport/ads/ads_stream.go @@ -102,6 +102,7 @@ type resourceTypeState struct { nonce string // Last received nonce. Should be reset when the stream breaks. bufferedRequests chan struct{} // Channel to buffer requests when writing is blocked. subscribedResources map[string]*ResourceWatchState // Map of subscribed resource names to their state. + pendingWrite bool // True if there is a pending write for this resource type. } // StreamImpl provides the functionality associated with an ADS (Aggregated @@ -203,6 +204,7 @@ func (s *StreamImpl) Subscribe(typ xdsresource.Type, name string) { // Create state for the newly subscribed resource. The watch timer will // be started when a request for this resource is actually sent out. state.subscribedResources[name] = &ResourceWatchState{State: ResourceWatchStateStarted} + state.pendingWrite = true // Send a request for the resource type with updated subscriptions. s.requestCh.Put(typ) @@ -233,6 +235,7 @@ func (s *StreamImpl) Unsubscribe(typ xdsresource.Type, name string) { rs.ExpiryTimer.Stop() } delete(state.subscribedResources, name) + state.pendingWrite = true // Send a request for the resource type with updated subscriptions. s.requestCh.Put(typ) @@ -346,17 +349,7 @@ func (s *StreamImpl) sendNew(stream transport.StreamingCall, typ xdsresource.Typ return nil } - names := resourceNames(state.subscribedResources) - if err := s.sendMessageLocked(stream, names, typ.TypeURL(), state.version, state.nonce, nil); err != nil { - return err - - } - select { - case <-state.bufferedRequests: - default: - } - s.startWatchTimersLocked(typ, names) - return nil + return s.sendMessageIfWritePendingLocked(stream, typ, state) } // sendExisting sends out discovery requests for existing resources when @@ -385,18 +378,10 @@ func (s *StreamImpl) sendExisting(stream transport.StreamingCall) error { continue } - names := resourceNames(state.subscribedResources) - if s.logger.V(2) { - s.logger.Infof("Re-requesting resources %v of type %q, as the stream has been recreated", names, typ.TypeURL()) - } - if err := s.sendMessageLocked(stream, names, typ.TypeURL(), state.version, state.nonce, nil); err != nil { + state.pendingWrite = true + if err := s.sendMessageIfWritePendingLocked(stream, typ, state); err != nil { return err } - select { - case <-state.bufferedRequests: - default: - } - s.startWatchTimersLocked(typ, names) } return nil } @@ -413,11 +398,9 @@ func (s *StreamImpl) sendBuffered(stream transport.StreamingCall) error { for typ, state := range s.resourceTypeState { select { case <-state.bufferedRequests: - names := resourceNames(state.subscribedResources) - if err := s.sendMessageLocked(stream, names, typ.TypeURL(), state.version, state.nonce, nil); err != nil { + if err := s.sendMessageIfWritePendingLocked(stream, typ, state); err != nil { return err } - s.startWatchTimersLocked(typ, names) default: // No buffered request. continue @@ -426,6 +409,38 @@ func (s *StreamImpl) sendBuffered(stream transport.StreamingCall) error { return nil } +// sendMessageIfWritePendingLocked attempts to sends a discovery request to the +// server, if there is a pending write for the given resource type. +// +// If the request is successfully sent, the pending write field is cleared and +// watch timers are started for the resources in the request. +// +// Caller needs to hold c.mu. +func (s *StreamImpl) sendMessageIfWritePendingLocked(stream transport.StreamingCall, typ xdsresource.Type, state *resourceTypeState) error { + if !state.pendingWrite { + if s.logger.V(2) { + s.logger.Infof("Skipping sending request for type %q, because all subscribed resources were already sent", typ.TypeURL()) + } + return nil + } + + names := resourceNames(state.subscribedResources) + if err := s.sendMessageLocked(stream, names, typ.TypeURL(), state.version, state.nonce, nil); err != nil { + return err + } + state.pendingWrite = false + + // Drain the buffered requests channel because we just sent a request for this + // resource type. + select { + case <-state.bufferedRequests: + default: + } + + s.startWatchTimersLocked(typ, names) + return nil +} + // sendMessageLocked sends a discovery request to the server, populating the // different fields of the message with the given parameters. Returns a non-nil // error if the request could not be sent.