Skip to content

Commit

Permalink
Add API extensions clients to client factory
Browse files Browse the repository at this point in the history
Signed-off-by: Tom Wieczorek <[email protected]>
  • Loading branch information
twz123 committed Oct 2, 2024
1 parent 05b277a commit afd9a51
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 37 deletions.
27 changes: 19 additions & 8 deletions internal/testutil/kube_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ import (
"github.com/k0sproject/k0s/pkg/constant"
kubeutil "github.com/k0sproject/k0s/pkg/kubernetes"

apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
apiextensionsfake "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/fake"
apiextensionsscheme "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/scheme"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -51,6 +54,7 @@ func NewFakeClientFactory(objects ...runtime.Object) *FakeClientFactory {
// Create a scheme containing all the kinds and types that k0s knows about.
scheme := runtime.NewScheme()
utilruntime.Must(kubernetesscheme.AddToScheme(scheme))
utilruntime.Must(apiextensionsscheme.AddToScheme(scheme))
utilruntime.Must(k0sscheme.AddToScheme(scheme))

// Create a dynamic fake client that can deal with all that.
Expand All @@ -66,21 +70,24 @@ func NewFakeClientFactory(objects ...runtime.Object) *FakeClientFactory {
// transform between typed and unstructured objects.
tracker := fakeclient.TypedObjectTrackerFrom(scheme, fakeDynamic)
kubeClients := fakeclient.NewClientset[kubernetesfake.Clientset](fakeDiscovery, tracker)
apiExtensionsClients := fakeclient.NewClientset[apiextensionsfake.Clientset](fakeDiscovery, tracker)
k0sClients := fakeclient.NewClientset[k0sfake.Clientset](fakeDiscovery, tracker)

return &FakeClientFactory{
DynamicClient: fakeDynamic,
Client: kubeClients,
DiscoveryClient: memory.NewMemCacheClient(fakeDiscovery),
K0sClient: k0sClients,
DynamicClient: fakeDynamic,
Client: kubeClients,
DiscoveryClient: memory.NewMemCacheClient(fakeDiscovery),
APIExtensionsClient: apiExtensionsClients,
K0sClient: k0sClients,
}
}

type FakeClientFactory struct {
DynamicClient *dynamicfake.FakeDynamicClient
Client *kubernetesfake.Clientset
DiscoveryClient discovery.CachedDiscoveryInterface
K0sClient *k0sfake.Clientset
DynamicClient *dynamicfake.FakeDynamicClient
Client kubernetes.Interface
DiscoveryClient discovery.CachedDiscoveryInterface
APIExtensionsClient *apiextensionsfake.Clientset
K0sClient *k0sfake.Clientset
}

func (f *FakeClientFactory) GetClient() (kubernetes.Interface, error) {
Expand All @@ -95,6 +102,10 @@ func (f *FakeClientFactory) GetDiscoveryClient() (discovery.CachedDiscoveryInter
return f.DiscoveryClient, nil
}

func (f *FakeClientFactory) GetAPIExtensionsClient() (apiextensionsclientset.Interface, error) {
return f.APIExtensionsClient, nil
}

func (f *FakeClientFactory) GetK0sClient() (k0sclientset.Interface, error) {
return f.K0sClient, nil
}
Expand Down
19 changes: 7 additions & 12 deletions pkg/applier/stack.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ import (
"github.com/k0sproject/k0s/pkg/kubernetes"
"github.com/k0sproject/k0s/pkg/kubernetes/watch"

extensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
extensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apiErrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -155,25 +154,21 @@ func (s *Stack) Apply(ctx context.Context, prune bool) error {

// waitForCRD waits 5 seconds for a CRD to become established on a best-effort basis.
func (s *Stack) waitForCRD(ctx context.Context, crdName string) {
config, err := s.Clients.GetRESTConfig()
if err != nil {
return
}
client, err := extensionsclient.NewForConfig(config)
client, err := s.Clients.GetAPIExtensionsClient()
if err != nil {
return
}

ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

_ = watch.CRDs(client.CustomResourceDefinitions()).
_ = watch.CRDs(client.ApiextensionsV1().CustomResourceDefinitions()).
WithObjectName(crdName).
WithErrorCallback(watch.IsRetryable).
Until(ctx, func(item *extensionsv1.CustomResourceDefinition) (bool, error) {
Until(ctx, func(item *apiextensionsv1.CustomResourceDefinition) (bool, error) {
for _, cond := range item.Status.Conditions {
if cond.Type == extensionsv1.Established {
return cond.Status == extensionsv1.ConditionTrue, nil
if cond.Type == apiextensionsv1.Established {
return cond.Status == apiextensionsv1.ConditionTrue, nil
}
}
return false, nil
Expand Down Expand Up @@ -450,7 +445,7 @@ func (s *Stack) prepareResource(resource *unstructured.Unstructured) {

func isCRD(resource *unstructured.Unstructured) bool {
gvk := resource.GroupVersionKind()
return gvk.Group == extensionsv1.GroupName && gvk.Kind == "CustomResourceDefinition"
return gvk.Group == apiextensionsv1.GroupName && gvk.Kind == "CustomResourceDefinition"
}

func generateResourceID(resource unstructured.Unstructured) string {
Expand Down
18 changes: 6 additions & 12 deletions pkg/component/controller/etcd_member_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,9 @@ import (
kubeutil "github.com/k0sproject/k0s/pkg/kubernetes"
"github.com/k0sproject/k0s/pkg/kubernetes/watch"
"github.com/sirupsen/logrus"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

extensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
extclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1"
)

var _ manager.Component = (*EtcdMemberReconciler)(nil)
Expand Down Expand Up @@ -126,18 +124,14 @@ func (e *EtcdMemberReconciler) Stop() error {
}

func (e *EtcdMemberReconciler) waitForCRD(ctx context.Context) error {
rc, err := e.clientFactory.GetRESTConfig()
if err != nil {
return err
}
ec, err := extclient.NewForConfig(rc)
client, err := e.clientFactory.GetAPIExtensionsClient()
if err != nil {
return err
}
var lastObservedVersion string
log := logrus.WithField("component", "etcdMemberReconciler")
log.Info("waiting to see EtcdMember CRD ready")
return watch.CRDs(ec.CustomResourceDefinitions()).
return watch.CRDs(client.ApiextensionsV1().CustomResourceDefinitions()).
WithObjectName(fmt.Sprintf("%s.%s", "etcdmembers", "etcd.k0sproject.io")).
WithErrorCallback(func(err error) (time.Duration, error) {
if retryAfter, e := watch.IsRetryable(err); e == nil {
Expand All @@ -159,12 +153,12 @@ func (e *EtcdMemberReconciler) waitForCRD(ctx context.Context) error {
)
return retryAfter, nil
}).
Until(ctx, func(item *extensionsv1.CustomResourceDefinition) (bool, error) {
Until(ctx, func(item *apiextensionsv1.CustomResourceDefinition) (bool, error) {
lastObservedVersion = item.ResourceVersion
for _, cond := range item.Status.Conditions {
if cond.Type == extensionsv1.Established {
if cond.Type == apiextensionsv1.Established {
log.Infof("EtcdMember CRD status: %s", cond.Status)
return cond.Status == extensionsv1.ConditionTrue, nil
return cond.Status == apiextensionsv1.ConditionTrue, nil
}
}

Expand Down
36 changes: 31 additions & 5 deletions pkg/kubernetes/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
cfgClient "github.com/k0sproject/k0s/pkg/client/clientset/typed/k0s/v1beta1"
"github.com/k0sproject/k0s/pkg/constant"

apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
"k8s.io/client-go/discovery"
"k8s.io/client-go/discovery/cached/memory"
"k8s.io/client-go/dynamic"
Expand All @@ -37,6 +38,7 @@ type ClientFactoryInterface interface {
GetClient() (kubernetes.Interface, error)
GetDynamicClient() (dynamic.Interface, error)
GetDiscoveryClient() (discovery.CachedDiscoveryInterface, error)
GetAPIExtensionsClient() (apiextensionsclientset.Interface, error)
GetK0sClient() (k0sclientset.Interface, error)
GetConfigClient() (cfgClient.ClusterConfigInterface, error) // Deprecated: Use [ClientFactoryInterface.GetK0sClient] instead.
GetRESTConfig() (*rest.Config, error)
Expand All @@ -49,11 +51,12 @@ type ClientFactoryInterface interface {
type ClientFactory struct {
LoadRESTConfig func() (*rest.Config, error)

client kubernetes.Interface
dynamicClient dynamic.Interface
discoveryClient discovery.CachedDiscoveryInterface
k0sClient k0sclientset.Interface
restConfig *rest.Config
client kubernetes.Interface
dynamicClient dynamic.Interface
discoveryClient discovery.CachedDiscoveryInterface
apiExtensionsClient apiextensionsclientset.Interface
k0sClient k0sclientset.Interface
restConfig *rest.Config

mutex sync.Mutex
}
Expand Down Expand Up @@ -128,6 +131,29 @@ func (c *ClientFactory) GetDiscoveryClient() (discovery.CachedDiscoveryInterface
return cachedClient, nil
}

func (c *ClientFactory) GetAPIExtensionsClient() (apiextensionsclientset.Interface, error) {
c.mutex.Lock()
defer c.mutex.Unlock()

if c.apiExtensionsClient != nil {
return c.apiExtensionsClient, nil
}

config, err := c.getRESTConfig()
if err != nil {
return nil, err
}

client, err := apiextensionsclientset.NewForConfig(config)
if err != nil {
return nil, err
}

c.apiExtensionsClient = client

return client, nil
}

func (c *ClientFactory) GetK0sClient() (k0sclientset.Interface, error) {
c.mutex.Lock()
defer c.mutex.Unlock()
Expand Down

0 comments on commit afd9a51

Please sign in to comment.