diff --git a/README.md b/README.md index 7f6e1fd3..c6ffe282 100644 --- a/README.md +++ b/README.md @@ -17,11 +17,11 @@ FlinkK8sOperator is a [Kubernetes operator](https://coreos.com/operators/) that *Beta* -The operator is in use for some less-crtical jobs at Lyft. At this point the focus is on testing and stability While in +The operator is in use for some less-critical jobs at Lyft. At this point the focus is on testing and stability While in Beta, we will attempt to limit the number of backwards-incompatible changes, but they may still occur as necessary. ## Prerequisites -* Version >= 1.9 of Kubernetes. +* Version >= 1.10 of Kubernetes (versions < 1.13 require `--feature-gates=CustomResourceSubresources=true`) * Version >= 1.7 of Apache Flink. ## Overview diff --git a/deploy/crd.yaml b/deploy/crd.yaml index e83dcc13..3bc6cb18 100644 --- a/deploy/crd.yaml +++ b/deploy/crd.yaml @@ -89,6 +89,8 @@ spec: properties: savepointLocation: type: string + savepointPath: + type: string jobManagerConfig: type: object properties: @@ -377,7 +379,8 @@ spec: - jarName - parallelism - entryClass - + subresources: + status: {} additionalPrinterColumns: - name: Phase type: string diff --git a/docs/crd.md b/docs/crd.md index b863ce17..81d19985 100644 --- a/docs/crd.md +++ b/docs/crd.md @@ -74,13 +74,13 @@ Below is the list of fields in the custom resource and their description * **programArgs** `type:string` External configuration parameters to be passed as arguments to the job like input and output sources, etc - + + * **savepointPath** `type:string` + If specified, the application state will be restored from this savepoint + * **allowNonRestoredState** `type:boolean` Skips savepoint operator state that cannot be mapped to the new program version - * **savepointInfo** `type:SavepointInfo` - Optional Savepoint info that can be passed in to indicate that the Flink job must resume from the corresponding savepoint. - * **flinkVersion** `type:string required=true` The version of Flink to be managed. This version must match the version in the image. diff --git a/integ/simple_test.go b/integ/simple_test.go index 3dd402cd..ca6ccda7 100644 --- a/integ/simple_test.go +++ b/integ/simple_test.go @@ -218,7 +218,7 @@ func (s *IntegSuite) TestSimple(c *C) { time.Sleep(100 * time.Millisecond) } - c.Assert(app.Spec.SavepointInfo.SavepointLocation, NotNil) + c.Assert(app.Status.SavepointPath, NotNil) job := func() map[string]interface{} { jobs, _ := s.Util.FlinkAPIGet(app, "/jobs") jobMap := jobs.(map[string]interface{}) diff --git a/pkg/apis/app/v1beta1/types.go b/pkg/apis/app/v1beta1/types.go index f842969f..7f2705e4 100644 --- a/pkg/apis/app/v1beta1/types.go +++ b/pkg/apis/app/v1beta1/types.go @@ -28,31 +28,33 @@ type FlinkApplication struct { } type FlinkApplicationSpec struct { - Image string `json:"image,omitempty" protobuf:"bytes,2,opt,name=image"` - ImagePullPolicy apiv1.PullPolicy `json:"imagePullPolicy,omitempty" protobuf:"bytes,14,opt,name=imagePullPolicy,casttype=PullPolicy"` - ImagePullSecrets []apiv1.LocalObjectReference `json:"imagePullSecrets,omitempty" patchStrategy:"merge" patchMergeKey:"name" protobuf:"bytes,15,rep,name=imagePullSecrets"` - ServiceAccountName string `json:"serviceAccountName,omitempty"` - FlinkConfig FlinkConfig `json:"flinkConfig"` - FlinkVersion string `json:"flinkVersion"` - TaskManagerConfig TaskManagerConfig `json:"taskManagerConfig,omitempty"` - JobManagerConfig JobManagerConfig `json:"jobManagerConfig,omitempty"` - JarName string `json:"jarName"` - Parallelism int32 `json:"parallelism"` - EntryClass string `json:"entryClass,omitempty"` - ProgramArgs string `json:"programArgs,omitempty"` - SavepointInfo SavepointInfo `json:"savepointInfo,omitempty"` - DeploymentMode DeploymentMode `json:"deploymentMode,omitempty"` - RPCPort *int32 `json:"rpcPort,omitempty"` - BlobPort *int32 `json:"blobPort,omitempty"` - QueryPort *int32 `json:"queryPort,omitempty"` - UIPort *int32 `json:"uiPort,omitempty"` - MetricsQueryPort *int32 `json:"metricsQueryPort,omitempty"` - Volumes []apiv1.Volume `json:"volumes,omitempty"` - VolumeMounts []apiv1.VolumeMount `json:"volumeMounts,omitempty"` - RestartNonce string `json:"restartNonce"` - DeleteMode DeleteMode `json:"deleteMode,omitempty"` - AllowNonRestoredState bool `json:"allowNonRestoredState,omitempty"` - ForceRollback bool `json:"forceRollback"` + Image string `json:"image,omitempty" protobuf:"bytes,2,opt,name=image"` + ImagePullPolicy apiv1.PullPolicy `json:"imagePullPolicy,omitempty" protobuf:"bytes,14,opt,name=imagePullPolicy,casttype=PullPolicy"` + ImagePullSecrets []apiv1.LocalObjectReference `json:"imagePullSecrets,omitempty" patchStrategy:"merge" patchMergeKey:"name" protobuf:"bytes,15,rep,name=imagePullSecrets"` + ServiceAccountName string `json:"serviceAccountName,omitempty"` + FlinkConfig FlinkConfig `json:"flinkConfig"` + FlinkVersion string `json:"flinkVersion"` + TaskManagerConfig TaskManagerConfig `json:"taskManagerConfig,omitempty"` + JobManagerConfig JobManagerConfig `json:"jobManagerConfig,omitempty"` + JarName string `json:"jarName"` + Parallelism int32 `json:"parallelism"` + EntryClass string `json:"entryClass,omitempty"` + ProgramArgs string `json:"programArgs,omitempty"` + // Deprecated: use SavepointPath instead + SavepointInfo SavepointInfo `json:"savepointInfo,omitempty"` + SavepointPath string `json:"savepointPath,omitempty"` + DeploymentMode DeploymentMode `json:"deploymentMode,omitempty"` + RPCPort *int32 `json:"rpcPort,omitempty"` + BlobPort *int32 `json:"blobPort,omitempty"` + QueryPort *int32 `json:"queryPort,omitempty"` + UIPort *int32 `json:"uiPort,omitempty"` + MetricsQueryPort *int32 `json:"metricsQueryPort,omitempty"` + Volumes []apiv1.Volume `json:"volumes,omitempty"` + VolumeMounts []apiv1.VolumeMount `json:"volumeMounts,omitempty"` + RestartNonce string `json:"restartNonce"` + DeleteMode DeleteMode `json:"deleteMode,omitempty"` + AllowNonRestoredState bool `json:"allowNonRestoredState,omitempty"` + ForceRollback bool `json:"forceRollback"` } type FlinkConfig map[string]interface{} @@ -122,7 +124,6 @@ type EnvironmentConfig struct { type SavepointInfo struct { SavepointLocation string `json:"savepointLocation,omitempty"` - TriggerID string `json:"triggerId,omitempty"` } type FlinkClusterStatus struct { @@ -157,17 +158,19 @@ type FlinkJobStatus struct { } type FlinkApplicationStatus struct { - Phase FlinkApplicationPhase `json:"phase"` - StartedAt *metav1.Time `json:"startedAt,omitempty"` - LastUpdatedAt *metav1.Time `json:"lastUpdatedAt,omitempty"` - Reason string `json:"reason,omitempty"` - ClusterStatus FlinkClusterStatus `json:"clusterStatus,omitempty"` - JobStatus FlinkJobStatus `json:"jobStatus"` - FailedDeployHash string `json:"failedDeployHash,omitEmpty"` - RollbackHash string `json:"rollbackHash,omitEmpty"` - DeployHash string `json:"deployHash"` - RetryCount int32 `json:"retryCount,omitEmpty"` - LastSeenError *FlinkApplicationError `json:"lastSeenError,omitEmpty"` + Phase FlinkApplicationPhase `json:"phase"` + StartedAt *metav1.Time `json:"startedAt,omitempty"` + LastUpdatedAt *metav1.Time `json:"lastUpdatedAt,omitempty"` + Reason string `json:"reason,omitempty"` + ClusterStatus FlinkClusterStatus `json:"clusterStatus,omitempty"` + JobStatus FlinkJobStatus `json:"jobStatus"` + FailedDeployHash string `json:"failedDeployHash,omitEmpty"` + RollbackHash string `json:"rollbackHash,omitEmpty"` + DeployHash string `json:"deployHash"` + SavepointTriggerID string `json:"savepointTriggerId,omitempty"` + SavepointPath string `json:"savepointPath,omitempty"` + RetryCount int32 `json:"retryCount,omitEmpty"` + LastSeenError *FlinkApplicationError `json:"lastSeenError,omitEmpty"` } func (in *FlinkApplicationStatus) GetPhase() FlinkApplicationPhase { diff --git a/pkg/controller/flink/flink.go b/pkg/controller/flink/flink.go index 5bad4986..c9e20765 100644 --- a/pkg/controller/flink/flink.go +++ b/pkg/controller/flink/flink.go @@ -55,7 +55,8 @@ type ControllerInterface interface { // Starts the Job in the Flink Cluster StartFlinkJob(ctx context.Context, application *v1beta1.FlinkApplication, hash string, - jarName string, parallelism int32, entryClass string, programArgs string, allowNonRestoredState bool) (string, error) + jarName string, parallelism int32, entryClass string, programArgs string, allowNonRestoredState bool, + savepointPath string) (string, error) // Savepoint creation is asynchronous. // Polls the status of the Savepoint, using the triggerID @@ -271,14 +272,15 @@ func (f *Controller) CreateCluster(ctx context.Context, application *v1beta1.Fli } func (f *Controller) StartFlinkJob(ctx context.Context, application *v1beta1.FlinkApplication, hash string, - jarName string, parallelism int32, entryClass string, programArgs string, allowNonRestoredState bool) (string, error) { + jarName string, parallelism int32, entryClass string, programArgs string, allowNonRestoredState bool, + savepointPath string) (string, error) { response, err := f.flinkClient.SubmitJob( ctx, getURLFromApp(application, hash), jarName, client.SubmitJobRequest{ Parallelism: parallelism, - SavepointPath: application.Spec.SavepointInfo.SavepointLocation, + SavepointPath: savepointPath, EntryClass: entryClass, ProgramArgs: programArgs, AllowNonRestoredState: allowNonRestoredState, @@ -298,7 +300,7 @@ func (f *Controller) GetSavepointStatus(ctx context.Context, application *v1beta if err != nil { return nil, err } - return f.flinkClient.CheckSavepointStatus(ctx, getURLFromApp(application, hash), jobID, application.Spec.SavepointInfo.TriggerID) + return f.flinkClient.CheckSavepointStatus(ctx, getURLFromApp(application, hash), jobID, application.Status.SavepointTriggerID) } func (f *Controller) IsClusterReady(ctx context.Context, application *v1beta1.FlinkApplication) (bool, error) { diff --git a/pkg/controller/flink/flink_test.go b/pkg/controller/flink/flink_test.go index e151883c..f2c5f847 100644 --- a/pkg/controller/flink/flink_test.go +++ b/pkg/controller/flink/flink_test.go @@ -232,7 +232,7 @@ func TestFlinkIsServiceReadyErr(t *testing.T) { func TestFlinkGetSavepointStatus(t *testing.T) { flinkControllerForTest := getTestFlinkController() flinkApp := getFlinkTestApp() - flinkApp.Spec.SavepointInfo.TriggerID = "t1" + flinkApp.Status.SavepointTriggerID = "t1" mockJmClient := flinkControllerForTest.flinkClient.(*clientMock.JobManagerClient) mockJmClient.CheckSavepointStatusFunc = func(ctx context.Context, url string, jobID, triggerID string) (*client.SavepointResponse, error) { @@ -428,7 +428,7 @@ func TestStartFlinkJob(t *testing.T) { flinkApp.Spec.ProgramArgs = "args" flinkApp.Spec.EntryClass = "class" flinkApp.Spec.JarName = "jar-name" - flinkApp.Spec.SavepointInfo.SavepointLocation = "location//" + flinkApp.Spec.SavepointPath = "location//" flinkApp.Spec.FlinkVersion = "1.7" mockJmClient := flinkControllerForTest.flinkClient.(*clientMock.JobManagerClient) @@ -446,7 +446,8 @@ func TestStartFlinkJob(t *testing.T) { }, nil } jobID, err := flinkControllerForTest.StartFlinkJob(context.Background(), &flinkApp, "hash", - flinkApp.Spec.JarName, flinkApp.Spec.Parallelism, flinkApp.Spec.EntryClass, flinkApp.Spec.ProgramArgs, flinkApp.Spec.AllowNonRestoredState) + flinkApp.Spec.JarName, flinkApp.Spec.Parallelism, flinkApp.Spec.EntryClass, flinkApp.Spec.ProgramArgs, + flinkApp.Spec.AllowNonRestoredState, "location//") assert.Nil(t, err) assert.Equal(t, jobID, testJobID) } @@ -465,7 +466,8 @@ func TestStartFlinkJobAllowNonRestoredState(t *testing.T) { }, nil } jobID, err := flinkControllerForTest.StartFlinkJob(context.Background(), &flinkApp, "hash", - flinkApp.Spec.JarName, flinkApp.Spec.Parallelism, flinkApp.Spec.EntryClass, flinkApp.Spec.ProgramArgs, flinkApp.Spec.AllowNonRestoredState) + flinkApp.Spec.JarName, flinkApp.Spec.Parallelism, flinkApp.Spec.EntryClass, flinkApp.Spec.ProgramArgs, + flinkApp.Spec.AllowNonRestoredState, "") assert.Nil(t, err) assert.Equal(t, jobID, testJobID) } @@ -480,7 +482,8 @@ func TestStartFlinkJobEmptyJobID(t *testing.T) { return &client.SubmitJobResponse{}, nil } jobID, err := flinkControllerForTest.StartFlinkJob(context.Background(), &flinkApp, "hash", - flinkApp.Spec.JarName, flinkApp.Spec.Parallelism, flinkApp.Spec.EntryClass, flinkApp.Spec.ProgramArgs, flinkApp.Spec.AllowNonRestoredState) + flinkApp.Spec.JarName, flinkApp.Spec.Parallelism, flinkApp.Spec.EntryClass, flinkApp.Spec.ProgramArgs, + flinkApp.Spec.AllowNonRestoredState, "") assert.EqualError(t, err, "unable to submit job: invalid job id") assert.Empty(t, jobID) } @@ -494,7 +497,8 @@ func TestStartFlinkJobErr(t *testing.T) { return nil, errors.New("submit error") } jobID, err := flinkControllerForTest.StartFlinkJob(context.Background(), &flinkApp, "hash", - flinkApp.Spec.JarName, flinkApp.Spec.Parallelism, flinkApp.Spec.EntryClass, flinkApp.Spec.ProgramArgs, flinkApp.Spec.AllowNonRestoredState) + flinkApp.Spec.JarName, flinkApp.Spec.Parallelism, flinkApp.Spec.EntryClass, flinkApp.Spec.ProgramArgs, + flinkApp.Spec.AllowNonRestoredState, "") assert.EqualError(t, err, "submit error") assert.Empty(t, jobID) } diff --git a/pkg/controller/flink/mock/mock_flink.go b/pkg/controller/flink/mock/mock_flink.go index f2f7fc18..fe05c07f 100644 --- a/pkg/controller/flink/mock/mock_flink.go +++ b/pkg/controller/flink/mock/mock_flink.go @@ -14,7 +14,7 @@ type DeleteOldResourcesForApp func(ctx context.Context, application *v1beta1.Fli type CancelWithSavepointFunc func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (string, error) type ForceCancelFunc func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) error type StartFlinkJobFunc func(ctx context.Context, application *v1beta1.FlinkApplication, hash string, - jarName string, parallelism int32, entryClass string, programArgs string, allowNonRestoredState bool) (string, error) + jarName string, parallelism int32, entryClass string, programArgs string, allowNonRestoredState bool, savepointPath string) (string, error) type GetSavepointStatusFunc func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (*client.SavepointResponse, error) type IsClusterReadyFunc func(ctx context.Context, application *v1beta1.FlinkApplication) (bool, error) type IsServiceReadyFunc func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (bool, error) @@ -79,9 +79,9 @@ func (m *FlinkController) ForceCancel(ctx context.Context, application *v1beta1. } func (m *FlinkController) StartFlinkJob(ctx context.Context, application *v1beta1.FlinkApplication, hash string, - jarName string, parallelism int32, entryClass string, programArgs string, allowNonRestoredState bool) (string, error) { + jarName string, parallelism int32, entryClass string, programArgs string, allowNonRestoredState bool, savepointPath string) (string, error) { if m.StartFlinkJobFunc != nil { - return m.StartFlinkJobFunc(ctx, application, hash, jarName, parallelism, entryClass, programArgs, allowNonRestoredState) + return m.StartFlinkJobFunc(ctx, application, hash, jarName, parallelism, entryClass, programArgs, allowNonRestoredState, savepointPath) } return "", nil } diff --git a/pkg/controller/flinkapplication/flink_state_machine.go b/pkg/controller/flinkapplication/flink_state_machine.go index e092a940..2482e906 100644 --- a/pkg/controller/flinkapplication/flink_state_machine.go +++ b/pkg/controller/flinkapplication/flink_state_machine.go @@ -25,9 +25,9 @@ import ( ) const ( - jobFinalizer = "job.finalizers.flink.k8s.io" - applicationChanged = true - applicationUnchanged = false + jobFinalizer = "job.finalizers.flink.k8s.io" + statusChanged = true + statusUnchanged = false ) // The core state machine that manages Flink clusters and jobs. See docs/state_machine.md for a description of the @@ -128,13 +128,13 @@ func (s *FlinkStateMachine) Handle(ctx context.Context, application *v1beta1.Fli successTimer := s.metrics.stateMachineHandleSuccessPhaseMap[currentPhase].Start(ctx) defer timer.Stop() - updateApplication, err := s.handle(ctx, application) + updateStatus, err := s.handle(ctx, application) // Update k8s object - if updateApplication { + if updateStatus { now := v1.NewTime(s.clock.Now()) application.Status.LastUpdatedAt = &now - updateAppErr := s.k8Cluster.UpdateK8Object(ctx, application) + updateAppErr := s.k8Cluster.UpdateStatus(ctx, application) if updateAppErr != nil { s.metrics.errorCounterPhaseMap[currentPhase].Inc(ctx) return updateAppErr @@ -157,7 +157,7 @@ func (s *FlinkStateMachine) handle(ctx context.Context, application *v1beta1.Fli if !application.ObjectMeta.DeletionTimestamp.IsZero() && appPhase != v1beta1.FlinkApplicationDeleting { s.updateApplicationPhase(application, v1beta1.FlinkApplicationDeleting) // Always perform a single application update per callback - return applicationChanged, nil + return statusChanged, nil } if s.IsTimeToHandlePhase(application) { @@ -229,11 +229,11 @@ func (s *FlinkStateMachine) handleNewOrUpdating(ctx context.Context, application err := s.flinkController.CreateCluster(ctx, application) if err != nil { logger.Errorf(ctx, "Cluster creation failed with error: %v", err) - return applicationUnchanged, err + return statusUnchanged, err } s.updateApplicationPhase(application, v1beta1.FlinkApplicationClusterStarting) - return applicationChanged, nil + return statusChanged, nil } func (s *FlinkStateMachine) deployFailed(ctx context.Context, app *v1beta1.FlinkApplication) (bool, error) { @@ -248,7 +248,7 @@ func (s *FlinkStateMachine) deployFailed(ctx context.Context, app *v1beta1.Flink app.Status.RetryCount = 0 s.updateApplicationPhase(app, v1beta1.FlinkApplicationDeployFailed) - return applicationChanged, nil + return statusChanged, nil } // Create the underlying Kubernetes objects for the new cluster @@ -265,29 +265,29 @@ func (s *FlinkStateMachine) handleClusterStarting(ctx context.Context, applicati // Wait for all to be running ready, err := s.flinkController.IsClusterReady(ctx, application) if err != nil { - return applicationUnchanged, err + return statusUnchanged, err } if !ready { - return applicationUnchanged, nil + return statusUnchanged, nil } logger.Infof(ctx, "Flink cluster has started successfully") // TODO: in single mode move to submitting job s.updateApplicationPhase(application, v1beta1.FlinkApplicationSavepointing) - return applicationChanged, nil + return statusChanged, nil } func (s *FlinkStateMachine) handleApplicationSavepointing(ctx context.Context, application *v1beta1.FlinkApplication) (bool, error) { // we've already savepointed (or this is our first deploy), continue on - if application.Spec.SavepointInfo.SavepointLocation != "" || application.Status.DeployHash == "" { + if application.Status.SavepointPath != "" || application.Status.DeployHash == "" { application.Status.JobStatus.JobID = "" s.updateApplicationPhase(application, v1beta1.FlinkApplicationSubmittingJob) - return applicationChanged, nil + return statusChanged, nil } // we haven't started savepointing yet; do so now // TODO: figure out the idempotence of this - if application.Spec.SavepointInfo.TriggerID == "" { + if application.Status.SavepointTriggerID == "" { if rollback, reason := s.shouldRollback(ctx, application); rollback { // we were unable to start savepointing for our failure period, so roll back // TODO: we should think about how to handle the case where the cluster has started savepointing, but does @@ -300,20 +300,20 @@ func (s *FlinkStateMachine) handleApplicationSavepointing(ctx context.Context, a triggerID, err := s.flinkController.CancelWithSavepoint(ctx, application, application.Status.DeployHash) if err != nil { - return applicationUnchanged, err + return statusUnchanged, err } s.flinkController.LogEvent(ctx, application, corev1.EventTypeNormal, "CancellingJob", fmt.Sprintf("Cancelling job %s with a final savepoint", application.Status.JobStatus.JobID)) - application.Spec.SavepointInfo.TriggerID = triggerID - return applicationChanged, nil + application.Status.SavepointTriggerID = triggerID + return statusChanged, nil } // check the savepoints in progress savepointStatusResponse, err := s.flinkController.GetSavepointStatus(ctx, application, application.Status.DeployHash) if err != nil { - return applicationUnchanged, err + return statusUnchanged, err } var restorePath string @@ -348,17 +348,18 @@ func (s *FlinkStateMachine) handleApplicationSavepointing(ctx context.Context, a } if restorePath != "" { - application.Spec.SavepointInfo.SavepointLocation = restorePath + application.Status.SavepointPath = restorePath application.Status.JobStatus.JobID = "" s.updateApplicationPhase(application, v1beta1.FlinkApplicationSubmittingJob) - return applicationChanged, nil + return statusChanged, nil } - return applicationUnchanged, nil + return statusUnchanged, nil } func (s *FlinkStateMachine) submitJobIfNeeded(ctx context.Context, app *v1beta1.FlinkApplication, hash string, - jarName string, parallelism int32, entryClass string, programArgs string, allowNonRestoredState bool) (string, error) { + jarName string, parallelism int32, entryClass string, programArgs string, allowNonRestoredState bool, + savepointPath string) (string, error) { isReady, _ := s.flinkController.IsServiceReady(ctx, app, hash) // Ignore errors if !isReady { @@ -387,8 +388,9 @@ func (s *FlinkStateMachine) submitJobIfNeeded(ctx context.Context, app *v1beta1. case 0: // no active jobs, we need to submit a new one logger.Infof(ctx, "No job found for the application") + jobID, err := s.flinkController.StartFlinkJob(ctx, app, hash, - jarName, parallelism, entryClass, programArgs, allowNonRestoredState) + jarName, parallelism, entryClass, programArgs, allowNonRestoredState, savepointPath) if err != nil { s.flinkController.LogEvent(ctx, app, corev1.EventTypeWarning, "JobSubmissionFailed", fmt.Sprintf("Failed to submit job to cluster for deploy %s: %v", hash, err)) @@ -438,14 +440,14 @@ func (s *FlinkStateMachine) handleSubmittingJob(ctx context.Context, app *v1beta s.flinkController.LogEvent(ctx, app, corev1.EventTypeWarning, "JobsSubmissionFailed", fmt.Sprintf("Failed to submit job: %s", reason)) s.updateApplicationPhase(app, v1beta1.FlinkApplicationRollingBackJob) - return applicationChanged, nil + return statusChanged, nil } // switch the service to point to the new jobmanager hash := flink.HashForApplication(app) err := s.updateGenericService(ctx, app, hash) if err != nil { - return applicationUnchanged, err + return statusUnchanged, err } // Update status of the cluster @@ -455,32 +457,46 @@ func (s *FlinkStateMachine) handleSubmittingJob(ctx context.Context, app *v1beta } if app.Status.JobStatus.JobID == "" { + savepointPath := "" + if app.Status.DeployHash == "" { + // this is the first deploy, use the user-provided savepoint + savepointPath = app.Spec.SavepointPath + if savepointPath == "" { + //nolint // fall back to the old config for backwards-compatibility + savepointPath = app.Spec.SavepointInfo.SavepointLocation + } + } else { + // otherwise use the savepoint created by the operator + savepointPath = app.Status.SavepointPath + } + appJobID, err := s.submitJobIfNeeded(ctx, app, hash, - app.Spec.JarName, app.Spec.Parallelism, app.Spec.EntryClass, app.Spec.ProgramArgs, app.Spec.AllowNonRestoredState) + app.Spec.JarName, app.Spec.Parallelism, app.Spec.EntryClass, app.Spec.ProgramArgs, + app.Spec.AllowNonRestoredState, savepointPath) if err != nil { - return applicationUnchanged, err + return statusUnchanged, err } if appJobID != "" { app.Status.JobStatus.JobID = appJobID - return applicationChanged, nil + return statusChanged, nil } // we weren't ready to submit yet - return applicationUnchanged, nil + return statusUnchanged, nil } // get the state of the current application job, err := s.flinkController.GetJobForApplication(ctx, app, hash) if err != nil { - return applicationUnchanged, err + return statusUnchanged, err } if job.State == client.Running { - // Clear the savepoint info - app.Spec.SavepointInfo = v1beta1.SavepointInfo{} // Update the application status with the running job info app.Status.DeployHash = hash + app.Status.SavepointPath = "" + app.Status.SavepointTriggerID = "" app.Status.JobStatus.JarName = app.Spec.JarName app.Status.JobStatus.Parallelism = app.Spec.Parallelism app.Status.JobStatus.EntryClass = app.Spec.EntryClass @@ -488,10 +504,10 @@ func (s *FlinkStateMachine) handleSubmittingJob(ctx context.Context, app *v1beta app.Status.JobStatus.AllowNonRestoredState = app.Spec.AllowNonRestoredState s.updateApplicationPhase(app, v1beta1.FlinkApplicationRunning) - return applicationChanged, nil + return statusChanged, nil } - return applicationUnchanged, nil + return statusUnchanged, nil } // Something has gone wrong during the update, post job-cancellation (and cluster tear-down in single mode). We need @@ -516,36 +532,38 @@ func (s *FlinkStateMachine) handleRollingBack(ctx context.Context, app *v1beta1. // update the service to point back to the old deployment if needed err := s.updateGenericService(ctx, app, app.Status.DeployHash) if err != nil { - return applicationUnchanged, err + return statusUnchanged, err } // wait until the service is ready isReady, _ := s.flinkController.IsServiceReady(ctx, app, app.Status.DeployHash) // Ignore errors if !isReady { - return applicationUnchanged, nil + return statusUnchanged, nil } // submit the old job jobID, err := s.submitJobIfNeeded(ctx, app, app.Status.DeployHash, app.Status.JobStatus.JarName, app.Status.JobStatus.Parallelism, app.Status.JobStatus.EntryClass, app.Status.JobStatus.ProgramArgs, - app.Status.JobStatus.AllowNonRestoredState) + app.Status.JobStatus.AllowNonRestoredState, + app.Status.SavepointPath) // set rollbackHash app.Status.RollbackHash = app.Status.DeployHash if err != nil { - return applicationUnchanged, err + return statusUnchanged, err } if jobID != "" { app.Status.JobStatus.JobID = jobID - app.Spec.SavepointInfo = v1beta1.SavepointInfo{} + app.Status.SavepointPath = "" + app.Status.SavepointTriggerID = "" // move to the deploy failed state return s.deployFailed(ctx, app) } - return applicationUnchanged, nil + return statusUnchanged, nil } // Check if the application is Running. @@ -554,7 +572,7 @@ func (s *FlinkStateMachine) handleApplicationRunning(ctx context.Context, applic job, err := s.flinkController.GetJobForApplication(ctx, application, application.Status.DeployHash) if err != nil { // TODO: think more about this case - return applicationUnchanged, err + return statusUnchanged, err } if job == nil { @@ -565,7 +583,7 @@ func (s *FlinkStateMachine) handleApplicationRunning(ctx context.Context, applic cur, err := s.flinkController.GetCurrentDeploymentsForApp(ctx, application) if err != nil { - return applicationUnchanged, err + return statusUnchanged, err } // If the application has changed (i.e., there are no current deployments), and we haven't already failed trying to @@ -574,7 +592,7 @@ func (s *FlinkStateMachine) handleApplicationRunning(ctx context.Context, applic logger.Infof(ctx, "Application resource has changed. Moving to Updating") // TODO: handle single mode s.updateApplicationPhase(application, v1beta1.FlinkApplicationUpdating) - return applicationChanged, nil + return statusChanged, nil } // If there are old resources left-over from a previous version, clean them up @@ -597,10 +615,10 @@ func (s *FlinkStateMachine) handleApplicationRunning(ctx context.Context, applic // Update k8s object if either job or cluster status has changed if hasJobStatusChanged || hasClusterStatusChanged { - return applicationChanged, nil + return statusChanged, nil } - return applicationUnchanged, nil + return statusUnchanged, nil } func (s *FlinkStateMachine) addFinalizerIfMissing(ctx context.Context, application *v1beta1.FlinkApplication, finalizer string) error { @@ -626,9 +644,9 @@ func removeString(list []string, target string) []string { return ret } -func (s *FlinkStateMachine) clearFinalizers(app *v1beta1.FlinkApplication) (bool, error) { +func (s *FlinkStateMachine) clearFinalizers(ctx context.Context, app *v1beta1.FlinkApplication) (bool, error) { app.Finalizers = removeString(app.Finalizers, jobFinalizer) - return applicationChanged, nil + return statusUnchanged, s.k8Cluster.UpdateK8Object(ctx, app) } func jobFinished(job *client.FlinkJobOverview) bool { @@ -647,43 +665,50 @@ func (s *FlinkStateMachine) handleApplicationDeleting(ctx context.Context, app * // If the delete mode is none or there's no deployhash set (which means we failed to submit the job on the // first deploy) just delete the finalizer so the cluster can be torn down if app.Spec.DeleteMode == v1beta1.DeleteModeNone || app.Status.DeployHash == "" { - return s.clearFinalizers(app) + return s.clearFinalizers(ctx, app) } job, err := s.flinkController.GetJobForApplication(ctx, app, app.Status.DeployHash) if err != nil { - return applicationUnchanged, err - } - - if jobFinished(job) { - // there are no running jobs for this application, we can just tear down - return s.clearFinalizers(app) + return statusUnchanged, err } switch app.Spec.DeleteMode { case v1beta1.DeleteModeForceCancel: - logger.Infof(ctx, "Force cancelling job as part of cleanup") - return applicationUnchanged, s.flinkController.ForceCancel(ctx, app, app.Status.DeployHash) + if job.State == client.Cancelling { + // we've already cancelled the job, waiting for it to finish + return statusUnchanged, nil + } else if jobFinished(job) { + // job was successfully cancelled + return s.clearFinalizers(ctx, app) + } + + logger.Infof(ctx, "Force-cancelling job without a savepoint") + return statusUnchanged, s.flinkController.ForceCancel(ctx, app, app.Status.DeployHash) case v1beta1.DeleteModeSavepoint, "": - if app.Spec.SavepointInfo.SavepointLocation != "" { + if app.Status.SavepointPath != "" { // we've already created the savepoint, now just waiting for the job to be cancelled - return applicationUnchanged, nil + if jobFinished(job) { + return s.clearFinalizers(ctx, app) + } + + return statusUnchanged, nil } - if app.Spec.SavepointInfo.TriggerID == "" { + if app.Status.SavepointTriggerID == "" { // delete with savepoint triggerID, err := s.flinkController.CancelWithSavepoint(ctx, app, app.Status.DeployHash) if err != nil { - return applicationUnchanged, err + return statusUnchanged, err } s.flinkController.LogEvent(ctx, app, corev1.EventTypeNormal, "CancellingJob", fmt.Sprintf("Cancelling job with savepoint %v", triggerID)) - app.Spec.SavepointInfo.TriggerID = triggerID + app.Status.SavepointTriggerID = triggerID } else { // we've already started savepointing; check the status status, err := s.flinkController.GetSavepointStatus(ctx, app, app.Status.DeployHash) if err != nil { - return applicationUnchanged, err + return statusUnchanged, err } if status.Operation.Location == "" && status.SavepointStatus.Status != client.SavePointInProgress { @@ -691,31 +716,31 @@ func (s *FlinkStateMachine) handleApplicationDeleting(ctx context.Context, app * s.flinkController.LogEvent(ctx, app, corev1.EventTypeWarning, "SavepointFailed", fmt.Sprintf("Failed to take savepoint %v", status.Operation.FailureCause)) // clear the trigger id so that we can try again - app.Spec.SavepointInfo.TriggerID = "" + app.Status.SavepointTriggerID = "" return true, client.GetRetryableError(errors.New("failed to take savepoint"), v1beta1.CancelJobWithSavepoint, "500", math.MaxInt32) } else if status.SavepointStatus.Status == client.SavePointCompleted { // we're done, clean up s.flinkController.LogEvent(ctx, app, corev1.EventTypeNormal, "CanceledJob", fmt.Sprintf("Cancelled job with savepoint '%s'", status.Operation.Location)) - app.Spec.SavepointInfo.SavepointLocation = status.Operation.Location - app.Spec.SavepointInfo.TriggerID = "" + app.Status.SavepointPath = status.Operation.Location + app.Status.SavepointTriggerID = "" } } - return applicationChanged, nil + return statusChanged, nil default: logger.Errorf(ctx, "Unsupported DeleteMode %s", app.Spec.DeleteMode) } - return applicationUnchanged, nil + return statusUnchanged, nil } func (s *FlinkStateMachine) compareAndUpdateError(application *v1beta1.FlinkApplication, err error) bool { oldErr := application.Status.LastSeenError if err == nil && oldErr == nil { - return applicationUnchanged + return statusUnchanged } if err == nil { @@ -732,7 +757,7 @@ func (s *FlinkStateMachine) compareAndUpdateError(application *v1beta1.FlinkAppl application.Status.LastSeenError.LastErrorUpdateTime = &now } - return applicationChanged + return statusChanged } diff --git a/pkg/controller/flinkapplication/flink_state_machine_test.go b/pkg/controller/flinkapplication/flink_state_machine_test.go index 770f1a8e..113360e2 100644 --- a/pkg/controller/flinkapplication/flink_state_machine_test.go +++ b/pkg/controller/flinkapplication/flink_state_machine_test.go @@ -97,7 +97,7 @@ func TestHandleStartingDual(t *testing.T) { } mockK8Cluster := stateMachineForTest.k8Cluster.(*k8mock.K8Cluster) - mockK8Cluster.UpdateK8ObjectFunc = func(ctx context.Context, object runtime.Object) error { + mockK8Cluster.UpdateStatusFunc = func(ctx context.Context, object runtime.Object) error { application := object.(*v1beta1.FlinkApplication) assert.Equal(t, v1beta1.FlinkApplicationSavepointing, application.Status.Phase) updateInvoked = true @@ -125,7 +125,7 @@ func TestHandleApplicationSavepointingInitialDeploy(t *testing.T) { } mockK8Cluster := stateMachineForTest.k8Cluster.(*k8mock.K8Cluster) - mockK8Cluster.UpdateK8ObjectFunc = func(ctx context.Context, object runtime.Object) error { + mockK8Cluster.UpdateStatusFunc = func(ctx context.Context, object runtime.Object) error { application := object.(*v1beta1.FlinkApplication) assert.Equal(t, v1beta1.FlinkApplicationSubmittingJob, application.Status.Phase) updateInvoked = true @@ -174,12 +174,12 @@ func TestHandleApplicationSavepointingDual(t *testing.T) { mockK8Cluster := stateMachineForTest.k8Cluster.(*k8mock.K8Cluster) updateCount := 0 - mockK8Cluster.UpdateK8ObjectFunc = func(ctx context.Context, object runtime.Object) error { + mockK8Cluster.UpdateStatusFunc = func(ctx context.Context, object runtime.Object) error { application := object.(*v1beta1.FlinkApplication) if updateCount == 0 { - assert.Equal(t, "trigger", application.Spec.SavepointInfo.TriggerID) + assert.Equal(t, "trigger", application.Status.SavepointTriggerID) } else { - assert.Equal(t, testSavepointLocation, application.Spec.SavepointInfo.SavepointLocation) + assert.Equal(t, testSavepointLocation, application.Status.SavepointPath) assert.Equal(t, v1beta1.FlinkApplicationSubmittingJob, application.Status.Phase) } @@ -211,22 +211,19 @@ func TestHandleApplicationSavepointingFailed(t *testing.T) { } app := v1beta1.FlinkApplication{ - Spec: v1beta1.FlinkApplicationSpec{ - SavepointInfo: v1beta1.SavepointInfo{ - TriggerID: "trigger", - }, - }, + Spec: v1beta1.FlinkApplicationSpec{}, Status: v1beta1.FlinkApplicationStatus{ - Phase: v1beta1.FlinkApplicationSavepointing, - DeployHash: "blah", + Phase: v1beta1.FlinkApplicationSavepointing, + DeployHash: "blah", + SavepointTriggerID: "trigger", }, } hash := flink.HashForApplication(&app) mockK8Cluster := stateMachineForTest.k8Cluster.(*k8mock.K8Cluster) - mockK8Cluster.UpdateK8ObjectFunc = func(ctx context.Context, object runtime.Object) error { + mockK8Cluster.UpdateStatusFunc = func(ctx context.Context, object runtime.Object) error { application := object.(*v1beta1.FlinkApplication) - assert.Empty(t, application.Spec.SavepointInfo.SavepointLocation) + assert.Empty(t, application.Status.SavepointPath) assert.Equal(t, hash, application.Status.FailedDeployHash) assert.Equal(t, v1beta1.FlinkApplicationDeployFailed, application.Status.Phase) updateInvoked = true @@ -241,14 +238,11 @@ func TestRestoreFromExternalizedCheckpoint(t *testing.T) { updateInvoked := false app := v1beta1.FlinkApplication{ - Spec: v1beta1.FlinkApplicationSpec{ - SavepointInfo: v1beta1.SavepointInfo{ - TriggerID: "trigger", - }, - }, + Spec: v1beta1.FlinkApplicationSpec{}, Status: v1beta1.FlinkApplicationStatus{ - Phase: v1beta1.FlinkApplicationSavepointing, - DeployHash: "blah", + Phase: v1beta1.FlinkApplicationSavepointing, + DeployHash: "blah", + SavepointTriggerID: "trigger", }, } @@ -267,9 +261,9 @@ func TestRestoreFromExternalizedCheckpoint(t *testing.T) { } mockK8Cluster := stateMachineForTest.k8Cluster.(*k8mock.K8Cluster) - mockK8Cluster.UpdateK8ObjectFunc = func(ctx context.Context, object runtime.Object) error { + mockK8Cluster.UpdateStatusFunc = func(ctx context.Context, object runtime.Object) error { application := object.(*v1beta1.FlinkApplication) - assert.Equal(t, "/tmp/checkpoint", application.Spec.SavepointInfo.SavepointLocation) + assert.Equal(t, "/tmp/checkpoint", application.Status.SavepointPath) assert.Equal(t, v1beta1.FlinkApplicationSubmittingJob, application.Status.Phase) updateInvoked = true return nil @@ -316,7 +310,7 @@ func TestSubmittingToRunning(t *testing.T) { startCount := 0 mockFlinkController.StartFlinkJobFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string, - jarName string, parallelism int32, entryClass string, programArgs string, allowNonRestoredState bool) (string, error) { + jarName string, parallelism int32, entryClass string, programArgs string, allowNonRestoredState bool, savepointPath string) (string, error) { assert.Equal(t, appHash, hash) assert.Equal(t, app.Spec.JarName, jarName) @@ -324,6 +318,7 @@ func TestSubmittingToRunning(t *testing.T) { assert.Equal(t, app.Spec.EntryClass, entryClass) assert.Equal(t, app.Spec.ProgramArgs, programArgs) assert.Equal(t, app.Spec.AllowNonRestoredState, allowNonRestoredState) + assert.Equal(t, app.Spec.SavepointPath, savepointPath) startCount++ return jobID, nil @@ -364,10 +359,18 @@ func TestSubmittingToRunning(t *testing.T) { } else if updateCount == 1 { application := object.(*v1beta1.FlinkApplication) assert.Equal(t, jobFinalizer, application.Finalizers[0]) - } else if updateCount == 2 { + } + + updateCount++ + return nil + } + + statusUpdateCount := 0 + mockK8Cluster.UpdateStatusFunc = func(ctx context.Context, object runtime.Object) error { + if statusUpdateCount == 0 { application := object.(*v1beta1.FlinkApplication) assert.Equal(t, jobID, application.Status.JobStatus.JobID) - } else if updateCount == 3 { + } else if statusUpdateCount == 1 { application := object.(*v1beta1.FlinkApplication) assert.Equal(t, appHash, application.Status.DeployHash) assert.Equal(t, app.Spec.JarName, app.Status.JobStatus.JarName) @@ -376,8 +379,7 @@ func TestSubmittingToRunning(t *testing.T) { assert.Equal(t, app.Spec.ProgramArgs, app.Status.JobStatus.ProgramArgs) assert.Equal(t, v1beta1.FlinkApplicationRunning, application.Status.Phase) } - - updateCount++ + statusUpdateCount++ return nil } @@ -387,7 +389,8 @@ func TestSubmittingToRunning(t *testing.T) { assert.Nil(t, err) assert.Equal(t, 1, startCount) - assert.Equal(t, 4, updateCount) + assert.Equal(t, 2, updateCount) + assert.Equal(t, 2, statusUpdateCount) } func TestHandleApplicationNotReady(t *testing.T) { @@ -401,7 +404,7 @@ func TestHandleApplicationNotReady(t *testing.T) { return nil, nil } mockFlinkController.StartFlinkJobFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string, - jarName string, parallelism int32, entryClass string, programArgs string, allowNonRestoredState bool) (string, error) { + jarName string, parallelism int32, entryClass string, programArgs string, allowNonRestoredState bool, savepointPath string) (string, error) { assert.False(t, true) return "", nil } @@ -465,7 +468,7 @@ func TestRunningToClusterStarting(t *testing.T) { } mockK8Cluster := stateMachineForTest.k8Cluster.(*k8mock.K8Cluster) - mockK8Cluster.UpdateK8ObjectFunc = func(ctx context.Context, object runtime.Object) error { + mockK8Cluster.UpdateStatusFunc = func(ctx context.Context, object runtime.Object) error { application := object.(*v1beta1.FlinkApplication) assert.Equal(t, v1beta1.FlinkApplicationUpdating, application.Status.Phase) updateInvoked = true @@ -495,8 +498,9 @@ func TestRollingBack(t *testing.T) { ProgramArgs: "--test", }, Status: v1beta1.FlinkApplicationStatus{ - Phase: v1beta1.FlinkApplicationRollingBackJob, - DeployHash: "old-hash", + Phase: v1beta1.FlinkApplicationRollingBackJob, + DeployHash: "old-hash", + SavepointPath: "file:///savepoint", JobStatus: v1beta1.FlinkJobStatus{ JarName: "old-job.jar", Parallelism: 10, @@ -516,7 +520,7 @@ func TestRollingBack(t *testing.T) { startCalled := false mockFlinkController.StartFlinkJobFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string, - jarName string, parallelism int32, entryClass string, programArgs string, allowNonRestoredState bool) (string, error) { + jarName string, parallelism int32, entryClass string, programArgs string, allowNonRestoredState bool, savepointPath string) (string, error) { startCalled = true assert.Equal(t, "old-hash", hash) @@ -525,6 +529,7 @@ func TestRollingBack(t *testing.T) { assert.Equal(t, app.Status.JobStatus.EntryClass, entryClass) assert.Equal(t, app.Status.JobStatus.ProgramArgs, programArgs) assert.Equal(t, app.Status.JobStatus.AllowNonRestoredState, allowNonRestoredState) + assert.Equal(t, app.Status.SavepointPath, savepointPath) return jobID, nil } @@ -578,13 +583,20 @@ func TestRollingBack(t *testing.T) { } else if updateCount == 1 { application := object.(*v1beta1.FlinkApplication) assert.Equal(t, jobFinalizer, application.Finalizers[0]) - } else if updateCount == 2 { + } + + updateCount++ + return nil + } + + statusUpdated := false + mockK8Cluster.UpdateStatusFunc = func(ctx context.Context, object runtime.Object) error { + if !statusUpdated { application := object.(*v1beta1.FlinkApplication) assert.Equal(t, appHash, application.Status.FailedDeployHash) assert.Equal(t, v1beta1.FlinkApplicationDeployFailed, application.Status.Phase) + statusUpdated = true } - - updateCount++ return nil } @@ -594,7 +606,8 @@ func TestRollingBack(t *testing.T) { assert.Nil(t, err) assert.True(t, startCalled) - assert.Equal(t, 4, updateCount) + assert.True(t, statusUpdated) + assert.Equal(t, 2, updateCount) } func TestIsApplicationStuck(t *testing.T) { @@ -688,28 +701,32 @@ func TestDeleteWithSavepoint(t *testing.T) { } mockK8Cluster := stateMachineForTest.k8Cluster.(*k8mock.K8Cluster) - updateCount := 1 - mockK8Cluster.UpdateK8ObjectFunc = func(ctx context.Context, object runtime.Object) error { + updateStatusCount := 0 + mockK8Cluster.UpdateStatusFunc = func(ctx context.Context, object runtime.Object) error { application := object.(*v1beta1.FlinkApplication) assert.Equal(t, v1beta1.FlinkApplicationDeleting, application.Status.Phase) - if updateCount == 1 { - assert.Equal(t, triggerID, application.Spec.SavepointInfo.TriggerID) - } else if updateCount == 2 { + if updateStatusCount == 0 { + assert.Equal(t, triggerID, application.Status.SavepointTriggerID) + } else if updateStatusCount == 1 { assert.NotNil(t, application.Status.LastSeenError) - } else if updateCount == 3 { - assert.Equal(t, savepointPath, application.Spec.SavepointInfo.SavepointLocation) - } else if updateCount == 4 { - assert.Equal(t, 0, len(app.Finalizers)) + } else if updateStatusCount == 2 { + assert.Equal(t, savepointPath, application.Status.SavepointPath) } - updateCount++ + updateStatusCount++ + return nil + } + + updated := false + mockK8Cluster.UpdateK8ObjectFunc = func(ctx context.Context, object runtime.Object) error { + assert.Equal(t, 0, len(app.Finalizers)) + updated = true return nil } err := stateMachineForTest.Handle(context.Background(), &app) assert.NoError(t, err) - assert.Equal(t, 2, updateCount) savepointStatusCount := 0 mockFlinkController.GetSavepointStatusFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (*client.SavepointResponse, error) { @@ -745,7 +762,7 @@ func TestDeleteWithSavepoint(t *testing.T) { err = stateMachineForTest.Handle(context.Background(), &app) assert.NoError(t, err) - assert.Equal(t, 4, updateCount) + assert.Equal(t, 3, updateStatusCount) mockFlinkController.GetJobForApplicationFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (jobs *client.FlinkJobOverview, err error) { return &client.FlinkJobOverview{ @@ -757,7 +774,7 @@ func TestDeleteWithSavepoint(t *testing.T) { err = stateMachineForTest.Handle(context.Background(), &app) assert.NoError(t, err) - assert.Equal(t, 5, updateCount) + assert.True(t, updated) } @@ -771,8 +788,9 @@ func TestDeleteWithSavepointAndFinishedJob(t *testing.T) { DeletionTimestamp: &metav1.Time{Time: time.Now()}, }, Status: v1beta1.FlinkApplicationStatus{ - Phase: v1beta1.FlinkApplicationDeleting, - DeployHash: "deployhash", + Phase: v1beta1.FlinkApplicationDeleting, + DeployHash: "deployhash", + SavepointPath: "file:///savepoint", JobStatus: v1beta1.FlinkJobStatus{ JobID: jobID, }, @@ -990,7 +1008,7 @@ func TestRollbackWithRetryableError(t *testing.T) { mockK8Cluster := stateMachineForTest.k8Cluster.(*k8mock.K8Cluster) updateErrCount := 0 - mockK8Cluster.UpdateK8ObjectFunc = func(ctx context.Context, object runtime.Object) error { + mockK8Cluster.UpdateStatusFunc = func(ctx context.Context, object runtime.Object) error { updateErrCount++ return nil } @@ -1056,7 +1074,7 @@ func TestRollbackWithFailFastError(t *testing.T) { } failFastError := client.GetNonRetryableError(errors.New("blah"), "SubmitJob", "400BadRequest") mockFlinkController.StartFlinkJobFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string, - jarName string, parallelism int32, entryClass string, programArgs string, allowNonRestoredState bool) (string, error) { + jarName string, parallelism int32, entryClass string, programArgs string, allowNonRestoredState bool, savepointPath string) (string, error) { return "", failFastError } diff --git a/pkg/controller/k8/cluster.go b/pkg/controller/k8/cluster.go index eb20d388..ff4c0eed 100644 --- a/pkg/controller/k8/cluster.go +++ b/pkg/controller/k8/cluster.go @@ -38,6 +38,8 @@ type ClusterInterface interface { CreateK8Object(ctx context.Context, object runtime.Object) error UpdateK8Object(ctx context.Context, object runtime.Object) error DeleteK8Object(ctx context.Context, object runtime.Object) error + + UpdateStatus(ctx context.Context, object runtime.Object) error } func NewK8Cluster(mgr manager.Manager, cfg config.RuntimeConfig) ClusterInterface { @@ -170,7 +172,7 @@ func (k *Cluster) CreateK8Object(ctx context.Context, object runtime.Object) err objCreate := object.DeepCopyObject() err := k.client.Create(ctx, objCreate) if err != nil { - logger.Errorf(ctx, "K8 object creation failed %v", err) + logger.Errorf(ctx, "K8s object creation failed %v", err) k.metrics.createFailure.Inc(ctx) return err } @@ -186,7 +188,25 @@ func (k *Cluster) UpdateK8Object(ctx context.Context, object runtime.Object) err logger.Warnf(ctx, "Conflict while updating object") k.metrics.updateConflicts.Inc(ctx) } else { - logger.Errorf(ctx, "K8 object update failed %v", err) + logger.Errorf(ctx, "K8s object update failed %v", err) + k.metrics.updateFailure.Inc(ctx) + } + return err + } + k.metrics.updateSuccess.Inc(ctx) + return nil +} + +func (k *Cluster) UpdateStatus(ctx context.Context, object runtime.Object) error { + objectCopy := object.DeepCopyObject() + + err := k.client.Status().Update(ctx, objectCopy) + if err != nil { + if errors.IsConflict(err) { + logger.Warnf(ctx, "Conflict while updating status") + k.metrics.updateConflicts.Inc(ctx) + } else { + logger.Errorf(ctx, "K8s object update failed %v", err) k.metrics.updateFailure.Inc(ctx) } return err @@ -199,7 +219,7 @@ func (k *Cluster) DeleteK8Object(ctx context.Context, object runtime.Object) err objDelete := object.DeepCopyObject() err := k.client.Delete(ctx, objDelete) if err != nil { - logger.Errorf(ctx, "K8 object delete failed %v", err) + logger.Errorf(ctx, "K8s object delete failed %v", err) k.metrics.deleteFailure.Inc(ctx) return err } diff --git a/pkg/controller/k8/mock/mock_k8.go b/pkg/controller/k8/mock/mock_k8.go index 66196d7b..9e3c79c7 100644 --- a/pkg/controller/k8/mock/mock_k8.go +++ b/pkg/controller/k8/mock/mock_k8.go @@ -13,6 +13,7 @@ type CreateK8ObjectFunc func(ctx context.Context, object runtime.Object) error type GetServiceFunc func(ctx context.Context, namespace string, name string) (*corev1.Service, error) type GetServiceWithLabelFunc func(ctx context.Context, namespace string, labelMap map[string]string) (*corev1.ServiceList, error) type UpdateK8ObjectFunc func(ctx context.Context, object runtime.Object) error +type UpdateStatusFunc func(ctx context.Context, object runtime.Object) error type DeleteK8ObjectFunc func(ctx context.Context, object runtime.Object) error type K8Cluster struct { @@ -21,6 +22,7 @@ type K8Cluster struct { GetServicesWithLabelFunc GetServiceWithLabelFunc CreateK8ObjectFunc CreateK8ObjectFunc UpdateK8ObjectFunc UpdateK8ObjectFunc + UpdateStatusFunc UpdateStatusFunc DeleteK8ObjectFunc DeleteK8ObjectFunc } @@ -59,6 +61,13 @@ func (m *K8Cluster) UpdateK8Object(ctx context.Context, object runtime.Object) e return nil } +func (m *K8Cluster) UpdateStatus(ctx context.Context, object runtime.Object) error { + if m.UpdateStatusFunc != nil { + return m.UpdateStatusFunc(ctx, object) + } + return nil +} + func (m *K8Cluster) DeleteK8Object(ctx context.Context, object runtime.Object) error { if m.DeleteK8ObjectFunc != nil { return m.DeleteK8ObjectFunc(ctx, object)