Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

leaderelection: configure all timeouts via environment variables #305

Merged
merged 5 commits into from
Sep 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 81 additions & 35 deletions pkg/leader/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package leader

import (
"context"
"fmt"
"os"
"time"

Expand All @@ -13,6 +14,18 @@ import (

type Callback func(cb context.Context)

const devModeEnvKey = "CATTLE_DEV_MODE"
const leaseDurationEnvKey = "CATTLE_ELECTION_LEASE_DURATION"
const renewDeadlineEnvKey = "CATTLE_ELECTION_RENEW_DEADLINE"
const retryPeriodEnvKey = "CATTLE_ELECTION_RETRY_PERIOD"

const defaultLeaseDuration = 45 * time.Second
const defaultRenewDeadline = 30 * time.Second
const defaultRetryPeriod = 2 * time.Second

const developmentLeaseDuration = 45 * time.Hour
const developmentRenewDeadline = 30 * time.Hour

func RunOrDie(ctx context.Context, namespace, name string, client kubernetes.Interface, cb Callback) {
if namespace == "" {
namespace = "kube-system"
Expand Down Expand Up @@ -43,42 +56,75 @@ func run(ctx context.Context, namespace, name string, client kubernetes.Interfac
logrus.Fatalf("error creating leader lock for %s: %v", name, err)
}

t := time.Second
if dl := os.Getenv("CATTLE_DEV_MODE"); dl != "" {
t = time.Hour
cbs := leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
go cb(ctx)
},
OnStoppedLeading: func() {
select {
case <-ctx.Done():
// The context has been canceled or is otherwise complete.
// This is a request to terminate. Exit 0.
// Exiting cleanly is useful when the context is canceled
// so that Kubernetes doesn't record it exiting in error
// when the exit was requested. For example, the wrangler-cli
// package sets up a context that cancels when SIGTERM is
// sent in. If a node is shut down this is the type of signal
// sent. In that case you want the 0 exit code to mark it as
// complete so that everything comes back up correctly after
// a restart.
// The pattern found here can be found inside the kube-scheduler.
logrus.Info("requested to terminate, exiting")
os.Exit(0)
default:
logrus.Fatalf("leaderelection lost for %s", name)
}
},
}

leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: 45 * t,
RenewDeadline: 30 * t,
RetryPeriod: 2 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
go cb(ctx)
},
OnStoppedLeading: func() {
select {
case <-ctx.Done():
// The context has been canceled or is otherwise complete.
// This is a request to terminate. Exit 0.
// Exiting cleanly is useful when the context is canceled
// so that Kubernetes doesn't record it exiting in error
// when the exit was requested. For example, the wrangler-cli
// package sets up a context that cancels when SIGTERM is
// sent in. If a node is shut down this is the type of signal
// sent. In that case you want the 0 exit code to mark it as
// complete so that everything comes back up correctly after
// a restart.
// The pattern found here can be found inside the kube-scheduler.
logrus.Info("requested to terminate, exiting")
os.Exit(0)
default:
logrus.Fatalf("leaderelection lost for %s", name)
}
},
},
ReleaseOnCancel: true,
})
config, err := computeConfig(rl, cbs)
if err != nil {
return err
}

leaderelection.RunOrDie(ctx, *config)
panic("unreachable")
}

func computeConfig(rl resourcelock.Interface, cbs leaderelection.LeaderCallbacks) (*leaderelection.LeaderElectionConfig, error) {
leaseDuration := defaultLeaseDuration
renewDeadline := defaultRenewDeadline
retryPeriod := defaultRetryPeriod
var err error
if d := os.Getenv(devModeEnvKey); d != "" {
leaseDuration = developmentLeaseDuration
renewDeadline = developmentRenewDeadline
}
if d := os.Getenv(leaseDurationEnvKey); d != "" {
leaseDuration, err = time.ParseDuration(d)
if err != nil {
return nil, fmt.Errorf("%s value [%s] is not a valid duration: %w", leaseDurationEnvKey, d, err)
}
}
if d := os.Getenv(renewDeadlineEnvKey); d != "" {
renewDeadline, err = time.ParseDuration(d)
if err != nil {
return nil, fmt.Errorf("%s value [%s] is not a valid duration: %w", renewDeadlineEnvKey, d, err)
}
}
if d := os.Getenv(retryPeriodEnvKey); d != "" {
retryPeriod, err = time.ParseDuration(d)
if err != nil {
return nil, fmt.Errorf("%s value [%s] is not a valid duration: %w", retryPeriodEnvKey, d, err)
}
}

return &leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: leaseDuration,
RenewDeadline: renewDeadline,
RetryPeriod: retryPeriod,
Callbacks: cbs,
ReleaseOnCancel: true,
}, nil
}
156 changes: 156 additions & 0 deletions pkg/leader/leader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package leader

import (
"os"
"reflect"
"testing"
"time"

"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
)

func Test_computeConfig(t *testing.T) {
type args struct {
rl resourcelock.Interface
cbs leaderelection.LeaderCallbacks
}
type env struct {
key string
value string
}
tests := []struct {
name string
args args
envs []env
want *leaderelection.LeaderElectionConfig
wantErr bool
}{
{
name: "all defaults",
moio marked this conversation as resolved.
Show resolved Hide resolved
args: args{
rl: nil,
cbs: leaderelection.LeaderCallbacks{},
},
envs: []env{},
want: &leaderelection.LeaderElectionConfig{
Lock: nil,
LeaseDuration: defaultLeaseDuration,
RenewDeadline: defaultRenewDeadline,
RetryPeriod: defaultRetryPeriod,
Callbacks: leaderelection.LeaderCallbacks{},
ReleaseOnCancel: true,
},
wantErr: false,
},
{
name: "dev mode",
args: args{
rl: nil,
cbs: leaderelection.LeaderCallbacks{},
},
envs: []env{
{key: devModeEnvKey, value: "true"},
},
want: &leaderelection.LeaderElectionConfig{
Lock: nil,
LeaseDuration: developmentLeaseDuration,
RenewDeadline: developmentRenewDeadline,
RetryPeriod: defaultRetryPeriod,
Callbacks: leaderelection.LeaderCallbacks{},
ReleaseOnCancel: true,
},
wantErr: false,
},
{
name: "all overridden",
args: args{
rl: nil,
cbs: leaderelection.LeaderCallbacks{},
},
envs: []env{
{key: devModeEnvKey, value: "true"},
{key: leaseDurationEnvKey, value: "1s"},
{key: renewDeadlineEnvKey, value: "2s"},
{key: retryPeriodEnvKey, value: "3m"},
},
want: &leaderelection.LeaderElectionConfig{
Lock: nil,
LeaseDuration: time.Second,
RenewDeadline: 2 * time.Second,
RetryPeriod: 3 * time.Minute,
Callbacks: leaderelection.LeaderCallbacks{},
ReleaseOnCancel: true,
},
wantErr: false,
},
{
name: "unparseable lease duration",
args: args{
rl: nil,
cbs: leaderelection.LeaderCallbacks{},
},
envs: []env{
{key: leaseDurationEnvKey, value: "bomb"},
{key: renewDeadlineEnvKey, value: "2s"},
{key: retryPeriodEnvKey, value: "3m"},
},
want: nil,
wantErr: true,
},
{
name: "unparseable renew deadline",
args: args{
rl: nil,
cbs: leaderelection.LeaderCallbacks{},
},
envs: []env{
{key: leaseDurationEnvKey, value: "1s"},
{key: renewDeadlineEnvKey, value: "bomb"},
{key: retryPeriodEnvKey, value: "3m"},
},
want: nil,
wantErr: true,
},
{
name: "unparseable retry period",
args: args{
rl: nil,
cbs: leaderelection.LeaderCallbacks{},
},
envs: []env{
{key: leaseDurationEnvKey, value: "1s"},
{key: renewDeadlineEnvKey, value: "2s"},
{key: retryPeriodEnvKey, value: "bomb"},
},
want: nil,
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
for _, e := range []string{leaseDurationEnvKey, renewDeadlineEnvKey, retryPeriodEnvKey} {
err := os.Unsetenv(e)
if err != nil {
t.Errorf("could not Unsetenv: %v", err)
return
}
}
for _, e := range tt.envs {
err := os.Setenv(e.key, e.value)
if err != nil {
t.Errorf("could not SetEnv: %v", err)
return
}
}
got, err := computeConfig(tt.args.rl, tt.args.cbs)
if (err != nil) != tt.wantErr {
t.Errorf("computeConfig() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("computeConfig() got = %v, want %v", got, tt.want)
}
})
}
}
Loading