Skip to content

Commit

Permalink
monitor the bootstrap kubeconfig and restart immediately when changes
Browse files Browse the repository at this point in the history
Signed-off-by: haoqing0110 <[email protected]>
  • Loading branch information
haoqing0110 committed Sep 26, 2024
1 parent e683e8c commit bd7e9d7
Show file tree
Hide file tree
Showing 11 changed files with 286 additions and 241 deletions.
5 changes: 3 additions & 2 deletions pkg/cmd/spoke/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,16 @@ func NewKlusterletOperatorCmd() *cobra.Command {

// NewKlusterletAgentCmd is to start the singleton agent including registration/work
func NewKlusterletAgentCmd() *cobra.Command {
ctx, cancel := context.WithCancel(context.TODO())
commonOptions := commonoptions.NewAgentOptions()
workOptions := work.NewWorkloadAgentOptions()
registrationOption := registration.NewSpokeAgentOptions()
registrationOption := registration.NewSpokeAgentOptions(cancel)

agentConfig := singletonspoke.NewAgentConfig(commonOptions, registrationOption, workOptions)
cmdConfig := commonOptions.CommonOpts.
NewControllerCommandConfig("klusterlet", version.Get(), agentConfig.RunSpokeAgent).
WithHealthChecks(agentConfig.HealthCheckers()...)
cmd := cmdConfig.NewCommandWithContext(context.TODO())
cmd := cmdConfig.NewCommandWithContext(ctx)
cmd.Use = agentCmdName
cmd.Short = "Start the klusterlet agent"

Expand Down
5 changes: 3 additions & 2 deletions pkg/cmd/spoke/registration.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@ import (
)

func NewRegistrationAgent() *cobra.Command {
agentOptions := spoke.NewSpokeAgentOptions()
ctx, cancel := context.WithCancel(context.TODO())
agentOptions := spoke.NewSpokeAgentOptions(cancel)
commonOptions := commonoptions.NewAgentOptions()
cfg := spoke.NewSpokeAgentConfig(commonOptions, agentOptions)
cmdConfig := commonOptions.CommonOpts.
NewControllerCommandConfig("registration-agent", version.Get(), cfg.RunSpokeAgent).
WithHealthChecks(cfg.HealthCheckers()...)

cmd := cmdConfig.NewCommandWithContext(context.TODO())
cmd := cmdConfig.NewCommandWithContext(ctx)
cmd.Use = agentCmdName
cmd.Short = "Start the Cluster Registration Agent"

Expand Down
68 changes: 68 additions & 0 deletions pkg/registration/spoke/event_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package spoke

import (
"context"
"fmt"
"reflect"

corev1 "k8s.io/api/core/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
)

type bootstrapKubeconfigEventHandler struct {
bootstrapKubeconfigSecretName *string
cancel context.CancelFunc
}

func (hc *bootstrapKubeconfigEventHandler) Name() string {
return "bootstrap-hub-kubeconfig"
}

// implement cache.ResourceEventHandler.OnAdd
func (hc *bootstrapKubeconfigEventHandler) OnAdd(obj interface{}, isInInitialList bool) {
}

// implement cache.ResourceEventHandler.OnUpdate
func (hc *bootstrapKubeconfigEventHandler) OnUpdate(oldObj, newObj interface{}) {
newSecret, ok := newObj.(*corev1.Secret)
if !ok {
utilruntime.HandleError(fmt.Errorf("invalid secret object: %v", newObj))
return
}

if newSecret.Name != *hc.bootstrapKubeconfigSecretName {
return
}

oldSecret, ok := oldObj.(*corev1.Secret)
if !ok {
utilruntime.HandleError(fmt.Errorf("invalid secret object: %v", oldObj))
return
}

if !reflect.DeepEqual(newSecret.Data, oldSecret.Data) {
// Restart immediately if the bootstrap kubeconfig changes. Otherwise, in the backup restore scenario,
// the work agent may resync a wrong bootstrap kubeconfig from the cache to overwrite the restored one.
klog.Info("the bootstrap kubeconfig changes and rebootstrap is required, cancel the context")
hc.cancel()
}
}

// implement cache.ResourceEventHandler.OnDelete
func (hc *bootstrapKubeconfigEventHandler) OnDelete(obj interface{}) {
switch t := obj.(type) {
case *corev1.Secret:
if t.Name == *hc.bootstrapKubeconfigSecretName {
klog.Info("the bootstrap kubeconfig deletes and rebootstrap is required, cancel the context")
hc.cancel()
}
case cache.DeletedFinalStateUnknown:
secret, ok := t.Obj.(*corev1.Secret)
if ok && secret.Name == *hc.bootstrapKubeconfigSecretName {
klog.Info("the bootstrap kubeconfig deletes and rebootstrap is required, cancel the context")
hc.cancel()
}
}
}
159 changes: 159 additions & 0 deletions pkg/registration/spoke/event_handler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package spoke

import (
"context"
"testing"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func TestBootstrapKubeconfigEventHandler(t *testing.T) {
//#nosec G101
defaultSecretName := "bootstrap-hub-kubeconfig"
cases := []struct {
name string
secretName string
originalBootstrapKubeconfigSecret interface{}
bootstrapKubeconfigSecret interface{}
add bool
update bool
delete bool
expectCancel bool
}{
{
name: "add",
bootstrapKubeconfigSecret: newBootstrapKubeconfigSecret(defaultSecretName, nil, map[string][]byte{
"kubeconfig": []byte("invalid-kubeconfig"),
}),
expectCancel: false,
},
{
name: "label changes",
originalBootstrapKubeconfigSecret: newBootstrapKubeconfigSecret(defaultSecretName, nil, map[string][]byte{
"kubeconfig": []byte("invalid-kubeconfig"),
}),
bootstrapKubeconfigSecret: newBootstrapKubeconfigSecret(defaultSecretName, map[string]string{
"test": "true",
}, map[string][]byte{
"kubeconfig": []byte("invalid-kubeconfig"),
}),
update: true,
expectCancel: false,
},
{
name: "spec changes",
originalBootstrapKubeconfigSecret: newBootstrapKubeconfigSecret(defaultSecretName, nil, map[string][]byte{
"kubeconfig": []byte("invalid-kubeconfig"),
}),
bootstrapKubeconfigSecret: newBootstrapKubeconfigSecret(defaultSecretName, nil, map[string][]byte{
"kubeconfig": []byte("another-invalid-kubeconfig"),
}),
update: true,
expectCancel: true,
},
{
name: "invalid new object type - update",
bootstrapKubeconfigSecret: struct{}{},
update: true,
expectCancel: false,
},
{
name: "invalid old object type - update",
originalBootstrapKubeconfigSecret: struct{}{},
bootstrapKubeconfigSecret: newBootstrapKubeconfigSecret(defaultSecretName, nil, map[string][]byte{
"kubeconfig": []byte("another-invalid-kubeconfig"),
}),
update: true,
expectCancel: false,
},
{
name: "delete",
bootstrapKubeconfigSecret: newBootstrapKubeconfigSecret(defaultSecretName, nil, nil),
delete: true,
expectCancel: true,
},
{
name: "delete other secret",
bootstrapKubeconfigSecret: newBootstrapKubeconfigSecret("other-secret", nil, nil),
delete: true,
expectCancel: false,
},
{
name: "custom secret name",
secretName: "other-secret",
bootstrapKubeconfigSecret: newBootstrapKubeconfigSecret("other-secret", nil, nil),
delete: true,
expectCancel: true,
},
{
name: "invalid type - delete",
secretName: "other-secret",
bootstrapKubeconfigSecret: struct{}{},
delete: true,
expectCancel: false,
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
secretName := defaultSecretName
bootstrapKubeconfigSecretName := &secretName
ctx, cancel := context.WithCancel(context.Background())

hc := &bootstrapKubeconfigEventHandler{
bootstrapKubeconfigSecretName: bootstrapKubeconfigSecretName,
cancel: cancel,
}

if len(c.secretName) > 0 {
*bootstrapKubeconfigSecretName = c.secretName
}

if c.add {
hc.OnAdd(c.bootstrapKubeconfigSecret, false)
}

if c.update {
hc.OnUpdate(c.originalBootstrapKubeconfigSecret, c.bootstrapKubeconfigSecret)
select {
case <-ctx.Done():
// context should be cancelled
if c.expectCancel == false {
t.Errorf("expected context to be not cancelled, but it was not")
}
default:
if c.expectCancel == true {
t.Errorf("expected context to be cancelled, but it was not")
}
}
}

if c.delete {
hc.OnDelete(c.bootstrapKubeconfigSecret)
select {
case <-ctx.Done():
// context should be cancelled
if c.expectCancel == false {
t.Errorf("expected context to be not cancelled, but it was not")
}
default:
if c.expectCancel == true {
t.Errorf("expected context to be cancelled, but it was not")
}
}
}

})
}
}

func newBootstrapKubeconfigSecret(name string, labels map[string]string, data map[string][]byte, others ...string) *corev1.Secret {
return &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: "open-cluster-management-agent",
Labels: labels,
},
Data: data,
}
}
64 changes: 0 additions & 64 deletions pkg/registration/spoke/health_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,11 @@ package spoke

import (
"context"
"fmt"
"net/http"
"reflect"
"sync"

"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
)

type hubKubeConfigHealthChecker struct {
Expand Down Expand Up @@ -47,62 +42,3 @@ func (hc *hubKubeConfigHealthChecker) setBootstrapped() {
defer hc.lock.Unlock()
hc.bootstrapped = true
}

type bootstrapKubeconfigHealthChecker struct {
bootstrapKubeconfigSecretName *string
changed bool
}

func (hc *bootstrapKubeconfigHealthChecker) Name() string {
return "bootstrap-hub-kubeconfig"
}

// bootstrapKubeconfigHealthChecker returns an error when the bootstrap kubeconfig changes.
func (hc *bootstrapKubeconfigHealthChecker) Check(_ *http.Request) error {
if hc.changed {
return errors.New("the bootstrap kubeconfig changes and rebootstrap is required.")
}
return nil
}

// implement cache.ResourceEventHandler.OnAdd
func (hc *bootstrapKubeconfigHealthChecker) OnAdd(obj interface{}, isInInitialList bool) {
}

// implement cache.ResourceEventHandler.OnUpdate
func (hc *bootstrapKubeconfigHealthChecker) OnUpdate(oldObj, newObj interface{}) {
newSecret, ok := newObj.(*corev1.Secret)
if !ok {
utilruntime.HandleError(fmt.Errorf("invalid secret object: %v", newObj))
return
}

if newSecret.Name != *hc.bootstrapKubeconfigSecretName {
return
}

oldSecret, ok := oldObj.(*corev1.Secret)
if !ok {
utilruntime.HandleError(fmt.Errorf("invalid secret object: %v", oldObj))
return
}

if !reflect.DeepEqual(newSecret.Data, oldSecret.Data) {
hc.changed = true
}
}

// implement cache.ResourceEventHandler.OnDelete
func (hc *bootstrapKubeconfigHealthChecker) OnDelete(obj interface{}) {
switch t := obj.(type) {
case *corev1.Secret:
if t.Name == *hc.bootstrapKubeconfigSecretName {
hc.changed = true
}
case cache.DeletedFinalStateUnknown:
secret, ok := t.Obj.(*corev1.Secret)
if ok && secret.Name == *hc.bootstrapKubeconfigSecretName {
hc.changed = true
}
}
}
Loading

0 comments on commit bd7e9d7

Please sign in to comment.