Skip to content

Commit

Permalink
wait for cache sync and DAG build before starting xDS server (#5672)
Browse files Browse the repository at this point in the history
Prevents starting the XDS server and building the DAG until the cache is synced with the initial list of k8s objects and these events are processed by the event handler

Signed-off-by: Ahmad Karimi <[email protected]>
  • Loading branch information
therealak12 authored Oct 10, 2023
1 parent 51c0021 commit 2765c72
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 77 deletions.
5 changes: 5 additions & 0 deletions changelogs/unreleased/5672-therealak12-minor.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
## Contour now waits for the cache sync before starting the DAG rebuild and XDS server

Before this, we only waited for informer caches to sync but didn't wait for delivering the events to subscribed handlers.
Now contour waits for the initial list of Kubernetes objects to be cached and processed by handlers (using the returned `HasSynced` methods)
and then starts building its DAG and serving XDS.
109 changes: 67 additions & 42 deletions cmd/contour/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package main

import (
"context"
"errors"
"fmt"
"net"
"net/http"
Expand All @@ -25,6 +24,25 @@ import (

"github.com/alecthomas/kingpin/v2"
envoy_server_v3 "github.com/envoyproxy/go-control-plane/pkg/server/v3"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
networking_v1 "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
ctrl_cache "sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/manager/signals"
controller_runtime_metrics "sigs.k8s.io/controller-runtime/pkg/metrics"
gatewayapi_v1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1"

controller_runtime_metrics_server "sigs.k8s.io/controller-runtime/pkg/metrics/server"

contour_api_v1 "github.com/projectcontour/contour/apis/projectcontour/v1"
contour_api_v1alpha1 "github.com/projectcontour/contour/apis/projectcontour/v1alpha1"
"github.com/projectcontour/contour/internal/annotation"
Expand All @@ -46,22 +64,10 @@ import (
"github.com/projectcontour/contour/internal/xdscache"
xdscache_v3 "github.com/projectcontour/contour/internal/xdscache/v3"
"github.com/projectcontour/contour/pkg/config"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
networking_v1 "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
ctrl_cache "sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/manager/signals"
controller_runtime_metrics "sigs.k8s.io/controller-runtime/pkg/metrics"
controller_runtime_metrics_server "sigs.k8s.io/controller-runtime/pkg/metrics/server"
gatewayapi_v1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1"
)

const (
initialDagBuildPollPeriod = 100 * time.Millisecond
)

// registerServe registers the serve subcommand and flags
Expand Down Expand Up @@ -176,11 +182,12 @@ func registerServe(app *kingpin.Application) (*kingpin.CmdClause, *serveContext)
}

type Server struct {
log logrus.FieldLogger
ctx *serveContext
coreClient *kubernetes.Clientset
mgr manager.Manager
registry *prometheus.Registry
log logrus.FieldLogger
ctx *serveContext
coreClient *kubernetes.Clientset
mgr manager.Manager
registry *prometheus.Registry
handlerCacheSyncs []cache.InformerSynced
}

// NewServer returns a Server object which contains the initial configuration
Expand Down Expand Up @@ -537,14 +544,24 @@ func (s *Server) doServe() error {
contourMetrics,
dag.ComposeObservers(append(xdscache.ObserversOf(resources), snapshotHandler)...),
)

hasSynced := func() bool {
for _, syncFunc := range s.handlerCacheSyncs {
if !syncFunc() {
return false
}
}
return true
}

contourHandler := contour.NewEventHandler(contour.EventHandlerConfig{
Logger: s.log.WithField("context", "contourEventHandler"),
HoldoffDelay: 100 * time.Millisecond,
HoldoffMaxDelay: 500 * time.Millisecond,
Observer: observer,
StatusUpdater: sh.Writer(),
Builder: builder,
})
}, hasSynced)

// Wrap contourHandler in an EventRecorder which tracks API server events.
eventHandler := &contour.EventRecorder{
Expand All @@ -568,7 +585,7 @@ func (s *Server) doServe() error {

// Inform on the remaining resources.
for name, r := range informerResources {
if err := informOnResource(r, eventHandler, s.mgr.GetCache()); err != nil {
if err := s.informOnResource(r, eventHandler); err != nil {
s.log.WithError(err).WithField("resource", name).Fatal("failed to create informer")
}
}
Expand All @@ -584,15 +601,15 @@ func (s *Server) doServe() error {
handler = k8s.NewNamespaceFilter(sets.List(secretNamespaces), eventHandler)
}

if err := informOnResource(&corev1.Secret{}, handler, s.mgr.GetCache()); err != nil {
if err := s.informOnResource(&corev1.Secret{}, handler); err != nil {
s.log.WithError(err).WithField("resource", "secrets").Fatal("failed to create informer")
}

// Inform on endpoints.
if err := informOnResource(&corev1.Endpoints{}, &contour.EventRecorder{
if err := s.informOnResource(&corev1.Endpoints{}, &contour.EventRecorder{
Next: endpointHandler,
Counter: contourMetrics.EventHandlerOperations,
}, s.mgr.GetCache()); err != nil {
}); err != nil {
s.log.WithError(err).WithField("resource", "endpoints").Fatal("failed to create informer")
}

Expand Down Expand Up @@ -646,7 +663,7 @@ func (s *Server) doServe() error {
handler = k8s.NewNamespaceFilter([]string{contourConfiguration.Envoy.Service.Namespace}, handler)
}

if err := informOnResource(&corev1.Service{}, handler, s.mgr.GetCache()); err != nil {
if err := s.informOnResource(&corev1.Service{}, handler); err != nil {
s.log.WithError(err).WithField("resource", "services").Fatal("failed to create informer")
}

Expand All @@ -657,11 +674,11 @@ func (s *Server) doServe() error {

xdsServer := &xdsServer{
log: s.log,
mgr: s.mgr,
registry: s.registry,
config: *contourConfiguration.XDSServer,
snapshotHandler: snapshotHandler,
resources: resources,
initialDagBuilt: contourHandler.HasBuiltInitialDag,
}
if err := s.mgr.Add(xdsServer); err != nil {
return err
Expand Down Expand Up @@ -830,11 +847,11 @@ func (s *Server) setupDebugService(debugConfig contour_api_v1alpha1.DebugConfig,

type xdsServer struct {
log logrus.FieldLogger
mgr manager.Manager
registry *prometheus.Registry
config contour_api_v1alpha1.XDSServerConfig
snapshotHandler *xdscache.SnapshotHandler
resources []xdscache.ResourceCache
initialDagBuilt func() bool
}

func (x *xdsServer) NeedLeaderElection() bool {
Expand All @@ -844,11 +861,13 @@ func (x *xdsServer) NeedLeaderElection() bool {
func (x *xdsServer) Start(ctx context.Context) error {
log := x.log.WithField("context", "xds")

log.Printf("waiting for informer caches to sync")
if !x.mgr.GetCache().WaitForCacheSync(ctx) {
return errors.New("informer cache failed to sync")
log.Printf("waiting for the initial dag to be built")
if err := wait.PollUntilContextCancel(ctx, initialDagBuildPollPeriod, true, func(ctx context.Context) (done bool, err error) {
return x.initialDagBuilt(), nil
}); err != nil {
return fmt.Errorf("failed to wait for initial dag build, %w", err)
}
log.Printf("informer caches synced")
log.Printf("the initial dag is built")

grpcServer := xds.NewServer(x.registry, grpcOptions(log, x.config.TLS)...)

Expand Down Expand Up @@ -953,12 +972,12 @@ func (s *Server) setupGatewayAPI(contourConfiguration contour_api_v1alpha1.Conto
// to process, we just need informers to get events.
case contourConfiguration.Gateway.GatewayRef != nil:
// Inform on GatewayClasses.
if err := informOnResource(&gatewayapi_v1beta1.GatewayClass{}, eventHandler, mgr.GetCache()); err != nil {
if err := s.informOnResource(&gatewayapi_v1beta1.GatewayClass{}, eventHandler); err != nil {
s.log.WithError(err).WithField("resource", "gatewayclasses").Fatal("failed to create informer")
}

// Inform on Gateways.
if err := informOnResource(&gatewayapi_v1beta1.Gateway{}, eventHandler, mgr.GetCache()); err != nil {
if err := s.informOnResource(&gatewayapi_v1beta1.Gateway{}, eventHandler); err != nil {
s.log.WithError(err).WithField("resource", "gateways").Fatal("failed to create informer")
}
// Otherwise, run the GatewayClass and Gateway controllers to determine
Expand Down Expand Up @@ -1029,12 +1048,12 @@ func (s *Server) setupGatewayAPI(contourConfiguration contour_api_v1alpha1.Conto
}

// Inform on ReferenceGrants.
if err := informOnResource(&gatewayapi_v1beta1.ReferenceGrant{}, eventHandler, mgr.GetCache()); err != nil {
if err := s.informOnResource(&gatewayapi_v1beta1.ReferenceGrant{}, eventHandler); err != nil {
s.log.WithError(err).WithField("resource", "referencegrants").Fatal("failed to create informer")
}

// Inform on Namespaces.
if err := informOnResource(&corev1.Namespace{}, eventHandler, mgr.GetCache()); err != nil {
if err := s.informOnResource(&corev1.Namespace{}, eventHandler); err != nil {
s.log.WithError(err).WithField("resource", "namespaces").Fatal("failed to create informer")
}
}
Expand Down Expand Up @@ -1197,12 +1216,18 @@ func (s *Server) getDAGBuilder(dbc dagBuilderConfig) *dag.Builder {
return builder
}

func informOnResource(obj client.Object, handler cache.ResourceEventHandler, cache ctrl_cache.Cache) error {
inf, err := cache.GetInformer(context.Background(), obj)
func (s *Server) informOnResource(obj client.Object, handler cache.ResourceEventHandler) error {
inf, err := s.mgr.GetCache().GetInformer(context.Background(), obj)
if err != nil {
return err
}

_, err = inf.AddEventHandler(handler)
return err
registration, err := inf.AddEventHandler(handler)

if err != nil {
return err
}

s.handlerCacheSyncs = append(s.handlerCacheSyncs, registration.HasSynced)
return nil
}
72 changes: 55 additions & 17 deletions internal/contour/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@ package contour
import (
"context"
"reflect"
"sync/atomic"
"time"

"github.com/projectcontour/contour/internal/dag"
"github.com/projectcontour/contour/internal/k8s"
"github.com/sirupsen/logrus"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/cache/synctrack"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/projectcontour/contour/internal/dag"
"github.com/projectcontour/contour/internal/k8s"
)

type EventHandlerConfig struct {
Expand Down Expand Up @@ -55,9 +59,16 @@ type EventHandler struct {
// seq is the sequence counter of the number of times
// an event has been received.
seq int

// syncTracker is used to update/query the status of the cache sync.
// Uses an internal counter: incremented at item start, decremented at end.
// HasSynced returns true if its UpstreamHasSynced returns true and the counter is non-positive.
syncTracker *synctrack.SingleFileTracker

initialDagBuilt atomic.Bool
}

func NewEventHandler(config EventHandlerConfig) *EventHandler {
func NewEventHandler(config EventHandlerConfig, upstreamHasSynced cache.InformerSynced) *EventHandler {
return &EventHandler{
FieldLogger: config.Logger,
builder: config.Builder,
Expand All @@ -67,11 +78,13 @@ func NewEventHandler(config EventHandlerConfig) *EventHandler {
statusUpdater: config.StatusUpdater,
update: make(chan any),
sequence: make(chan int, 1),
syncTracker: &synctrack.SingleFileTracker{UpstreamHasSynced: upstreamHasSynced},
}
}

type opAdd struct {
obj any
obj any
isInInitialList bool
}

type opUpdate struct {
Expand All @@ -83,7 +96,10 @@ type opDelete struct {
}

func (e *EventHandler) OnAdd(obj any, isInInitialList bool) {
e.update <- opAdd{obj: obj}
if isInInitialList {
e.syncTracker.Start()
}
e.update <- opAdd{obj: obj, isInInitialList: isInInitialList}
}

func (e *EventHandler) OnUpdate(oldObj, newObj any) {
Expand All @@ -94,10 +110,15 @@ func (e *EventHandler) OnDelete(obj any) {
e.update <- opDelete{obj: obj}
}

// NeedLeaderElection is included to implement manager.LeaderElectionRunnable
func (e *EventHandler) NeedLeaderElection() bool {
return false
}

func (e *EventHandler) HasBuiltInitialDag() bool {
return e.initialDagBuilt.Load()
}

// Implements leadership.NeedLeaderElectionNotification
func (e *EventHandler) OnElectedLeader() {
// Trigger an update when we are elected leader to ensure resource
Expand Down Expand Up @@ -164,9 +185,37 @@ func (e *EventHandler) Start(ctx context.Context) error {
// not to process it.
e.incSequence()
}

// We're done processing this event
if updateOpAdd, ok := op.(opAdd); ok {
if updateOpAdd.isInInitialList {
e.syncTracker.Finished()
}
}
case <-pending:
// Ensure informer caches are synced.
// Schedule a retry for dag rebuild if cache is not synced yet.
// Note that we can't block and wait for the cache sync as it depends on progress of this loop.
if !e.syncTracker.HasSynced() {
e.Info("skipping dag rebuild as cache is not synced")
timer.Reset(e.holdoffDelay)
break
}

e.WithField("last_update", time.Since(lastDAGRebuild)).WithField("outstanding", reset()).Info("performing delayed update")
e.rebuildDAG()

// Build a new DAG and sends it to the Observer.
latestDAG := e.builder.Build()
e.observer.OnChange(latestDAG)

// Allow XDS server to start (if it hasn't already).
e.initialDagBuilt.Store(true)

// Update the status on objects.
for _, upd := range latestDAG.StatusCache.GetStatusUpdates() {
e.statusUpdater.Send(upd)
}

e.incSequence()
lastDAGRebuild = time.Now()
case <-ctx.Done():
Expand Down Expand Up @@ -237,14 +286,3 @@ func (e *EventHandler) incSequence() {
default:
}
}

// rebuildDAG builds a new DAG and sends it to the Observer,
// the updates the status on objects, and updates the metrics.
func (e *EventHandler) rebuildDAG() {
latestDAG := e.builder.Build()
e.observer.OnChange(latestDAG)

for _, upd := range latestDAG.StatusCache.GetStatusUpdates() {
e.statusUpdater.Send(upd)
}
}
Loading

0 comments on commit 2765c72

Please sign in to comment.