diff --git a/changelogs/unreleased/5672-therealak12-minor.md b/changelogs/unreleased/5672-therealak12-minor.md new file mode 100644 index 00000000000..2b3efc28bed --- /dev/null +++ b/changelogs/unreleased/5672-therealak12-minor.md @@ -0,0 +1,5 @@ +## Contour now waits for the cache sync before starting the DAG rebuild and XDS server + +Before this, we only waited for informer caches to sync but didn't wait for delivering the events to subscribed handlers. +Now contour waits for the initial list of Kubernetes objects to be cached and processed by handlers (using the returned `HasSynced` methods) +and then starts building its DAG and serving XDS. diff --git a/cmd/contour/serve.go b/cmd/contour/serve.go index 1d9a41a713d..d7bd8387a4a 100644 --- a/cmd/contour/serve.go +++ b/cmd/contour/serve.go @@ -15,7 +15,6 @@ package main import ( "context" - "errors" "fmt" "net" "net/http" @@ -25,6 +24,25 @@ import ( "github.com/alecthomas/kingpin/v2" envoy_server_v3 "github.com/envoyproxy/go-control-plane/pkg/server/v3" + "github.com/prometheus/client_golang/prometheus" + "github.com/sirupsen/logrus" + corev1 "k8s.io/api/core/v1" + networking_v1 "k8s.io/api/networking/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" + ctrl_cache "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/manager/signals" + controller_runtime_metrics "sigs.k8s.io/controller-runtime/pkg/metrics" + gatewayapi_v1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1" + + controller_runtime_metrics_server "sigs.k8s.io/controller-runtime/pkg/metrics/server" + contour_api_v1 "github.com/projectcontour/contour/apis/projectcontour/v1" contour_api_v1alpha1 "github.com/projectcontour/contour/apis/projectcontour/v1alpha1" "github.com/projectcontour/contour/internal/annotation" @@ -46,22 +64,10 @@ import ( "github.com/projectcontour/contour/internal/xdscache" xdscache_v3 "github.com/projectcontour/contour/internal/xdscache/v3" "github.com/projectcontour/contour/pkg/config" - "github.com/prometheus/client_golang/prometheus" - "github.com/sirupsen/logrus" - corev1 "k8s.io/api/core/v1" - networking_v1 "k8s.io/api/networking/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/cache" - ctrl_cache "sigs.k8s.io/controller-runtime/pkg/cache" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/manager" - "sigs.k8s.io/controller-runtime/pkg/manager/signals" - controller_runtime_metrics "sigs.k8s.io/controller-runtime/pkg/metrics" - controller_runtime_metrics_server "sigs.k8s.io/controller-runtime/pkg/metrics/server" - gatewayapi_v1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1" +) + +const ( + initialDagBuildPollPeriod = 100 * time.Millisecond ) // registerServe registers the serve subcommand and flags @@ -176,11 +182,12 @@ func registerServe(app *kingpin.Application) (*kingpin.CmdClause, *serveContext) } type Server struct { - log logrus.FieldLogger - ctx *serveContext - coreClient *kubernetes.Clientset - mgr manager.Manager - registry *prometheus.Registry + log logrus.FieldLogger + ctx *serveContext + coreClient *kubernetes.Clientset + mgr manager.Manager + registry *prometheus.Registry + handlerCacheSyncs []cache.InformerSynced } // NewServer returns a Server object which contains the initial configuration @@ -537,6 +544,16 @@ func (s *Server) doServe() error { contourMetrics, dag.ComposeObservers(append(xdscache.ObserversOf(resources), snapshotHandler)...), ) + + hasSynced := func() bool { + for _, syncFunc := range s.handlerCacheSyncs { + if !syncFunc() { + return false + } + } + return true + } + contourHandler := contour.NewEventHandler(contour.EventHandlerConfig{ Logger: s.log.WithField("context", "contourEventHandler"), HoldoffDelay: 100 * time.Millisecond, @@ -544,7 +561,7 @@ func (s *Server) doServe() error { Observer: observer, StatusUpdater: sh.Writer(), Builder: builder, - }) + }, hasSynced) // Wrap contourHandler in an EventRecorder which tracks API server events. eventHandler := &contour.EventRecorder{ @@ -568,7 +585,7 @@ func (s *Server) doServe() error { // Inform on the remaining resources. for name, r := range informerResources { - if err := informOnResource(r, eventHandler, s.mgr.GetCache()); err != nil { + if err := s.informOnResource(r, eventHandler); err != nil { s.log.WithError(err).WithField("resource", name).Fatal("failed to create informer") } } @@ -584,15 +601,15 @@ func (s *Server) doServe() error { handler = k8s.NewNamespaceFilter(sets.List(secretNamespaces), eventHandler) } - if err := informOnResource(&corev1.Secret{}, handler, s.mgr.GetCache()); err != nil { + if err := s.informOnResource(&corev1.Secret{}, handler); err != nil { s.log.WithError(err).WithField("resource", "secrets").Fatal("failed to create informer") } // Inform on endpoints. - if err := informOnResource(&corev1.Endpoints{}, &contour.EventRecorder{ + if err := s.informOnResource(&corev1.Endpoints{}, &contour.EventRecorder{ Next: endpointHandler, Counter: contourMetrics.EventHandlerOperations, - }, s.mgr.GetCache()); err != nil { + }); err != nil { s.log.WithError(err).WithField("resource", "endpoints").Fatal("failed to create informer") } @@ -646,7 +663,7 @@ func (s *Server) doServe() error { handler = k8s.NewNamespaceFilter([]string{contourConfiguration.Envoy.Service.Namespace}, handler) } - if err := informOnResource(&corev1.Service{}, handler, s.mgr.GetCache()); err != nil { + if err := s.informOnResource(&corev1.Service{}, handler); err != nil { s.log.WithError(err).WithField("resource", "services").Fatal("failed to create informer") } @@ -657,11 +674,11 @@ func (s *Server) doServe() error { xdsServer := &xdsServer{ log: s.log, - mgr: s.mgr, registry: s.registry, config: *contourConfiguration.XDSServer, snapshotHandler: snapshotHandler, resources: resources, + initialDagBuilt: contourHandler.HasBuiltInitialDag, } if err := s.mgr.Add(xdsServer); err != nil { return err @@ -830,11 +847,11 @@ func (s *Server) setupDebugService(debugConfig contour_api_v1alpha1.DebugConfig, type xdsServer struct { log logrus.FieldLogger - mgr manager.Manager registry *prometheus.Registry config contour_api_v1alpha1.XDSServerConfig snapshotHandler *xdscache.SnapshotHandler resources []xdscache.ResourceCache + initialDagBuilt func() bool } func (x *xdsServer) NeedLeaderElection() bool { @@ -844,11 +861,13 @@ func (x *xdsServer) NeedLeaderElection() bool { func (x *xdsServer) Start(ctx context.Context) error { log := x.log.WithField("context", "xds") - log.Printf("waiting for informer caches to sync") - if !x.mgr.GetCache().WaitForCacheSync(ctx) { - return errors.New("informer cache failed to sync") + log.Printf("waiting for the initial dag to be built") + if err := wait.PollUntilContextCancel(ctx, initialDagBuildPollPeriod, true, func(ctx context.Context) (done bool, err error) { + return x.initialDagBuilt(), nil + }); err != nil { + return fmt.Errorf("failed to wait for initial dag build, %w", err) } - log.Printf("informer caches synced") + log.Printf("the initial dag is built") grpcServer := xds.NewServer(x.registry, grpcOptions(log, x.config.TLS)...) @@ -953,12 +972,12 @@ func (s *Server) setupGatewayAPI(contourConfiguration contour_api_v1alpha1.Conto // to process, we just need informers to get events. case contourConfiguration.Gateway.GatewayRef != nil: // Inform on GatewayClasses. - if err := informOnResource(&gatewayapi_v1beta1.GatewayClass{}, eventHandler, mgr.GetCache()); err != nil { + if err := s.informOnResource(&gatewayapi_v1beta1.GatewayClass{}, eventHandler); err != nil { s.log.WithError(err).WithField("resource", "gatewayclasses").Fatal("failed to create informer") } // Inform on Gateways. - if err := informOnResource(&gatewayapi_v1beta1.Gateway{}, eventHandler, mgr.GetCache()); err != nil { + if err := s.informOnResource(&gatewayapi_v1beta1.Gateway{}, eventHandler); err != nil { s.log.WithError(err).WithField("resource", "gateways").Fatal("failed to create informer") } // Otherwise, run the GatewayClass and Gateway controllers to determine @@ -1029,12 +1048,12 @@ func (s *Server) setupGatewayAPI(contourConfiguration contour_api_v1alpha1.Conto } // Inform on ReferenceGrants. - if err := informOnResource(&gatewayapi_v1beta1.ReferenceGrant{}, eventHandler, mgr.GetCache()); err != nil { + if err := s.informOnResource(&gatewayapi_v1beta1.ReferenceGrant{}, eventHandler); err != nil { s.log.WithError(err).WithField("resource", "referencegrants").Fatal("failed to create informer") } // Inform on Namespaces. - if err := informOnResource(&corev1.Namespace{}, eventHandler, mgr.GetCache()); err != nil { + if err := s.informOnResource(&corev1.Namespace{}, eventHandler); err != nil { s.log.WithError(err).WithField("resource", "namespaces").Fatal("failed to create informer") } } @@ -1197,12 +1216,18 @@ func (s *Server) getDAGBuilder(dbc dagBuilderConfig) *dag.Builder { return builder } -func informOnResource(obj client.Object, handler cache.ResourceEventHandler, cache ctrl_cache.Cache) error { - inf, err := cache.GetInformer(context.Background(), obj) +func (s *Server) informOnResource(obj client.Object, handler cache.ResourceEventHandler) error { + inf, err := s.mgr.GetCache().GetInformer(context.Background(), obj) if err != nil { return err } - _, err = inf.AddEventHandler(handler) - return err + registration, err := inf.AddEventHandler(handler) + + if err != nil { + return err + } + + s.handlerCacheSyncs = append(s.handlerCacheSyncs, registration.HasSynced) + return nil } diff --git a/internal/contour/handler.go b/internal/contour/handler.go index 64bd51ef5d0..6e2767fecae 100644 --- a/internal/contour/handler.go +++ b/internal/contour/handler.go @@ -19,12 +19,16 @@ package contour import ( "context" "reflect" + "sync/atomic" "time" - "github.com/projectcontour/contour/internal/dag" - "github.com/projectcontour/contour/internal/k8s" "github.com/sirupsen/logrus" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/cache/synctrack" "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/projectcontour/contour/internal/dag" + "github.com/projectcontour/contour/internal/k8s" ) type EventHandlerConfig struct { @@ -55,9 +59,16 @@ type EventHandler struct { // seq is the sequence counter of the number of times // an event has been received. seq int + + // syncTracker is used to update/query the status of the cache sync. + // Uses an internal counter: incremented at item start, decremented at end. + // HasSynced returns true if its UpstreamHasSynced returns true and the counter is non-positive. + syncTracker *synctrack.SingleFileTracker + + initialDagBuilt atomic.Bool } -func NewEventHandler(config EventHandlerConfig) *EventHandler { +func NewEventHandler(config EventHandlerConfig, upstreamHasSynced cache.InformerSynced) *EventHandler { return &EventHandler{ FieldLogger: config.Logger, builder: config.Builder, @@ -67,11 +78,13 @@ func NewEventHandler(config EventHandlerConfig) *EventHandler { statusUpdater: config.StatusUpdater, update: make(chan any), sequence: make(chan int, 1), + syncTracker: &synctrack.SingleFileTracker{UpstreamHasSynced: upstreamHasSynced}, } } type opAdd struct { - obj any + obj any + isInInitialList bool } type opUpdate struct { @@ -83,7 +96,10 @@ type opDelete struct { } func (e *EventHandler) OnAdd(obj any, isInInitialList bool) { - e.update <- opAdd{obj: obj} + if isInInitialList { + e.syncTracker.Start() + } + e.update <- opAdd{obj: obj, isInInitialList: isInInitialList} } func (e *EventHandler) OnUpdate(oldObj, newObj any) { @@ -94,10 +110,15 @@ func (e *EventHandler) OnDelete(obj any) { e.update <- opDelete{obj: obj} } +// NeedLeaderElection is included to implement manager.LeaderElectionRunnable func (e *EventHandler) NeedLeaderElection() bool { return false } +func (e *EventHandler) HasBuiltInitialDag() bool { + return e.initialDagBuilt.Load() +} + // Implements leadership.NeedLeaderElectionNotification func (e *EventHandler) OnElectedLeader() { // Trigger an update when we are elected leader to ensure resource @@ -164,9 +185,37 @@ func (e *EventHandler) Start(ctx context.Context) error { // not to process it. e.incSequence() } + + // We're done processing this event + if updateOpAdd, ok := op.(opAdd); ok { + if updateOpAdd.isInInitialList { + e.syncTracker.Finished() + } + } case <-pending: + // Ensure informer caches are synced. + // Schedule a retry for dag rebuild if cache is not synced yet. + // Note that we can't block and wait for the cache sync as it depends on progress of this loop. + if !e.syncTracker.HasSynced() { + e.Info("skipping dag rebuild as cache is not synced") + timer.Reset(e.holdoffDelay) + break + } + e.WithField("last_update", time.Since(lastDAGRebuild)).WithField("outstanding", reset()).Info("performing delayed update") - e.rebuildDAG() + + // Build a new DAG and sends it to the Observer. + latestDAG := e.builder.Build() + e.observer.OnChange(latestDAG) + + // Allow XDS server to start (if it hasn't already). + e.initialDagBuilt.Store(true) + + // Update the status on objects. + for _, upd := range latestDAG.StatusCache.GetStatusUpdates() { + e.statusUpdater.Send(upd) + } + e.incSequence() lastDAGRebuild = time.Now() case <-ctx.Done(): @@ -237,14 +286,3 @@ func (e *EventHandler) incSequence() { default: } } - -// rebuildDAG builds a new DAG and sends it to the Observer, -// the updates the status on objects, and updates the metrics. -func (e *EventHandler) rebuildDAG() { - latestDAG := e.builder.Build() - e.observer.OnChange(latestDAG) - - for _, upd := range latestDAG.StatusCache.GetStatusUpdates() { - e.statusUpdater.Send(upd) - } -} diff --git a/internal/featuretests/v3/featuretests.go b/internal/featuretests/v3/featuretests.go index 8cd2fbc8f95..30f0bd72183 100644 --- a/internal/featuretests/v3/featuretests.go +++ b/internal/featuretests/v3/featuretests.go @@ -32,6 +32,17 @@ import ( envoy_service_route_v3 "github.com/envoyproxy/go-control-plane/envoy/service/route/v3" envoy_service_secret_v3 "github.com/envoyproxy/go-control-plane/envoy/service/secret/v3" resource "github.com/envoyproxy/go-control-plane/pkg/resource/v3" + "github.com/prometheus/client_golang/prometheus" + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" + v1 "k8s.io/api/core/v1" + "k8s.io/client-go/tools/cache" + contour_api_v1 "github.com/projectcontour/contour/apis/projectcontour/v1" "github.com/projectcontour/contour/apis/projectcontour/v1alpha1" "github.com/projectcontour/contour/internal/contour" @@ -46,16 +57,6 @@ import ( contour_xds_v3 "github.com/projectcontour/contour/internal/xds/v3" "github.com/projectcontour/contour/internal/xdscache" xdscache_v3 "github.com/projectcontour/contour/internal/xdscache/v3" - "github.com/prometheus/client_golang/prometheus" - "github.com/sirupsen/logrus" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" - "google.golang.org/protobuf/proto" - "google.golang.org/protobuf/types/known/anypb" - v1 "k8s.io/api/core/v1" - "k8s.io/client-go/tools/cache" ) const ( @@ -152,7 +153,7 @@ func setup(t *testing.T, opts ...any) (ResourceEventHandlerWrapper, *Contour, fu dag.ComposeObservers(xdscache.ObserversOf(resources)...), ), Builder: builder, - }) + }, func() bool { return true }) l, err := net.Listen("tcp", "127.0.0.1:0") require.NoError(t, err) diff --git a/internal/xdscache/v3/server_test.go b/internal/xdscache/v3/server_test.go index 54d845e06f0..0cda062359d 100644 --- a/internal/xdscache/v3/server_test.go +++ b/internal/xdscache/v3/server_test.go @@ -27,12 +27,6 @@ import ( envoy_service_runtime_v3 "github.com/envoyproxy/go-control-plane/envoy/service/runtime/v3" envoy_service_secret_v3 "github.com/envoyproxy/go-control-plane/envoy/service/secret/v3" resource "github.com/envoyproxy/go-control-plane/pkg/resource/v3" - "github.com/projectcontour/contour/internal/contour" - "github.com/projectcontour/contour/internal/dag" - "github.com/projectcontour/contour/internal/fixture" - "github.com/projectcontour/contour/internal/xds" - contour_xds_v3 "github.com/projectcontour/contour/internal/xds/v3" - "github.com/projectcontour/contour/internal/xdscache" "github.com/stretchr/testify/require" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -42,6 +36,13 @@ import ( networking_v1 "k8s.io/api/networking/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" + + "github.com/projectcontour/contour/internal/contour" + "github.com/projectcontour/contour/internal/dag" + "github.com/projectcontour/contour/internal/fixture" + "github.com/projectcontour/contour/internal/xds" + contour_xds_v3 "github.com/projectcontour/contour/internal/xds/v3" + "github.com/projectcontour/contour/internal/xdscache" ) func TestGRPC(t *testing.T) { @@ -214,7 +215,7 @@ func TestGRPC(t *testing.T) { Logger: log, Builder: new(dag.Builder), Observer: dag.ComposeObservers(xdscache.ObserversOf(resources)...), - }) + }, func() bool { return true }) srv := xds.NewServer(nil) contour_xds_v3.RegisterServer(contour_xds_v3.NewContourServer(log, xdscache.ResourcesOf(resources)...), srv)