Skip to content

Commit

Permalink
[STRMCMP-675] Enable status subresource in CRD (lyft#126)
Browse files Browse the repository at this point in the history
This PR enables the status subresource for the FlinkApplication CRD. This makes it harder for users to accidentally overwrite status fields while trying to update their applications (an issue we've seen internally). For CRDs with the status subresource enabled, updates to the base resource will ignore changes to the status subrecord, and updates the status subresource will ignore changes to the spec and metadata.

The bulk of this PR is a set of changes that enables that one. Previously, the operator would modify the SavepointInfo field of the spec during updates. This goes against the general principle that the user should own the spec, and prevented a move to the status subresource as the operator would need to update both the spec and status.

Instead, we introduce a new field spec.savepointPath that contains the savepoint that will be restored on the initial job submission (the existing spec.savepointInfo.savepointLocation is retained for backwards-compatibility, but will be removed in a future CRD version). Then we introduce two new fields on the status: status.savepointPath (which contains the path of the savepoint that the operator has created during the update process) and status.savepointTrigger (which contains the trigger for the savepoint the operator is creating). This allows us to cleanly separate out user intent (which savepoint should we initially start from) from the operator's internal state and processes.

Note: this change is slightly incompatible with the existing operator. This update should not be deployed to a cluster where there are active flinkapplication updates occurring — i.e., all flinkapplications should be in a Running or DeployFailed state.
  • Loading branch information
mwylde authored Nov 13, 2019
1 parent 5955172 commit f499e7f
Show file tree
Hide file tree
Showing 12 changed files with 271 additions and 187 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ FlinkK8sOperator is a [Kubernetes operator](https://coreos.com/operators/) that

*Beta*

The operator is in use for some less-crtical jobs at Lyft. At this point the focus is on testing and stability While in
The operator is in use for some less-critical jobs at Lyft. At this point the focus is on testing and stability While in
Beta, we will attempt to limit the number of backwards-incompatible changes, but they may still occur as necessary.

## Prerequisites
* Version >= 1.9 of Kubernetes.
* Version >= 1.10 of Kubernetes (versions < 1.13 require `--feature-gates=CustomResourceSubresources=true`)
* Version >= 1.7 of Apache Flink.

## Overview
Expand Down
5 changes: 4 additions & 1 deletion deploy/crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ spec:
properties:
savepointLocation:
type: string
savepointPath:
type: string
jobManagerConfig:
type: object
properties:
Expand Down Expand Up @@ -377,7 +379,8 @@ spec:
- jarName
- parallelism
- entryClass

subresources:
status: {}
additionalPrinterColumns:
- name: Phase
type: string
Expand Down
8 changes: 4 additions & 4 deletions docs/crd.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,13 @@ Below is the list of fields in the custom resource and their description

* **programArgs** `type:string`
External configuration parameters to be passed as arguments to the job like input and output sources, etc


* **savepointPath** `type:string`
If specified, the application state will be restored from this savepoint

* **allowNonRestoredState** `type:boolean`
Skips savepoint operator state that cannot be mapped to the new program version

* **savepointInfo** `type:SavepointInfo`
Optional Savepoint info that can be passed in to indicate that the Flink job must resume from the corresponding savepoint.

* **flinkVersion** `type:string required=true`
The version of Flink to be managed. This version must match the version in the image.

Expand Down
2 changes: 1 addition & 1 deletion integ/simple_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func (s *IntegSuite) TestSimple(c *C) {
time.Sleep(100 * time.Millisecond)
}

c.Assert(app.Spec.SavepointInfo.SavepointLocation, NotNil)
c.Assert(app.Status.SavepointPath, NotNil)
job := func() map[string]interface{} {
jobs, _ := s.Util.FlinkAPIGet(app, "/jobs")
jobMap := jobs.(map[string]interface{})
Expand Down
77 changes: 40 additions & 37 deletions pkg/apis/app/v1beta1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,31 +28,33 @@ type FlinkApplication struct {
}

type FlinkApplicationSpec struct {
Image string `json:"image,omitempty" protobuf:"bytes,2,opt,name=image"`
ImagePullPolicy apiv1.PullPolicy `json:"imagePullPolicy,omitempty" protobuf:"bytes,14,opt,name=imagePullPolicy,casttype=PullPolicy"`
ImagePullSecrets []apiv1.LocalObjectReference `json:"imagePullSecrets,omitempty" patchStrategy:"merge" patchMergeKey:"name" protobuf:"bytes,15,rep,name=imagePullSecrets"`
ServiceAccountName string `json:"serviceAccountName,omitempty"`
FlinkConfig FlinkConfig `json:"flinkConfig"`
FlinkVersion string `json:"flinkVersion"`
TaskManagerConfig TaskManagerConfig `json:"taskManagerConfig,omitempty"`
JobManagerConfig JobManagerConfig `json:"jobManagerConfig,omitempty"`
JarName string `json:"jarName"`
Parallelism int32 `json:"parallelism"`
EntryClass string `json:"entryClass,omitempty"`
ProgramArgs string `json:"programArgs,omitempty"`
SavepointInfo SavepointInfo `json:"savepointInfo,omitempty"`
DeploymentMode DeploymentMode `json:"deploymentMode,omitempty"`
RPCPort *int32 `json:"rpcPort,omitempty"`
BlobPort *int32 `json:"blobPort,omitempty"`
QueryPort *int32 `json:"queryPort,omitempty"`
UIPort *int32 `json:"uiPort,omitempty"`
MetricsQueryPort *int32 `json:"metricsQueryPort,omitempty"`
Volumes []apiv1.Volume `json:"volumes,omitempty"`
VolumeMounts []apiv1.VolumeMount `json:"volumeMounts,omitempty"`
RestartNonce string `json:"restartNonce"`
DeleteMode DeleteMode `json:"deleteMode,omitempty"`
AllowNonRestoredState bool `json:"allowNonRestoredState,omitempty"`
ForceRollback bool `json:"forceRollback"`
Image string `json:"image,omitempty" protobuf:"bytes,2,opt,name=image"`
ImagePullPolicy apiv1.PullPolicy `json:"imagePullPolicy,omitempty" protobuf:"bytes,14,opt,name=imagePullPolicy,casttype=PullPolicy"`
ImagePullSecrets []apiv1.LocalObjectReference `json:"imagePullSecrets,omitempty" patchStrategy:"merge" patchMergeKey:"name" protobuf:"bytes,15,rep,name=imagePullSecrets"`
ServiceAccountName string `json:"serviceAccountName,omitempty"`
FlinkConfig FlinkConfig `json:"flinkConfig"`
FlinkVersion string `json:"flinkVersion"`
TaskManagerConfig TaskManagerConfig `json:"taskManagerConfig,omitempty"`
JobManagerConfig JobManagerConfig `json:"jobManagerConfig,omitempty"`
JarName string `json:"jarName"`
Parallelism int32 `json:"parallelism"`
EntryClass string `json:"entryClass,omitempty"`
ProgramArgs string `json:"programArgs,omitempty"`
// Deprecated: use SavepointPath instead
SavepointInfo SavepointInfo `json:"savepointInfo,omitempty"`
SavepointPath string `json:"savepointPath,omitempty"`
DeploymentMode DeploymentMode `json:"deploymentMode,omitempty"`
RPCPort *int32 `json:"rpcPort,omitempty"`
BlobPort *int32 `json:"blobPort,omitempty"`
QueryPort *int32 `json:"queryPort,omitempty"`
UIPort *int32 `json:"uiPort,omitempty"`
MetricsQueryPort *int32 `json:"metricsQueryPort,omitempty"`
Volumes []apiv1.Volume `json:"volumes,omitempty"`
VolumeMounts []apiv1.VolumeMount `json:"volumeMounts,omitempty"`
RestartNonce string `json:"restartNonce"`
DeleteMode DeleteMode `json:"deleteMode,omitempty"`
AllowNonRestoredState bool `json:"allowNonRestoredState,omitempty"`
ForceRollback bool `json:"forceRollback"`
}

type FlinkConfig map[string]interface{}
Expand Down Expand Up @@ -122,7 +124,6 @@ type EnvironmentConfig struct {

type SavepointInfo struct {
SavepointLocation string `json:"savepointLocation,omitempty"`
TriggerID string `json:"triggerId,omitempty"`
}

type FlinkClusterStatus struct {
Expand Down Expand Up @@ -157,17 +158,19 @@ type FlinkJobStatus struct {
}

type FlinkApplicationStatus struct {
Phase FlinkApplicationPhase `json:"phase"`
StartedAt *metav1.Time `json:"startedAt,omitempty"`
LastUpdatedAt *metav1.Time `json:"lastUpdatedAt,omitempty"`
Reason string `json:"reason,omitempty"`
ClusterStatus FlinkClusterStatus `json:"clusterStatus,omitempty"`
JobStatus FlinkJobStatus `json:"jobStatus"`
FailedDeployHash string `json:"failedDeployHash,omitEmpty"`
RollbackHash string `json:"rollbackHash,omitEmpty"`
DeployHash string `json:"deployHash"`
RetryCount int32 `json:"retryCount,omitEmpty"`
LastSeenError *FlinkApplicationError `json:"lastSeenError,omitEmpty"`
Phase FlinkApplicationPhase `json:"phase"`
StartedAt *metav1.Time `json:"startedAt,omitempty"`
LastUpdatedAt *metav1.Time `json:"lastUpdatedAt,omitempty"`
Reason string `json:"reason,omitempty"`
ClusterStatus FlinkClusterStatus `json:"clusterStatus,omitempty"`
JobStatus FlinkJobStatus `json:"jobStatus"`
FailedDeployHash string `json:"failedDeployHash,omitEmpty"`
RollbackHash string `json:"rollbackHash,omitEmpty"`
DeployHash string `json:"deployHash"`
SavepointTriggerID string `json:"savepointTriggerId,omitempty"`
SavepointPath string `json:"savepointPath,omitempty"`
RetryCount int32 `json:"retryCount,omitEmpty"`
LastSeenError *FlinkApplicationError `json:"lastSeenError,omitEmpty"`
}

func (in *FlinkApplicationStatus) GetPhase() FlinkApplicationPhase {
Expand Down
10 changes: 6 additions & 4 deletions pkg/controller/flink/flink.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ type ControllerInterface interface {

// Starts the Job in the Flink Cluster
StartFlinkJob(ctx context.Context, application *v1beta1.FlinkApplication, hash string,
jarName string, parallelism int32, entryClass string, programArgs string, allowNonRestoredState bool) (string, error)
jarName string, parallelism int32, entryClass string, programArgs string, allowNonRestoredState bool,
savepointPath string) (string, error)

// Savepoint creation is asynchronous.
// Polls the status of the Savepoint, using the triggerID
Expand Down Expand Up @@ -271,14 +272,15 @@ func (f *Controller) CreateCluster(ctx context.Context, application *v1beta1.Fli
}

func (f *Controller) StartFlinkJob(ctx context.Context, application *v1beta1.FlinkApplication, hash string,
jarName string, parallelism int32, entryClass string, programArgs string, allowNonRestoredState bool) (string, error) {
jarName string, parallelism int32, entryClass string, programArgs string, allowNonRestoredState bool,
savepointPath string) (string, error) {
response, err := f.flinkClient.SubmitJob(
ctx,
getURLFromApp(application, hash),
jarName,
client.SubmitJobRequest{
Parallelism: parallelism,
SavepointPath: application.Spec.SavepointInfo.SavepointLocation,
SavepointPath: savepointPath,
EntryClass: entryClass,
ProgramArgs: programArgs,
AllowNonRestoredState: allowNonRestoredState,
Expand All @@ -298,7 +300,7 @@ func (f *Controller) GetSavepointStatus(ctx context.Context, application *v1beta
if err != nil {
return nil, err
}
return f.flinkClient.CheckSavepointStatus(ctx, getURLFromApp(application, hash), jobID, application.Spec.SavepointInfo.TriggerID)
return f.flinkClient.CheckSavepointStatus(ctx, getURLFromApp(application, hash), jobID, application.Status.SavepointTriggerID)
}

func (f *Controller) IsClusterReady(ctx context.Context, application *v1beta1.FlinkApplication) (bool, error) {
Expand Down
16 changes: 10 additions & 6 deletions pkg/controller/flink/flink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func TestFlinkIsServiceReadyErr(t *testing.T) {
func TestFlinkGetSavepointStatus(t *testing.T) {
flinkControllerForTest := getTestFlinkController()
flinkApp := getFlinkTestApp()
flinkApp.Spec.SavepointInfo.TriggerID = "t1"
flinkApp.Status.SavepointTriggerID = "t1"

mockJmClient := flinkControllerForTest.flinkClient.(*clientMock.JobManagerClient)
mockJmClient.CheckSavepointStatusFunc = func(ctx context.Context, url string, jobID, triggerID string) (*client.SavepointResponse, error) {
Expand Down Expand Up @@ -428,7 +428,7 @@ func TestStartFlinkJob(t *testing.T) {
flinkApp.Spec.ProgramArgs = "args"
flinkApp.Spec.EntryClass = "class"
flinkApp.Spec.JarName = "jar-name"
flinkApp.Spec.SavepointInfo.SavepointLocation = "location//"
flinkApp.Spec.SavepointPath = "location//"
flinkApp.Spec.FlinkVersion = "1.7"

mockJmClient := flinkControllerForTest.flinkClient.(*clientMock.JobManagerClient)
Expand All @@ -446,7 +446,8 @@ func TestStartFlinkJob(t *testing.T) {
}, nil
}
jobID, err := flinkControllerForTest.StartFlinkJob(context.Background(), &flinkApp, "hash",
flinkApp.Spec.JarName, flinkApp.Spec.Parallelism, flinkApp.Spec.EntryClass, flinkApp.Spec.ProgramArgs, flinkApp.Spec.AllowNonRestoredState)
flinkApp.Spec.JarName, flinkApp.Spec.Parallelism, flinkApp.Spec.EntryClass, flinkApp.Spec.ProgramArgs,
flinkApp.Spec.AllowNonRestoredState, "location//")
assert.Nil(t, err)
assert.Equal(t, jobID, testJobID)
}
Expand All @@ -465,7 +466,8 @@ func TestStartFlinkJobAllowNonRestoredState(t *testing.T) {
}, nil
}
jobID, err := flinkControllerForTest.StartFlinkJob(context.Background(), &flinkApp, "hash",
flinkApp.Spec.JarName, flinkApp.Spec.Parallelism, flinkApp.Spec.EntryClass, flinkApp.Spec.ProgramArgs, flinkApp.Spec.AllowNonRestoredState)
flinkApp.Spec.JarName, flinkApp.Spec.Parallelism, flinkApp.Spec.EntryClass, flinkApp.Spec.ProgramArgs,
flinkApp.Spec.AllowNonRestoredState, "")
assert.Nil(t, err)
assert.Equal(t, jobID, testJobID)
}
Expand All @@ -480,7 +482,8 @@ func TestStartFlinkJobEmptyJobID(t *testing.T) {
return &client.SubmitJobResponse{}, nil
}
jobID, err := flinkControllerForTest.StartFlinkJob(context.Background(), &flinkApp, "hash",
flinkApp.Spec.JarName, flinkApp.Spec.Parallelism, flinkApp.Spec.EntryClass, flinkApp.Spec.ProgramArgs, flinkApp.Spec.AllowNonRestoredState)
flinkApp.Spec.JarName, flinkApp.Spec.Parallelism, flinkApp.Spec.EntryClass, flinkApp.Spec.ProgramArgs,
flinkApp.Spec.AllowNonRestoredState, "")
assert.EqualError(t, err, "unable to submit job: invalid job id")
assert.Empty(t, jobID)
}
Expand All @@ -494,7 +497,8 @@ func TestStartFlinkJobErr(t *testing.T) {
return nil, errors.New("submit error")
}
jobID, err := flinkControllerForTest.StartFlinkJob(context.Background(), &flinkApp, "hash",
flinkApp.Spec.JarName, flinkApp.Spec.Parallelism, flinkApp.Spec.EntryClass, flinkApp.Spec.ProgramArgs, flinkApp.Spec.AllowNonRestoredState)
flinkApp.Spec.JarName, flinkApp.Spec.Parallelism, flinkApp.Spec.EntryClass, flinkApp.Spec.ProgramArgs,
flinkApp.Spec.AllowNonRestoredState, "")
assert.EqualError(t, err, "submit error")
assert.Empty(t, jobID)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/flink/mock/mock_flink.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type DeleteOldResourcesForApp func(ctx context.Context, application *v1beta1.Fli
type CancelWithSavepointFunc func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (string, error)
type ForceCancelFunc func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) error
type StartFlinkJobFunc func(ctx context.Context, application *v1beta1.FlinkApplication, hash string,
jarName string, parallelism int32, entryClass string, programArgs string, allowNonRestoredState bool) (string, error)
jarName string, parallelism int32, entryClass string, programArgs string, allowNonRestoredState bool, savepointPath string) (string, error)
type GetSavepointStatusFunc func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (*client.SavepointResponse, error)
type IsClusterReadyFunc func(ctx context.Context, application *v1beta1.FlinkApplication) (bool, error)
type IsServiceReadyFunc func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (bool, error)
Expand Down Expand Up @@ -79,9 +79,9 @@ func (m *FlinkController) ForceCancel(ctx context.Context, application *v1beta1.
}

func (m *FlinkController) StartFlinkJob(ctx context.Context, application *v1beta1.FlinkApplication, hash string,
jarName string, parallelism int32, entryClass string, programArgs string, allowNonRestoredState bool) (string, error) {
jarName string, parallelism int32, entryClass string, programArgs string, allowNonRestoredState bool, savepointPath string) (string, error) {
if m.StartFlinkJobFunc != nil {
return m.StartFlinkJobFunc(ctx, application, hash, jarName, parallelism, entryClass, programArgs, allowNonRestoredState)
return m.StartFlinkJobFunc(ctx, application, hash, jarName, parallelism, entryClass, programArgs, allowNonRestoredState, savepointPath)
}
return "", nil
}
Expand Down
Loading

0 comments on commit f499e7f

Please sign in to comment.