Skip to content

Commit

Permalink
[STRMCMP-590] Update CRD version to v1beta1 (lyft#67)
Browse files Browse the repository at this point in the history
Update CRD version to v1beta1
  • Loading branch information
Micah Wylde authored Aug 6, 2019
1 parent 500fe6b commit f687fe5
Show file tree
Hide file tree
Showing 46 changed files with 1,489 additions and 458 deletions.
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ FlinkK8sOperator is a [Kubernetes operator](https://coreos.com/operators/) that

## Project Status

*Alpha*
*Beta*

The FlinkK8sOperator is still under active development and has not been extensively tested in production environment. Backward compatibility of the APIs is not guaranteed for alpha releases.
The operator is in use for some less-crtical jobs at Lyft. At this point the focus is on testing and stability While in
Beta, we will attempt to limit the number of backwards-incompatible changes, but they may still occur as necessary.

## Prerequisites
* Version >= 1.9 of Kubernetes.
Expand Down
2 changes: 1 addition & 1 deletion config/config.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
apiVersion: flink.k8s.io/v1alpha1
apiVersion: flink.k8s.io/v1beta1
kind: FlinkApplication
projectName: flinkk8soperator
2 changes: 1 addition & 1 deletion deploy/crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ spec:
shortNames:
- flinkapp
scope: Namespaced
version: v1alpha1
version: v1beta1
validation:
# openAPIV3Schema is the schema for validating custom objects.
openAPIV3Schema:
Expand Down
4 changes: 2 additions & 2 deletions deploy/flinkk8soperator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ spec:
metadata:
labels:
app: flinkoperator
app.kubernetes.io/version: 0.1.3
app.kubernetes.io/version: 0.2.0
spec:
serviceAccountName: flinkoperator
volumes:
Expand All @@ -26,7 +26,7 @@ spec:
path: config.yaml
containers:
- name: flinkoperator-gojson
image: docker.io/lyft/flinkk8soperator:v0.1.3
image: docker.io/lyft/flinkk8soperator:v0.2.0
command:
- flinkoperator
args:
Expand Down
4 changes: 2 additions & 2 deletions docs/crd.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ The [flinkapplication](https://github.com/lyft/flinkk8soperator/blob/master/depl

[FlinkApplication Custom Resource Example](https://github.com/lyft/flinkk8soperator/blob/master/examples/wordcount/flink-operator-custom-resource.yaml)

The type information is available here [FlinkApplication Type](https://github.com/lyft/flinkk8soperator/blob/master/pkg/apis/app/v1alpha1/types.go#L25)
The type information is available here [FlinkApplication Type](https://github.com/lyft/flinkk8soperator/blob/master/pkg/apis/app/v1beta1/types.go#L25)

Below is the list of fields in the custom resource and their description

Expand Down Expand Up @@ -111,4 +111,4 @@ Below is the list of fields in the custom resource and their description
* **ForceRollback** `type:bool`
Can be set to true to force rollback a deploy/update. The rollback is **not** performed when the application is in a **RUNNING** phase.
If an application is successfully rolled back, it is moved to a *DeployFailed* phase. Un-setting or setting `ForceRollback` to `False` will allow updates to progress normally.


4 changes: 2 additions & 2 deletions docs/quick-start-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ $ kubectl get flinkapplication.flink.k8s.io -n flink-operator wordcount-operator

The output should be something like this
```yaml
apiVersion: flink.k8s.io/v1alpha1
apiVersion: v1beta1
kind: FlinkApplication
metadata:
clusterName: ""
Expand All @@ -98,7 +98,7 @@ metadata:
name: wordcount-operator-example
namespace: flink-operator
resourceVersion: "1025774"
selfLink: /apis/flink.k8s.io/v1alpha1/namespaces/flink-operator/flinkapplications/wordcount-operator-example
selfLink: v1beta1
uid: a2855178-b29c-11e9-9a3b-025000000001
spec:
entryClass: org.apache.flink.WordCount
Expand Down
2 changes: 1 addition & 1 deletion examples/wordcount/flink-operator-custom-resource.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
apiVersion: flink.k8s.io/v1alpha1
apiVersion: flink.k8s.io/v1beta1
kind: FlinkApplication
metadata:
name: wordcount-operator-example
Expand Down
6 changes: 3 additions & 3 deletions integ/checkpoint_failure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"os"
"time"

"github.com/lyft/flinkk8soperator/pkg/apis/app/v1alpha1"
"github.com/lyft/flinkk8soperator/pkg/apis/app/v1beta1"
"github.com/prometheus/common/log"
. "gopkg.in/check.v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -27,7 +27,7 @@ func failingJobTest(s *IntegSuite, c *C, testName string, causeFailure func()) {
// Cause it to fail
causeFailure()

c.Assert(s.Util.WaitForPhase(config.Name, v1alpha1.FlinkApplicationRunning, v1alpha1.FlinkApplicationDeployFailed), IsNil)
c.Assert(s.Util.WaitForPhase(config.Name, v1beta1.FlinkApplicationRunning, v1beta1.FlinkApplicationDeployFailed), IsNil)

// wait a bit for it to start failing
time.Sleep(5 * time.Second)
Expand All @@ -40,7 +40,7 @@ func failingJobTest(s *IntegSuite, c *C, testName string, causeFailure func()) {
c.Assert(err, IsNil)

// because the checkpoint will fail, the app should move to deploy failed
c.Assert(s.Util.WaitForPhase(config.Name, v1alpha1.FlinkApplicationDeployFailed), IsNil)
c.Assert(s.Util.WaitForPhase(config.Name, v1beta1.FlinkApplicationDeployFailed), IsNil)

// And the job should not have been updated
newApp, err := s.Util.GetFlinkApplication(config.Name)
Expand Down
38 changes: 19 additions & 19 deletions integ/simple_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"os"
"time"

"github.com/lyft/flinkk8soperator/pkg/apis/app/v1alpha1"
"github.com/lyft/flinkk8soperator/pkg/apis/app/v1beta1"
"github.com/lyft/flinkk8soperator/pkg/controller/flink/client"
"github.com/prometheus/common/log"
. "gopkg.in/check.v1"
Expand All @@ -19,7 +19,7 @@ import (

const NewImage = "lyft/operator-test-app:b1b3cb8e8f98bd41f44f9c89f8462ce255e0d13f.2"

func updateAndValidate(c *C, s *IntegSuite, name string, updateFn func(app *v1alpha1.FlinkApplication), failurePhase v1alpha1.FlinkApplicationPhase) *v1alpha1.FlinkApplication {
func updateAndValidate(c *C, s *IntegSuite, name string, updateFn func(app *v1beta1.FlinkApplication), failurePhase v1beta1.FlinkApplicationPhase) *v1beta1.FlinkApplication {
app, err := s.Util.GetFlinkApplication(name)
c.Assert(err, IsNil)

Expand All @@ -29,8 +29,8 @@ func updateAndValidate(c *C, s *IntegSuite, name string, updateFn func(app *v1al
_, err = s.Util.FlinkApps().Update(app)
c.Assert(err, IsNil)

c.Assert(s.Util.WaitForPhase(name, v1alpha1.FlinkApplicationSavepointing, failurePhase), IsNil)
c.Assert(s.Util.WaitForPhase(name, v1alpha1.FlinkApplicationRunning, failurePhase), IsNil)
c.Assert(s.Util.WaitForPhase(name, v1beta1.FlinkApplicationSavepointing, failurePhase), IsNil)
c.Assert(s.Util.WaitForPhase(name, v1beta1.FlinkApplicationRunning, failurePhase), IsNil)
c.Assert(s.Util.WaitForAllTasksInState(name, "RUNNING"), IsNil)

// check that it really updated
Expand Down Expand Up @@ -80,7 +80,7 @@ func (s *IntegSuite) TestSimple(c *C) {
c.Assert(s.Util.CreateFlinkApplication(config), IsNil,
Commentf("Failed to create flink application"))

c.Assert(s.Util.WaitForPhase(config.Name, v1alpha1.FlinkApplicationRunning, v1alpha1.FlinkApplicationDeployFailed), IsNil)
c.Assert(s.Util.WaitForPhase(config.Name, v1beta1.FlinkApplicationRunning, v1beta1.FlinkApplicationDeployFailed), IsNil)
c.Assert(s.Util.WaitForAllTasksInState(config.Name, "RUNNING"), IsNil)

pods, err := s.Util.KubeClient.CoreV1().Pods(s.Util.Namespace.Name).
Expand All @@ -94,9 +94,9 @@ func (s *IntegSuite) TestSimple(c *C) {
log.Info("Application started successfully")

// test updating the app with a new image
newApp := updateAndValidate(c, s, config.Name, func(app *v1alpha1.FlinkApplication) {
newApp := updateAndValidate(c, s, config.Name, func(app *v1beta1.FlinkApplication) {
app.Spec.Image = NewImage
}, v1alpha1.FlinkApplicationDeployFailed)
}, v1beta1.FlinkApplicationDeployFailed)
// check that the pods have the new image
c.Assert(newApp.Spec.Image, Equals, NewImage)
pods, err = s.Util.KubeClient.CoreV1().Pods(s.Util.Namespace.Name).
Expand All @@ -108,9 +108,9 @@ func (s *IntegSuite) TestSimple(c *C) {
}

// test updating the app with a config change
newApp = updateAndValidate(c, s, config.Name, func(app *v1alpha1.FlinkApplication) {
newApp = updateAndValidate(c, s, config.Name, func(app *v1beta1.FlinkApplication) {
app.Spec.FlinkConfig["akka.client.timeout"] = "23 s"
}, v1alpha1.FlinkApplicationDeployFailed)
}, v1beta1.FlinkApplicationDeployFailed)
// validate the config has been applied
res, err := s.Util.FlinkAPIGet(newApp, "/jobmanager/config")
c.Assert(err, IsNil)
Expand All @@ -137,9 +137,9 @@ func (s *IntegSuite) TestSimple(c *C) {
_, err = s.Util.FlinkApps().Update(newApp)
c.Assert(err, IsNil)

c.Assert(s.Util.WaitForPhase(newApp.Name, v1alpha1.FlinkApplicationSavepointing, ""), IsNil)
c.Assert(s.Util.WaitForPhase(newApp.Name, v1beta1.FlinkApplicationSavepointing, ""), IsNil)
// we should end up in the DeployFailed phase
c.Assert(s.Util.WaitForPhase(newApp.Name, v1alpha1.FlinkApplicationDeployFailed, ""), IsNil)
c.Assert(s.Util.WaitForPhase(newApp.Name, v1beta1.FlinkApplicationDeployFailed, ""), IsNil)

log.Info("Job is in deploy failed, waiting for tasks to start")

Expand All @@ -166,7 +166,7 @@ func (s *IntegSuite) TestSimple(c *C) {
log.Info("Attempting to roll forward")

// and we should be able to roll forward by resubmitting with a fixed config
updateAndValidate(c, s, config.Name, func(app *v1alpha1.FlinkApplication) {
updateAndValidate(c, s, config.Name, func(app *v1beta1.FlinkApplication) {
app.Spec.JarName = config.Spec.JarName
app.Spec.RestartNonce = "rollback2"
}, "")
Expand All @@ -191,20 +191,20 @@ func (s *IntegSuite) TestSimple(c *C) {
newApp.Spec.TaskManagerConfig.Resources = &TaskManagerDefaultResources

_, _ = s.Util.FlinkApps().Update(newApp)
c.Assert(s.Util.WaitForPhase(newApp.Name, v1alpha1.FlinkApplicationClusterStarting, ""), IsNil)
c.Assert(s.Util.WaitForPhase(newApp.Name, v1beta1.FlinkApplicationClusterStarting, ""), IsNil)

// User realizes error and cancels the deploy
log.Infof("Cancelling deploy...")
newApp.Spec.ForceRollback = true
_, _ = s.Util.FlinkApps().Update(newApp)

// we should end up in the DeployFailed phase
c.Assert(s.Util.WaitForPhase(newApp.Name, v1alpha1.FlinkApplicationDeployFailed, ""), IsNil)
c.Assert(s.Util.WaitForPhase(newApp.Name, v1beta1.FlinkApplicationDeployFailed, ""), IsNil)
c.Assert(newApp.Spec.ForceRollback, Equals, true)
log.Info("User cancelled deploy. Job is in deploy failed, waiting for tasks to start")

// but the job should still be running
c.Assert(newApp.Status.JobStatus.State, Equals, v1alpha1.Running)
c.Assert(newApp.Status.JobStatus.State, Equals, v1beta1.Running)
log.Info("Attempting to roll forward with fix")

// Fixing update
Expand All @@ -219,7 +219,7 @@ func (s *IntegSuite) TestSimple(c *C) {
},
}
// and we should be able to roll forward by resubmitting with a fixed config
updateAndValidate(c, s, config.Name, func(app *v1alpha1.FlinkApplication) {
updateAndValidate(c, s, config.Name, func(app *v1beta1.FlinkApplication) {
app.Spec.TaskManagerConfig.Resources = &TaskManagerFixedResources
app.Spec.ForceRollback = false
}, "")
Expand All @@ -229,7 +229,7 @@ func (s *IntegSuite) TestSimple(c *C) {
c.Assert(s.Util.FlinkApps().Delete(config.Name, &v1.DeleteOptions{}), IsNil)

// validate that a savepoint was taken and the job was cancelled
var app *v1alpha1.FlinkApplication
var app *v1beta1.FlinkApplication
for {
app, err = s.Util.GetFlinkApplication(config.Name)
c.Assert(err, IsNil)
Expand Down Expand Up @@ -297,7 +297,7 @@ func (s *IntegSuite) TestRecovery(c *C) {
c.Log("Application Created")

// wait for it to be running
c.Assert(s.Util.WaitForPhase(config.Name, v1alpha1.FlinkApplicationRunning, v1alpha1.FlinkApplicationDeployFailed), IsNil)
c.Assert(s.Util.WaitForPhase(config.Name, v1beta1.FlinkApplicationRunning, v1beta1.FlinkApplicationDeployFailed), IsNil)
c.Assert(s.Util.WaitForAllTasksInState(config.Name, "RUNNING"), IsNil)

c.Log("Application running")
Expand Down Expand Up @@ -347,7 +347,7 @@ func (s *IntegSuite) TestRecovery(c *C) {
}

c.Assert(err, IsNil)
c.Assert(s.Util.WaitForPhase(config.Name, v1alpha1.FlinkApplicationRunning, v1alpha1.FlinkApplicationDeployFailed), IsNil)
c.Assert(s.Util.WaitForPhase(config.Name, v1beta1.FlinkApplicationRunning, v1beta1.FlinkApplicationDeployFailed), IsNil)

// stop it from failing
c.Assert(os.Remove(s.Util.CheckpointDir+"/fail"), IsNil)
Expand Down
2 changes: 1 addition & 1 deletion integ/test_app.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
apiVersion: flink.k8s.io/v1alpha1
apiVersion: flink.k8s.io/v1beta1
kind: FlinkApplication
metadata:
name: operator-test-app
Expand Down
20 changes: 10 additions & 10 deletions integ/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (
"time"

"github.com/go-resty/resty"
"github.com/lyft/flinkk8soperator/pkg/apis/app/v1alpha1"
v1alpha12 "github.com/lyft/flinkk8soperator/pkg/client/clientset/versioned/typed/app/v1alpha1"
flinkapp "github.com/lyft/flinkk8soperator/pkg/apis/app/v1beta1"
client "github.com/lyft/flinkk8soperator/pkg/client/clientset/versioned/typed/app/v1beta1"
"github.com/prometheus/common/log"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -327,13 +327,13 @@ func (f *TestUtil) TailOperatorLogs() error {
return nil
}

func (f *TestUtil) ReadFlinkApplication(path string) (*v1alpha1.FlinkApplication, error) {
func (f *TestUtil) ReadFlinkApplication(path string) (*flinkapp.FlinkApplication, error) {
file, err := getFile(path)
if err != nil {
return nil, err
}

app := v1alpha1.FlinkApplication{}
app := flinkapp.FlinkApplication{}
err = yaml.NewYAMLOrJSONDecoder(file, 2048).Decode(&app)
if err != nil {
return nil, err
Expand All @@ -344,20 +344,20 @@ func (f *TestUtil) ReadFlinkApplication(path string) (*v1alpha1.FlinkApplication
return &app, nil
}

func (f *TestUtil) FlinkApps() v1alpha12.FlinkApplicationInterface {
return f.FlinkApplicationClient.FlinkV1alpha1().FlinkApplications(f.Namespace.Name)
func (f *TestUtil) FlinkApps() client.FlinkApplicationInterface {
return f.FlinkApplicationClient.FlinkV1beta1().FlinkApplications(f.Namespace.Name)
}

func (f *TestUtil) CreateFlinkApplication(application *v1alpha1.FlinkApplication) error {
func (f *TestUtil) CreateFlinkApplication(application *flinkapp.FlinkApplication) error {
_, err := f.FlinkApps().Create(application)
return err
}

func (f *TestUtil) GetFlinkApplication(name string) (*v1alpha1.FlinkApplication, error) {
func (f *TestUtil) GetFlinkApplication(name string) (*flinkapp.FlinkApplication, error) {
return f.FlinkApps().Get(name, metav1.GetOptions{})
}

func (f *TestUtil) WaitForPhase(name string, phase v1alpha1.FlinkApplicationPhase, failurePhases ...v1alpha1.FlinkApplicationPhase) error {
func (f *TestUtil) WaitForPhase(name string, phase flinkapp.FlinkApplicationPhase, failurePhases ...flinkapp.FlinkApplicationPhase) error {
for {
app, err := f.FlinkApps().Get(name, metav1.GetOptions{})

Expand All @@ -379,7 +379,7 @@ func (f *TestUtil) WaitForPhase(name string, phase v1alpha1.FlinkApplicationPhas
}
}

func (f *TestUtil) FlinkAPIGet(app *v1alpha1.FlinkApplication, endpoint string) (interface{}, error) {
func (f *TestUtil) FlinkAPIGet(app *flinkapp.FlinkApplication, endpoint string) (interface{}, error) {
url := fmt.Sprintf("http://localhost:8001/api/v1/namespaces/%s/"+
"services/%s:8081/proxy/%s",
f.Namespace.Name, app.Name, endpoint)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
package apis

import (
"github.com/lyft/flinkk8soperator/pkg/apis/app/v1alpha1"
"github.com/lyft/flinkk8soperator/pkg/apis/app/v1beta1"
)

func init() {
// Register the types with the Scheme so the components can map objects to GroupVersionKinds and back
AddToSchemes = append(AddToSchemes, v1alpha1.SchemeBuilder.AddToScheme)
AddToSchemes = append(AddToSchemes, v1beta1.SchemeBuilder.AddToScheme)
}
3 changes: 3 additions & 0 deletions pkg/apis/app/v1beta1/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
// +k8s:deepcopy-gen=package
// +groupName=flink.k8s.io
package v1beta1
42 changes: 42 additions & 0 deletions pkg/apis/app/v1beta1/register.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package v1beta1

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
)

const (
version = "v1beta1"
groupName = "flink.k8s.io"

FlinkApplicationKind = "FlinkApplication"
)

var (
SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes)
AddToScheme = SchemeBuilder.AddToScheme
// SchemeGroupVersion is the group version used to register these objects.
SchemeGroupVersion = schema.GroupVersion{Group: groupName, Version: version}
)

// GetKind takes an unqualified kind and returns back a Group qualified GroupKind
func Kind(kind string) schema.GroupKind {
return SchemeGroupVersion.WithKind(kind).GroupKind()
}

// Resource takes an unqualified resource and returns a Group qualified GroupResource
func Resource(resource string) schema.GroupResource {
return SchemeGroupVersion.WithResource(resource).GroupResource()
}

// addKnownTypes adds the set of types defined in this package to the supplied scheme.
func addKnownTypes(scheme *runtime.Scheme) error {
scheme.AddKnownTypes(SchemeGroupVersion,
&FlinkApplication{},
&FlinkApplicationList{},
)

metav1.AddToGroupVersion(scheme, SchemeGroupVersion)
return nil
}
Loading

0 comments on commit f687fe5

Please sign in to comment.