Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft: Performance mega boost - queue per app #1990

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ USER root
COPY --from=builder /usr/bin/spark-operator /usr/bin/
RUN apt-get update --allow-releaseinfo-change \
&& apt-get update \
&& apt-get install -y tini \
&& apt-get install -y tini htop\
&& rm -rf /var/lib/apt/lists/*

COPY entrypoint.sh /usr/bin/
COPY export-pprof.sh /usr/bin/

ENTRYPOINT ["/usr/bin/entrypoint.sh"]
2 changes: 2 additions & 0 deletions charts/spark-operator-chart/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ See [helm uninstall](https://helm.sh/docs/helm/helm_uninstall) for command docum
| imagePullSecrets | list | `[]` | Image pull secrets |
| ingressUrlFormat | string | `""` | Ingress URL format. Requires the UI service to be enabled by setting `uiService.enable` to true. |
| istio.enabled | bool | `false` | When using `istio`, spark jobs need to run without a sidecar to properly terminate |
| k8sBurstQps | int | `200` | K8s Api burst config |
| k8sQps | int | `100` | K8s Api qps config |
| labelSelectorFilter | string | `""` | A comma-separated list of key=value, or key labels to filter resources during watch and list based on the specified labels. |
| leaderElection.lockName | string | `"spark-operator-lock"` | Leader election lock name. Ref: https://github.com/kubeflow/spark-operator/blob/master/docs/user-guide.md#enabling-leader-election-for-high-availability. |
| leaderElection.lockNamespace | string | `""` | Optionally store the lock in another namespace. Defaults to operator's namespace |
Expand Down
5 changes: 5 additions & 0 deletions charts/spark-operator-chart/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ spec:
args:
- -v={{ .Values.logLevel }}
- -logtostderr
- -max-queue-time-without-update-in-minutes=30
- -queue-cleaner-interval-in-minutes=10
Comment on lines +77 to +78
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make these two values configurable? Would there be a possibility for users to modify these values?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure I will

- -enable-profiling=false
- -api-qps={{ .Values.k8sBurstQps }}
- -api-burst={{ .Values.k8sQps }}
{{- if eq (len $jobNamespaces) 1 }}
- -namespace={{ index $jobNamespaces 0 }}
{{- end }}
Expand Down
6 changes: 6 additions & 0 deletions charts/spark-operator-chart/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ sparkJobNamespaces:
# -- Operator concurrency, higher values might increase memory usage
controllerThreads: 10

# -- K8s Api qps config
k8sQps: 100

# -- K8s Api burst config
k8sBurstQps: 200

# -- Operator resync interval. Note that the operator will respond to events (e.g. create, update)
# unrelated to this setting
resyncInterval: 30
Expand Down
25 changes: 25 additions & 0 deletions docs/api-docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -2873,6 +2873,31 @@ string
</tr>
<tr>
<td>
<code>memoryLimit</code><br/>
<em>
string
</em>
</td>
<td>
<em>(Optional)</em>
<p>MemoryLimit is the pod limit, used in cases we want to enable the memory limit for the pod.</p>
</td>
</tr>
<tr>
<td>
<code>memoryRequestOverride</code><br/>
<em>
string
</em>
</td>
<td>
<em>(Optional)</em>
<p>MemoryRequestOverride is an optional paramter, enables to define pod memory request that is less than the java memory + overhead
Which is used in spark by default.</p>
</td>
</tr>
<tr>
<td>
<code>gpu</code><br/>
<em>
<a href="#sparkoperator.k8s.io/v1beta2.GPUSpec">
Expand Down
41 changes: 41 additions & 0 deletions export-pprof.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#!/bin/bash

# Generate a timestamp
timestamp=$(date +%Y-%m-%d_%H-%M-%S)

# Function to fetch goroutine profile
fetch_goroutine_profile() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this script used anywhere? Couldn't find the references or docs.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This script is used for manual profiling which we've added as part of this feature (disabled by default). I probably need to add some documenting on how to use it

local profile_type=$1 # 'start' or 'end'
local profile_path="/tmp/goroutine-profile-debug1-$profile_type-$timestamp.prof"
echo "Starting to fetch Goroutine profile ($profile_type) at $(date)..."
curl "http://localhost:6060/debug/pprof/goroutine?debug=1" -o "$profile_path"
echo "Completed fetching Goroutine profile ($profile_type) at $(date). File saved to $profile_path"
}

echo "Starting to fetch CPU profile at $(date)..."
cpu_profile="/tmp/cpu-profile-30sec-$timestamp.prof"
curl "http://localhost:6060/debug/pprof/profile?seconds=30" -o "$cpu_profile" &
echo "CPU profile fetch initiated, running for 30 seconds..."

echo "Starting to fetch Trace profile at $(date)..."
trace_profile="/tmp/trace-profile-30sec-$timestamp.prof"
curl "http://localhost:6060/debug/pprof/trace?seconds=30" -o "$trace_profile" &
echo "Trace profile fetch initiated, running for 30 seconds..."

echo "Fetching initial Goroutine profile..."
fetch_goroutine_profile "start" &

# Wait for CPU and trace profiling to complete
wait

echo "Starting to fetch final Goroutine profile after waiting for other profiles to complete..."
fetch_goroutine_profile "end"

echo "All profiling data collected"

# Copying profiles to S3 bucket
echo "CPU profile output - $cpu_profile"
echo "Trace profile output - $trace_profile"
echo "Initial Goroutine profile output - /tmp/goroutine-profile-debug1-start-$timestamp.prof"
echo "Final Goroutine profile output - /tmp/goroutine-profile-debug1-end-$timestamp.prof"

81 changes: 52 additions & 29 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"context"
"flag"
"fmt"
"log"
"net/http"
"os"
"os/signal"
"strings"
Expand Down Expand Up @@ -49,37 +51,48 @@ import (
"github.com/kubeflow/spark-operator/pkg/controller/sparkapplication"
"github.com/kubeflow/spark-operator/pkg/util"
"github.com/kubeflow/spark-operator/pkg/webhook"

_ "net/http/pprof"
"runtime"
)

var (
master = flag.String("master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
kubeConfig = flag.String("kubeConfig", "", "Path to a kube config. Only required if out-of-cluster.")
controllerThreads = flag.Int("controller-threads", 10, "Number of worker threads used by the SparkApplication controller.")
resyncInterval = flag.Int("resync-interval", 30, "Informer resync interval in seconds.")
namespace = flag.String("namespace", apiv1.NamespaceAll, "The Kubernetes namespace to manage. Will manage custom resource objects of the managed CRD types for the whole cluster if unset.")
labelSelectorFilter = flag.String("label-selector-filter", "", "A comma-separated list of key=value, or key labels to filter resources during watch and list based on the specified labels.")
enableWebhook = flag.Bool("enable-webhook", false, "Whether to enable the mutating admission webhook for admitting and patching Spark pods.")
webhookTimeout = flag.Int("webhook-timeout", 30, "Webhook Timeout in seconds before the webhook returns a timeout")
enableResourceQuotaEnforcement = flag.Bool("enable-resource-quota-enforcement", false, "Whether to enable ResourceQuota enforcement for SparkApplication resources. Requires the webhook to be enabled.")
ingressURLFormat = flag.String("ingress-url-format", "", "Ingress URL format.")
enableUIService = flag.Bool("enable-ui-service", true, "Enable Spark service UI.")
enableLeaderElection = flag.Bool("leader-election", false, "Enable Spark operator leader election.")
leaderElectionLockNamespace = flag.String("leader-election-lock-namespace", "spark-operator", "Namespace in which to create the ConfigMap for leader election.")
leaderElectionLockName = flag.String("leader-election-lock-name", "spark-operator-lock", "Name of the ConfigMap for leader election.")
leaderElectionLeaseDuration = flag.Duration("leader-election-lease-duration", 15*time.Second, "Leader election lease duration.")
leaderElectionRenewDeadline = flag.Duration("leader-election-renew-deadline", 14*time.Second, "Leader election renew deadline.")
leaderElectionRetryPeriod = flag.Duration("leader-election-retry-period", 4*time.Second, "Leader election retry period.")
enableBatchScheduler = flag.Bool("enable-batch-scheduler", false, fmt.Sprintf("Enable batch schedulers for pods' scheduling, the available batch schedulers are: (%s).", strings.Join(batchscheduler.GetRegisteredNames(), ",")))
enableMetrics = flag.Bool("enable-metrics", false, "Whether to enable the metrics endpoint.")
metricsPort = flag.String("metrics-port", "10254", "Port for the metrics endpoint.")
metricsEndpoint = flag.String("metrics-endpoint", "/metrics", "Metrics endpoint.")
metricsPrefix = flag.String("metrics-prefix", "", "Prefix for the metrics.")
ingressClassName = flag.String("ingress-class-name", "", "Set ingressClassName for ingress resources created.")
metricsLabels util.ArrayFlags
metricsJobStartLatencyBuckets util.HistogramBuckets = util.DefaultJobStartLatencyBuckets
master = flag.String("master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
kubeConfig = flag.String("kubeConfig", "", "Path to a kube config. Only required if out-of-cluster.")
controllerThreads = flag.Int("controller-threads", 10, "Number of app queues map that will be created and used by the SparkApplication controller.")
enableProfiling = flag.Bool("enable-profiling", false, "Whether to enable pprof server profiling")
resyncInterval = flag.Int("resync-interval", 30, "Informer resync interval in seconds.")
namespace = flag.String("namespace", apiv1.NamespaceAll, "The Kubernetes namespace to manage. Will manage custom resource objects of the managed CRD types for the whole cluster if unset.")
labelSelectorFilter = flag.String("label-selector-filter", "", "A comma-separated list of key=value, or key labels to filter resources during watch and list based on the specified labels.")
enableWebhook = flag.Bool("enable-webhook", false, "Whether to enable the mutating admission webhook for admitting and patching Spark pods.")
webhookTimeout = flag.Int("webhook-timeout", 30, "Webhook Timeout in seconds before the webhook returns a timeout")
enableResourceQuotaEnforcement = flag.Bool("enable-resource-quota-enforcement", false, "Whether to enable ResourceQuota enforcement for SparkApplication resources. Requires the webhook to be enabled.")
ingressURLFormat = flag.String("ingress-url-format", "", "Ingress URL format.")
enableUIService = flag.Bool("enable-ui-service", true, "Enable Spark service UI.")
enableLeaderElection = flag.Bool("leader-election", false, "Enable Spark operator leader election.")
leaderElectionLockNamespace = flag.String("leader-election-lock-namespace", "spark-operator", "Namespace in which to create the ConfigMap for leader election.")
leaderElectionLockName = flag.String("leader-election-lock-name", "spark-operator-lock", "Name of the ConfigMap for leader election.")
leaderElectionLeaseDuration = flag.Duration("leader-election-lease-duration", 15*time.Second, "Leader election lease duration.")
leaderElectionRenewDeadline = flag.Duration("leader-election-renew-deadline", 14*time.Second, "Leader election renew deadline.")
leaderElectionRetryPeriod = flag.Duration("leader-election-retry-period", 4*time.Second, "Leader election retry period.")
enableBatchScheduler = flag.Bool("enable-batch-scheduler", false, fmt.Sprintf("Enable batch schedulers for pods' scheduling, the available batch schedulers are: (%s).", strings.Join(batchscheduler.GetRegisteredNames(), ",")))
enableMetrics = flag.Bool("enable-metrics", false, "Whether to enable the metrics endpoint.")
metricsPort = flag.String("metrics-port", "10254", "Port for the metrics endpoint.")
metricsEndpoint = flag.String("metrics-endpoint", "/metrics", "Metrics endpoint.")
metricsPrefix = flag.String("metrics-prefix", "", "Prefix for the metrics.")
ingressClassName = flag.String("ingress-class-name", "", "Set ingressClassName for ingress resources created.")
maxQueueTimeWithoutUpdateInMinutes = flag.Int("max-queue-time-without-update-in-minutes", 30, "Sets the maximum time that queue can be without update before it is considered as deleted.")
queueCleanerIntervalInMinutes = flag.Int("queue-cleaner-interval-in-minutes", 10, "Sets the interval time for the queue cleaner.")
apiQps = flag.Float64("api-qps", 100.00, "k8s api qps configuration")
apiBurst = flag.Int("api-burst", 200, "k8s api burst configuration")

metricsLabels util.ArrayFlags
metricsJobStartLatencyBuckets util.HistogramBuckets = util.DefaultJobStartLatencyBuckets
)

func main() {


flag.Var(&metricsLabels, "metrics-labels", "Labels for the metrics")
flag.Var(&metricsJobStartLatencyBuckets, "metrics-job-start-latency-buckets",
"Comma-separated boundary values (in seconds) for the job start latency histogram bucket; "+
Expand All @@ -91,6 +104,8 @@ func main() {
if err != nil {
glog.Fatal(err)
}
config.QPS = float32(*apiQps)
config.Burst = *apiBurst
kubeClient, err := clientset.NewForConfig(config)
if err != nil {
glog.Fatal(err)
Expand All @@ -102,6 +117,13 @@ func main() {
stopCh := make(chan struct{}, 1)
startCh := make(chan struct{}, 1)

go func() {
if *enableProfiling {
runtime.SetMutexProfileFraction(1) // Enable mutex profiling
log.Println(http.ListenAndServe("localhost:6060", nil))
}
}()

if *enableLeaderElection {
podName := os.Getenv("POD_NAME")
hostname, err := os.Hostname()
Expand Down Expand Up @@ -191,7 +213,7 @@ func main() {
}

applicationController := sparkapplication.NewController(
crClient, kubeClient, crInformerFactory, podInformerFactory, metricConfig, *namespace, *ingressURLFormat, *ingressClassName, batchSchedulerMgr, *enableUIService)
crClient, kubeClient, crInformerFactory, podInformerFactory, metricConfig, *namespace, *ingressURLFormat, *ingressClassName, batchSchedulerMgr, *enableUIService, *controllerThreads)
scheduledApplicationController := scheduledsparkapplication.NewController(
crClient, kubeClient, apiExtensionsClient, crInformerFactory, clock.RealClock{})

Expand Down Expand Up @@ -233,8 +255,9 @@ func main() {
}

glog.Info("Starting application controller goroutines")

if err = applicationController.Start(*controllerThreads, stopCh); err != nil {
queueCleanerIntervalDuration := time.Duration(*queueCleanerIntervalInMinutes) * time.Minute
maxQueueTimeWithoutUpdateDuration := time.Duration(*maxQueueTimeWithoutUpdateInMinutes) * time.Minute
if err = applicationController.Start(maxQueueTimeWithoutUpdateDuration, queueCleanerIntervalDuration, stopCh); err != nil {
glog.Fatal(err)
}
if err = scheduledApplicationController.Start(*controllerThreads, stopCh); err != nil {
Expand All @@ -248,7 +271,7 @@ func main() {
}

glog.Info("Shutting down the Spark Operator")
applicationController.Stop()
applicationController.Stop(stopCh)
scheduledApplicationController.Stop()
if *enableWebhook {
if err := hook.Stop(); err != nil {
Expand Down
7 changes: 7 additions & 0 deletions pkg/apis/sparkoperator.k8s.io/v1beta2/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,13 @@ type SparkPodSpec struct {
// MemoryOverhead is the amount of off-heap memory to allocate in cluster mode, in MiB unless otherwise specified.
// +optional
MemoryOverhead *string `json:"memoryOverhead,omitempty"`
// MemoryLimit is the pod limit, used in cases we want to enable the memory limit for the pod.
// +optional
MemoryLimit *string `json:"memoryLimit,omitempty"`
// MemoryRequestOverride is an optional paramter, enables to define pod memory request that is less than the java memory + overhead
// Which is used in spark by default.
// +optional
MemoryRequestOverride *string `json:"memoryRequestOverride,omitempty"`
// GPU specifies GPU requirement for the pod.
// +optional
GPU *GPUSpec `json:"gpu,omitempty"`
Expand Down
15 changes: 15 additions & 0 deletions pkg/apis/sparkoperator.k8s.io/v1beta2/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading