Skip to content

Commit

Permalink
Let ClusterConfigInitializer apply the api-config stack
Browse files Browse the repository at this point in the history
The k0s managed stacks don't have to live on disk. They can be used
directly from in-memory. Let the api-config stack be the first. Include
its application in the Start method of the ClusterConfigInitializer.
Have the applier manager ignore this stack and write an ignore note in
the stack directory.

Signed-off-by: Tom Wieczorek <[email protected]>
  • Loading branch information
twz123 committed Oct 2, 2024
1 parent 00c64db commit 16cfedb
Show file tree
Hide file tree
Showing 5 changed files with 185 additions and 69 deletions.
24 changes: 9 additions & 15 deletions cmd/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,10 @@ func (c *command) start(ctx context.Context) error {
nodeComponents.Add(ctx, &applier.Manager{
K0sVars: c.K0sVars,
KubeClientFactory: adminClientFactory,
LeaderElector: leaderElector,
IgnoredStacks: map[string]string{
controller.ClusterConfigStackName: "v1.32",
},
LeaderElector: leaderElector,
})
}

Expand Down Expand Up @@ -389,20 +392,11 @@ func (c *command) start(ctx context.Context) error {
var configSource clusterconfig.ConfigSource
// For backwards compatibility, use file as config source by default
if c.EnableDynamicConfig {
// The CRDs are only required if the config is stored in the cluster.
clusterComponents.Add(ctx, controller.NewCRD(
c.K0sVars.ManifestsDir, "v1beta1",
controller.WithStackName("api-config"),
controller.WithCRDAssetsDir("k0s")),
)

initializer, err := controller.NewClusterConfigInitializer(adminClientFactory, leaderElector, nodeConfig)
if err != nil {
return fmt.Errorf("failed to create cluster config initializer: %w", err)
}

clusterComponents.Add(ctx, initializer)

clusterComponents.Add(ctx, controller.NewClusterConfigInitializer(
adminClientFactory,
leaderElector,
nodeConfig,
))
configSource, err = clusterconfig.NewAPIConfigSource(adminClientFactory)
if err != nil {
return err
Expand Down
17 changes: 16 additions & 1 deletion pkg/applier/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ import (
"context"
"fmt"
"path"
"path/filepath"
"time"

"github.com/k0sproject/k0s/internal/pkg/dir"
"github.com/k0sproject/k0s/internal/pkg/file"
"github.com/k0sproject/k0s/pkg/component/controller/leaderelector"
"github.com/k0sproject/k0s/pkg/component/manager"
"github.com/k0sproject/k0s/pkg/config"
Expand All @@ -36,6 +38,7 @@ import (
// Manager is the Component interface wrapper for Applier
type Manager struct {
K0sVars *config.CfgVars
IgnoredStacks map[string]string
KubeClientFactory kubeutil.ClientFactoryInterface

// client kubernetes.Interface
Expand Down Expand Up @@ -157,12 +160,24 @@ func (m *Manager) createStack(ctx context.Context, name string) {
return
}

log := m.log.WithField("stack", name)

stackName := filepath.Base(name)
if sinceVersion, ignored := m.IgnoredStacks[stackName]; ignored {
if err := file.AtomicWithTarget(filepath.Join(name, "ignored.txt")).WriteString(
"The " + stackName + " stack is handled internally since k0s " + sinceVersion + ".\n" +
"This directory is ignored and can be safely removed.\n",
); err != nil {
log.WithError(err).Warn("Failed to write ignore notice")
}
return
}

stackCtx, cancelStack := context.WithCancel(ctx)
stack := stack{cancelStack, NewStackApplier(name, m.KubeClientFactory)}
m.stacks[name] = stack

go func() {
log := m.log.WithField("stack", name)
for {
log.Info("Running stack")
if err := stack.Run(stackCtx); err != nil {
Expand Down
54 changes: 54 additions & 0 deletions pkg/applier/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import (
"context"
"os"
"path/filepath"
"strings"
"testing"
"time"

"github.com/k0sproject/k0s/internal/testutil"
"github.com/k0sproject/k0s/pkg/applier"
Expand All @@ -30,6 +32,7 @@ import (
"github.com/k0sproject/k0s/pkg/kubernetes/watch"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -96,3 +99,54 @@ data: {}
}),
)
}

func TestManager_IgnoresStacks(t *testing.T) {
k0sVars, err := config.NewCfgVars(nil, t.TempDir())
require.NoError(t, err)
leaderElector := leaderelector.Dummy{Leader: true}
clients := testutil.NewFakeClientFactory()

underTest := applier.Manager{
K0sVars: k0sVars,
IgnoredStacks: map[string]string{"ignored": "v99"},
KubeClientFactory: clients,
LeaderElector: &leaderElector,
}

ignored := filepath.Join(k0sVars.ManifestsDir, "ignored")
require.NoError(t, os.MkdirAll(ignored, constant.ManifestsDirMode))
os.WriteFile(filepath.Join(ignored, "ignored.yaml"), []byte(`

Check failure on line 118 in pkg/applier/manager_test.go

View workflow job for this annotation

GitHub Actions / Lint

Error return value of `os.WriteFile` is not checked (errcheck)
apiVersion: v1
kind: ConfigMap
metadata:
name: ignored
namespace: default
data: {}
`,
), constant.CertMode)

require.NoError(t, leaderElector.Init(context.TODO()))
require.NoError(t, underTest.Init(context.TODO()))
require.NoError(t, underTest.Start(context.TODO()))
t.Cleanup(func() { assert.NoError(t, underTest.Stop()) })
require.NoError(t, leaderElector.Start(context.TODO()))
t.Cleanup(func() { assert.NoError(t, leaderElector.Stop()) })

var content []byte
require.EventuallyWithT(t, func(t *assert.CollectT) {
content, err = os.ReadFile(filepath.Join(ignored, "ignored.txt"))
assert.NoError(t, err)
}, 10*time.Second, 100*time.Millisecond)

expectedContent := []string{
"The ignored stack is handled internally since k0s v99.",
"This directory is ignored and can be safely removed.",
"",
}
assert.Equal(t, strings.Join(expectedContent, "\n"), string(content))

configMaps, err := clients.Client.CoreV1().ConfigMaps("default").List(context.TODO(), metav1.ListOptions{})
if assert.NoError(t, err) {
assert.Empty(t, configMaps.Items)
}
}
115 changes: 76 additions & 39 deletions pkg/component/controller/clusterconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,36 +20,43 @@ import (
"context"
"errors"
"fmt"
"io/fs"
"os"
"time"

"github.com/k0sproject/k0s/pkg/apis/k0s/v1beta1"
k0sclient "github.com/k0sproject/k0s/pkg/client/clientset/typed/k0s/v1beta1"
k0sv1beta1 "github.com/k0sproject/k0s/pkg/apis/k0s/v1beta1"
"github.com/k0sproject/k0s/pkg/applier"
k0sv1beta1client "github.com/k0sproject/k0s/pkg/client/clientset/typed/k0s/v1beta1"
"github.com/k0sproject/k0s/pkg/component/controller/clusterconfig"
"github.com/k0sproject/k0s/pkg/component/controller/leaderelector"
"github.com/k0sproject/k0s/pkg/component/manager"
"github.com/k0sproject/k0s/pkg/constant"
kubeutil "github.com/k0sproject/k0s/pkg/kubernetes"
"github.com/k0sproject/k0s/pkg/kubernetes"
"github.com/k0sproject/k0s/static"

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/util/wait"

"github.com/sirupsen/logrus"
"sigs.k8s.io/yaml"
)

const ClusterConfigStackName = "api-config"

// ClusterConfigReconciler reconciles a ClusterConfig object
type ClusterConfigReconciler struct {
KubeClientFactory kubeutil.ClientFactoryInterface
KubeClientFactory kubernetes.ClientFactoryInterface

log *logrus.Entry
reconciler manager.Reconciler
configSource clusterconfig.ConfigSource
}

// NewClusterConfigReconciler creates a new clusterConfig reconciler
func NewClusterConfigReconciler(reconciler manager.Reconciler, kubeClientFactory kubeutil.ClientFactoryInterface, configSource clusterconfig.ConfigSource) *ClusterConfigReconciler {
func NewClusterConfigReconciler(reconciler manager.Reconciler, kubeClientFactory kubernetes.ClientFactoryInterface, configSource clusterconfig.ConfigSource) *ClusterConfigReconciler {
return &ClusterConfigReconciler{
KubeClientFactory: kubeClientFactory,
log: logrus.WithFields(logrus.Fields{"component": "clusterConfig-reconciler"}),
Expand Down Expand Up @@ -100,7 +107,7 @@ func (r *ClusterConfigReconciler) Stop() error {
return nil
}

func (r *ClusterConfigReconciler) reportStatus(ctx context.Context, config *v1beta1.ClusterConfig, reconcileError error) {
func (r *ClusterConfigReconciler) reportStatus(ctx context.Context, config *k0sv1beta1.ClusterConfig, reconcileError error) {
hostname, err := os.Hostname()
if err != nil {
r.log.Error("failed to get hostname:", err)
Expand All @@ -119,11 +126,11 @@ func (r *ClusterConfigReconciler) reportStatus(ctx context.Context, config *v1be
FirstTimestamp: metav1.Now(),
LastTimestamp: metav1.Now(),
InvolvedObject: corev1.ObjectReference{
Kind: v1beta1.ClusterConfigKind,
Kind: k0sv1beta1.ClusterConfigKind,
Namespace: config.Namespace,
Name: config.Name,
UID: config.UID,
APIVersion: v1beta1.ClusterConfigAPIVersion,
APIVersion: k0sv1beta1.ClusterConfigAPIVersion,
ResourceVersion: config.ResourceVersion,
},
Action: "ConfigReconciling",
Expand All @@ -147,55 +154,67 @@ func (r *ClusterConfigReconciler) reportStatus(ctx context.Context, config *v1be

type ClusterConfigInitializer struct {
log logrus.FieldLogger
configClient k0sclient.ClusterConfigInterface
clients kubernetes.ClientFactoryInterface
leaderElector leaderelector.Interface
initialConfig *v1beta1.ClusterConfig
initialConfig *k0sv1beta1.ClusterConfig
}

// Init implements manager.Component.
func (*ClusterConfigInitializer) Init(context.Context) error { return nil }
// Init implements [manager.Component].
func (i *ClusterConfigInitializer) Init(context.Context) error {
return nil
}

// Start implements manager.Component.
// Start implements [manager.Component].
func (i *ClusterConfigInitializer) Start(ctx context.Context) error {
if err := i.ensureClusterConfigExistence(ctx); err != nil {
return fmt.Errorf("failed to ensure the existence of the cluster configuration: %w", err)
}
return nil
}

// Stop implements manager.Component.
// Stop implements [manager.Component].
func (*ClusterConfigInitializer) Stop() error { return nil }

func NewClusterConfigInitializer(kubeClientFactory kubeutil.ClientFactoryInterface, leaderElector leaderelector.Interface, initialConfig *v1beta1.ClusterConfig) (*ClusterConfigInitializer, error) {
configClient, err := kubeClientFactory.GetConfigClient()
if err != nil {
return nil, err
}

func NewClusterConfigInitializer(clients kubernetes.ClientFactoryInterface, leaderElector leaderelector.Interface, initialConfig *k0sv1beta1.ClusterConfig) *ClusterConfigInitializer {
return &ClusterConfigInitializer{
log: logrus.WithField("component", "clusterConfigInitializer"),
configClient: configClient,
clients: clients,
leaderElector: leaderElector,
initialConfig: initialConfig,
}, nil
}
}

func (i *ClusterConfigInitializer) ensureClusterConfigExistence(ctx context.Context) (err error) {
clientset, err := i.clients.GetK0sClient()
if err != nil {
return err
}
client := clientset.K0sV1beta1().ClusterConfigs(constant.ClusterConfigNamespace)

// We need to wait until the cluster configuration exists or we succeed in creating it.
waitErr := wait.PollUntilContextTimeout(ctx, 1*time.Second, 20*time.Second, true, func(ctx context.Context) (bool, error) {
var stackApplied bool
pollErr := wait.PollUntilContextTimeout(ctx, 1*time.Second, 20*time.Second, true, func(ctx context.Context) (bool, error) {
if i.leaderElector.IsLeader() {
err = i.createClusterConfig(ctx)
if err == nil {
i.log.Debug("Cluster configuration created")
return true, nil
if stackApplied {
err = nil
} else {
err = i.applyAPIConfigStack(ctx)
}
if apierrors.IsAlreadyExists(err) {
// An already existing configuration is just fine.
i.log.Debug("Cluster configuration already exists")
return true, nil
if err == nil {
stackApplied = true
err = i.createClusterConfig(ctx, client)
if err == nil {
i.log.Debug("Cluster configuration created")
return true, nil
}
if apierrors.IsAlreadyExists(err) {
// An already existing configuration is just fine.
i.log.Debug("Cluster configuration already exists")
return true, nil
}
}
} else {
err = i.clusterConfigExists(ctx)
err = i.clusterConfigExists(ctx, client)
if err == nil {
i.log.Debug("Cluster configuration exists")
return true, nil
Expand All @@ -206,30 +225,48 @@ func (i *ClusterConfigInitializer) ensureClusterConfigExistence(ctx context.Cont
return false, nil
})

if waitErr != nil {
waitErr = context.Cause(ctx)
if pollErr != nil {
pollErr = context.Cause(ctx)
if err != nil {
return fmt.Errorf("%w (%w)", context.Cause(ctx), err)
}

return waitErr
return pollErr
}

return nil

}

func (i *ClusterConfigInitializer) clusterConfigExists(ctx context.Context) error {
func (i *ClusterConfigInitializer) applyAPIConfigStack(ctx context.Context) error {
rawCRD, err := fs.ReadFile(static.CRDs, "k0s/k0s.k0sproject.io_clusterconfigs.yaml")
if err != nil {
return err
}

var crd unstructured.Unstructured
if err := yaml.Unmarshal(rawCRD, &crd); err != nil {
return err
}

return (&applier.Stack{
Name: ClusterConfigStackName,
Resources: []*unstructured.Unstructured{&crd},
Clients: i.clients,
}).Apply(ctx, true)
}

func (i *ClusterConfigInitializer) clusterConfigExists(ctx context.Context, client k0sv1beta1client.ClusterConfigInterface) error {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
_, err := i.configClient.Get(ctx, constant.ClusterConfigObjectName, metav1.GetOptions{})
_, err := client.Get(ctx, constant.ClusterConfigObjectName, metav1.GetOptions{})
return err
}

func (i *ClusterConfigInitializer) createClusterConfig(ctx context.Context) error {
func (i *ClusterConfigInitializer) createClusterConfig(ctx context.Context, client k0sv1beta1client.ClusterConfigInterface) error {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
clusterWideConfig := i.initialConfig.GetClusterWideConfig().StripDefaults().CRValidator()
_, err := i.configClient.Create(ctx, clusterWideConfig, metav1.CreateOptions{})
_, err := client.Create(ctx, clusterWideConfig, metav1.CreateOptions{})
return err
}
Loading

0 comments on commit 16cfedb

Please sign in to comment.