Skip to content

Commit

Permalink
[Backward Incompatible] Upgrade controller runtime and Multi Namespac…
Browse files Browse the repository at this point in the history
…e support for Flink operator (lyft#39)

* Upgrade controller runtime and Multi Namespace support
* Fix lint and also event recorder
* Upgrade to correct right stdlib version

Note: This change will cause a restart/rollover of existing flink applications when the operator is deployed.
  • Loading branch information
anandswaminathan authored Jul 3, 2019
1 parent 9aaa992 commit f920664
Show file tree
Hide file tree
Showing 17 changed files with 249 additions and 306 deletions.
405 changes: 161 additions & 244 deletions Gopkg.lock

Large diffs are not rendered by default.

27 changes: 15 additions & 12 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,34 +6,37 @@ required = [
"k8s.io/code-generator/cmd/client-gen",
"k8s.io/code-generator/cmd/lister-gen",
"k8s.io/code-generator/cmd/informer-gen",
"k8s.io/code-generator/cmd/openapi-gen",
"k8s.io/gengo/args",
]

[[constraint]]
name = "sigs.k8s.io/controller-runtime"
version = "^0.1.0"

[[override]]
name = "k8s.io/code-generator"
version = "kubernetes-1.11.2"
version = "kubernetes-1.14.1"

[[constraint]]
name = "sigs.k8s.io/controller-runtime"
version = "v0.2.0-beta.3"

[[override]]
name = "k8s.io/api"
version = "kubernetes-1.11.2"
version = "kubernetes-1.14.1"

[[override]]
name = "k8s.io/apimachinery"
version = "kubernetes-1.11.2"
version = "kubernetes-1.14.1"

[[constraint]]
[[override]]
name = "k8s.io/client-go"
version = "kubernetes-1.11.2"
version = "kubernetes-1.14.1"

[[override]]
name = "k8s.io/apiextensions-apiserver"
version = "kubernetes-1.11.2"
version = "kubernetes-1.14.1"

[[override]]
name = "gopkg.in/fsnotify.v1"
source = "https://github.com/fsnotify/fsnotify.git"

[[constraint]]
name = "github.com/lyft/flytestdlib"
version = "0.2.8"
version = "0.2.10"
23 changes: 18 additions & 5 deletions cmd/flinkk8soperator/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"flag"
"fmt"
"os"
"strings"

"sigs.k8s.io/controller-runtime/pkg/cache"

"github.com/lyft/flytestdlib/config/viper"
"github.com/lyft/flytestdlib/version"
Expand Down Expand Up @@ -145,11 +148,21 @@ func operatorEntryPoint(ctx context.Context, metricsScope promutils.Scope,
return nil, err
}

// Create a new Cmd to provide shared dependencies and start components
mgr, err := manager.New(cfg, manager.Options{
Namespace: controllerCfg.LimitNamespace,
SyncPeriod: &controllerCfg.ResyncPeriod.Duration,
})
limitNameSpace := strings.TrimSpace(controllerCfg.LimitNamespace)
var mgr manager.Manager

if limitNameSpace == "" {
mgr, err = manager.New(cfg, manager.Options{
SyncPeriod: &controllerCfg.ResyncPeriod.Duration,
})
} else {
namespaceList := strings.Split(limitNameSpace, ",")
mgr, err = manager.New(cfg, manager.Options{
NewCache: cache.MultiNamespacedCacheBuilder(namespaceList),
SyncPeriod: &controllerCfg.ResyncPeriod.Duration,
})
}

if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion local_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@ logger:
show-source: true
level: 5
formatter:
type: text
type: text
8 changes: 0 additions & 8 deletions pkg/client/clientset/versioned/clientset.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 0 additions & 5 deletions pkg/client/clientset/versioned/fake/clientset_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 9 additions & 7 deletions pkg/client/clientset/versioned/fake/register.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 9 additions & 7 deletions pkg/client/clientset/versioned/scheme/register.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 3 additions & 5 deletions pkg/controller/flink/flink.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,8 @@ import (
"fmt"
"time"

"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/manager"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"

"github.com/lyft/flinkk8soperator/pkg/controller/common"

Expand Down Expand Up @@ -95,15 +93,15 @@ type ControllerInterface interface {
CompareAndUpdateJobStatus(ctx context.Context, app *v1alpha1.FlinkApplication, hash string) (bool, error)
}

func NewController(k8sCluster k8.ClusterInterface, mgr manager.Manager, config controllerConfig.RuntimeConfig) ControllerInterface {
func NewController(k8sCluster k8.ClusterInterface, eventRecorder record.EventRecorder, config controllerConfig.RuntimeConfig) ControllerInterface {
metrics := newControllerMetrics(config.MetricsScope)
return &Controller{
k8Cluster: k8sCluster,
jobManager: NewJobManagerController(k8sCluster, config),
taskManager: NewTaskManagerController(k8sCluster, config),
flinkClient: client.NewFlinkJobManagerClient(config),
metrics: metrics,
eventRecorder: mgr.GetRecorder(controllerConfig.AppName),
eventRecorder: eventRecorder,
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/flink/flink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
const testImage = "123.xyz.com/xx:11ae1218924428faabd9b64423fa0c332efba6b2"

// Note: if you find yourself changing this to fix a test, that should be treated as a breaking API change
const testAppHash = "718222d3"
const testAppHash = "cb56c9a1"
const testAppName = "app-name"
const testNamespace = "ns"
const testJobID = "j1"
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/flink/job_manager_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func TestJobManagerCreateSuccess(t *testing.T) {
"flink-job-properties": "jarName: test.jar\nparallelism: 8\nentryClass:com.test.MainClass\nprogramArgs:\"--test\"",
}
app.Annotations = annotations
hash := "922eff1b"
hash := "334c7c5d"
expectedLabels := map[string]string{
"flink-app": "app-name",
"flink-app-hash": hash,
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/flink/task_manager_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func TestTaskManagerCreateSuccess(t *testing.T) {
"flink-job-properties": "jarName: test.jar\nparallelism: 8\nentryClass:com.test.MainClass\nprogramArgs:\"--test\"",
}

hash := "922eff1b"
hash := "334c7c5d"

app.Annotations = annotations
expectedLabels := map[string]string{
Expand Down
3 changes: 2 additions & 1 deletion pkg/controller/flinkapplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ func (r *ReconcileFlinkApplication) Reconcile(request reconcile.Request) (reconc
// and Start it when the Manager is Started.
func Add(ctx context.Context, mgr manager.Manager, cfg config.RuntimeConfig) error {
k8sCluster := k8.NewK8Cluster(mgr)
flinkStateMachine := NewFlinkStateMachine(k8sCluster, mgr, cfg)
eventRecorder := mgr.GetEventRecorderFor(config.AppName)
flinkStateMachine := NewFlinkStateMachine(k8sCluster, eventRecorder, cfg)

metrics := newReconcilerMetrics(cfg.MetricsScope)
reconciler := ReconcileFlinkApplication{
Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/flinkapplication/flink_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"time"

"sigs.k8s.io/controller-runtime/pkg/manager"
"k8s.io/client-go/tools/record"

"github.com/pkg/errors"

Expand Down Expand Up @@ -589,12 +589,12 @@ func (s *FlinkStateMachine) handleApplicationDeleting(ctx context.Context, app *
return nil
}

func NewFlinkStateMachine(k8sCluster k8.ClusterInterface, mgr manager.Manager, config config.RuntimeConfig) FlinkHandlerInterface {
func NewFlinkStateMachine(k8sCluster k8.ClusterInterface, eventRecorder record.EventRecorder, config config.RuntimeConfig) FlinkHandlerInterface {

metrics := newStateMachineMetrics(config.MetricsScope)
return &FlinkStateMachine{
k8Cluster: k8sCluster,
flinkController: flink.NewController(k8sCluster, mgr, config),
flinkController: flink.NewController(k8sCluster, eventRecorder, config),
clock: clock.RealClock{},
metrics: metrics,
}
Expand Down
11 changes: 7 additions & 4 deletions pkg/controller/k8/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,11 @@ func (k *Cluster) GetDeploymentsWithLabel(ctx context.Context, namespace string,
options := &client.ListOptions{
LabelSelector: labelSelector,
}
err := k.cache.List(ctx, options, deploymentList)
listOptionsFunc := client.UseListOptions(options)
err := k.cache.List(ctx, deploymentList, listOptionsFunc)
if err != nil {
if IsK8sObjectDoesNotExist(err) {
err := k.client.List(ctx, options, deploymentList)
err := k.client.List(ctx, deploymentList, listOptionsFunc)
if err != nil {
logger.Warnf(ctx, "Failed to list deployments %v", err)
return nil, err
Expand All @@ -111,10 +112,12 @@ func (k *Cluster) GetServicesWithLabel(ctx context.Context, namespace string, la
options := &client.ListOptions{
LabelSelector: labelSelector,
}
err := k.cache.List(ctx, options, serviceList)
listOptionsFunc := client.UseListOptions(options)

err := k.cache.List(ctx, serviceList, listOptionsFunc)
if err != nil {
if IsK8sObjectDoesNotExist(err) {
err := k.client.List(ctx, options, serviceList)
err := k.client.List(ctx, serviceList, listOptionsFunc)
if err != nil {
logger.Warnf(ctx, "Failed to list services %v", err)
return nil, err
Expand Down

0 comments on commit f920664

Please sign in to comment.