Skip to content

Commit

Permalink
Merge pull request #101 from rgdoliveira/sync_main
Browse files Browse the repository at this point in the history
Sync main branch with Apache main branch
  • Loading branch information
rgdoliveira authored Nov 18, 2024
2 parents 7497479 + fa85d0a commit 437c198
Show file tree
Hide file tree
Showing 15 changed files with 232 additions and 38 deletions.
2 changes: 2 additions & 0 deletions api/v1alpha08/sonataflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,8 @@ type SonataFlowStatus struct {
// Triggers list of triggers created for the SonataFlow
//+operator-sdk:csv:customresourcedefinitions:type=status,displayName="triggers"
Triggers []SonataFlowTriggerRef `json:"triggers,omitempty"`
//+operator-sdk:csv:customresourcedefinitions:type=status,displayName="flowRevision"
FlowCRC uint32 `json:"flowCRC,omitempty"`
}

// SonataFlowTriggerRef defines a trigger created for the SonataFlow.
Expand Down
3 changes: 3 additions & 0 deletions bundle/manifests/sonataflow.org_sonataflows.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10050,6 +10050,9 @@ spec:
endpoint:
description: Endpoint is an externally accessible URL of the workflow
type: string
flowCRC:
format: int32
type: integer
lastTimeRecoverAttempt:
format: date-time
type: string
Expand Down
3 changes: 3 additions & 0 deletions config/crd/bases/sonataflow.org_sonataflows.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10050,6 +10050,9 @@ spec:
endpoint:
description: Endpoint is an externally accessible URL of the workflow
type: string
flowCRC:
format: int32
type: integer
lastTimeRecoverAttempt:
format: date-time
type: string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,8 @@ spec:
- description: Endpoint is an externally accessible URL of the workflow
displayName: endpoint
path: endpoint
- displayName: flowRevision
path: flowCRC
- displayName: lastTimeRecoverAttempt
path: lastTimeRecoverAttempt
- description: Platform displays which platform is being used by this workflow
Expand Down
8 changes: 8 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ rules:
- list
- update
- watch
- apiGroups:
- serving.knative.dev
resources:
- revisions
verbs:
- delete
- list
- watch
- apiGroups:
- sonataflow.org
resources:
Expand Down
2 changes: 1 addition & 1 deletion internal/controller/profiles/common/knative_eventing.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ package common
import (
"context"

operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
"k8s.io/klog/v2"

operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
"github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/knative"
"github.com/apache/incubator-kie-kogito-serverless-operator/log"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down
6 changes: 6 additions & 0 deletions internal/controller/profiles/common/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"context"
"fmt"

"github.com/apache/incubator-kie-kogito-serverless-operator/utils"

"k8s.io/client-go/rest"

"github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/discovery"
Expand Down Expand Up @@ -56,6 +58,10 @@ func (s *StateSupport) PerformStatusUpdate(ctx context.Context, workflow *operat
return false, err
}
workflow.Status.ObservedGeneration = workflow.Generation
workflow.Status.FlowCRC, err = utils.Crc32Checksum(workflow.Spec.Flow)
if err != nil {
return false, err
}
services.SetServiceUrlsInWorkflowStatus(pl, workflow)
if workflow.Status.Platform == nil {
workflow.Status.Platform = &operatorapi.SonataFlowPlatformRef{}
Expand Down
13 changes: 6 additions & 7 deletions internal/controller/profiles/preview/profile_preview_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func Test_deployWorkflowReconciliationHandler_handleObjects(t *testing.T) {
assert.Equal(t, serviceMonitor.Spec.Endpoints[0].Path, "/q/metrics")
}

func Test_GenerationAnnotationCheck(t *testing.T) {
func Test_WorkflowChangedCheck(t *testing.T) {
// we load a workflow with metadata.generation to 0
workflow := test.GetBaseSonataFlow(t.Name())
platform := test.GetBasePlatformInReadyPhase(t.Name())
Expand All @@ -199,15 +199,14 @@ func Test_GenerationAnnotationCheck(t *testing.T) {
assert.NotNil(t, result)
assert.Len(t, objects, 3)

// then we load a workflow with metadata.generation set to 1
// then we load the current workflow
workflowChanged := &operatorapi.SonataFlow{}
err = client.Get(context.TODO(), clientruntime.ObjectKeyFromObject(workflow), workflowChanged)
assert.NoError(t, err)
//we set the generation to 1
workflowChanged.Generation = int64(1)
err = client.Update(context.TODO(), workflowChanged)
assert.NoError(t, err)
// reconcile
//we change something within the flow
workflowChanged.Spec.Flow.AutoRetries = true

// reconcile -> the one in the k8s DB is different, so there's a change.
handler = &deployWithBuildWorkflowState{
StateSupport: fakeReconcilerSupport(client),
ensurers: NewObjectEnsurers(&common.StateSupport{C: client}),
Expand Down
107 changes: 99 additions & 8 deletions internal/controller/profiles/preview/states_preview.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,31 @@ package preview
import (
"context"
"fmt"
"sort"

"github.com/apache/incubator-kie-kogito-serverless-operator/utils"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/klog/v2"
servingv1 "knative.dev/serving/pkg/apis/serving/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/apache/incubator-kie-kogito-serverless-operator/api"
operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
"github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/builder"
"github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/knative"
"github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/platform"
"github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/profiles/common"
"github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/profiles/common/constants"
"github.com/apache/incubator-kie-kogito-serverless-operator/log"
kubeutil "github.com/apache/incubator-kie-kogito-serverless-operator/utils/kubernetes"
"github.com/apache/incubator-kie-kogito-serverless-operator/workflowproj"
)

const (
kSink = "K_SINK"
workflowContainer = "workflow"
)

type newBuilderState struct {
Expand Down Expand Up @@ -199,7 +209,11 @@ func (h *deployWithBuildWorkflowState) Do(ctx context.Context, workflow *operato
return ctrl.Result{}, nil, err
}

if h.isWorkflowChanged(workflow) { // Let's check that the 2 resWorkflowDef definition are different
hasChanged, err := h.isWorkflowChanged(workflow)
if err != nil {
return ctrl.Result{}, nil, err
}
if hasChanged { // Let's check that the 2 resWorkflowDef definition are different
if err = buildManager.MarkToRestart(build); err != nil {
return ctrl.Result{}, nil, err
}
Expand All @@ -221,15 +235,92 @@ func (h *deployWithBuildWorkflowState) Do(ctx context.Context, workflow *operato
}

func (h *deployWithBuildWorkflowState) PostReconcile(ctx context.Context, workflow *operatorapi.SonataFlow) error {
//By default, we don't want to perform anything after the reconciliation, and so we will simply return no error
// Clean up the outdated Knative revisions, if any
return h.cleanupOutdatedRevisions(ctx, workflow)
}

// isWorkflowChanged checks whether the contents of .spec.flow of the given workflow has changed.
func (h *deployWithBuildWorkflowState) isWorkflowChanged(workflow *operatorapi.SonataFlow) (bool, error) {
// Added this guard for backward compatibility for workflows deployed with a previous operator version, so we won't kick thousands of builds on users' cluster.
// After this reconciliation cycle, the CRC should be updated
if workflow.Status.FlowCRC == 0 {
return false, nil
}
actualCRC, err := utils.Crc32Checksum(workflow.Spec.Flow)
if err != nil {
return false, err
}
return actualCRC != workflow.Status.FlowCRC, nil
}

func (h *deployWithBuildWorkflowState) cleanupOutdatedRevisions(ctx context.Context, workflow *operatorapi.SonataFlow) error {
if !workflow.IsKnativeDeployment() {
return nil
}
avail, err := knative.GetKnativeAvailability(h.Cfg)
if err != nil {
return err
}
if !avail.Serving || !avail.Eventing {
return nil
}
injected, err := knative.CheckKSinkInjected(workflow.Name, workflow.Namespace)
if err != nil {
return err
}
if !injected {
return fmt.Errorf("waiting for Sinkbinding K_SINK injection to complete")
}
opts := &client.ListOptions{
LabelSelector: labels.SelectorFromSet(
map[string]string{
workflowproj.LabelWorkflow: workflow.Name,
workflowproj.LabelWorkflowNamespace: workflow.Namespace,
},
),
Namespace: workflow.Namespace,
}
revisionList := &servingv1.RevisionList{}
if err := h.C.List(ctx, revisionList, opts); err != nil {
return err
}
// Sort the revisions based on creation timestamp
sortRevisions(revisionList.Items)
// Clean up previous revisions that do not have K_SINK injected
for i := 0; i < len(revisionList.Items)-1; i++ {
revision := &revisionList.Items[i]
if !containsKSink(revision) {
klog.V(log.I).InfoS("Revision %s does not have K_SINK injected and can be cleaned up.", revision.Name)
if err := h.C.Delete(ctx, revision, &client.DeleteOptions{}); err != nil {
return err
}
}
}
return nil
}

// isWorkflowChanged marks the workflow status as unknown to require a new build reconciliation
func (h *deployWithBuildWorkflowState) isWorkflowChanged(workflow *operatorapi.SonataFlow) bool {
generation := kubeutil.GetLastGeneration(workflow.Namespace, workflow.Name, h.C, context.TODO())
if generation > workflow.Status.ObservedGeneration {
return true
func containsKSink(revision *servingv1.Revision) bool {
for _, container := range revision.Spec.PodSpec.Containers {
if container.Name == workflowContainer {
for _, env := range container.Env {
if env.Name == kSink {
return true
}
}
break
}
}
return false
}

type CreationTimestamp []servingv1.Revision

func (a CreationTimestamp) Len() int { return len(a) }
func (a CreationTimestamp) Less(i, j int) bool {
return a[i].CreationTimestamp.Before(&a[j].CreationTimestamp)
}
func (a CreationTimestamp) Swap(i, j int) { a[i], a[j] = a[j], a[i] }

func sortRevisions(revisions []servingv1.Revision) {
sort.Sort(CreationTimestamp(revisions))
}
54 changes: 54 additions & 0 deletions internal/controller/profiles/preview/states_preview_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package preview

import (
"testing"

"github.com/apache/incubator-kie-kogito-serverless-operator/utils"

"github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/profiles/common"
"github.com/apache/incubator-kie-kogito-serverless-operator/test"
"github.com/serverlessworkflow/sdk-go/v2/model"
"github.com/stretchr/testify/assert"
)

func Test_deployWithBuildWorkflowState_isWorkflowChanged(t *testing.T) {
workflow1 := test.GetBaseSonataFlow(t.Name())
workflow2 := test.GetBaseSonataFlow(t.Name())
workflow1.Status.FlowCRC, _ = utils.Crc32Checksum(workflow1.Spec.Flow)
workflow2.Status.FlowCRC, _ = utils.Crc32Checksum(workflow2.Spec.Flow)
deployWithBuildWorkflowState := &deployWithBuildWorkflowState{
StateSupport: &common.StateSupport{C: test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow1).Build()},
}

hasChanged, err := deployWithBuildWorkflowState.isWorkflowChanged(workflow2)
assert.NoError(t, err)
assert.False(t, hasChanged)

// change workflow2
workflow2.Spec.Flow.Metadata = model.Metadata{
"string": model.Object{
StringValue: "test",
},
}

hasChanged, err = deployWithBuildWorkflowState.isWorkflowChanged(workflow2)
assert.NoError(t, err)
assert.True(t, hasChanged)
}
1 change: 1 addition & 0 deletions internal/controller/sonataflow_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type SonataFlowReconciler struct {
//+kubebuilder:rbac:groups=sonataflow.org,resources=sonataflows/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=sonataflow.org,resources=sonataflows/finalizers,verbs=update
//+kubebuilder:rbac:groups="monitoring.coreos.com",resources=servicemonitors,verbs=get;list;watch;create;update;delete
//+kubebuilder:rbac:groups="serving.knative.dev",resources=revisions,verbs=list;watch;delete

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
Expand Down
11 changes: 11 additions & 0 deletions operator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27477,6 +27477,9 @@ spec:
endpoint:
description: Endpoint is an externally accessible URL of the workflow
type: string
flowCRC:
format: int32
type: integer
lastTimeRecoverAttempt:
format: date-time
type: string
Expand Down Expand Up @@ -27746,6 +27749,14 @@ rules:
- list
- update
- watch
- apiGroups:
- serving.knative.dev
resources:
- revisions
verbs:
- delete
- list
- watch
- apiGroups:
- sonataflow.org
resources:
Expand Down
3 changes: 3 additions & 0 deletions test/yaml.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"runtime"
"strings"

"github.com/apache/incubator-kie-kogito-serverless-operator/utils"

"github.com/apache/incubator-kie-kogito-serverless-operator/api"
operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
"github.com/apache/incubator-kie-kogito-serverless-operator/log"
Expand Down Expand Up @@ -71,6 +73,7 @@ func GetSonataFlow(testFile, namespace string) *operatorapi.SonataFlow {
GetKubernetesResource(testFile, ksw)
klog.V(log.D).InfoS("Successfully read KSW", "ksw", spew.Sprint(ksw))
ksw.Namespace = namespace
ksw.Status.FlowCRC, _ = utils.Crc32Checksum(ksw.Spec.Flow)
return ksw
}

Expand Down
33 changes: 33 additions & 0 deletions utils/crc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package utils

import (
"bytes"
"encoding/gob"
"hash/crc32"
)

func Crc32Checksum(v interface{}) (uint32, error) {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
if err := enc.Encode(v); err != nil {
return 0, err
}
return crc32.ChecksumIEEE(buf.Bytes()), nil
}
Loading

0 comments on commit 437c198

Please sign in to comment.