Skip to content

Commit

Permalink
feat: leader election
Browse files Browse the repository at this point in the history
  • Loading branch information
adityathebe authored and moshloop committed Oct 25, 2024
1 parent 21954a8 commit dbc85a6
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 18 deletions.
40 changes: 28 additions & 12 deletions cmd/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import (
"github.com/flanksource/canary-checker/pkg/controllers"
"github.com/flanksource/canary-checker/pkg/labels"
"github.com/flanksource/commons/logger"
"github.com/flanksource/duty/job"
dutyKubernetes "github.com/flanksource/duty/kubernetes"
"github.com/flanksource/duty/leader"
"github.com/flanksource/duty/shutdown"
"github.com/spf13/cobra"
"go.opentelemetry.io/otel"
Expand All @@ -37,10 +39,10 @@ var (
Use: "operator",
Short: "Start the kubernetes operator",
Run: func(cmd *cobra.Command, args []string) {
if err := run(cmd, args); err != nil {
if err := run(); err != nil {
shutdown.ShutdownAndExit(1, err.Error())
} else {
shutdown.ShutdownAndExit(0, err.Error())
shutdown.ShutdownAndExit(0, "")
}
},
}
Expand All @@ -56,15 +58,7 @@ func init() {
// +kubebuilder:scaffold:scheme
}

func run(cmd *cobra.Command, args []string) error {
logger := logger.GetLogger("operator")
logger.SetLogLevel(k8sLogLevel)

scheme := runtime.NewScheme()

_ = clientgoscheme.AddToScheme(scheme)
_ = canaryv1.AddToScheme(scheme)

func run() error {
ctx, err := InitContext()
if err != nil {
return err
Expand All @@ -78,12 +72,23 @@ func run(cmd *cobra.Command, args []string) error {
return errors.New("operator requires a kubernetes connection")
}

ctx.WithTracer(otel.GetTracerProvider().Tracer("canary-checker"))
ctx.WithTracer(otel.GetTracerProvider().Tracer(app))

apicontext.DefaultContext = ctx.WithNamespace(runner.WatchNamespace)

cache.PostgresCache = cache.NewPostgresCache(apicontext.DefaultContext)

if enableLeaderElection {
job.DisableCronStartOnSchedule()

go func() {
err := leader.Register(ctx, app, runner.WatchNamespace, nil, nil, nil)
if err != nil {
shutdown.ShutdownAndExit(1, fmt.Sprintf("leader election failed: %v", err))
}
}()
}

if runner.OperatorExecutor {
logger.Infof("Starting executors")

Expand All @@ -94,6 +99,16 @@ func run(cmd *cobra.Command, args []string) error {
}

go serve()
return startControllers()
}

func startControllers() error {
scheme := runtime.NewScheme()
_ = clientgoscheme.AddToScheme(scheme)
_ = canaryv1.AddToScheme(scheme)

logger := logger.GetLogger("operator")
logger.SetLogLevel(k8sLogLevel)

ctrl.SetLogger(logr.FromSlogHandler(logger.Handler()))
setupLog := ctrl.Log.WithName("setup")
Expand All @@ -102,6 +117,7 @@ func run(cmd *cobra.Command, args []string) error {
Scheme: scheme,
LeaderElection: enableLeaderElection,
LeaderElectionNamespace: runner.WatchNamespace,
LeaderElectionID: "fa62cd4d.flanksource.com",
Metrics: ctrlMetrics.Options{
BindAddress: ":0",
},
Expand Down
10 changes: 6 additions & 4 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,26 @@ import (
"go.opentelemetry.io/otel"
)

const app = "canary-checker"

func InitContext() (context.Context, error) {
ctx, closer, err := duty.Start("canary-checker", duty.SkipChangelogMigration, duty.SkipMigrationByDefaultMode)
ctx, closer, err := duty.Start(app, duty.SkipChangelogMigration, duty.SkipMigrationByDefaultMode)
if err != nil {
return ctx, fmt.Errorf("Failed to initialize db: %v", err.Error())
return ctx, fmt.Errorf("failed to initialize db: %v", err.Error())
}
shutdown.AddHook(closer)

if err := properties.LoadFile(propertiesFile); err != nil {
return ctx, fmt.Errorf("failed to load properties: %v", err)
}

ctx.WithTracer(otel.GetTracerProvider().Tracer("canary-checker"))
ctx.WithTracer(otel.GetTracerProvider().Tracer(app))

return ctx, nil
}

var Root = &cobra.Command{
Use: "canary-checker",
Use: app,
PersistentPreRun: func(cmd *cobra.Command, args []string) {
logger.UseSlog()
shutdown.WaitForSignal()
Expand Down
2 changes: 1 addition & 1 deletion cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ var Run = &cobra.Command{
log.Fatalln("Must specify at least one canary")
}

ctx, closer, err := duty.Start("canary-checker", duty.ClientOnly, duty.SkipMigrationByDefaultMode)
ctx, closer, err := duty.Start(app, duty.ClientOnly, duty.SkipMigrationByDefaultMode)
if err != nil {
logger.Fatalf("Failed to initialize db: %v", err.Error())
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ var RunTopology = &cobra.Command{

defer shutdown.Shutdown()
var err error
apicontext.DefaultContext, _, err = duty.Start("canary-checker", duty.ClientOnly, duty.SkipMigrationByDefaultMode)
apicontext.DefaultContext, _, err = duty.Start(app, duty.ClientOnly, duty.SkipMigrationByDefaultMode)
if err != nil {
logger.Errorf(err.Error())
return
Expand Down

0 comments on commit dbc85a6

Please sign in to comment.