Skip to content

Commit

Permalink
wait for the first DAG build before starting the XDS server
Browse files Browse the repository at this point in the history
  • Loading branch information
therealak12 committed Aug 15, 2023
1 parent 44d7293 commit 8684e5c
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 9 deletions.
28 changes: 19 additions & 9 deletions cmd/contour/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
networking_v1 "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
Expand Down Expand Up @@ -63,6 +64,10 @@ import (
"github.com/projectcontour/contour/pkg/config"
)

const (
initialDagBuildPollPeriod = 100 * time.Millisecond
)

// registerServe registers the serve subcommand and flags
// with the Application provided.
func registerServe(app *kingpin.Application) (*kingpin.CmdClause, *serveContext) {
Expand Down Expand Up @@ -658,12 +663,12 @@ func (s *Server) doServe() error {
}

xdsServer := &xdsServer{
log: s.log,
registry: s.registry,
config: *contourConfiguration.XDSServer,
snapshotHandler: snapshotHandler,
resources: resources,
handlerCacheSyncs: s.handlerCacheSyncs,
log: s.log,
registry: s.registry,
config: *contourConfiguration.XDSServer,
snapshotHandler: snapshotHandler,
resources: resources,
initialDagBuilt: contourHandler.HasBuiltInitialDag,
}
if err := s.mgr.Add(xdsServer); err != nil {
return err
Expand Down Expand Up @@ -837,6 +842,7 @@ type xdsServer struct {
snapshotHandler *xdscache.SnapshotHandler
resources []xdscache.ResourceCache
handlerCacheSyncs []cache.InformerSynced

Check failure on line 844 in cmd/contour/serve.go

View workflow job for this annotation

GitHub Actions / lint

field `handlerCacheSyncs` is unused (unused)
initialDagBuilt func() bool
}

func (x *xdsServer) NeedLeaderElection() bool {
Expand All @@ -846,9 +852,13 @@ func (x *xdsServer) NeedLeaderElection() bool {
func (x *xdsServer) Start(ctx context.Context) error {
log := x.log.WithField("context", "xds")

log.Printf("waiting for the initial list to be delivered to handlers")
cache.WaitForCacheSync(ctx.Done(), x.handlerCacheSyncs...)
log.Printf("the initial list delivered to handlers")
log.Printf("waiting for the initial dag to be built")
if err := wait.PollImmediateUntil(initialDagBuildPollPeriod, func() (bool, error) {

Check failure on line 856 in cmd/contour/serve.go

View workflow job for this annotation

GitHub Actions / lint

SA1019: wait.PollImmediateUntil is deprecated: This method does not return errors from context, use PollWithContextCancel. Note that the new method will no longer return ErrWaitTimeout and instead return errors defined by the context package. Will be removed in a future release. (staticcheck)
return x.initialDagBuilt(), nil
}, ctx.Done()); err != nil {
return fmt.Errorf("failed to wait for initial dag build, %w", err)
}
log.Printf("the initial dag is built")

grpcServer := xds.NewServer(x.registry, grpcOptions(log, x.config.TLS)...)

Expand Down
8 changes: 8 additions & 0 deletions internal/contour/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ type EventHandler struct {
seq int

syncTracker *synctrack.SingleFileTracker

initialDagBuilt bool
}

func NewEventHandler(config EventHandlerConfig, upstreamHasSynced cache.InformerSynced) *EventHandler {
Expand Down Expand Up @@ -91,6 +93,7 @@ type opDelete struct {

func (e *EventHandler) OnAdd(obj any, isInInitialList bool) {
if isInInitialList {
time.Sleep(time.Second * 3)
e.syncTracker.Start()
}
e.update <- opAdd{obj: obj, isInInitialList: isInInitialList}
Expand All @@ -109,6 +112,10 @@ func (e *EventHandler) NeedLeaderElection() bool {
return false
}

func (e *EventHandler) HasBuiltInitialDag() bool {
return e.initialDagBuilt
}

// Implements leadership.NeedLeaderElectionNotification
func (e *EventHandler) OnElectedLeader() {
// Trigger an update when we are elected leader to ensure resource
Expand Down Expand Up @@ -196,6 +203,7 @@ func (e *EventHandler) Start(ctx context.Context) error {
e.rebuildDAG()
e.incSequence()
lastDAGRebuild = time.Now()
e.initialDagBuilt = true
case <-ctx.Done():
// shutdown
return nil
Expand Down

0 comments on commit 8684e5c

Please sign in to comment.