Skip to content

Commit

Permalink
This PR adds new functionality to be able to manage Etcd peers using …
Browse files Browse the repository at this point in the history
…CRDs.

One of the challenges for automation (Terraform, cluster API, k0sctl) when removing nodes is that when removing a controller node one has to also remove the Etcd peer entry to it in Etcd cluster.

In this PR there's few things going on:
- Each controller creates their own `EtcdMember` object with memberID and peer address fields
- Each controller watches the changes on `EtcdMember` objects
- When the leader controller sees `EtcdMember` object being deleted it will remove the corresponding peer from Etcd cluster.
- If the peer removal fails for some reason it will write an event into `kube-system` namespace

Very DRAFT still, need to refactor some of the code for better readability and error handling.

Signed-off-by: Jussi Nummelin <[email protected]>

Temp commit for state mgmt with conditions

Signed-off-by: Jussi Nummelin <[email protected]>

Fix timing issue in EtcdMember inttest

Signed-off-by: Jussi Nummelin <[email protected]>

Finetune EtcdMember.joinstatus to be validated as enum

Co-authored-by: Tom Wieczorek <[email protected]>
Signed-off-by: Jussi Nummelin <[email protected]>

Update pkg/component/controller/etcd_member_reconciler.go

Co-authored-by: Tom Wieczorek <[email protected]>
Signed-off-by: Jussi Nummelin <[email protected]>

Use net.JoinHostPort

Signed-off-by: Jussi Nummelin <[email protected]>

Fix JoinHostPort

Signed-off-by: Jussi Nummelin <[email protected]>
  • Loading branch information
jnummelin committed Apr 29, 2024
1 parent 61c9c7b commit 3392f6d
Show file tree
Hide file tree
Showing 27 changed files with 1,682 additions and 6 deletions.
8 changes: 7 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,12 @@ controllergen_targets += pkg/apis/autopilot/v1beta2/.controller-gen.stamp
pkg/apis/autopilot/v1beta2/.controller-gen.stamp: $(shell find pkg/apis/autopilot/v1beta2/ -maxdepth 1 -type f -name \*.go)
pkg/apis/autopilot/v1beta2/.controller-gen.stamp: gen_output_dir = autopilot

controllergen_targets += pkg/apis/etcd/v1beta1/.controller-gen.stamp
pkg/apis/etcd/v1beta1/.controller-gen.stamp: $(shell find pkg/apis/etcd/v1beta1/ -maxdepth 1 -type f -name \*.go)
pkg/apis/etcd/v1beta1/.controller-gen.stamp: gen_output_dir = etcd

codegen_targets += $(controllergen_targets)

pkg/apis/%/.controller-gen.stamp: .k0sbuild.docker-image.k0s hack/tools/boilerplate.go.txt hack/tools/Makefile.variables
rm -rf 'static/manifests/$(gen_output_dir)/CustomResourceDefinition'
mkdir -p 'static/manifests/$(gen_output_dir)'
Expand All @@ -137,7 +142,7 @@ pkg/apis/%/.controller-gen.stamp: .k0sbuild.docker-image.k0s hack/tools/boilerpl
&& mv -f -- "$$gendir"/zz_generated.deepcopy.go '$(dir $@).'
touch -- '$@'

clientset_input_dirs := pkg/apis/autopilot/v1beta2 pkg/apis/k0s/v1beta1 pkg/apis/helm/v1beta1
clientset_input_dirs := pkg/apis/autopilot/v1beta2 pkg/apis/k0s/v1beta1 pkg/apis/helm/v1beta1 pkg/apis/etcd/v1beta1
codegen_targets += pkg/client/clientset/.client-gen.stamp
pkg/client/clientset/.client-gen.stamp: $(shell find $(clientset_input_dirs) -type f -name \*.go -not -name \*_test.go -not -name zz_\*)
pkg/client/clientset/.client-gen.stamp: .k0sbuild.docker-image.k0s hack/tools/boilerplate.go.txt embedded-bins/Makefile.variables
Expand All @@ -163,6 +168,7 @@ static/zz_generated_assets.go: .k0sbuild.docker-image.k0s hack/tools/Makefile.va
static/manifests/helm/CustomResourceDefinition/... \
static/manifests/v1beta1/CustomResourceDefinition/... \
static/manifests/autopilot/CustomResourceDefinition/... \
static/manifests/etcd/CustomResourceDefinition/... \
static/manifests/calico/... \
static/manifests/windows/... \
static/misc/...
Expand Down
13 changes: 13 additions & 0 deletions cmd/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,19 @@ func (c *command) start(ctx context.Context) error {
CertManager: worker.NewCertificateManager(ctx, c.K0sVars.KubeletAuthConfigPath),
})

if nodeConfig.Spec.Storage.Type == v1beta1.EtcdStorageType && nodeConfig.Spec.Storage.Etcd.ExternalCluster == nil {
etcdReconciler, err := controller.NewEtcdMemberReconciler(adminClientFactory, c.K0sVars, nodeConfig.Spec.Storage.Etcd, leaderElector)
if err != nil {
return err
}
etcdCRDSaver, err := controller.NewManifestsSaver("etcd-member", c.K0sVars.DataDir)
if err != nil {
return fmt.Errorf("failed to initialize etcd-member manifests saver: %w", err)
}
clusterComponents.Add(ctx, controller.NewCRD(etcdCRDSaver, []string{"etcd"}))
nodeComponents.Add(ctx, etcdReconciler)
}

perfTimer.Checkpoint("starting-certificates-init")
certs := &Certificates{
ClusterSpec: nodeConfig.Spec,
Expand Down
9 changes: 9 additions & 0 deletions internal/testutil/kube_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package testutil

import (
"fmt"

"k8s.io/client-go/rest"

"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -32,9 +33,13 @@ import (
restfake "k8s.io/client-go/rest/fake"
kubetesting "k8s.io/client-go/testing"

etcdMemberClient "github.com/k0sproject/k0s/pkg/client/clientset/typed/etcd/v1beta1"
cfgClient "github.com/k0sproject/k0s/pkg/client/clientset/typed/k0s/v1beta1"
kubeutil "github.com/k0sproject/k0s/pkg/kubernetes"
)

var _ kubeutil.ClientFactoryInterface = (*FakeClientFactory)(nil)

// NewFakeClientFactory creates new client factory which uses internally only the kube fake client interface
func NewFakeClientFactory(objects ...runtime.Object) FakeClientFactory {
rawDiscovery := &discoveryfake.FakeDiscovery{Fake: &kubetesting.Fake{}}
Expand Down Expand Up @@ -89,3 +94,7 @@ func (f FakeClientFactory) GetRESTClient() (rest.Interface, error) {
func (f FakeClientFactory) GetRESTConfig() *rest.Config {
return &rest.Config{}
}

func (f FakeClientFactory) GetEtcdMemberClient() (etcdMemberClient.EtcdMemberInterface, error) {
return nil, fmt.Errorf("NOT IMPLEMENTED")
}
1 change: 1 addition & 0 deletions inttest/Makefile.variables
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,4 @@ smoketests := \
check-singlenode \
check-statussocket \
check-upgrade \
check-etcdmember \
23 changes: 23 additions & 0 deletions inttest/common/bootloosesuite.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (

"github.com/k0sproject/k0s/internal/pkg/file"
apclient "github.com/k0sproject/k0s/pkg/client/clientset"
etcdmemberclient "github.com/k0sproject/k0s/pkg/client/clientset/typed/etcd/v1beta1"
"github.com/k0sproject/k0s/pkg/constant"
"github.com/k0sproject/k0s/pkg/k0scontext"
"github.com/k0sproject/k0s/pkg/kubernetes/watch"
Expand Down Expand Up @@ -774,6 +775,18 @@ func (s *BootlooseSuite) StopController(name string) error {
return s.launchDelegate.StopController(s.Context(), ssh)
}

func (s *BootlooseSuite) RestartController(name string) error {
ssh, err := s.SSH(s.Context(), name)
s.Require().NoError(err)
defer ssh.Disconnect()
s.T().Log("killing k0s")
err = s.launchDelegate.StopController(s.Context(), ssh)
if err != nil {
return err
}
return s.launchDelegate.StartController(s.Context(), ssh)
}

func (s *BootlooseSuite) StartController(name string) error {
ssh, err := s.SSH(s.Context(), name)
s.Require().NoError(err)
Expand Down Expand Up @@ -896,6 +909,16 @@ func (s *BootlooseSuite) ExtensionsClient(node string, k0sKubeconfigArgs ...stri
return extclient.NewForConfig(cfg)
}

// EtcdMemberClient return a client for accessing etcd member CRDs
func (s *BootlooseSuite) EtcdMemberClient(node string, k0sKubeconfigArgs ...string) (*etcdmemberclient.EtcdV1beta1Client, error) {
cfg, err := s.GetKubeConfig(node, k0sKubeconfigArgs...)
if err != nil {
return nil, err
}

return etcdmemberclient.NewForConfig(cfg)
}

// WaitForNodeReady wait that we see the given node in "Ready" state in kubernetes API
func (s *BootlooseSuite) WaitForNodeReady(name string, kc kubernetes.Interface) error {
s.T().Logf("waiting to see %s ready in kube API", name)
Expand Down
10 changes: 10 additions & 0 deletions inttest/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,16 @@ func VerifyKubeletMetrics(ctx context.Context, kc *kubernetes.Clientset, node st
})
}

func ResetNode(name string, suite *BootlooseSuite) error {
ssh, err := suite.SSH(suite.Context(), name)
if err != nil {
return err
}
defer ssh.Disconnect()
_, err = ssh.ExecWithOutput(suite.Context(), fmt.Sprintf("%s reset --debug", suite.K0sFullPath))
return err
}

// Retrieves the LogfFn stored in context, falling back to use testing.T's Logf
// if the context has a *testing.T or logrus's Infof as a last resort.
func logfFrom(ctx context.Context) LogfFn {
Expand Down
216 changes: 216 additions & 0 deletions inttest/etcdmember/etcdmember_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
/*
Copyright 2024 k0s authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package hacontrolplane

import (
"context"
"encoding/json"
"fmt"
"sync"
"testing"
"time"

"github.com/k0sproject/k0s/inttest/common"
"github.com/stretchr/testify/suite"

etcdv1beta1 "github.com/k0sproject/k0s/pkg/apis/etcd/v1beta1"
"github.com/k0sproject/k0s/pkg/kubernetes/watch"
)

type EtcdMemberSuite struct {
common.BootlooseSuite
}

func (s *EtcdMemberSuite) getMembers(fromControllerIdx int) map[string]string {
// our etcd instances doesn't listen on public IP, so test is performed by calling CLI tools over ssh
// which in general even makes sense, we can test tooling as well
sshCon, err := s.SSH(s.Context(), s.ControllerNode(fromControllerIdx))
s.Require().NoError(err)
defer sshCon.Disconnect()
output, err := sshCon.ExecWithOutput(s.Context(), "/usr/local/bin/k0s etcd member-list 2>/dev/null")
s.T().Logf("k0s etcd member-list output: %s", output)
s.Require().NoError(err)

members := struct {
Members map[string]string `json:"members"`
}{}

s.NoError(json.Unmarshal([]byte(output), &members))
return members.Members
}

func (s *EtcdMemberSuite) TestDeregistration() {

var joinToken string
for idx := 0; idx < s.BootlooseSuite.ControllerCount; idx++ {
s.Require().NoError(s.WaitForSSH(s.ControllerNode(idx), 2*time.Minute, 1*time.Second))

// Note that the token is intentionally empty for the first controller
s.Require().NoError(s.InitController(idx, joinToken))
s.Require().NoError(s.WaitJoinAPI(s.ControllerNode(idx)))
s.Require().NoError(s.WaitForKubeAPI(s.ControllerNode(idx)))
// With the primary controller running, create the join token for subsequent controllers.
if idx == 0 {
token, err := s.GetJoinToken("controller")
s.Require().NoError(err)
joinToken = token
}
}

// Final sanity -- ensure all nodes see each other according to etcd
for idx := 0; idx < s.BootlooseSuite.ControllerCount; idx++ {
s.Require().Len(s.GetMembers(idx), s.BootlooseSuite.ControllerCount)
}
kc, err := s.KubeClient(s.ControllerNode(0))
s.Require().NoError(err)

etcdMemberClient, err := s.EtcdMemberClient(s.ControllerNode(0))

// Check each node is present in the etcd cluster and reports joined state
expectedObjects := []string{"controller0", "controller1", "controller2"}
var wg sync.WaitGroup
for i, obj := range expectedObjects {
wg.Add(1)
ctrlName := obj
ctrlIndex := i
go func() {
defer wg.Done()
s.T().Logf("verifying initial status of %s", ctrlName)
em := &etcdv1beta1.EtcdMember{}
ctx, cancel := context.WithTimeout(s.Context(), 30*time.Second)
defer cancel()

err = watch.EtcdMembers(etcdMemberClient.EtcdMembers()).
WithObjectName(ctrlName).
WithErrorCallback(common.RetryWatchErrors(s.T().Logf)).
Until(ctx, func(item *etcdv1beta1.EtcdMember) (done bool, err error) {
c := item.Status.GetCondition(etcdv1beta1.ConditionTypeJoined)
if c != nil {
// We have the condition so we can bail out
em = item
}
return c != nil, nil
})

// We've got the condition, verify status details
s.Require().NoError(err)
s.Require().Equal(em.PeerAddress, s.GetControllerIPAddress(ctrlIndex))
c := em.Status.GetCondition(etcdv1beta1.ConditionTypeJoined)
s.Require().NotEmpty(c)
s.Require().Equal(etcdv1beta1.ConditionTrue, c.Status)
}()

}
s.T().Log("waiting to see correct statuses on EtcdMembers")
wg.Wait()
s.T().Log("All statuses found")
// Make one of the nodes leave
s.leaveNode("controller2")

// Check that the node is gone from the etcd cluster according to etcd itself
members := s.getMembers(0)
s.Require().Len(members, s.BootlooseSuite.ControllerCount-1)
s.Require().NotContains(members, "controller2")

// Make sure the EtcdMember CR status is successfully updated
em := s.getMember("controller2")
s.Require().Equal(em.Status.ReconcileStatus, "Success")
s.Require().Equal(etcdv1beta1.ConditionFalse, em.Status.GetCondition(etcdv1beta1.ConditionTypeJoined).Status)

// Stop k0s and reset the node
s.Require().NoError(s.StopController(s.ControllerNode(2)))
s.Require().NoError(common.ResetNode(s.ControllerNode(2), &s.BootlooseSuite))

// Make the node rejoin
s.Require().NoError(s.InitController(2, joinToken))
s.Require().NoError(s.WaitForKubeAPI(s.ControllerNode(2)))

// Final sanity -- ensure all nodes see each other according to etcd
members = s.getMembers(0)
s.Require().Len(members, s.BootlooseSuite.ControllerCount)
s.Require().Contains(members, "controller2")

// Check the CR is present again
em = s.getMember("controller2")
s.Require().Equal(em.PeerAddress, s.GetControllerIPAddress(2))
s.Require().Equal(false, em.Leave)
s.Require().Equal(etcdv1beta1.ConditionTrue, em.Status.GetCondition(etcdv1beta1.ConditionTypeJoined).Status)

// Check that after restarting the controller, the member is still present
s.Require().NoError(s.RestartController(s.ControllerNode(2)))
em = &etcdv1beta1.EtcdMember{}
err = kc.RESTClient().Get().AbsPath(fmt.Sprintf(basePath, "controller2")).Do(s.Context()).Into(em)
s.Require().NoError(err)
s.Require().Equal(em.PeerAddress, s.GetControllerIPAddress(2))

}

const basePath = "apis/etcd.k0sproject.io/v1beta1/etcdmembers/%s"

func (s *EtcdMemberSuite) leaveNode(name string) {
kc, err := s.KubeClient(s.ControllerNode(0))
s.Require().NoError(err)

// Patch the EtcdMember CR to set the Leave flag
path := fmt.Sprintf(basePath, name)
patch := []byte(`{"leave":true}`)
result := kc.RESTClient().Patch("application/merge-patch+json").AbsPath(path).Body(patch).Do(s.Context())

s.Require().NoError(result.Error())
s.T().Logf("marked %s for leaving, waiting to see the state updated", name)
ctx, cancel := context.WithTimeout(s.Context(), 10*time.Second)
defer cancel()

err = common.Poll(ctx, func(ctx context.Context) (done bool, err error) {
em := &etcdv1beta1.EtcdMember{}
err = kc.RESTClient().Get().AbsPath(fmt.Sprintf(basePath, name)).Do(s.Context()).Into(em) //DoRaw(s.Context())
s.Require().NoError(err)

c := em.Status.GetCondition(etcdv1beta1.ConditionTypeJoined)
if c == nil {
return false, nil
}
s.T().Logf("JoinStatus = %s, waiting for %s", c.Status, etcdv1beta1.ConditionFalse)
return c.Status == etcdv1beta1.ConditionFalse, nil

})
s.Require().NoError(err)

}

// getMember returns the etcd member CR for the given name
func (s *EtcdMemberSuite) getMember(name string) *etcdv1beta1.EtcdMember {
kc, err := s.KubeClient(s.ControllerNode(0))
s.Require().NoError(err)

em := &etcdv1beta1.EtcdMember{}
err = kc.RESTClient().Get().AbsPath(fmt.Sprintf(basePath, name)).Do(s.Context()).Into(em)
s.Require().NoError(err)
return em
}

func TestEtcdMemberSuite(t *testing.T) {
s := EtcdMemberSuite{
common.BootlooseSuite{
ControllerCount: 3,
LaunchMode: common.LaunchModeOpenRC,
},
}

suite.Run(t, &s)

}
18 changes: 18 additions & 0 deletions pkg/apis/etcd/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright 2024 k0s authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package k0s contains API Schema definitions for the etcd.k0sproject.io API group.
package etcd

const GroupName = "etcd.k0sproject.io"
Loading

0 comments on commit 3392f6d

Please sign in to comment.