Skip to content

Commit

Permalink
Fix apache#486 - Set the controller config to the default platform (a…
Browse files Browse the repository at this point in the history
…pache#487)

Signed-off-by: Ricardo Zanini <[email protected]>
  • Loading branch information
ricardozanini authored and rgdoliveira committed Aug 6, 2024
1 parent ffc7382 commit 6fcaa8c
Show file tree
Hide file tree
Showing 12 changed files with 107 additions and 77 deletions.
22 changes: 11 additions & 11 deletions controllers/clusterplatform/clusterplatform.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,25 @@ import (
"github.com/apache/incubator-kie-kogito-serverless-operator/api/metadata"
operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
"github.com/apache/incubator-kie-kogito-serverless-operator/log"
"github.com/apache/incubator-kie-kogito-serverless-operator/utils"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime/pkg/client"
)

const (
PlatformServices operatorapi.WorkFlowCapability = "services"
)

// GetActiveClusterPlatform returns the currently installed active cluster platform.
func GetActiveClusterPlatform(ctx context.Context, c ctrl.Client) (*operatorapi.SonataFlowClusterPlatform, error) {
return getClusterPlatform(ctx, c, true)
func GetActiveClusterPlatform(ctx context.Context) (*operatorapi.SonataFlowClusterPlatform, error) {
return getClusterPlatform(ctx, true)
}

// getClusterPlatform returns the currently active cluster platform or any cluster platform existing in the cluster.
func getClusterPlatform(ctx context.Context, c ctrl.Client, active bool) (*operatorapi.SonataFlowClusterPlatform, error) {
func getClusterPlatform(ctx context.Context, active bool) (*operatorapi.SonataFlowClusterPlatform, error) {
klog.V(log.D).InfoS("Finding available cluster platforms")

lst, err := listPrimaryClusterPlatforms(ctx, c)
lst, err := listPrimaryClusterPlatforms(ctx)
if err != nil {
return nil, err
}
Expand All @@ -66,8 +66,8 @@ func getClusterPlatform(ctx context.Context, c ctrl.Client, active bool) (*opera
}

// listPrimaryClusterPlatforms returns all non-secondary cluster platforms installed (only one will be active).
func listPrimaryClusterPlatforms(ctx context.Context, c ctrl.Reader) (*operatorapi.SonataFlowClusterPlatformList, error) {
lst, err := listAllClusterPlatforms(ctx, c)
func listPrimaryClusterPlatforms(ctx context.Context) (*operatorapi.SonataFlowClusterPlatformList, error) {
lst, err := listAllClusterPlatforms(ctx)
if err != nil {
return nil, err
}
Expand All @@ -83,8 +83,8 @@ func listPrimaryClusterPlatforms(ctx context.Context, c ctrl.Reader) (*operatora
}

// allDuplicatedClusterPlatforms returns true if every cluster platform has a "Duplicated" status set
func allDuplicatedClusterPlatforms(ctx context.Context, c ctrl.Reader) bool {
lst, err := listAllClusterPlatforms(ctx, c)
func allDuplicatedClusterPlatforms(ctx context.Context) bool {
lst, err := listAllClusterPlatforms(ctx)
if err != nil {
return false
}
Expand All @@ -99,9 +99,9 @@ func allDuplicatedClusterPlatforms(ctx context.Context, c ctrl.Reader) bool {
}

// listAllClusterPlatforms returns all clusterplatforms installed.
func listAllClusterPlatforms(ctx context.Context, c ctrl.Reader) (*operatorapi.SonataFlowClusterPlatformList, error) {
func listAllClusterPlatforms(ctx context.Context) (*operatorapi.SonataFlowClusterPlatformList, error) {
lst := operatorapi.NewSonataFlowClusterPlatformList()
if err := c.List(ctx, &lst); err != nil {
if err := utils.GetClient().List(ctx, &lst); err != nil {
return nil, err
}
return &lst, nil
Expand Down
4 changes: 2 additions & 2 deletions controllers/clusterplatform/initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (action *initializeAction) Name() string {
}

func (action *initializeAction) CanHandle(ctx context.Context, cPlatform *operatorapi.SonataFlowClusterPlatform) bool {
return !cPlatform.Status.IsDuplicated() || allDuplicatedClusterPlatforms(ctx, action.client)
return !cPlatform.Status.IsDuplicated() || allDuplicatedClusterPlatforms(ctx)
}

func (action *initializeAction) Handle(ctx context.Context, cPlatform *operatorapi.SonataFlowClusterPlatform) error {
Expand Down Expand Up @@ -107,7 +107,7 @@ func (action *initializeAction) isPrimaryDuplicate(ctx context.Context, cPlatfor
// Always reconcile secondary cluster platforms
return false, nil
}
platforms, err := listPrimaryClusterPlatforms(ctx, action.client)
platforms, err := listPrimaryClusterPlatforms(ctx)
if err != nil {
return false, err
}
Expand Down
44 changes: 7 additions & 37 deletions controllers/platform/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,9 @@ import (
"context"

"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime/pkg/client"

"github.com/apache/incubator-kie-kogito-serverless-operator/container-builder/client"
"github.com/apache/incubator-kie-kogito-serverless-operator/log"
"github.com/apache/incubator-kie-kogito-serverless-operator/utils"

Expand All @@ -36,7 +34,7 @@ import (

const defaultSonataFlowPlatformName = "sonataflow-platform"

func ConfigureDefaults(ctx context.Context, c client.Client, p *operatorapi.SonataFlowPlatform, verbose bool) error {
func CreateOrUpdateWithDefaults(ctx context.Context, p *operatorapi.SonataFlowPlatform, verbose bool) error {
// update missing fields in the resource
if p.Status.Cluster == "" || utils.IsOpenShift() {
p.Status.Cluster = operatorapi.PlatformClusterOpenShift
Expand All @@ -52,7 +50,7 @@ func ConfigureDefaults(ctx context.Context, c client.Client, p *operatorapi.Sona
return err
}

err = configureRegistry(ctx, c, p, verbose)
err = configureRegistry(ctx, p, verbose)
if err != nil {
return err
}
Expand All @@ -61,53 +59,25 @@ func ConfigureDefaults(ctx context.Context, c client.Client, p *operatorapi.Sona
klog.V(log.I).InfoS("Maven Timeout set", "timeout", p.Spec.Build.Config.Timeout.Duration)
}

return createOrUpdatePlatform(ctx, c, p)
return createOrUpdatePlatform(ctx, p)
}

func createOrUpdatePlatform(ctx context.Context, c client.Client, p *operatorapi.SonataFlowPlatform) error {
func createOrUpdatePlatform(ctx context.Context, p *operatorapi.SonataFlowPlatform) error {
config := operatorapi.SonataFlowPlatform{}
err := c.Get(ctx, ctrl.ObjectKey{Namespace: p.Namespace, Name: p.Name}, &config)
err := utils.GetClient().Get(ctx, ctrl.ObjectKey{Namespace: p.Namespace, Name: p.Name}, &config)
if errors.IsNotFound(err) {
klog.V(log.D).ErrorS(err, "Platform not found, creating it")
return c.Create(ctx, p)
return utils.GetClient().Create(ctx, p)
} else if err != nil {
klog.V(log.E).ErrorS(err, "Error reading the Platform")
return err
}

config.Spec = p.Spec
config.Status.Cluster = p.Status.Cluster
err = c.Update(ctx, &config)
err = utils.GetClient().Update(ctx, &config)
if err != nil {
klog.V(log.E).ErrorS(err, "Error updating the BuildPlatform")
}
return err
}

func newDefaultSonataFlowPlatform(namespace string) *operatorapi.SonataFlowPlatform {
if utils.IsOpenShift() {
return &operatorapi.SonataFlowPlatform{
ObjectMeta: metav1.ObjectMeta{Name: defaultSonataFlowPlatformName, Namespace: namespace},
Spec: operatorapi.SonataFlowPlatformSpec{
Build: operatorapi.BuildPlatformSpec{
Config: operatorapi.BuildPlatformConfig{
BuildStrategy: operatorapi.PlatformBuildStrategy,
},
},
},
}
}

return &operatorapi.SonataFlowPlatform{
ObjectMeta: metav1.ObjectMeta{Name: defaultSonataFlowPlatformName, Namespace: namespace},
Spec: operatorapi.SonataFlowPlatformSpec{
Build: operatorapi.BuildPlatformSpec{
Config: operatorapi.BuildPlatformConfig{
BuildStrategyOptions: map[string]string{
kanikoBuildCacheEnabled: "true",
},
},
},
},
}
}
2 changes: 1 addition & 1 deletion controllers/platform/initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (action *initializeAction) Handle(ctx context.Context, platform *operatorap
return nil, nil
}

if err = ConfigureDefaults(ctx, action.client, platform, true); err != nil {
if err = CreateOrUpdateWithDefaults(ctx, platform, true); err != nil {
return nil, err
}
// nolint: staticcheck
Expand Down
2 changes: 1 addition & 1 deletion controllers/platform/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (action *serviceAction) CanHandle(platform *operatorapi.SonataFlowPlatform)

func (action *serviceAction) Handle(ctx context.Context, platform *operatorapi.SonataFlowPlatform) (*operatorapi.SonataFlowPlatform, error) {
// Refresh applied configuration
if err := ConfigureDefaults(ctx, action.client, platform, false); err != nil {
if err := CreateOrUpdateWithDefaults(ctx, platform, false); err != nil {
return nil, err
}

Expand Down
2 changes: 1 addition & 1 deletion controllers/platform/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (action *monitorAction) Handle(ctx context.Context, platform *operatorapi.S
}

// Refresh applied configuration
if err := ConfigureDefaults(ctx, action.client, platform, false); err != nil {
if err := CreateOrUpdateWithDefaults(ctx, platform, false); err != nil {
return nil, err
}

Expand Down
33 changes: 31 additions & 2 deletions controllers/platform/platform.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"os"
"strings"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"

coordination "k8s.io/api/coordination/v1"
Expand Down Expand Up @@ -147,13 +148,41 @@ func getLocalPlatform(ctx context.Context, c ctrl.Client, namespace string, acti
}
klog.V(log.I).InfoS("Not found a local build platform", "Namespace", namespace)
klog.V(log.I).InfoS("Creating a default SonataFlowPlatform", "Namespace", namespace)
sfp := newDefaultSonataFlowPlatform(namespace)
if err = c.Create(ctx, sfp); err != nil {
sfp := newEmptySonataFlowPlatform(namespace)
if err = CreateOrUpdateWithDefaults(ctx, sfp, false); err != nil {
return nil, err
}
return sfp, nil
}

func newEmptySonataFlowPlatform(namespace string) *operatorapi.SonataFlowPlatform {
if utils.IsOpenShift() {
return &operatorapi.SonataFlowPlatform{
ObjectMeta: metav1.ObjectMeta{Name: defaultSonataFlowPlatformName, Namespace: namespace},
Spec: operatorapi.SonataFlowPlatformSpec{
Build: operatorapi.BuildPlatformSpec{
Config: operatorapi.BuildPlatformConfig{
BuildStrategy: operatorapi.PlatformBuildStrategy,
},
},
},
}
}

return &operatorapi.SonataFlowPlatform{
ObjectMeta: metav1.ObjectMeta{Name: defaultSonataFlowPlatformName, Namespace: namespace},
Spec: operatorapi.SonataFlowPlatformSpec{
Build: operatorapi.BuildPlatformSpec{
Config: operatorapi.BuildPlatformConfig{
BuildStrategyOptions: map[string]string{
kanikoBuildCacheEnabled: "true",
},
},
},
},
}
}

// listPrimaryPlatforms returns all non-secondary platforms installed in a given namespace (only one will be active).
func listPrimaryPlatforms(ctx context.Context, c ctrl.Reader, namespace string) (*operatorapi.SonataFlowPlatformList, error) {
lst, err := listAllPlatforms(ctx, c, namespace)
Expand Down
10 changes: 5 additions & 5 deletions controllers/platform/platformutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"strings"
"time"

"github.com/apache/incubator-kie-kogito-serverless-operator/utils"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -35,7 +36,6 @@ import (

"github.com/apache/incubator-kie-kogito-serverless-operator/controllers/workflowdef"

"github.com/apache/incubator-kie-kogito-serverless-operator/container-builder/client"
"github.com/apache/incubator-kie-kogito-serverless-operator/log"

operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
Expand All @@ -46,7 +46,7 @@ var builderDockerfileFromRE = regexp.MustCompile(`FROM (.*) AS builder`)
// ResourceCustomizer can be used to inject code that changes the objects before they are created.
type ResourceCustomizer func(object ctrl.Object) ctrl.Object

func configureRegistry(ctx context.Context, c client.Client, p *operatorapi.SonataFlowPlatform, verbose bool) error {
func configureRegistry(ctx context.Context, p *operatorapi.SonataFlowPlatform, verbose bool) error {
if p.Spec.Build.Config.BuildStrategy == operatorapi.PlatformBuildStrategy && p.Status.Cluster == operatorapi.PlatformClusterOpenShift {
p.Spec.Build.Config.Registry = operatorapi.RegistrySpec{}
klog.V(log.D).InfoS("Platform registry not set and ignored on openshift cluster")
Expand All @@ -55,7 +55,7 @@ func configureRegistry(ctx context.Context, c client.Client, p *operatorapi.Sona

if p.Spec.Build.Config.Registry.Address == "" && p.Status.Cluster == operatorapi.PlatformClusterKubernetes {
// try KEP-1755
address, err := GetRegistryAddress(ctx, c)
address, err := GetRegistryAddress(ctx)
if err != nil && verbose {
klog.V(log.E).ErrorS(err, "Cannot find a registry where to push images via KEP-1755")
} else if err == nil && address != nil {
Expand Down Expand Up @@ -139,9 +139,9 @@ func setStatusAdditionalInfo(platform *operatorapi.SonataFlowPlatform) {

// GetRegistryAddress KEP-1755
// https://github.com/kubernetes/enhancements/tree/master/keps/sig-cluster-lifecycle/generic/1755-communicating-a-local-registry
func GetRegistryAddress(ctx context.Context, c client.Client) (*string, error) {
func GetRegistryAddress(ctx context.Context) (*string, error) {
config := corev1.ConfigMap{}
err := c.Get(ctx, ctrl.ObjectKey{Namespace: "kube-public", Name: "local-registry-hosting"}, &config)
err := utils.GetClient().Get(ctx, ctrl.ObjectKey{Namespace: "kube-public", Name: "local-registry-hosting"}, &config)
if err != nil {
if k8serrors.IsNotFound(err) {
return nil, nil
Expand Down
7 changes: 2 additions & 5 deletions controllers/profiles/common/properties/platform_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (

"github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
"github.com/apache/incubator-kie-kogito-serverless-operator/test"
"github.com/apache/incubator-kie-kogito-serverless-operator/utils"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -64,8 +63,7 @@ func Test_resolvePlatformWorkflowProperties(t *testing.T) {
},
}

client := test.NewSonataFlowClientBuilder().WithRuntimeObjects(platform, secret, cm).WithStatusSubresource(platform).Build()
utils.SetClient(client)
_ = test.NewSonataFlowClientBuilder().WithRuntimeObjects(platform, secret, cm).WithStatusSubresource(platform).Build()

props, err := resolvePlatformWorkflowProperties(platform)
assert.NoError(t, err)
Expand Down Expand Up @@ -106,8 +104,7 @@ func Test_resolvePlatformWorkflowProperties_RefNotFound(t *testing.T) {
},
}

client := test.NewSonataFlowClientBuilder().WithRuntimeObjects(platform).WithStatusSubresource(platform).Build()
utils.SetClient(client)
_ = test.NewSonataFlowClientBuilder().WithRuntimeObjects(platform).WithStatusSubresource(platform).Build()

props, err := resolvePlatformWorkflowProperties(platform)
assert.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion controllers/sonataflowclusterplatform_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (r *SonataFlowClusterPlatformReconciler) SetupWithManager(mgr ctrlrun.Manag

// if actively referenced sonataflowplatform object is changed, reconcile the active SonataFlowClusterPlatform.
func (r *SonataFlowClusterPlatformReconciler) mapPlatformToClusterPlatformRequests(ctx context.Context, object client.Object) []reconcile.Request {
sfcPlatform, err := clusterplatform.GetActiveClusterPlatform(ctx, r.Client)
sfcPlatform, err := clusterplatform.GetActiveClusterPlatform(ctx)
if err != nil && !errors.IsNotFound(err) {
klog.V(log.E).ErrorS(err, "Failed to get active SonataFlowClusterPlatform")
return nil
Expand Down
10 changes: 5 additions & 5 deletions controllers/sonataflowplatform_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (r *SonataFlowPlatformReconciler) Reconcile(ctx context.Context, req reconc

target := instance.DeepCopy()

if err = r.SonataFlowPlatformUpdateStatus(ctx, req, target); err != nil {
if err = r.updateSonataFlowPlatformStatus(ctx, req, target); err != nil {
return reconcile.Result{}, err
}

Expand Down Expand Up @@ -170,10 +170,10 @@ func (r *SonataFlowPlatformReconciler) Reconcile(ctx context.Context, req reconc

}

// If an active cluster platform exists, update platform.Status accordingly
func (r *SonataFlowPlatformReconciler) SonataFlowPlatformUpdateStatus(ctx context.Context, req reconcile.Request, target *operatorapi.SonataFlowPlatform) error {
// sonataFlowPlatformUpdateStatus If an active cluster platform exists, update platform.Status accordingly
func (r *SonataFlowPlatformReconciler) updateSonataFlowPlatformStatus(ctx context.Context, req reconcile.Request, target *operatorapi.SonataFlowPlatform) error {
// Fetch the active SonataFlowClusterPlatform instance
sfcPlatform, err := clusterplatform.GetActiveClusterPlatform(ctx, r.Client)
sfcPlatform, err := clusterplatform.GetActiveClusterPlatform(ctx)
if err != nil && !errors.IsNotFound(err) {
klog.V(log.E).ErrorS(err, "Failed to get active SonataFlowClusterPlatform")
return err
Expand Down Expand Up @@ -241,7 +241,7 @@ func (r *SonataFlowPlatformReconciler) mapClusterPlatformToPlatformRequests(ctx
// if actively referenced sonataflowplatform is changed, reconcile other SonataFlowPlatforms in the cluster.
func (r *SonataFlowPlatformReconciler) mapPlatformToPlatformRequests(ctx context.Context, object client.Object) []reconcile.Request {
platform := object.(*operatorapi.SonataFlowPlatform)
sfcPlatform, err := clusterplatform.GetActiveClusterPlatform(ctx, r.Client)
sfcPlatform, err := clusterplatform.GetActiveClusterPlatform(ctx)
if err != nil && !errors.IsNotFound(err) {
klog.V(log.E).ErrorS(err, "Failed to get active SonataFlowClusterPlatform")
return nil
Expand Down
Loading

0 comments on commit 6fcaa8c

Please sign in to comment.