From 3802af8e641a1970559036c5c7f08cdeb3c2dee5 Mon Sep 17 00:00:00 2001 From: Jeff Ohrstrom Date: Tue, 11 Apr 2023 13:26:30 -0400 Subject: [PATCH] add the ability to override conf (#27) Add the ability to supply a different config file and fix several bugs. * fill in pitzer available memory. * use nproc instead of assuming cores. * fix workers to actually run on nodes. --- form.yml.erb | 3 ++ template/before.sh.erb | 70 ++++++++++++++++++++++++++++++++---------- 2 files changed, 56 insertions(+), 17 deletions(-) diff --git a/form.yml.erb b/form.yml.erb index e3f78da..6b05c6c 100644 --- a/form.yml.erb +++ b/form.yml.erb @@ -15,6 +15,7 @@ form: - bc_num_slots - node_type - num_workers + - spark_configuration_file - only_driver_on_root - include_tutorials - bc_email_on_started @@ -60,6 +61,8 @@ attributes: data-min-num-workers-for-cluster-pitzer: 20, data-max-num-workers-for-cluster-pitzer: 80, ] + spark_configuration_file: + help: Override defaults with a new configuration file. Leave blank to use defaults. only_driver_on_root: widget: "check_box" label: Only launch the driver on the master node. diff --git a/template/before.sh.erb b/template/before.sh.erb index c3b0ade..cf33ac9 100755 --- a/template/before.sh.erb +++ b/template/before.sh.erb @@ -1,8 +1,35 @@ <%- - avail_cores = context.node_type.include?("hugemem") ? 48 : 28 - avail_mem = context.node_type.include?("hugemem") ? 1528 : 120 num_workers = context.num_workers.to_i - torque_cluster = OodAppkit.clusters[context.cluster].job_config[:adapter] == 'torque' + total_tasks = num_workers * context.bc_num_slots.to_i + + def avail_mem + if context.cluster == 'pitzer' + if context.node_type == 'hugemem' + 1492 + elsif context.node_type == 'largemem' + 760 + else + 184 + end + else + context.node_type == "hugemem" ? 1492 : 120 + end + end + + def extra_spark_config + if context.spark_configuration_file.empty? + {} + else + {}.tap do |extra| + File.readlines(context.spark_configuration_file.strip).each do |line| + key, value = line.split(' ') + extra[key] = value + end + rescue StandardError + {} + end.compact + end + end spark_config = { "spark.ui.reverseProxy" => "true", @@ -15,7 +42,21 @@ "spark.ui.enabled" => "false", # So we need to disable the ability to kill applications "spark.ui.killEnabled" => "false", - } + }.merge(extra_spark_config) + + def pyspark_submit_args + args = [] + args.concat(["--executor-memory #{avail_mem/context.num_workers.to_i}G"]) unless extra_spark_config.has_key?('spark.executor.memory') + args.concat(['--conf spark.driver.maxResultSize=0']) unless extra_spark_config.has_key?("spark.driver.maxResultSize") + + if extra_spark_config.has_key?('spark.driver.memory') + args.concat(["--driver-memory #{extra_spark_config['spark.driver.memory']}"]) + else + args.concat(["--driver-memory #{context.only_driver_on_root == "1" ? avail_mem : 2}G"]) + end + + args + end -%> # Export the module function if it exists [[ $(type -t module) == "function" ]] && export -f module @@ -168,11 +209,12 @@ sed 's/^ \{2\}//' > "${SPARK_WORKER_SCRIPT}" << EOL export SPARK_WORKER_PORT=\$(find_port \${SPARK_WORKER_HOST}) export SPARK_WORKER_WEBUI_PORT=\$(find_port \${SPARK_WORKER_HOST}) export SPARK_WORKER_DIR="${PWD}/work" - export SPARK_WORKER_CORES=<%= avail_cores / num_workers %> - export WORKER_LOG_FILE="${LOG_ROOT}/spark-worker-\${SPARK_WORKER_HOST}-\${1}.out" + export SPARK_WORKER_CORES=$(($(nproc)/<%= num_workers %>)) + + export WORKER_LOG_FILE="${LOG_ROOT}/spark-worker-\${SPARK_WORKER_HOST}-\${SLURM_TASK_PID}.out" # Launch worker - echo "Launching worker #\${1} on \${SPARK_WORKER_HOST}:\${SPARK_WORKER_PORT}..." + echo "Launching worker \${SLURM_TASK_PID} on \${SPARK_WORKER_HOST}:\${SPARK_WORKER_PORT}..." set -x "\${SPARK_HOME}/bin"/spark-class "org.apache.spark.deploy.worker.Worker" \\ --properties-file "${SPARK_CONFIG_FILE}" \\ @@ -184,13 +226,7 @@ chmod 700 "${SPARK_WORKER_SCRIPT}" # Launch workers echo "Launching workers..." -<%- if torque_cluster %> -for ((i=0; i<<%= num_workers %>; i++)); do - pbsdsh -u -- "${SPARK_WORKER_SCRIPT}" ${i} & -done -<%- else -%> -srun --export ALL --ntasks <%= num_workers %> "${SPARK_WORKER_SCRIPT}" & -<%- end -%> +srun --export ALL --ntasks <%= total_tasks %> "${SPARK_WORKER_SCRIPT}" & # # Launch Jupyter with PySpark @@ -217,10 +253,10 @@ sed 's/^ \{2\}//' > "${PYTHON_WRAPPER_FILE}" << EOL export PYTHONSTARTUP="\${SPARK_HOME}/python/pyspark/shell.py" export PYSPARK_SUBMIT_ARGS=" \\ --master spark://${SPARK_MASTER_HOST}:${SPARK_MASTER_PORT} \\ - --driver-memory <%= context.only_driver_on_root == "1" ? avail_mem : 2 %>G \\ - --executor-memory <%= avail_mem / num_workers %>G \\ - --conf spark.driver.maxResultSize=0 \\ --properties-file \"${SPARK_CONFIG_FILE}\" \\ + <%- pyspark_submit_args.each do |arg| -%> + <%= arg %> \\ + <%- end -%> pyspark-shell \\ "