From f499e7f2ff5c2f7b2e84e458d08ffdb1df2d22b9 Mon Sep 17 00:00:00 2001 From: Micah Wylde Date: Wed, 13 Nov 2019 14:41:48 -0800 Subject: [PATCH] [STRMCMP-675] Enable status subresource in CRD (#126) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This PR enables the status subresource for the FlinkApplication CRD. This makes it harder for users to accidentally overwrite status fields while trying to update their applications (an issue we've seen internally). For CRDs with the status subresource enabled, updates to the base resource will ignore changes to the status subrecord, and updates the status subresource will ignore changes to the spec and metadata. The bulk of this PR is a set of changes that enables that one. Previously, the operator would modify the SavepointInfo field of the spec during updates. This goes against the general principle that the user should own the spec, and prevented a move to the status subresource as the operator would need to update both the spec and status. Instead, we introduce a new field spec.savepointPath that contains the savepoint that will be restored on the initial job submission (the existing spec.savepointInfo.savepointLocation is retained for backwards-compatibility, but will be removed in a future CRD version). Then we introduce two new fields on the status: status.savepointPath (which contains the path of the savepoint that the operator has created during the update process) and status.savepointTrigger (which contains the trigger for the savepoint the operator is creating). This allows us to cleanly separate out user intent (which savepoint should we initially start from) from the operator's internal state and processes. Note: this change is slightly incompatible with the existing operator. This update should not be deployed to a cluster where there are active flinkapplication updates occurring — i.e., all flinkapplications should be in a Running or DeployFailed state. --- README.md | 4 +- deploy/crd.yaml | 5 +- docs/crd.md | 8 +- integ/simple_test.go | 2 +- pkg/apis/app/v1beta1/types.go | 77 ++++---- pkg/controller/flink/flink.go | 10 +- pkg/controller/flink/flink_test.go | 16 +- pkg/controller/flink/mock/mock_flink.go | 6 +- .../flinkapplication/flink_state_machine.go | 167 ++++++++++-------- .../flink_state_machine_test.go | 128 ++++++++------ pkg/controller/k8/cluster.go | 26 ++- pkg/controller/k8/mock/mock_k8.go | 9 + 12 files changed, 271 insertions(+), 187 deletions(-) diff --git a/README.md b/README.md index 7f6e1fd3..c6ffe282 100644 --- a/README.md +++ b/README.md @@ -17,11 +17,11 @@ FlinkK8sOperator is a [Kubernetes operator](https://coreos.com/operators/) that *Beta* -The operator is in use for some less-crtical jobs at Lyft. At this point the focus is on testing and stability While in +The operator is in use for some less-critical jobs at Lyft. At this point the focus is on testing and stability While in Beta, we will attempt to limit the number of backwards-incompatible changes, but they may still occur as necessary. ## Prerequisites -* Version >= 1.9 of Kubernetes. +* Version >= 1.10 of Kubernetes (versions < 1.13 require `--feature-gates=CustomResourceSubresources=true`) * Version >= 1.7 of Apache Flink. ## Overview diff --git a/deploy/crd.yaml b/deploy/crd.yaml index e83dcc13..3bc6cb18 100644 --- a/deploy/crd.yaml +++ b/deploy/crd.yaml @@ -89,6 +89,8 @@ spec: properties: savepointLocation: type: string + savepointPath: + type: string jobManagerConfig: type: object properties: @@ -377,7 +379,8 @@ spec: - jarName - parallelism - entryClass - + subresources: + status: {} additionalPrinterColumns: - name: Phase type: string diff --git a/docs/crd.md b/docs/crd.md index b863ce17..81d19985 100644 --- a/docs/crd.md +++ b/docs/crd.md @@ -74,13 +74,13 @@ Below is the list of fields in the custom resource and their description * **programArgs** `type:string` External configuration parameters to be passed as arguments to the job like input and output sources, etc - + + * **savepointPath** `type:string` + If specified, the application state will be restored from this savepoint + * **allowNonRestoredState** `type:boolean` Skips savepoint operator state that cannot be mapped to the new program version - * **savepointInfo** `type:SavepointInfo` - Optional Savepoint info that can be passed in to indicate that the Flink job must resume from the corresponding savepoint. - * **flinkVersion** `type:string required=true` The version of Flink to be managed. This version must match the version in the image. diff --git a/integ/simple_test.go b/integ/simple_test.go index 3dd402cd..ca6ccda7 100644 --- a/integ/simple_test.go +++ b/integ/simple_test.go @@ -218,7 +218,7 @@ func (s *IntegSuite) TestSimple(c *C) { time.Sleep(100 * time.Millisecond) } - c.Assert(app.Spec.SavepointInfo.SavepointLocation, NotNil) + c.Assert(app.Status.SavepointPath, NotNil) job := func() map[string]interface{} { jobs, _ := s.Util.FlinkAPIGet(app, "/jobs") jobMap := jobs.(map[string]interface{}) diff --git a/pkg/apis/app/v1beta1/types.go b/pkg/apis/app/v1beta1/types.go index f842969f..7f2705e4 100644 --- a/pkg/apis/app/v1beta1/types.go +++ b/pkg/apis/app/v1beta1/types.go @@ -28,31 +28,33 @@ type FlinkApplication struct { } type FlinkApplicationSpec struct { - Image string `json:"image,omitempty" protobuf:"bytes,2,opt,name=image"` - ImagePullPolicy apiv1.PullPolicy `json:"imagePullPolicy,omitempty" protobuf:"bytes,14,opt,name=imagePullPolicy,casttype=PullPolicy"` - ImagePullSecrets []apiv1.LocalObjectReference `json:"imagePullSecrets,omitempty" patchStrategy:"merge" patchMergeKey:"name" protobuf:"bytes,15,rep,name=imagePullSecrets"` - ServiceAccountName string `json:"serviceAccountName,omitempty"` - FlinkConfig FlinkConfig `json:"flinkConfig"` - FlinkVersion string `json:"flinkVersion"` - TaskManagerConfig TaskManagerConfig `json:"taskManagerConfig,omitempty"` - JobManagerConfig JobManagerConfig `json:"jobManagerConfig,omitempty"` - JarName string `json:"jarName"` - Parallelism int32 `json:"parallelism"` - EntryClass string `json:"entryClass,omitempty"` - ProgramArgs string `json:"programArgs,omitempty"` - SavepointInfo SavepointInfo `json:"savepointInfo,omitempty"` - DeploymentMode DeploymentMode `json:"deploymentMode,omitempty"` - RPCPort *int32 `json:"rpcPort,omitempty"` - BlobPort *int32 `json:"blobPort,omitempty"` - QueryPort *int32 `json:"queryPort,omitempty"` - UIPort *int32 `json:"uiPort,omitempty"` - MetricsQueryPort *int32 `json:"metricsQueryPort,omitempty"` - Volumes []apiv1.Volume `json:"volumes,omitempty"` - VolumeMounts []apiv1.VolumeMount `json:"volumeMounts,omitempty"` - RestartNonce string `json:"restartNonce"` - DeleteMode DeleteMode `json:"deleteMode,omitempty"` - AllowNonRestoredState bool `json:"allowNonRestoredState,omitempty"` - ForceRollback bool `json:"forceRollback"` + Image string `json:"image,omitempty" protobuf:"bytes,2,opt,name=image"` + ImagePullPolicy apiv1.PullPolicy `json:"imagePullPolicy,omitempty" protobuf:"bytes,14,opt,name=imagePullPolicy,casttype=PullPolicy"` + ImagePullSecrets []apiv1.LocalObjectReference `json:"imagePullSecrets,omitempty" patchStrategy:"merge" patchMergeKey:"name" protobuf:"bytes,15,rep,name=imagePullSecrets"` + ServiceAccountName string `json:"serviceAccountName,omitempty"` + FlinkConfig FlinkConfig `json:"flinkConfig"` + FlinkVersion string `json:"flinkVersion"` + TaskManagerConfig TaskManagerConfig `json:"taskManagerConfig,omitempty"` + JobManagerConfig JobManagerConfig `json:"jobManagerConfig,omitempty"` + JarName string `json:"jarName"` + Parallelism int32 `json:"parallelism"` + EntryClass string `json:"entryClass,omitempty"` + ProgramArgs string `json:"programArgs,omitempty"` + // Deprecated: use SavepointPath instead + SavepointInfo SavepointInfo `json:"savepointInfo,omitempty"` + SavepointPath string `json:"savepointPath,omitempty"` + DeploymentMode DeploymentMode `json:"deploymentMode,omitempty"` + RPCPort *int32 `json:"rpcPort,omitempty"` + BlobPort *int32 `json:"blobPort,omitempty"` + QueryPort *int32 `json:"queryPort,omitempty"` + UIPort *int32 `json:"uiPort,omitempty"` + MetricsQueryPort *int32 `json:"metricsQueryPort,omitempty"` + Volumes []apiv1.Volume `json:"volumes,omitempty"` + VolumeMounts []apiv1.VolumeMount `json:"volumeMounts,omitempty"` + RestartNonce string `json:"restartNonce"` + DeleteMode DeleteMode `json:"deleteMode,omitempty"` + AllowNonRestoredState bool `json:"allowNonRestoredState,omitempty"` + ForceRollback bool `json:"forceRollback"` } type FlinkConfig map[string]interface{} @@ -122,7 +124,6 @@ type EnvironmentConfig struct { type SavepointInfo struct { SavepointLocation string `json:"savepointLocation,omitempty"` - TriggerID string `json:"triggerId,omitempty"` } type FlinkClusterStatus struct { @@ -157,17 +158,19 @@ type FlinkJobStatus struct { } type FlinkApplicationStatus struct { - Phase FlinkApplicationPhase `json:"phase"` - StartedAt *metav1.Time `json:"startedAt,omitempty"` - LastUpdatedAt *metav1.Time `json:"lastUpdatedAt,omitempty"` - Reason string `json:"reason,omitempty"` - ClusterStatus FlinkClusterStatus `json:"clusterStatus,omitempty"` - JobStatus FlinkJobStatus `json:"jobStatus"` - FailedDeployHash string `json:"failedDeployHash,omitEmpty"` - RollbackHash string `json:"rollbackHash,omitEmpty"` - DeployHash string `json:"deployHash"` - RetryCount int32 `json:"retryCount,omitEmpty"` - LastSeenError *FlinkApplicationError `json:"lastSeenError,omitEmpty"` + Phase FlinkApplicationPhase `json:"phase"` + StartedAt *metav1.Time `json:"startedAt,omitempty"` + LastUpdatedAt *metav1.Time `json:"lastUpdatedAt,omitempty"` + Reason string `json:"reason,omitempty"` + ClusterStatus FlinkClusterStatus `json:"clusterStatus,omitempty"` + JobStatus FlinkJobStatus `json:"jobStatus"` + FailedDeployHash string `json:"failedDeployHash,omitEmpty"` + RollbackHash string `json:"rollbackHash,omitEmpty"` + DeployHash string `json:"deployHash"` + SavepointTriggerID string `json:"savepointTriggerId,omitempty"` + SavepointPath string `json:"savepointPath,omitempty"` + RetryCount int32 `json:"retryCount,omitEmpty"` + LastSeenError *FlinkApplicationError `json:"lastSeenError,omitEmpty"` } func (in *FlinkApplicationStatus) GetPhase() FlinkApplicationPhase { diff --git a/pkg/controller/flink/flink.go b/pkg/controller/flink/flink.go index 5bad4986..c9e20765 100644 --- a/pkg/controller/flink/flink.go +++ b/pkg/controller/flink/flink.go @@ -55,7 +55,8 @@ type ControllerInterface interface { // Starts the Job in the Flink Cluster StartFlinkJob(ctx context.Context, application *v1beta1.FlinkApplication, hash string, - jarName string, parallelism int32, entryClass string, programArgs string, allowNonRestoredState bool) (string, error) + jarName string, parallelism int32, entryClass string, programArgs string, allowNonRestoredState bool, + savepointPath string) (string, error) // Savepoint creation is asynchronous. // Polls the status of the Savepoint, using the triggerID @@ -271,14 +272,15 @@ func (f *Controller) CreateCluster(ctx context.Context, application *v1beta1.Fli } func (f *Controller) StartFlinkJob(ctx context.Context, application *v1beta1.FlinkApplication, hash string, - jarName string, parallelism int32, entryClass string, programArgs string, allowNonRestoredState bool) (string, error) { + jarName string, parallelism int32, entryClass string, programArgs string, allowNonRestoredState bool, + savepointPath string) (string, error) { response, err := f.flinkClient.SubmitJob( ctx, getURLFromApp(application, hash), jarName, client.SubmitJobRequest{ Parallelism: parallelism, - SavepointPath: application.Spec.SavepointInfo.SavepointLocation, + SavepointPath: savepointPath, EntryClass: entryClass, ProgramArgs: programArgs, AllowNonRestoredState: allowNonRestoredState, @@ -298,7 +300,7 @@ func (f *Controller) GetSavepointStatus(ctx context.Context, application *v1beta if err != nil { return nil, err } - return f.flinkClient.CheckSavepointStatus(ctx, getURLFromApp(application, hash), jobID, application.Spec.SavepointInfo.TriggerID) + return f.flinkClient.CheckSavepointStatus(ctx, getURLFromApp(application, hash), jobID, application.Status.SavepointTriggerID) } func (f *Controller) IsClusterReady(ctx context.Context, application *v1beta1.FlinkApplication) (bool, error) { diff --git a/pkg/controller/flink/flink_test.go b/pkg/controller/flink/flink_test.go index e151883c..f2c5f847 100644 --- a/pkg/controller/flink/flink_test.go +++ b/pkg/controller/flink/flink_test.go @@ -232,7 +232,7 @@ func TestFlinkIsServiceReadyErr(t *testing.T) { func TestFlinkGetSavepointStatus(t *testing.T) { flinkControllerForTest := getTestFlinkController() flinkApp := getFlinkTestApp() - flinkApp.Spec.SavepointInfo.TriggerID = "t1" + flinkApp.Status.SavepointTriggerID = "t1" mockJmClient := flinkControllerForTest.flinkClient.(*clientMock.JobManagerClient) mockJmClient.CheckSavepointStatusFunc = func(ctx context.Context, url string, jobID, triggerID string) (*client.SavepointResponse, error) { @@ -428,7 +428,7 @@ func TestStartFlinkJob(t *testing.T) { flinkApp.Spec.ProgramArgs = "args" flinkApp.Spec.EntryClass = "class" flinkApp.Spec.JarName = "jar-name" - flinkApp.Spec.SavepointInfo.SavepointLocation = "location//" + flinkApp.Spec.SavepointPath = "location//" flinkApp.Spec.FlinkVersion = "1.7" mockJmClient := flinkControllerForTest.flinkClient.(*clientMock.JobManagerClient) @@ -446,7 +446,8 @@ func TestStartFlinkJob(t *testing.T) { }, nil } jobID, err := flinkControllerForTest.StartFlinkJob(context.Background(), &flinkApp, "hash", - flinkApp.Spec.JarName, flinkApp.Spec.Parallelism, flinkApp.Spec.EntryClass, flinkApp.Spec.ProgramArgs, flinkApp.Spec.AllowNonRestoredState) + flinkApp.Spec.JarName, flinkApp.Spec.Parallelism, flinkApp.Spec.EntryClass, flinkApp.Spec.ProgramArgs, + flinkApp.Spec.AllowNonRestoredState, "location//") assert.Nil(t, err) assert.Equal(t, jobID, testJobID) } @@ -465,7 +466,8 @@ func TestStartFlinkJobAllowNonRestoredState(t *testing.T) { }, nil } jobID, err := flinkControllerForTest.StartFlinkJob(context.Background(), &flinkApp, "hash", - flinkApp.Spec.JarName, flinkApp.Spec.Parallelism, flinkApp.Spec.EntryClass, flinkApp.Spec.ProgramArgs, flinkApp.Spec.AllowNonRestoredState) + flinkApp.Spec.JarName, flinkApp.Spec.Parallelism, flinkApp.Spec.EntryClass, flinkApp.Spec.ProgramArgs, + flinkApp.Spec.AllowNonRestoredState, "") assert.Nil(t, err) assert.Equal(t, jobID, testJobID) } @@ -480,7 +482,8 @@ func TestStartFlinkJobEmptyJobID(t *testing.T) { return &client.SubmitJobResponse{}, nil } jobID, err := flinkControllerForTest.StartFlinkJob(context.Background(), &flinkApp, "hash", - flinkApp.Spec.JarName, flinkApp.Spec.Parallelism, flinkApp.Spec.EntryClass, flinkApp.Spec.ProgramArgs, flinkApp.Spec.AllowNonRestoredState) + flinkApp.Spec.JarName, flinkApp.Spec.Parallelism, flinkApp.Spec.EntryClass, flinkApp.Spec.ProgramArgs, + flinkApp.Spec.AllowNonRestoredState, "") assert.EqualError(t, err, "unable to submit job: invalid job id") assert.Empty(t, jobID) } @@ -494,7 +497,8 @@ func TestStartFlinkJobErr(t *testing.T) { return nil, errors.New("submit error") } jobID, err := flinkControllerForTest.StartFlinkJob(context.Background(), &flinkApp, "hash", - flinkApp.Spec.JarName, flinkApp.Spec.Parallelism, flinkApp.Spec.EntryClass, flinkApp.Spec.ProgramArgs, flinkApp.Spec.AllowNonRestoredState) + flinkApp.Spec.JarName, flinkApp.Spec.Parallelism, flinkApp.Spec.EntryClass, flinkApp.Spec.ProgramArgs, + flinkApp.Spec.AllowNonRestoredState, "") assert.EqualError(t, err, "submit error") assert.Empty(t, jobID) } diff --git a/pkg/controller/flink/mock/mock_flink.go b/pkg/controller/flink/mock/mock_flink.go index f2f7fc18..fe05c07f 100644 --- a/pkg/controller/flink/mock/mock_flink.go +++ b/pkg/controller/flink/mock/mock_flink.go @@ -14,7 +14,7 @@ type DeleteOldResourcesForApp func(ctx context.Context, application *v1beta1.Fli type CancelWithSavepointFunc func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (string, error) type ForceCancelFunc func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) error type StartFlinkJobFunc func(ctx context.Context, application *v1beta1.FlinkApplication, hash string, - jarName string, parallelism int32, entryClass string, programArgs string, allowNonRestoredState bool) (string, error) + jarName string, parallelism int32, entryClass string, programArgs string, allowNonRestoredState bool, savepointPath string) (string, error) type GetSavepointStatusFunc func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (*client.SavepointResponse, error) type IsClusterReadyFunc func(ctx context.Context, application *v1beta1.FlinkApplication) (bool, error) type IsServiceReadyFunc func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (bool, error) @@ -79,9 +79,9 @@ func (m *FlinkController) ForceCancel(ctx context.Context, application *v1beta1. } func (m *FlinkController) StartFlinkJob(ctx context.Context, application *v1beta1.FlinkApplication, hash string, - jarName string, parallelism int32, entryClass string, programArgs string, allowNonRestoredState bool) (string, error) { + jarName string, parallelism int32, entryClass string, programArgs string, allowNonRestoredState bool, savepointPath string) (string, error) { if m.StartFlinkJobFunc != nil { - return m.StartFlinkJobFunc(ctx, application, hash, jarName, parallelism, entryClass, programArgs, allowNonRestoredState) + return m.StartFlinkJobFunc(ctx, application, hash, jarName, parallelism, entryClass, programArgs, allowNonRestoredState, savepointPath) } return "", nil } diff --git a/pkg/controller/flinkapplication/flink_state_machine.go b/pkg/controller/flinkapplication/flink_state_machine.go index e092a940..2482e906 100644 --- a/pkg/controller/flinkapplication/flink_state_machine.go +++ b/pkg/controller/flinkapplication/flink_state_machine.go @@ -25,9 +25,9 @@ import ( ) const ( - jobFinalizer = "job.finalizers.flink.k8s.io" - applicationChanged = true - applicationUnchanged = false + jobFinalizer = "job.finalizers.flink.k8s.io" + statusChanged = true + statusUnchanged = false ) // The core state machine that manages Flink clusters and jobs. See docs/state_machine.md for a description of the @@ -128,13 +128,13 @@ func (s *FlinkStateMachine) Handle(ctx context.Context, application *v1beta1.Fli successTimer := s.metrics.stateMachineHandleSuccessPhaseMap[currentPhase].Start(ctx) defer timer.Stop() - updateApplication, err := s.handle(ctx, application) + updateStatus, err := s.handle(ctx, application) // Update k8s object - if updateApplication { + if updateStatus { now := v1.NewTime(s.clock.Now()) application.Status.LastUpdatedAt = &now - updateAppErr := s.k8Cluster.UpdateK8Object(ctx, application) + updateAppErr := s.k8Cluster.UpdateStatus(ctx, application) if updateAppErr != nil { s.metrics.errorCounterPhaseMap[currentPhase].Inc(ctx) return updateAppErr @@ -157,7 +157,7 @@ func (s *FlinkStateMachine) handle(ctx context.Context, application *v1beta1.Fli if !application.ObjectMeta.DeletionTimestamp.IsZero() && appPhase != v1beta1.FlinkApplicationDeleting { s.updateApplicationPhase(application, v1beta1.FlinkApplicationDeleting) // Always perform a single application update per callback - return applicationChanged, nil + return statusChanged, nil } if s.IsTimeToHandlePhase(application) { @@ -229,11 +229,11 @@ func (s *FlinkStateMachine) handleNewOrUpdating(ctx context.Context, application err := s.flinkController.CreateCluster(ctx, application) if err != nil { logger.Errorf(ctx, "Cluster creation failed with error: %v", err) - return applicationUnchanged, err + return statusUnchanged, err } s.updateApplicationPhase(application, v1beta1.FlinkApplicationClusterStarting) - return applicationChanged, nil + return statusChanged, nil } func (s *FlinkStateMachine) deployFailed(ctx context.Context, app *v1beta1.FlinkApplication) (bool, error) { @@ -248,7 +248,7 @@ func (s *FlinkStateMachine) deployFailed(ctx context.Context, app *v1beta1.Flink app.Status.RetryCount = 0 s.updateApplicationPhase(app, v1beta1.FlinkApplicationDeployFailed) - return applicationChanged, nil + return statusChanged, nil } // Create the underlying Kubernetes objects for the new cluster @@ -265,29 +265,29 @@ func (s *FlinkStateMachine) handleClusterStarting(ctx context.Context, applicati // Wait for all to be running ready, err := s.flinkController.IsClusterReady(ctx, application) if err != nil { - return applicationUnchanged, err + return statusUnchanged, err } if !ready { - return applicationUnchanged, nil + return statusUnchanged, nil } logger.Infof(ctx, "Flink cluster has started successfully") // TODO: in single mode move to submitting job s.updateApplicationPhase(application, v1beta1.FlinkApplicationSavepointing) - return applicationChanged, nil + return statusChanged, nil } func (s *FlinkStateMachine) handleApplicationSavepointing(ctx context.Context, application *v1beta1.FlinkApplication) (bool, error) { // we've already savepointed (or this is our first deploy), continue on - if application.Spec.SavepointInfo.SavepointLocation != "" || application.Status.DeployHash == "" { + if application.Status.SavepointPath != "" || application.Status.DeployHash == "" { application.Status.JobStatus.JobID = "" s.updateApplicationPhase(application, v1beta1.FlinkApplicationSubmittingJob) - return applicationChanged, nil + return statusChanged, nil } // we haven't started savepointing yet; do so now // TODO: figure out the idempotence of this - if application.Spec.SavepointInfo.TriggerID == "" { + if application.Status.SavepointTriggerID == "" { if rollback, reason := s.shouldRollback(ctx, application); rollback { // we were unable to start savepointing for our failure period, so roll back // TODO: we should think about how to handle the case where the cluster has started savepointing, but does @@ -300,20 +300,20 @@ func (s *FlinkStateMachine) handleApplicationSavepointing(ctx context.Context, a triggerID, err := s.flinkController.CancelWithSavepoint(ctx, application, application.Status.DeployHash) if err != nil { - return applicationUnchanged, err + return statusUnchanged, err } s.flinkController.LogEvent(ctx, application, corev1.EventTypeNormal, "CancellingJob", fmt.Sprintf("Cancelling job %s with a final savepoint", application.Status.JobStatus.JobID)) - application.Spec.SavepointInfo.TriggerID = triggerID - return applicationChanged, nil + application.Status.SavepointTriggerID = triggerID + return statusChanged, nil } // check the savepoints in progress savepointStatusResponse, err := s.flinkController.GetSavepointStatus(ctx, application, application.Status.DeployHash) if err != nil { - return applicationUnchanged, err + return statusUnchanged, err } var restorePath string @@ -348,17 +348,18 @@ func (s *FlinkStateMachine) handleApplicationSavepointing(ctx context.Context, a } if restorePath != "" { - application.Spec.SavepointInfo.SavepointLocation = restorePath + application.Status.SavepointPath = restorePath application.Status.JobStatus.JobID = "" s.updateApplicationPhase(application, v1beta1.FlinkApplicationSubmittingJob) - return applicationChanged, nil + return statusChanged, nil } - return applicationUnchanged, nil + return statusUnchanged, nil } func (s *FlinkStateMachine) submitJobIfNeeded(ctx context.Context, app *v1beta1.FlinkApplication, hash string, - jarName string, parallelism int32, entryClass string, programArgs string, allowNonRestoredState bool) (string, error) { + jarName string, parallelism int32, entryClass string, programArgs string, allowNonRestoredState bool, + savepointPath string) (string, error) { isReady, _ := s.flinkController.IsServiceReady(ctx, app, hash) // Ignore errors if !isReady { @@ -387,8 +388,9 @@ func (s *FlinkStateMachine) submitJobIfNeeded(ctx context.Context, app *v1beta1. case 0: // no active jobs, we need to submit a new one logger.Infof(ctx, "No job found for the application") + jobID, err := s.flinkController.StartFlinkJob(ctx, app, hash, - jarName, parallelism, entryClass, programArgs, allowNonRestoredState) + jarName, parallelism, entryClass, programArgs, allowNonRestoredState, savepointPath) if err != nil { s.flinkController.LogEvent(ctx, app, corev1.EventTypeWarning, "JobSubmissionFailed", fmt.Sprintf("Failed to submit job to cluster for deploy %s: %v", hash, err)) @@ -438,14 +440,14 @@ func (s *FlinkStateMachine) handleSubmittingJob(ctx context.Context, app *v1beta s.flinkController.LogEvent(ctx, app, corev1.EventTypeWarning, "JobsSubmissionFailed", fmt.Sprintf("Failed to submit job: %s", reason)) s.updateApplicationPhase(app, v1beta1.FlinkApplicationRollingBackJob) - return applicationChanged, nil + return statusChanged, nil } // switch the service to point to the new jobmanager hash := flink.HashForApplication(app) err := s.updateGenericService(ctx, app, hash) if err != nil { - return applicationUnchanged, err + return statusUnchanged, err } // Update status of the cluster @@ -455,32 +457,46 @@ func (s *FlinkStateMachine) handleSubmittingJob(ctx context.Context, app *v1beta } if app.Status.JobStatus.JobID == "" { + savepointPath := "" + if app.Status.DeployHash == "" { + // this is the first deploy, use the user-provided savepoint + savepointPath = app.Spec.SavepointPath + if savepointPath == "" { + //nolint // fall back to the old config for backwards-compatibility + savepointPath = app.Spec.SavepointInfo.SavepointLocation + } + } else { + // otherwise use the savepoint created by the operator + savepointPath = app.Status.SavepointPath + } + appJobID, err := s.submitJobIfNeeded(ctx, app, hash, - app.Spec.JarName, app.Spec.Parallelism, app.Spec.EntryClass, app.Spec.ProgramArgs, app.Spec.AllowNonRestoredState) + app.Spec.JarName, app.Spec.Parallelism, app.Spec.EntryClass, app.Spec.ProgramArgs, + app.Spec.AllowNonRestoredState, savepointPath) if err != nil { - return applicationUnchanged, err + return statusUnchanged, err } if appJobID != "" { app.Status.JobStatus.JobID = appJobID - return applicationChanged, nil + return statusChanged, nil } // we weren't ready to submit yet - return applicationUnchanged, nil + return statusUnchanged, nil } // get the state of the current application job, err := s.flinkController.GetJobForApplication(ctx, app, hash) if err != nil { - return applicationUnchanged, err + return statusUnchanged, err } if job.State == client.Running { - // Clear the savepoint info - app.Spec.SavepointInfo = v1beta1.SavepointInfo{} // Update the application status with the running job info app.Status.DeployHash = hash + app.Status.SavepointPath = "" + app.Status.SavepointTriggerID = "" app.Status.JobStatus.JarName = app.Spec.JarName app.Status.JobStatus.Parallelism = app.Spec.Parallelism app.Status.JobStatus.EntryClass = app.Spec.EntryClass @@ -488,10 +504,10 @@ func (s *FlinkStateMachine) handleSubmittingJob(ctx context.Context, app *v1beta app.Status.JobStatus.AllowNonRestoredState = app.Spec.AllowNonRestoredState s.updateApplicationPhase(app, v1beta1.FlinkApplicationRunning) - return applicationChanged, nil + return statusChanged, nil } - return applicationUnchanged, nil + return statusUnchanged, nil } // Something has gone wrong during the update, post job-cancellation (and cluster tear-down in single mode). We need @@ -516,36 +532,38 @@ func (s *FlinkStateMachine) handleRollingBack(ctx context.Context, app *v1beta1. // update the service to point back to the old deployment if needed err := s.updateGenericService(ctx, app, app.Status.DeployHash) if err != nil { - return applicationUnchanged, err + return statusUnchanged, err } // wait until the service is ready isReady, _ := s.flinkController.IsServiceReady(ctx, app, app.Status.DeployHash) // Ignore errors if !isReady { - return applicationUnchanged, nil + return statusUnchanged, nil } // submit the old job jobID, err := s.submitJobIfNeeded(ctx, app, app.Status.DeployHash, app.Status.JobStatus.JarName, app.Status.JobStatus.Parallelism, app.Status.JobStatus.EntryClass, app.Status.JobStatus.ProgramArgs, - app.Status.JobStatus.AllowNonRestoredState) + app.Status.JobStatus.AllowNonRestoredState, + app.Status.SavepointPath) // set rollbackHash app.Status.RollbackHash = app.Status.DeployHash if err != nil { - return applicationUnchanged, err + return statusUnchanged, err } if jobID != "" { app.Status.JobStatus.JobID = jobID - app.Spec.SavepointInfo = v1beta1.SavepointInfo{} + app.Status.SavepointPath = "" + app.Status.SavepointTriggerID = "" // move to the deploy failed state return s.deployFailed(ctx, app) } - return applicationUnchanged, nil + return statusUnchanged, nil } // Check if the application is Running. @@ -554,7 +572,7 @@ func (s *FlinkStateMachine) handleApplicationRunning(ctx context.Context, applic job, err := s.flinkController.GetJobForApplication(ctx, application, application.Status.DeployHash) if err != nil { // TODO: think more about this case - return applicationUnchanged, err + return statusUnchanged, err } if job == nil { @@ -565,7 +583,7 @@ func (s *FlinkStateMachine) handleApplicationRunning(ctx context.Context, applic cur, err := s.flinkController.GetCurrentDeploymentsForApp(ctx, application) if err != nil { - return applicationUnchanged, err + return statusUnchanged, err } // If the application has changed (i.e., there are no current deployments), and we haven't already failed trying to @@ -574,7 +592,7 @@ func (s *FlinkStateMachine) handleApplicationRunning(ctx context.Context, applic logger.Infof(ctx, "Application resource has changed. Moving to Updating") // TODO: handle single mode s.updateApplicationPhase(application, v1beta1.FlinkApplicationUpdating) - return applicationChanged, nil + return statusChanged, nil } // If there are old resources left-over from a previous version, clean them up @@ -597,10 +615,10 @@ func (s *FlinkStateMachine) handleApplicationRunning(ctx context.Context, applic // Update k8s object if either job or cluster status has changed if hasJobStatusChanged || hasClusterStatusChanged { - return applicationChanged, nil + return statusChanged, nil } - return applicationUnchanged, nil + return statusUnchanged, nil } func (s *FlinkStateMachine) addFinalizerIfMissing(ctx context.Context, application *v1beta1.FlinkApplication, finalizer string) error { @@ -626,9 +644,9 @@ func removeString(list []string, target string) []string { return ret } -func (s *FlinkStateMachine) clearFinalizers(app *v1beta1.FlinkApplication) (bool, error) { +func (s *FlinkStateMachine) clearFinalizers(ctx context.Context, app *v1beta1.FlinkApplication) (bool, error) { app.Finalizers = removeString(app.Finalizers, jobFinalizer) - return applicationChanged, nil + return statusUnchanged, s.k8Cluster.UpdateK8Object(ctx, app) } func jobFinished(job *client.FlinkJobOverview) bool { @@ -647,43 +665,50 @@ func (s *FlinkStateMachine) handleApplicationDeleting(ctx context.Context, app * // If the delete mode is none or there's no deployhash set (which means we failed to submit the job on the // first deploy) just delete the finalizer so the cluster can be torn down if app.Spec.DeleteMode == v1beta1.DeleteModeNone || app.Status.DeployHash == "" { - return s.clearFinalizers(app) + return s.clearFinalizers(ctx, app) } job, err := s.flinkController.GetJobForApplication(ctx, app, app.Status.DeployHash) if err != nil { - return applicationUnchanged, err - } - - if jobFinished(job) { - // there are no running jobs for this application, we can just tear down - return s.clearFinalizers(app) + return statusUnchanged, err } switch app.Spec.DeleteMode { case v1beta1.DeleteModeForceCancel: - logger.Infof(ctx, "Force cancelling job as part of cleanup") - return applicationUnchanged, s.flinkController.ForceCancel(ctx, app, app.Status.DeployHash) + if job.State == client.Cancelling { + // we've already cancelled the job, waiting for it to finish + return statusUnchanged, nil + } else if jobFinished(job) { + // job was successfully cancelled + return s.clearFinalizers(ctx, app) + } + + logger.Infof(ctx, "Force-cancelling job without a savepoint") + return statusUnchanged, s.flinkController.ForceCancel(ctx, app, app.Status.DeployHash) case v1beta1.DeleteModeSavepoint, "": - if app.Spec.SavepointInfo.SavepointLocation != "" { + if app.Status.SavepointPath != "" { // we've already created the savepoint, now just waiting for the job to be cancelled - return applicationUnchanged, nil + if jobFinished(job) { + return s.clearFinalizers(ctx, app) + } + + return statusUnchanged, nil } - if app.Spec.SavepointInfo.TriggerID == "" { + if app.Status.SavepointTriggerID == "" { // delete with savepoint triggerID, err := s.flinkController.CancelWithSavepoint(ctx, app, app.Status.DeployHash) if err != nil { - return applicationUnchanged, err + return statusUnchanged, err } s.flinkController.LogEvent(ctx, app, corev1.EventTypeNormal, "CancellingJob", fmt.Sprintf("Cancelling job with savepoint %v", triggerID)) - app.Spec.SavepointInfo.TriggerID = triggerID + app.Status.SavepointTriggerID = triggerID } else { // we've already started savepointing; check the status status, err := s.flinkController.GetSavepointStatus(ctx, app, app.Status.DeployHash) if err != nil { - return applicationUnchanged, err + return statusUnchanged, err } if status.Operation.Location == "" && status.SavepointStatus.Status != client.SavePointInProgress { @@ -691,31 +716,31 @@ func (s *FlinkStateMachine) handleApplicationDeleting(ctx context.Context, app * s.flinkController.LogEvent(ctx, app, corev1.EventTypeWarning, "SavepointFailed", fmt.Sprintf("Failed to take savepoint %v", status.Operation.FailureCause)) // clear the trigger id so that we can try again - app.Spec.SavepointInfo.TriggerID = "" + app.Status.SavepointTriggerID = "" return true, client.GetRetryableError(errors.New("failed to take savepoint"), v1beta1.CancelJobWithSavepoint, "500", math.MaxInt32) } else if status.SavepointStatus.Status == client.SavePointCompleted { // we're done, clean up s.flinkController.LogEvent(ctx, app, corev1.EventTypeNormal, "CanceledJob", fmt.Sprintf("Cancelled job with savepoint '%s'", status.Operation.Location)) - app.Spec.SavepointInfo.SavepointLocation = status.Operation.Location - app.Spec.SavepointInfo.TriggerID = "" + app.Status.SavepointPath = status.Operation.Location + app.Status.SavepointTriggerID = "" } } - return applicationChanged, nil + return statusChanged, nil default: logger.Errorf(ctx, "Unsupported DeleteMode %s", app.Spec.DeleteMode) } - return applicationUnchanged, nil + return statusUnchanged, nil } func (s *FlinkStateMachine) compareAndUpdateError(application *v1beta1.FlinkApplication, err error) bool { oldErr := application.Status.LastSeenError if err == nil && oldErr == nil { - return applicationUnchanged + return statusUnchanged } if err == nil { @@ -732,7 +757,7 @@ func (s *FlinkStateMachine) compareAndUpdateError(application *v1beta1.FlinkAppl application.Status.LastSeenError.LastErrorUpdateTime = &now } - return applicationChanged + return statusChanged } diff --git a/pkg/controller/flinkapplication/flink_state_machine_test.go b/pkg/controller/flinkapplication/flink_state_machine_test.go index 770f1a8e..113360e2 100644 --- a/pkg/controller/flinkapplication/flink_state_machine_test.go +++ b/pkg/controller/flinkapplication/flink_state_machine_test.go @@ -97,7 +97,7 @@ func TestHandleStartingDual(t *testing.T) { } mockK8Cluster := stateMachineForTest.k8Cluster.(*k8mock.K8Cluster) - mockK8Cluster.UpdateK8ObjectFunc = func(ctx context.Context, object runtime.Object) error { + mockK8Cluster.UpdateStatusFunc = func(ctx context.Context, object runtime.Object) error { application := object.(*v1beta1.FlinkApplication) assert.Equal(t, v1beta1.FlinkApplicationSavepointing, application.Status.Phase) updateInvoked = true @@ -125,7 +125,7 @@ func TestHandleApplicationSavepointingInitialDeploy(t *testing.T) { } mockK8Cluster := stateMachineForTest.k8Cluster.(*k8mock.K8Cluster) - mockK8Cluster.UpdateK8ObjectFunc = func(ctx context.Context, object runtime.Object) error { + mockK8Cluster.UpdateStatusFunc = func(ctx context.Context, object runtime.Object) error { application := object.(*v1beta1.FlinkApplication) assert.Equal(t, v1beta1.FlinkApplicationSubmittingJob, application.Status.Phase) updateInvoked = true @@ -174,12 +174,12 @@ func TestHandleApplicationSavepointingDual(t *testing.T) { mockK8Cluster := stateMachineForTest.k8Cluster.(*k8mock.K8Cluster) updateCount := 0 - mockK8Cluster.UpdateK8ObjectFunc = func(ctx context.Context, object runtime.Object) error { + mockK8Cluster.UpdateStatusFunc = func(ctx context.Context, object runtime.Object) error { application := object.(*v1beta1.FlinkApplication) if updateCount == 0 { - assert.Equal(t, "trigger", application.Spec.SavepointInfo.TriggerID) + assert.Equal(t, "trigger", application.Status.SavepointTriggerID) } else { - assert.Equal(t, testSavepointLocation, application.Spec.SavepointInfo.SavepointLocation) + assert.Equal(t, testSavepointLocation, application.Status.SavepointPath) assert.Equal(t, v1beta1.FlinkApplicationSubmittingJob, application.Status.Phase) } @@ -211,22 +211,19 @@ func TestHandleApplicationSavepointingFailed(t *testing.T) { } app := v1beta1.FlinkApplication{ - Spec: v1beta1.FlinkApplicationSpec{ - SavepointInfo: v1beta1.SavepointInfo{ - TriggerID: "trigger", - }, - }, + Spec: v1beta1.FlinkApplicationSpec{}, Status: v1beta1.FlinkApplicationStatus{ - Phase: v1beta1.FlinkApplicationSavepointing, - DeployHash: "blah", + Phase: v1beta1.FlinkApplicationSavepointing, + DeployHash: "blah", + SavepointTriggerID: "trigger", }, } hash := flink.HashForApplication(&app) mockK8Cluster := stateMachineForTest.k8Cluster.(*k8mock.K8Cluster) - mockK8Cluster.UpdateK8ObjectFunc = func(ctx context.Context, object runtime.Object) error { + mockK8Cluster.UpdateStatusFunc = func(ctx context.Context, object runtime.Object) error { application := object.(*v1beta1.FlinkApplication) - assert.Empty(t, application.Spec.SavepointInfo.SavepointLocation) + assert.Empty(t, application.Status.SavepointPath) assert.Equal(t, hash, application.Status.FailedDeployHash) assert.Equal(t, v1beta1.FlinkApplicationDeployFailed, application.Status.Phase) updateInvoked = true @@ -241,14 +238,11 @@ func TestRestoreFromExternalizedCheckpoint(t *testing.T) { updateInvoked := false app := v1beta1.FlinkApplication{ - Spec: v1beta1.FlinkApplicationSpec{ - SavepointInfo: v1beta1.SavepointInfo{ - TriggerID: "trigger", - }, - }, + Spec: v1beta1.FlinkApplicationSpec{}, Status: v1beta1.FlinkApplicationStatus{ - Phase: v1beta1.FlinkApplicationSavepointing, - DeployHash: "blah", + Phase: v1beta1.FlinkApplicationSavepointing, + DeployHash: "blah", + SavepointTriggerID: "trigger", }, } @@ -267,9 +261,9 @@ func TestRestoreFromExternalizedCheckpoint(t *testing.T) { } mockK8Cluster := stateMachineForTest.k8Cluster.(*k8mock.K8Cluster) - mockK8Cluster.UpdateK8ObjectFunc = func(ctx context.Context, object runtime.Object) error { + mockK8Cluster.UpdateStatusFunc = func(ctx context.Context, object runtime.Object) error { application := object.(*v1beta1.FlinkApplication) - assert.Equal(t, "/tmp/checkpoint", application.Spec.SavepointInfo.SavepointLocation) + assert.Equal(t, "/tmp/checkpoint", application.Status.SavepointPath) assert.Equal(t, v1beta1.FlinkApplicationSubmittingJob, application.Status.Phase) updateInvoked = true return nil @@ -316,7 +310,7 @@ func TestSubmittingToRunning(t *testing.T) { startCount := 0 mockFlinkController.StartFlinkJobFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string, - jarName string, parallelism int32, entryClass string, programArgs string, allowNonRestoredState bool) (string, error) { + jarName string, parallelism int32, entryClass string, programArgs string, allowNonRestoredState bool, savepointPath string) (string, error) { assert.Equal(t, appHash, hash) assert.Equal(t, app.Spec.JarName, jarName) @@ -324,6 +318,7 @@ func TestSubmittingToRunning(t *testing.T) { assert.Equal(t, app.Spec.EntryClass, entryClass) assert.Equal(t, app.Spec.ProgramArgs, programArgs) assert.Equal(t, app.Spec.AllowNonRestoredState, allowNonRestoredState) + assert.Equal(t, app.Spec.SavepointPath, savepointPath) startCount++ return jobID, nil @@ -364,10 +359,18 @@ func TestSubmittingToRunning(t *testing.T) { } else if updateCount == 1 { application := object.(*v1beta1.FlinkApplication) assert.Equal(t, jobFinalizer, application.Finalizers[0]) - } else if updateCount == 2 { + } + + updateCount++ + return nil + } + + statusUpdateCount := 0 + mockK8Cluster.UpdateStatusFunc = func(ctx context.Context, object runtime.Object) error { + if statusUpdateCount == 0 { application := object.(*v1beta1.FlinkApplication) assert.Equal(t, jobID, application.Status.JobStatus.JobID) - } else if updateCount == 3 { + } else if statusUpdateCount == 1 { application := object.(*v1beta1.FlinkApplication) assert.Equal(t, appHash, application.Status.DeployHash) assert.Equal(t, app.Spec.JarName, app.Status.JobStatus.JarName) @@ -376,8 +379,7 @@ func TestSubmittingToRunning(t *testing.T) { assert.Equal(t, app.Spec.ProgramArgs, app.Status.JobStatus.ProgramArgs) assert.Equal(t, v1beta1.FlinkApplicationRunning, application.Status.Phase) } - - updateCount++ + statusUpdateCount++ return nil } @@ -387,7 +389,8 @@ func TestSubmittingToRunning(t *testing.T) { assert.Nil(t, err) assert.Equal(t, 1, startCount) - assert.Equal(t, 4, updateCount) + assert.Equal(t, 2, updateCount) + assert.Equal(t, 2, statusUpdateCount) } func TestHandleApplicationNotReady(t *testing.T) { @@ -401,7 +404,7 @@ func TestHandleApplicationNotReady(t *testing.T) { return nil, nil } mockFlinkController.StartFlinkJobFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string, - jarName string, parallelism int32, entryClass string, programArgs string, allowNonRestoredState bool) (string, error) { + jarName string, parallelism int32, entryClass string, programArgs string, allowNonRestoredState bool, savepointPath string) (string, error) { assert.False(t, true) return "", nil } @@ -465,7 +468,7 @@ func TestRunningToClusterStarting(t *testing.T) { } mockK8Cluster := stateMachineForTest.k8Cluster.(*k8mock.K8Cluster) - mockK8Cluster.UpdateK8ObjectFunc = func(ctx context.Context, object runtime.Object) error { + mockK8Cluster.UpdateStatusFunc = func(ctx context.Context, object runtime.Object) error { application := object.(*v1beta1.FlinkApplication) assert.Equal(t, v1beta1.FlinkApplicationUpdating, application.Status.Phase) updateInvoked = true @@ -495,8 +498,9 @@ func TestRollingBack(t *testing.T) { ProgramArgs: "--test", }, Status: v1beta1.FlinkApplicationStatus{ - Phase: v1beta1.FlinkApplicationRollingBackJob, - DeployHash: "old-hash", + Phase: v1beta1.FlinkApplicationRollingBackJob, + DeployHash: "old-hash", + SavepointPath: "file:///savepoint", JobStatus: v1beta1.FlinkJobStatus{ JarName: "old-job.jar", Parallelism: 10, @@ -516,7 +520,7 @@ func TestRollingBack(t *testing.T) { startCalled := false mockFlinkController.StartFlinkJobFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string, - jarName string, parallelism int32, entryClass string, programArgs string, allowNonRestoredState bool) (string, error) { + jarName string, parallelism int32, entryClass string, programArgs string, allowNonRestoredState bool, savepointPath string) (string, error) { startCalled = true assert.Equal(t, "old-hash", hash) @@ -525,6 +529,7 @@ func TestRollingBack(t *testing.T) { assert.Equal(t, app.Status.JobStatus.EntryClass, entryClass) assert.Equal(t, app.Status.JobStatus.ProgramArgs, programArgs) assert.Equal(t, app.Status.JobStatus.AllowNonRestoredState, allowNonRestoredState) + assert.Equal(t, app.Status.SavepointPath, savepointPath) return jobID, nil } @@ -578,13 +583,20 @@ func TestRollingBack(t *testing.T) { } else if updateCount == 1 { application := object.(*v1beta1.FlinkApplication) assert.Equal(t, jobFinalizer, application.Finalizers[0]) - } else if updateCount == 2 { + } + + updateCount++ + return nil + } + + statusUpdated := false + mockK8Cluster.UpdateStatusFunc = func(ctx context.Context, object runtime.Object) error { + if !statusUpdated { application := object.(*v1beta1.FlinkApplication) assert.Equal(t, appHash, application.Status.FailedDeployHash) assert.Equal(t, v1beta1.FlinkApplicationDeployFailed, application.Status.Phase) + statusUpdated = true } - - updateCount++ return nil } @@ -594,7 +606,8 @@ func TestRollingBack(t *testing.T) { assert.Nil(t, err) assert.True(t, startCalled) - assert.Equal(t, 4, updateCount) + assert.True(t, statusUpdated) + assert.Equal(t, 2, updateCount) } func TestIsApplicationStuck(t *testing.T) { @@ -688,28 +701,32 @@ func TestDeleteWithSavepoint(t *testing.T) { } mockK8Cluster := stateMachineForTest.k8Cluster.(*k8mock.K8Cluster) - updateCount := 1 - mockK8Cluster.UpdateK8ObjectFunc = func(ctx context.Context, object runtime.Object) error { + updateStatusCount := 0 + mockK8Cluster.UpdateStatusFunc = func(ctx context.Context, object runtime.Object) error { application := object.(*v1beta1.FlinkApplication) assert.Equal(t, v1beta1.FlinkApplicationDeleting, application.Status.Phase) - if updateCount == 1 { - assert.Equal(t, triggerID, application.Spec.SavepointInfo.TriggerID) - } else if updateCount == 2 { + if updateStatusCount == 0 { + assert.Equal(t, triggerID, application.Status.SavepointTriggerID) + } else if updateStatusCount == 1 { assert.NotNil(t, application.Status.LastSeenError) - } else if updateCount == 3 { - assert.Equal(t, savepointPath, application.Spec.SavepointInfo.SavepointLocation) - } else if updateCount == 4 { - assert.Equal(t, 0, len(app.Finalizers)) + } else if updateStatusCount == 2 { + assert.Equal(t, savepointPath, application.Status.SavepointPath) } - updateCount++ + updateStatusCount++ + return nil + } + + updated := false + mockK8Cluster.UpdateK8ObjectFunc = func(ctx context.Context, object runtime.Object) error { + assert.Equal(t, 0, len(app.Finalizers)) + updated = true return nil } err := stateMachineForTest.Handle(context.Background(), &app) assert.NoError(t, err) - assert.Equal(t, 2, updateCount) savepointStatusCount := 0 mockFlinkController.GetSavepointStatusFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (*client.SavepointResponse, error) { @@ -745,7 +762,7 @@ func TestDeleteWithSavepoint(t *testing.T) { err = stateMachineForTest.Handle(context.Background(), &app) assert.NoError(t, err) - assert.Equal(t, 4, updateCount) + assert.Equal(t, 3, updateStatusCount) mockFlinkController.GetJobForApplicationFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (jobs *client.FlinkJobOverview, err error) { return &client.FlinkJobOverview{ @@ -757,7 +774,7 @@ func TestDeleteWithSavepoint(t *testing.T) { err = stateMachineForTest.Handle(context.Background(), &app) assert.NoError(t, err) - assert.Equal(t, 5, updateCount) + assert.True(t, updated) } @@ -771,8 +788,9 @@ func TestDeleteWithSavepointAndFinishedJob(t *testing.T) { DeletionTimestamp: &metav1.Time{Time: time.Now()}, }, Status: v1beta1.FlinkApplicationStatus{ - Phase: v1beta1.FlinkApplicationDeleting, - DeployHash: "deployhash", + Phase: v1beta1.FlinkApplicationDeleting, + DeployHash: "deployhash", + SavepointPath: "file:///savepoint", JobStatus: v1beta1.FlinkJobStatus{ JobID: jobID, }, @@ -990,7 +1008,7 @@ func TestRollbackWithRetryableError(t *testing.T) { mockK8Cluster := stateMachineForTest.k8Cluster.(*k8mock.K8Cluster) updateErrCount := 0 - mockK8Cluster.UpdateK8ObjectFunc = func(ctx context.Context, object runtime.Object) error { + mockK8Cluster.UpdateStatusFunc = func(ctx context.Context, object runtime.Object) error { updateErrCount++ return nil } @@ -1056,7 +1074,7 @@ func TestRollbackWithFailFastError(t *testing.T) { } failFastError := client.GetNonRetryableError(errors.New("blah"), "SubmitJob", "400BadRequest") mockFlinkController.StartFlinkJobFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string, - jarName string, parallelism int32, entryClass string, programArgs string, allowNonRestoredState bool) (string, error) { + jarName string, parallelism int32, entryClass string, programArgs string, allowNonRestoredState bool, savepointPath string) (string, error) { return "", failFastError } diff --git a/pkg/controller/k8/cluster.go b/pkg/controller/k8/cluster.go index eb20d388..ff4c0eed 100644 --- a/pkg/controller/k8/cluster.go +++ b/pkg/controller/k8/cluster.go @@ -38,6 +38,8 @@ type ClusterInterface interface { CreateK8Object(ctx context.Context, object runtime.Object) error UpdateK8Object(ctx context.Context, object runtime.Object) error DeleteK8Object(ctx context.Context, object runtime.Object) error + + UpdateStatus(ctx context.Context, object runtime.Object) error } func NewK8Cluster(mgr manager.Manager, cfg config.RuntimeConfig) ClusterInterface { @@ -170,7 +172,7 @@ func (k *Cluster) CreateK8Object(ctx context.Context, object runtime.Object) err objCreate := object.DeepCopyObject() err := k.client.Create(ctx, objCreate) if err != nil { - logger.Errorf(ctx, "K8 object creation failed %v", err) + logger.Errorf(ctx, "K8s object creation failed %v", err) k.metrics.createFailure.Inc(ctx) return err } @@ -186,7 +188,25 @@ func (k *Cluster) UpdateK8Object(ctx context.Context, object runtime.Object) err logger.Warnf(ctx, "Conflict while updating object") k.metrics.updateConflicts.Inc(ctx) } else { - logger.Errorf(ctx, "K8 object update failed %v", err) + logger.Errorf(ctx, "K8s object update failed %v", err) + k.metrics.updateFailure.Inc(ctx) + } + return err + } + k.metrics.updateSuccess.Inc(ctx) + return nil +} + +func (k *Cluster) UpdateStatus(ctx context.Context, object runtime.Object) error { + objectCopy := object.DeepCopyObject() + + err := k.client.Status().Update(ctx, objectCopy) + if err != nil { + if errors.IsConflict(err) { + logger.Warnf(ctx, "Conflict while updating status") + k.metrics.updateConflicts.Inc(ctx) + } else { + logger.Errorf(ctx, "K8s object update failed %v", err) k.metrics.updateFailure.Inc(ctx) } return err @@ -199,7 +219,7 @@ func (k *Cluster) DeleteK8Object(ctx context.Context, object runtime.Object) err objDelete := object.DeepCopyObject() err := k.client.Delete(ctx, objDelete) if err != nil { - logger.Errorf(ctx, "K8 object delete failed %v", err) + logger.Errorf(ctx, "K8s object delete failed %v", err) k.metrics.deleteFailure.Inc(ctx) return err } diff --git a/pkg/controller/k8/mock/mock_k8.go b/pkg/controller/k8/mock/mock_k8.go index 66196d7b..9e3c79c7 100644 --- a/pkg/controller/k8/mock/mock_k8.go +++ b/pkg/controller/k8/mock/mock_k8.go @@ -13,6 +13,7 @@ type CreateK8ObjectFunc func(ctx context.Context, object runtime.Object) error type GetServiceFunc func(ctx context.Context, namespace string, name string) (*corev1.Service, error) type GetServiceWithLabelFunc func(ctx context.Context, namespace string, labelMap map[string]string) (*corev1.ServiceList, error) type UpdateK8ObjectFunc func(ctx context.Context, object runtime.Object) error +type UpdateStatusFunc func(ctx context.Context, object runtime.Object) error type DeleteK8ObjectFunc func(ctx context.Context, object runtime.Object) error type K8Cluster struct { @@ -21,6 +22,7 @@ type K8Cluster struct { GetServicesWithLabelFunc GetServiceWithLabelFunc CreateK8ObjectFunc CreateK8ObjectFunc UpdateK8ObjectFunc UpdateK8ObjectFunc + UpdateStatusFunc UpdateStatusFunc DeleteK8ObjectFunc DeleteK8ObjectFunc } @@ -59,6 +61,13 @@ func (m *K8Cluster) UpdateK8Object(ctx context.Context, object runtime.Object) e return nil } +func (m *K8Cluster) UpdateStatus(ctx context.Context, object runtime.Object) error { + if m.UpdateStatusFunc != nil { + return m.UpdateStatusFunc(ctx, object) + } + return nil +} + func (m *K8Cluster) DeleteK8Object(ctx context.Context, object runtime.Object) error { if m.DeleteK8ObjectFunc != nil { return m.DeleteK8ObjectFunc(ctx, object)