Skip to content

Commit

Permalink
use kubeflow repo
Browse files Browse the repository at this point in the history
  • Loading branch information
sigmarkarl committed Sep 3, 2024
2 parents db55f5c + 5209a59 commit d91f157
Show file tree
Hide file tree
Showing 7 changed files with 295 additions and 3 deletions.
4 changes: 2 additions & 2 deletions docs/developer-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ $ docker build -t <image-tag> -f Dockerfile.rh .
If you'd like to build/test the spark-operator locally, follow the instructions below:

```bash
$ mkdir -p $GOPATH/src/github.com/GoogleCloudPlatform
$ cd $GOPATH/src/github.com/GoogleCloudPlatform
$ mkdir -p $GOPATH/src/github.com/kubeflow
$ cd $GOPATH/src/github.com/kubeflow
$ git clone [email protected]:kubeflow/spark-operator.git
$ cd spark-on-k8s-operator
```
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/golang/glog v1.2.1
github.com/google/go-cloud v0.1.1
github.com/google/uuid v1.6.0
github.com/jellydator/ttlcache/v3 v3.2.0
github.com/olekukonko/tablewriter v0.0.5
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.19.1
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ github.com/imdario/mergo v0.3.12 h1:b6R2BslTbIEToALKP7LxUvijTsNI9TAe80pLWN2g/HU=
github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/jellydator/ttlcache/v3 v3.2.0 h1:6lqVJ8X3ZaUwvzENqPAobDsXNExfUJd61u++uW8a3LE=
github.com/jellydator/ttlcache/v3 v3.2.0/go.mod h1:hi7MGFdMAwZna5n2tuvh63DvFLzVKySzCVW6+0gA2n4=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
Expand Down Expand Up @@ -240,6 +242,8 @@ go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0=
go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo=
go.starlark.net v0.0.0-20230525235612-a134d8f9ddca h1:VdD38733bfYv5tUZwEIskMM93VanwNIi5bIKnDrJdEY=
go.starlark.net v0.0.0-20230525235612-a134d8f9ddca/go.mod h1:jxU+3+j+71eXOW14274+SmmuW82qJzl6iZSeqEtTGds=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
Expand Down
14 changes: 13 additions & 1 deletion pkg/controller/sparkapplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ type Controller struct {
enableUIService bool
disableExecutorReporting bool
executorsProcessingLimit int
submissionCache *SubmissionCache
}

// NewController creates a new Controller.
Expand Down Expand Up @@ -139,6 +140,7 @@ func newSparkApplicationController(
enableUIService: enableUIService,
disableExecutorReporting: disableExecutorReporting,
executorsProcessingLimit: executorsProcessingLimit,
submissionCache: NewSubmissionCache(5 * time.Minute),
}

if metricsConfig != nil {
Expand Down Expand Up @@ -566,7 +568,17 @@ func (c *Controller) syncSparkApplication(key string) error {
appCopy.Status.AppState.State = v1beta2.FailedState
appCopy.Status.AppState.ErrorMessage = err.Error()
} else {
appCopy = c.submitSparkApplication(appCopy)
// related to ofas, we received 2 times the CRD events with NewState state
// we need to check if the submission was already done
// and skip the submission if it was already done
if !c.submissionCache.Exist(key) {
appCopy = c.submitSparkApplication(appCopy)
// whatever the result of the submission, we don't want to retry the submission
c.submissionCache.Set(key)
} else {
glog.V(2).Infof("submission attempt already done for: %q, skip it", key)
return nil
}
}
case v1beta2.SucceedingState:
if !shouldRetry(appCopy) {
Expand Down
224 changes: 224 additions & 0 deletions pkg/controller/sparkapplication/controller_another_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
package sparkapplication

import (
"context"
"fmt"
"os"
"os/exec"
"testing"
"time"

"github.com/stretchr/testify/assert"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
kubeclientfake "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/record"

"github.com/kubeflow/spark-operator/pkg/apis/sparkoperator.k8s.io/v1beta2"
crdclientfake "github.com/kubeflow/spark-operator/pkg/client/clientset/versioned/fake"
crdinformers "github.com/kubeflow/spark-operator/pkg/client/informers/externalversions"
"github.com/kubeflow/spark-operator/pkg/util"
)

// separate this test with the original `controller_test.go` to simplify rebasing process in the future

// newAnotherFakeController is a copy of the function from the original controller_test
// except we don't enable the UIService (Behavior from ofas spark-operator)
func newAnotherFakeController(app *v1beta2.SparkApplication, pods ...*apiv1.Pod) (*Controller, *record.FakeRecorder) {
crdclientfake.AddToScheme(scheme.Scheme)
crdClient := crdclientfake.NewSimpleClientset()
kubeClient := kubeclientfake.NewSimpleClientset()
util.IngressCapabilities = map[string]bool{"networking.k8s.io/v1": true}
informerFactory := crdinformers.NewSharedInformerFactory(crdClient, 0*time.Second)
recorder := record.NewFakeRecorder(3)

kubeClient.CoreV1().Nodes().Create(context.TODO(), &apiv1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
},
Status: apiv1.NodeStatus{
Addresses: []apiv1.NodeAddress{
{
Type: apiv1.NodeExternalIP,
Address: "12.34.56.78",
},
},
},
}, metav1.CreateOptions{})

podInformerFactory := informers.NewSharedInformerFactory(kubeClient, 0*time.Second)
controller := newSparkApplicationController(crdClient, kubeClient, informerFactory, podInformerFactory, recorder,
&util.MetricConfig{}, "", "", nil, false, false, util.RatelimitConfig{}, 5)

informer := informerFactory.Sparkoperator().V1beta2().SparkApplications().Informer()
if app != nil {
informer.GetIndexer().Add(app)
}

podInformer := podInformerFactory.Core().V1().Pods().Informer()
for _, pod := range pods {
if pod != nil {
podInformer.GetIndexer().Add(pod)
}
}
return controller, recorder
}

func TestSyncSparkApplication_When_Submission_Successes(t *testing.T) {

/*
test normal case when the submission is successes
we received 2 times the NewState state for each CRD
- first time, the controller will submit the application
- second time, the controller should skip the submission
Check submission is done only once and the expected state is SubmittedState
*/

originalSparkHome := os.Getenv(sparkHomeEnvVar)
originalKubernetesServiceHost := os.Getenv(kubernetesServiceHostEnvVar)
originalKubernetesServicePort := os.Getenv(kubernetesServicePortEnvVar)
os.Setenv(sparkHomeEnvVar, "/spark")
os.Setenv(kubernetesServiceHostEnvVar, "localhost")
os.Setenv(kubernetesServicePortEnvVar, "443")
defer func() {
os.Setenv(sparkHomeEnvVar, originalSparkHome)
os.Setenv(kubernetesServiceHostEnvVar, originalKubernetesServiceHost)
os.Setenv(kubernetesServicePortEnvVar, originalKubernetesServicePort)
}()

restartPolicyNever := v1beta2.RestartPolicy{
Type: v1beta2.Never,
}

// Create a new SparkApplication with NewState
app := &v1beta2.SparkApplication{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: "default",
},
Spec: v1beta2.SparkApplicationSpec{
RestartPolicy: restartPolicyNever,
},
Status: v1beta2.SparkApplicationStatus{
AppState: v1beta2.ApplicationState{
State: v1beta2.NewState,
},
},
}

ctrl, _ := newAnotherFakeController(app)
_, err := ctrl.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Create(context.TODO(), app, metav1.CreateOptions{})
if err != nil {
t.Fatal(err)
}

// Mock the execCommand to return a success
execCommand = func(command string, args ...string) *exec.Cmd {
cs := []string{"-test.run=TestHelperProcessSuccess", "--", command}
cs = append(cs, args...)
cmd := exec.Command(os.Args[0], cs...)
cmd.Env = []string{"GO_WANT_HELPER_PROCESS=1"}
return cmd
}

// simulate the first NewState
err = ctrl.syncSparkApplication(fmt.Sprintf("%s/%s", app.Namespace, app.Name))
assert.Nil(t, err)
updatedApp, err := ctrl.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Get(context.TODO(), app.Name, metav1.GetOptions{})
assert.Nil(t, err)
assert.Equal(t, v1beta2.SubmittedState, updatedApp.Status.AppState.State)
assert.Equal(t, float64(1), fetchCounterValue(ctrl.metrics.sparkAppSubmitCount, map[string]string{}))

// simulate the second NewState (should skip the submission)
err = ctrl.syncSparkApplication(fmt.Sprintf("%s/%s", app.Namespace, app.Name))
assert.Nil(t, err)
updatedApp, err = ctrl.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Get(context.TODO(), app.Name, metav1.GetOptions{})
assert.Nil(t, err)
// check the state is still submitted
assert.Equal(t, v1beta2.SubmittedState, updatedApp.Status.AppState.State)
// check the submit count does not change
assert.Equal(t, float64(1), fetchCounterValue(ctrl.metrics.sparkAppSubmitCount, map[string]string{}))
}

func TestSyncSparkApplication_When_Submission_Fails(t *testing.T) {
/*
test case when the submission is failed the first time
we received 2 times the NewState state for each CRD
- first time, the controller will submit the application and failed
- second time, the controller should skip the submission
Check submission is done only once and the expected state is FailedSubmissionState
*/
originalSparkHome := os.Getenv(sparkHomeEnvVar)
originalKubernetesServiceHost := os.Getenv(kubernetesServiceHostEnvVar)
originalKubernetesServicePort := os.Getenv(kubernetesServicePortEnvVar)
os.Setenv(sparkHomeEnvVar, "/spark")
os.Setenv(kubernetesServiceHostEnvVar, "localhost")
os.Setenv(kubernetesServicePortEnvVar, "443")
defer func() {
os.Setenv(sparkHomeEnvVar, originalSparkHome)
os.Setenv(kubernetesServiceHostEnvVar, originalKubernetesServiceHost)
os.Setenv(kubernetesServicePortEnvVar, originalKubernetesServicePort)
}()

restartPolicyNever := v1beta2.RestartPolicy{
Type: v1beta2.Never,
}

// Create a new SparkApplication with NewState
app := &v1beta2.SparkApplication{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: "default",
},
Spec: v1beta2.SparkApplicationSpec{
RestartPolicy: restartPolicyNever,
},
Status: v1beta2.SparkApplicationStatus{
AppState: v1beta2.ApplicationState{
State: v1beta2.NewState,
},
},
}

ctrl, _ := newAnotherFakeController(app)
_, err := ctrl.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Create(context.TODO(), app, metav1.CreateOptions{})
if err != nil {
t.Fatal(err)
}

// Mock the execCommand to return a failure
execCommand = func(command string, args ...string) *exec.Cmd {
cmd := exec.Command("/bin/should-fail")
return cmd
}

err = ctrl.syncSparkApplication(fmt.Sprintf("%s/%s", app.Namespace, app.Name))
assert.Nil(t, err)
updatedApp, err := ctrl.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Get(context.TODO(), app.Name, metav1.GetOptions{})
assert.Nil(t, err)
assert.Equal(t, v1beta2.FailedSubmissionState, updatedApp.Status.AppState.State)
assert.Equal(t, float64(0), fetchCounterValue(ctrl.metrics.sparkAppSubmitCount, map[string]string{}))
assert.Equal(t, float64(1), fetchCounterValue(ctrl.metrics.sparkAppFailedSubmissionCount, map[string]string{}))

// simulate the second NewState (should skip the submission)

// This time, mock the command to be successful, but we expected the command is not executed
execCommand = func(command string, args ...string) *exec.Cmd {
cs := []string{"-test.run=TestHelperProcessSuccess", "--", command}
cs = append(cs, args...)
cmd := exec.Command(os.Args[0], cs...)
cmd.Env = []string{"GO_WANT_HELPER_PROCESS=1"}
return cmd
}
err = ctrl.syncSparkApplication(fmt.Sprintf("%s/%s", app.Namespace, app.Name))
assert.Nil(t, err)
updatedApp, err = ctrl.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Get(context.TODO(), app.Name, metav1.GetOptions{})
assert.Nil(t, err)
// check the CR state is still failedSubmission
assert.Equal(t, v1beta2.FailedSubmissionState, updatedApp.Status.AppState.State)
// check the submit count does not change
assert.Equal(t, float64(0), fetchCounterValue(ctrl.metrics.sparkAppSubmitCount, map[string]string{}))
assert.Equal(t, float64(1), fetchCounterValue(ctrl.metrics.sparkAppFailedSubmissionCount, map[string]string{}))
}
34 changes: 34 additions & 0 deletions pkg/controller/sparkapplication/submission_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package sparkapplication

/*
A simple cache implementation that stores keys with a time-to-live (TTL) value.
*/

import (
"github.com/jellydator/ttlcache/v3"
"time"
)

type SubmissionCache struct {
cache *ttlcache.Cache[string, any] // value is not used
}

func NewSubmissionCache(ttl time.Duration) *SubmissionCache {
cache := ttlcache.New[string, any](
ttlcache.WithTTL[string, any](ttl),
)

c := &SubmissionCache{
cache: cache,
}
go cache.Start() // start the cache cleanup goroutine
return c
}

func (c *SubmissionCache) Set(key string) {
c.cache.Set(key, nil, ttlcache.DefaultTTL)
}

func (c *SubmissionCache) Exist(key string) bool {
return c.cache.Has(key)
}
17 changes: 17 additions & 0 deletions pkg/controller/sparkapplication/submission_cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package sparkapplication

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestSubmissionCacheExist(t *testing.T) {
cache := NewSubmissionCache(1 * time.Second)
assert.False(t, cache.Exist("key1"))
cache.Set("key1")
assert.True(t, cache.Exist("key1"))
time.Sleep(2 * time.Second)
assert.False(t, cache.Exist("key1"))
}

0 comments on commit d91f157

Please sign in to comment.