Skip to content

Commit

Permalink
Merge pull request #829 from enj/enj/i/wait_shutdown
Browse files Browse the repository at this point in the history
Ensure concierge and supervisor gracefully exit
  • Loading branch information
cfryanr authored Aug 31, 2021
2 parents 883007a + 0d285ce commit b19af2e
Show file tree
Hide file tree
Showing 7 changed files with 291 additions and 67 deletions.
31 changes: 28 additions & 3 deletions internal/concierge/apiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package apiserver
import (
"context"
"fmt"
"sync"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -15,6 +16,7 @@ import (
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/client-go/pkg/version"

"go.pinniped.dev/internal/controllerinit"
"go.pinniped.dev/internal/issuer"
"go.pinniped.dev/internal/plog"
"go.pinniped.dev/internal/registry/credentialrequest"
Expand All @@ -29,7 +31,7 @@ type Config struct {
type ExtraConfig struct {
Authenticator credentialrequest.TokenCredentialRequestAuthenticator
Issuer issuer.ClientCertIssuer
StartControllersPostStartHook func(ctx context.Context)
BuildControllersPostStartHook controllerinit.RunnerBuilder
Scheme *runtime.Scheme
NegotiatedSerializer runtime.NegotiatedSerializer
LoginConciergeGroupVersion schema.GroupVersion
Expand Down Expand Up @@ -105,16 +107,39 @@ func (c completedConfig) New() (*PinnipedServer, error) {
return nil, fmt.Errorf("could not install API groups: %w", err)
}

shutdown := &sync.WaitGroup{}
s.GenericAPIServer.AddPostStartHookOrDie("start-controllers",
func(postStartContext genericapiserver.PostStartHookContext) error {
plog.Debug("start-controllers post start hook starting")

ctx, cancel := context.WithCancel(context.Background())
go func() {
defer cancel()

<-postStartContext.StopCh
cancel()
}()
c.ExtraConfig.StartControllersPostStartHook(ctx)

runControllers, err := c.ExtraConfig.BuildControllersPostStartHook(ctx)
if err != nil {
return fmt.Errorf("cannot create run controller func: %w", err)
}

shutdown.Add(1)
go func() {
defer shutdown.Done()

runControllers(ctx)
}()

return nil
},
)
s.GenericAPIServer.AddPreShutdownHookOrDie("stop-controllers",
func() error {
plog.Debug("stop-controllers pre shutdown hook starting")
defer plog.Debug("stop-controllers pre shutdown hook completed")

shutdown.Wait()

return nil
},
Expand Down
17 changes: 11 additions & 6 deletions internal/concierge/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
conciergescheme "go.pinniped.dev/internal/concierge/scheme"
"go.pinniped.dev/internal/config/concierge"
"go.pinniped.dev/internal/controller/authenticator/authncache"
"go.pinniped.dev/internal/controllerinit"
"go.pinniped.dev/internal/controllermanager"
"go.pinniped.dev/internal/downward"
"go.pinniped.dev/internal/dynamiccert"
Expand Down Expand Up @@ -135,7 +136,7 @@ func (a *App) runServer(ctx context.Context) error {

// Prepare to start the controllers, but defer actually starting them until the
// post start hook of the aggregated API server.
startControllersFunc, err := controllermanager.PrepareControllers(
buildControllers, err := controllermanager.PrepareControllers(
&controllermanager.Config{
ServerInstallationInfo: podInfo,
APIGroupSuffix: *cfg.APIGroupSuffix,
Expand Down Expand Up @@ -165,7 +166,7 @@ func (a *App) runServer(ctx context.Context) error {
dynamicServingCertProvider,
authenticators,
certIssuer,
startControllersFunc,
buildControllers,
*cfg.APIGroupSuffix,
scheme,
loginGV,
Expand All @@ -190,7 +191,7 @@ func getAggregatedAPIServerConfig(
dynamicCertProvider dynamiccert.Private,
authenticator credentialrequest.TokenCredentialRequestAuthenticator,
issuer issuer.ClientCertIssuer,
startControllersPostStartHook func(context.Context),
buildControllers controllerinit.RunnerBuilder,
apiGroupSuffix string,
scheme *runtime.Scheme,
loginConciergeGroupVersion, identityConciergeGroupVersion schema.GroupVersion,
Expand Down Expand Up @@ -227,7 +228,7 @@ func getAggregatedAPIServerConfig(
ExtraConfig: apiserver.ExtraConfig{
Authenticator: authenticator,
Issuer: issuer,
StartControllersPostStartHook: startControllersPostStartHook,
BuildControllersPostStartHook: buildControllers,
Scheme: scheme,
NegotiatedSerializer: codecs,
LoginConciergeGroupVersion: loginConciergeGroupVersion,
Expand All @@ -237,7 +238,7 @@ func getAggregatedAPIServerConfig(
return apiServerConfig, nil
}

func Main() {
func main() error { // return an error instead of klog.Fatal to allow defer statements to run
logs.InitLogs()
defer logs.FlushLogs()

Expand All @@ -250,7 +251,11 @@ func Main() {

ctx := genericapiserver.SetupSignalContext()

if err := New(ctx, os.Args[1:], os.Stdout, os.Stderr).Run(); err != nil {
return New(ctx, os.Args[1:], os.Stdout, os.Stderr).Run()
}

func Main() {
if err := main(); err != nil {
klog.Fatal(err)
}
}
87 changes: 87 additions & 0 deletions internal/controllerinit/controllerinit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright 2021 the Pinniped contributors. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

package controllerinit

import (
"context"
"fmt"
"reflect"
"sort"
"time"
)

// Runner is something that can be run such as a series of controllers. Blocks until context is canceled.
type Runner func(context.Context)

// RunnerWrapper takes a Runner and wraps its execution with other logic. Blocks until context is canceled.
// RunnerWrapper is responsible for the lifetime of the passed in Runner.
type RunnerWrapper func(context.Context, Runner)

// RunnerBuilder is a function that can be used to construct a Runner.
// It is expected to be called in the main go routine since the construction can fail.
type RunnerBuilder func(context.Context) (Runner, error)

// informer is the subset of SharedInformerFactory needed for starting an informer cache and waiting for it to sync.
type informer interface {
Start(stopCh <-chan struct{})
WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool
}

// Prepare returns RunnerBuilder that, when called:
// 1. Starts all provided informers and waits for them sync (and fails if they hang)
// 2. Returns a Runner that combines the Runner and RunnerWrapper passed into Prepare
func Prepare(controllers Runner, controllersWrapper RunnerWrapper, informers ...informer) RunnerBuilder {
return func(ctx context.Context) (Runner, error) {
for _, informer := range informers {
informer := informer

informer.Start(ctx.Done())

// prevent us from blocking forever due to a broken informer
waitCtx, waitCancel := context.WithTimeout(ctx, time.Minute)
defer waitCancel()

// wait until the caches are synced before returning
status := informer.WaitForCacheSync(waitCtx.Done())

if unsynced := unsyncedInformers(status); len(unsynced) > 0 {
return nil, fmt.Errorf("failed to sync informers of %s: %v", anyToFullname(informer), unsynced)
}
}

return func(controllerCtx context.Context) {
controllersWrapper(controllerCtx, controllers)
}, nil
}
}

func unsyncedInformers(status map[reflect.Type]bool) []string {
if len(status) == 0 {
return []string{"all:empty"}
}

var names []string

for typ, synced := range status {
if !synced {
names = append(names, typeToFullname(typ))
}
}

sort.Strings(names)

return names
}

func anyToFullname(any interface{}) string {
typ := reflect.TypeOf(any)
return typeToFullname(typ)
}

func typeToFullname(typ reflect.Type) string {
if typ.Kind() == reflect.Ptr {
typ = typ.Elem()
}
return typ.PkgPath() + "." + typ.Name()
}
27 changes: 8 additions & 19 deletions internal/controllermanager/prepare_controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
package controllermanager

import (
"context"
"fmt"
"time"

Expand All @@ -27,6 +26,7 @@ import (
"go.pinniped.dev/internal/controller/authenticator/webhookcachefiller"
"go.pinniped.dev/internal/controller/impersonatorconfig"
"go.pinniped.dev/internal/controller/kubecertagent"
"go.pinniped.dev/internal/controllerinit"
"go.pinniped.dev/internal/controllerlib"
"go.pinniped.dev/internal/deploymentref"
"go.pinniped.dev/internal/downward"
Expand Down Expand Up @@ -95,7 +95,7 @@ type Config struct {

// Prepare the controllers and their informers and return a function that will start them when called.
//nolint:funlen // Eh, fair, it is a really long function...but it is wiring the world...so...
func PrepareControllers(c *Config) (func(ctx context.Context), error) {
func PrepareControllers(c *Config) (controllerinit.RunnerBuilder, error) {
loginConciergeGroupData, identityConciergeGroupData := groupsuffix.ConciergeAggregatedGroups(c.APIGroupSuffix)

dref, deployment, err := deploymentref.New(c.ServerInstallationInfo)
Expand Down Expand Up @@ -303,11 +303,12 @@ func PrepareControllers(c *Config) (func(ctx context.Context), error) {
singletonWorker,
)

// Return a function which starts the informers and controllers.
return func(ctx context.Context) {
informers.startAndWaitForSync(ctx)
go leaderElector(ctx, controllerManager.Start)
}, nil
return controllerinit.Prepare(controllerManager.Start, leaderElector,
informers.kubePublicNamespaceK8s,
informers.kubeSystemNamespaceK8s,
informers.installationNamespaceK8s,
informers.pinniped,
), nil
}

type informers struct {
Expand Down Expand Up @@ -345,15 +346,3 @@ func createInformers(
),
}
}

func (i *informers) startAndWaitForSync(ctx context.Context) {
i.kubePublicNamespaceK8s.Start(ctx.Done())
i.kubeSystemNamespaceK8s.Start(ctx.Done())
i.installationNamespaceK8s.Start(ctx.Done())
i.pinniped.Start(ctx.Done())

i.kubePublicNamespaceK8s.WaitForCacheSync(ctx.Done())
i.kubeSystemNamespaceK8s.WaitForCacheSync(ctx.Done())
i.installationNamespaceK8s.WaitForCacheSync(ctx.Done())
i.pinniped.WaitForCacheSync(ctx.Done())
}
8 changes: 6 additions & 2 deletions internal/leaderelection/leaderelection.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"k8s.io/client-go/tools/leaderelection/resourcelock"

"go.pinniped.dev/internal/constable"
"go.pinniped.dev/internal/controllerinit"
"go.pinniped.dev/internal/downward"
"go.pinniped.dev/internal/kubeclient"
"go.pinniped.dev/internal/plog"
Expand All @@ -36,7 +37,7 @@ const ErrNotLeader constable.Error = "write attempt rejected as client is not le
// logic and will coordinate lease release with the input controller starter function.
func New(podInfo *downward.PodInfo, deployment *appsv1.Deployment, opts ...kubeclient.Option) (
*kubeclient.Client,
func(context.Context, func(context.Context)),
controllerinit.RunnerWrapper,
error,
) {
internalClient, err := kubeclient.New(opts...)
Expand Down Expand Up @@ -89,7 +90,10 @@ func New(podInfo *downward.PodInfo, deployment *appsv1.Deployment, opts ...kubec
return nil, nil, fmt.Errorf("could not create leader election client: %w", err)
}

controllersWithLeaderElector := func(ctx context.Context, controllers func(context.Context)) {
controllersWithLeaderElector := func(ctx context.Context, controllers controllerinit.Runner) {
plog.Debug("leader election loop start", "identity", identity)
defer plog.Debug("leader election loop shutdown", "identity", identity)

leaderElectorCtx, leaderElectorCancel := context.WithCancel(context.Background()) // purposefully detached context

go func() {
Expand Down
Loading

0 comments on commit b19af2e

Please sign in to comment.