Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wait for cache sync and DAG build before starting xDS server #5672

Merged
merged 21 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions changelogs/unreleased/5672-therealak12-minor.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
## Contour now waits for the cache sync before starting the DAG rebuild and XDS server

Before this, we only waited for informer caches to sync but didn't wait for delivering the events to subscribed handlers.
Now contour waits for the initial list of Kubernetes objects to be cached and processed by handlers (using the returned `HasSynced` methods)
and then starts building its DAG and serving XDS.
109 changes: 67 additions & 42 deletions cmd/contour/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import (
"context"
"errors"
"fmt"
"net"
"net/http"
Expand All @@ -25,6 +24,25 @@

"github.com/alecthomas/kingpin/v2"
envoy_server_v3 "github.com/envoyproxy/go-control-plane/pkg/server/v3"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
networking_v1 "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
ctrl_cache "sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/manager/signals"
controller_runtime_metrics "sigs.k8s.io/controller-runtime/pkg/metrics"
gatewayapi_v1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1"

controller_runtime_metrics_server "sigs.k8s.io/controller-runtime/pkg/metrics/server"

contour_api_v1 "github.com/projectcontour/contour/apis/projectcontour/v1"
contour_api_v1alpha1 "github.com/projectcontour/contour/apis/projectcontour/v1alpha1"
"github.com/projectcontour/contour/internal/annotation"
Expand All @@ -46,22 +64,10 @@
"github.com/projectcontour/contour/internal/xdscache"
xdscache_v3 "github.com/projectcontour/contour/internal/xdscache/v3"
"github.com/projectcontour/contour/pkg/config"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
networking_v1 "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
ctrl_cache "sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/manager/signals"
controller_runtime_metrics "sigs.k8s.io/controller-runtime/pkg/metrics"
controller_runtime_metrics_server "sigs.k8s.io/controller-runtime/pkg/metrics/server"
gatewayapi_v1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1"
)

const (
initialDagBuildPollPeriod = 100 * time.Millisecond
)

// registerServe registers the serve subcommand and flags
Expand Down Expand Up @@ -176,11 +182,12 @@
}

type Server struct {
log logrus.FieldLogger
ctx *serveContext
coreClient *kubernetes.Clientset
mgr manager.Manager
registry *prometheus.Registry
log logrus.FieldLogger
ctx *serveContext
coreClient *kubernetes.Clientset
mgr manager.Manager
registry *prometheus.Registry
handlerCacheSyncs []cache.InformerSynced
}

// NewServer returns a Server object which contains the initial configuration
Expand Down Expand Up @@ -537,14 +544,24 @@
contourMetrics,
dag.ComposeObservers(append(xdscache.ObserversOf(resources), snapshotHandler)...),
)

hasSynced := func() bool {
for _, syncFunc := range s.handlerCacheSyncs {
if !syncFunc() {
return false
}

Check warning on line 552 in cmd/contour/serve.go

View check run for this annotation

Codecov / codecov/patch

cmd/contour/serve.go#L547-L552

Added lines #L547 - L552 were not covered by tests
}
return true

Check warning on line 554 in cmd/contour/serve.go

View check run for this annotation

Codecov / codecov/patch

cmd/contour/serve.go#L554

Added line #L554 was not covered by tests
}

contourHandler := contour.NewEventHandler(contour.EventHandlerConfig{
Logger: s.log.WithField("context", "contourEventHandler"),
HoldoffDelay: 100 * time.Millisecond,
HoldoffMaxDelay: 500 * time.Millisecond,
Observer: observer,
StatusUpdater: sh.Writer(),
Builder: builder,
})
}, hasSynced)

Check warning on line 564 in cmd/contour/serve.go

View check run for this annotation

Codecov / codecov/patch

cmd/contour/serve.go#L564

Added line #L564 was not covered by tests

// Wrap contourHandler in an EventRecorder which tracks API server events.
eventHandler := &contour.EventRecorder{
Expand All @@ -568,7 +585,7 @@

// Inform on the remaining resources.
for name, r := range informerResources {
if err := informOnResource(r, eventHandler, s.mgr.GetCache()); err != nil {
if err := s.informOnResource(r, eventHandler); err != nil {

Check warning on line 588 in cmd/contour/serve.go

View check run for this annotation

Codecov / codecov/patch

cmd/contour/serve.go#L588

Added line #L588 was not covered by tests
s.log.WithError(err).WithField("resource", name).Fatal("failed to create informer")
}
}
Expand All @@ -584,15 +601,15 @@
handler = k8s.NewNamespaceFilter(sets.List(secretNamespaces), eventHandler)
}

if err := informOnResource(&corev1.Secret{}, handler, s.mgr.GetCache()); err != nil {
if err := s.informOnResource(&corev1.Secret{}, handler); err != nil {

Check warning on line 604 in cmd/contour/serve.go

View check run for this annotation

Codecov / codecov/patch

cmd/contour/serve.go#L604

Added line #L604 was not covered by tests
s.log.WithError(err).WithField("resource", "secrets").Fatal("failed to create informer")
}

// Inform on endpoints.
if err := informOnResource(&corev1.Endpoints{}, &contour.EventRecorder{
if err := s.informOnResource(&corev1.Endpoints{}, &contour.EventRecorder{

Check warning on line 609 in cmd/contour/serve.go

View check run for this annotation

Codecov / codecov/patch

cmd/contour/serve.go#L609

Added line #L609 was not covered by tests
Next: endpointHandler,
Counter: contourMetrics.EventHandlerOperations,
}, s.mgr.GetCache()); err != nil {
}); err != nil {

Check warning on line 612 in cmd/contour/serve.go

View check run for this annotation

Codecov / codecov/patch

cmd/contour/serve.go#L612

Added line #L612 was not covered by tests
s.log.WithError(err).WithField("resource", "endpoints").Fatal("failed to create informer")
}

Expand Down Expand Up @@ -646,7 +663,7 @@
handler = k8s.NewNamespaceFilter([]string{contourConfiguration.Envoy.Service.Namespace}, handler)
}

if err := informOnResource(&corev1.Service{}, handler, s.mgr.GetCache()); err != nil {
if err := s.informOnResource(&corev1.Service{}, handler); err != nil {

Check warning on line 666 in cmd/contour/serve.go

View check run for this annotation

Codecov / codecov/patch

cmd/contour/serve.go#L666

Added line #L666 was not covered by tests
s.log.WithError(err).WithField("resource", "services").Fatal("failed to create informer")
}

Expand All @@ -657,11 +674,11 @@

xdsServer := &xdsServer{
log: s.log,
mgr: s.mgr,
registry: s.registry,
config: *contourConfiguration.XDSServer,
snapshotHandler: snapshotHandler,
resources: resources,
initialDagBuilt: contourHandler.HasBuiltInitialDag,

Check warning on line 681 in cmd/contour/serve.go

View check run for this annotation

Codecov / codecov/patch

cmd/contour/serve.go#L681

Added line #L681 was not covered by tests
}
if err := s.mgr.Add(xdsServer); err != nil {
return err
Expand Down Expand Up @@ -830,11 +847,11 @@

type xdsServer struct {
log logrus.FieldLogger
mgr manager.Manager
registry *prometheus.Registry
config contour_api_v1alpha1.XDSServerConfig
snapshotHandler *xdscache.SnapshotHandler
resources []xdscache.ResourceCache
initialDagBuilt func() bool
}

func (x *xdsServer) NeedLeaderElection() bool {
Expand All @@ -844,11 +861,13 @@
func (x *xdsServer) Start(ctx context.Context) error {
log := x.log.WithField("context", "xds")

log.Printf("waiting for informer caches to sync")
if !x.mgr.GetCache().WaitForCacheSync(ctx) {
return errors.New("informer cache failed to sync")
log.Printf("waiting for the initial dag to be built")
if err := wait.PollUntilContextCancel(ctx, initialDagBuildPollPeriod, true, func(ctx context.Context) (done bool, err error) {
return x.initialDagBuilt(), nil
}); err != nil {
return fmt.Errorf("failed to wait for initial dag build, %w", err)

Check warning on line 868 in cmd/contour/serve.go

View check run for this annotation

Codecov / codecov/patch

cmd/contour/serve.go#L864-L868

Added lines #L864 - L868 were not covered by tests
}
log.Printf("informer caches synced")
log.Printf("the initial dag is built")

Check warning on line 870 in cmd/contour/serve.go

View check run for this annotation

Codecov / codecov/patch

cmd/contour/serve.go#L870

Added line #L870 was not covered by tests

grpcServer := xds.NewServer(x.registry, grpcOptions(log, x.config.TLS)...)

Expand Down Expand Up @@ -953,12 +972,12 @@
// to process, we just need informers to get events.
case contourConfiguration.Gateway.GatewayRef != nil:
// Inform on GatewayClasses.
if err := informOnResource(&gatewayapi_v1beta1.GatewayClass{}, eventHandler, mgr.GetCache()); err != nil {
if err := s.informOnResource(&gatewayapi_v1beta1.GatewayClass{}, eventHandler); err != nil {

Check warning on line 975 in cmd/contour/serve.go

View check run for this annotation

Codecov / codecov/patch

cmd/contour/serve.go#L975

Added line #L975 was not covered by tests
s.log.WithError(err).WithField("resource", "gatewayclasses").Fatal("failed to create informer")
}

// Inform on Gateways.
if err := informOnResource(&gatewayapi_v1beta1.Gateway{}, eventHandler, mgr.GetCache()); err != nil {
if err := s.informOnResource(&gatewayapi_v1beta1.Gateway{}, eventHandler); err != nil {

Check warning on line 980 in cmd/contour/serve.go

View check run for this annotation

Codecov / codecov/patch

cmd/contour/serve.go#L980

Added line #L980 was not covered by tests
s.log.WithError(err).WithField("resource", "gateways").Fatal("failed to create informer")
}
// Otherwise, run the GatewayClass and Gateway controllers to determine
Expand Down Expand Up @@ -1029,12 +1048,12 @@
}

// Inform on ReferenceGrants.
if err := informOnResource(&gatewayapi_v1beta1.ReferenceGrant{}, eventHandler, mgr.GetCache()); err != nil {
if err := s.informOnResource(&gatewayapi_v1beta1.ReferenceGrant{}, eventHandler); err != nil {

Check warning on line 1051 in cmd/contour/serve.go

View check run for this annotation

Codecov / codecov/patch

cmd/contour/serve.go#L1051

Added line #L1051 was not covered by tests
s.log.WithError(err).WithField("resource", "referencegrants").Fatal("failed to create informer")
}

// Inform on Namespaces.
if err := informOnResource(&corev1.Namespace{}, eventHandler, mgr.GetCache()); err != nil {
if err := s.informOnResource(&corev1.Namespace{}, eventHandler); err != nil {

Check warning on line 1056 in cmd/contour/serve.go

View check run for this annotation

Codecov / codecov/patch

cmd/contour/serve.go#L1056

Added line #L1056 was not covered by tests
s.log.WithError(err).WithField("resource", "namespaces").Fatal("failed to create informer")
}
}
Expand Down Expand Up @@ -1197,12 +1216,18 @@
return builder
}

func informOnResource(obj client.Object, handler cache.ResourceEventHandler, cache ctrl_cache.Cache) error {
inf, err := cache.GetInformer(context.Background(), obj)
func (s *Server) informOnResource(obj client.Object, handler cache.ResourceEventHandler) error {
inf, err := s.mgr.GetCache().GetInformer(context.Background(), obj)

Check warning on line 1220 in cmd/contour/serve.go

View check run for this annotation

Codecov / codecov/patch

cmd/contour/serve.go#L1219-L1220

Added lines #L1219 - L1220 were not covered by tests
if err != nil {
return err
}

_, err = inf.AddEventHandler(handler)
return err
registration, err := inf.AddEventHandler(handler)

if err != nil {
return err
}

Check warning on line 1229 in cmd/contour/serve.go

View check run for this annotation

Codecov / codecov/patch

cmd/contour/serve.go#L1225-L1229

Added lines #L1225 - L1229 were not covered by tests

s.handlerCacheSyncs = append(s.handlerCacheSyncs, registration.HasSynced)
return nil

Check warning on line 1232 in cmd/contour/serve.go

View check run for this annotation

Codecov / codecov/patch

cmd/contour/serve.go#L1231-L1232

Added lines #L1231 - L1232 were not covered by tests
}
72 changes: 55 additions & 17 deletions internal/contour/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@
import (
"context"
"reflect"
"sync/atomic"
"time"

"github.com/projectcontour/contour/internal/dag"
"github.com/projectcontour/contour/internal/k8s"
"github.com/sirupsen/logrus"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/cache/synctrack"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/projectcontour/contour/internal/dag"
"github.com/projectcontour/contour/internal/k8s"
)

type EventHandlerConfig struct {
Expand Down Expand Up @@ -55,9 +59,16 @@
// seq is the sequence counter of the number of times
// an event has been received.
seq int

// syncTracker is used to update/query the status of the cache sync.
// Uses an internal counter: incremented at item start, decremented at end.
// HasSynced returns true if its UpstreamHasSynced returns true and the counter is non-positive.
syncTracker *synctrack.SingleFileTracker
sunjayBhatia marked this conversation as resolved.
Show resolved Hide resolved

initialDagBuilt atomic.Bool
}

func NewEventHandler(config EventHandlerConfig) *EventHandler {
func NewEventHandler(config EventHandlerConfig, upstreamHasSynced cache.InformerSynced) *EventHandler {
return &EventHandler{
FieldLogger: config.Logger,
builder: config.Builder,
Expand All @@ -67,11 +78,13 @@
statusUpdater: config.StatusUpdater,
update: make(chan any),
sequence: make(chan int, 1),
syncTracker: &synctrack.SingleFileTracker{UpstreamHasSynced: upstreamHasSynced},
}
}

type opAdd struct {
obj any
obj any
isInInitialList bool
}

type opUpdate struct {
Expand All @@ -83,7 +96,10 @@
}

func (e *EventHandler) OnAdd(obj any, isInInitialList bool) {
e.update <- opAdd{obj: obj}
if isInInitialList {
e.syncTracker.Start()
}

Check warning on line 101 in internal/contour/handler.go

View check run for this annotation

Codecov / codecov/patch

internal/contour/handler.go#L100-L101

Added lines #L100 - L101 were not covered by tests
e.update <- opAdd{obj: obj, isInInitialList: isInInitialList}
}

func (e *EventHandler) OnUpdate(oldObj, newObj any) {
Expand All @@ -94,10 +110,15 @@
e.update <- opDelete{obj: obj}
}

// NeedLeaderElection is included to implement manager.LeaderElectionRunnable
func (e *EventHandler) NeedLeaderElection() bool {
return false
}

func (e *EventHandler) HasBuiltInitialDag() bool {
return e.initialDagBuilt.Load()

Check warning on line 119 in internal/contour/handler.go

View check run for this annotation

Codecov / codecov/patch

internal/contour/handler.go#L118-L119

Added lines #L118 - L119 were not covered by tests
}

// Implements leadership.NeedLeaderElectionNotification
func (e *EventHandler) OnElectedLeader() {
// Trigger an update when we are elected leader to ensure resource
Expand Down Expand Up @@ -164,9 +185,37 @@
// not to process it.
e.incSequence()
}

// We're done processing this event
if updateOpAdd, ok := op.(opAdd); ok {
if updateOpAdd.isInInitialList {
e.syncTracker.Finished()
}

Check warning on line 193 in internal/contour/handler.go

View check run for this annotation

Codecov / codecov/patch

internal/contour/handler.go#L192-L193

Added lines #L192 - L193 were not covered by tests
}
case <-pending:
// Ensure informer caches are synced.
// Schedule a retry for dag rebuild if cache is not synced yet.
// Note that we can't block and wait for the cache sync as it depends on progress of this loop.
if !e.syncTracker.HasSynced() {
e.Info("skipping dag rebuild as cache is not synced")
timer.Reset(e.holdoffDelay)
break

Check warning on line 202 in internal/contour/handler.go

View check run for this annotation

Codecov / codecov/patch

internal/contour/handler.go#L200-L202

Added lines #L200 - L202 were not covered by tests
}

tsaarni marked this conversation as resolved.
Show resolved Hide resolved
e.WithField("last_update", time.Since(lastDAGRebuild)).WithField("outstanding", reset()).Info("performing delayed update")
e.rebuildDAG()

// Build a new DAG and sends it to the Observer.
latestDAG := e.builder.Build()
e.observer.OnChange(latestDAG)

// Allow XDS server to start (if it hasn't already).
e.initialDagBuilt.Store(true)

// Update the status on objects.
for _, upd := range latestDAG.StatusCache.GetStatusUpdates() {
e.statusUpdater.Send(upd)
}

e.incSequence()
lastDAGRebuild = time.Now()
case <-ctx.Done():
Expand Down Expand Up @@ -237,14 +286,3 @@
default:
}
}

// rebuildDAG builds a new DAG and sends it to the Observer,
// the updates the status on objects, and updates the metrics.
func (e *EventHandler) rebuildDAG() {
latestDAG := e.builder.Build()
e.observer.OnChange(latestDAG)

for _, upd := range latestDAG.StatusCache.GetStatusUpdates() {
e.statusUpdater.Send(upd)
}
}
Loading