Skip to content

Commit

Permalink
add the ability to override conf (#27)
Browse files Browse the repository at this point in the history
Add the ability to supply a different config file and fix several bugs.

* fill in pitzer available memory.
* use nproc instead of assuming cores.
* fix workers to actually run on nodes.
  • Loading branch information
johrstrom authored Apr 11, 2023
1 parent 954cf58 commit 3802af8
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 17 deletions.
3 changes: 3 additions & 0 deletions form.yml.erb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ form:
- bc_num_slots
- node_type
- num_workers
- spark_configuration_file
- only_driver_on_root
- include_tutorials
- bc_email_on_started
Expand Down Expand Up @@ -60,6 +61,8 @@ attributes:
data-min-num-workers-for-cluster-pitzer: 20,
data-max-num-workers-for-cluster-pitzer: 80,
]
spark_configuration_file:
help: Override defaults with a new configuration file. Leave blank to use defaults.
only_driver_on_root:
widget: "check_box"
label: Only launch the driver on the master node.
Expand Down
70 changes: 53 additions & 17 deletions template/before.sh.erb
Original file line number Diff line number Diff line change
@@ -1,8 +1,35 @@
<%-
avail_cores = context.node_type.include?("hugemem") ? 48 : 28
avail_mem = context.node_type.include?("hugemem") ? 1528 : 120
num_workers = context.num_workers.to_i
torque_cluster = OodAppkit.clusters[context.cluster].job_config[:adapter] == 'torque'
total_tasks = num_workers * context.bc_num_slots.to_i

def avail_mem
if context.cluster == 'pitzer'
if context.node_type == 'hugemem'
1492
elsif context.node_type == 'largemem'
760
else
184
end
else
context.node_type == "hugemem" ? 1492 : 120
end
end

def extra_spark_config
if context.spark_configuration_file.empty?
{}
else
{}.tap do |extra|
File.readlines(context.spark_configuration_file.strip).each do |line|
key, value = line.split(' ')
extra[key] = value
end
rescue StandardError
{}
end.compact
end
end

spark_config = {
"spark.ui.reverseProxy" => "true",
Expand All @@ -15,7 +42,21 @@
"spark.ui.enabled" => "false",
# So we need to disable the ability to kill applications
"spark.ui.killEnabled" => "false",
}
}.merge(extra_spark_config)

def pyspark_submit_args
args = []
args.concat(["--executor-memory #{avail_mem/context.num_workers.to_i}G"]) unless extra_spark_config.has_key?('spark.executor.memory')
args.concat(['--conf spark.driver.maxResultSize=0']) unless extra_spark_config.has_key?("spark.driver.maxResultSize")

if extra_spark_config.has_key?('spark.driver.memory')
args.concat(["--driver-memory #{extra_spark_config['spark.driver.memory']}"])
else
args.concat(["--driver-memory #{context.only_driver_on_root == "1" ? avail_mem : 2}G"])
end

args
end
-%>
# Export the module function if it exists
[[ $(type -t module) == "function" ]] && export -f module
Expand Down Expand Up @@ -168,11 +209,12 @@ sed 's/^ \{2\}//' > "${SPARK_WORKER_SCRIPT}" << EOL
export SPARK_WORKER_PORT=\$(find_port \${SPARK_WORKER_HOST})
export SPARK_WORKER_WEBUI_PORT=\$(find_port \${SPARK_WORKER_HOST})
export SPARK_WORKER_DIR="${PWD}/work"
export SPARK_WORKER_CORES=<%= avail_cores / num_workers %>
export WORKER_LOG_FILE="${LOG_ROOT}/spark-worker-\${SPARK_WORKER_HOST}-\${1}.out"
export SPARK_WORKER_CORES=$(($(nproc)/<%= num_workers %>))

export WORKER_LOG_FILE="${LOG_ROOT}/spark-worker-\${SPARK_WORKER_HOST}-\${SLURM_TASK_PID}.out"

# Launch worker
echo "Launching worker #\${1} on \${SPARK_WORKER_HOST}:\${SPARK_WORKER_PORT}..."
echo "Launching worker \${SLURM_TASK_PID} on \${SPARK_WORKER_HOST}:\${SPARK_WORKER_PORT}..."
set -x
"\${SPARK_HOME}/bin"/spark-class "org.apache.spark.deploy.worker.Worker" \\
--properties-file "${SPARK_CONFIG_FILE}" \\
Expand All @@ -184,13 +226,7 @@ chmod 700 "${SPARK_WORKER_SCRIPT}"

# Launch workers
echo "Launching workers..."
<%- if torque_cluster %>
for ((i=0; i<<%= num_workers %>; i++)); do
pbsdsh -u -- "${SPARK_WORKER_SCRIPT}" ${i} &
done
<%- else -%>
srun --export ALL --ntasks <%= num_workers %> "${SPARK_WORKER_SCRIPT}" &
<%- end -%>
srun --export ALL --ntasks <%= total_tasks %> "${SPARK_WORKER_SCRIPT}" &

#
# Launch Jupyter with PySpark
Expand All @@ -217,10 +253,10 @@ sed 's/^ \{2\}//' > "${PYTHON_WRAPPER_FILE}" << EOL
export PYTHONSTARTUP="\${SPARK_HOME}/python/pyspark/shell.py"
export PYSPARK_SUBMIT_ARGS=" \\
--master spark://${SPARK_MASTER_HOST}:${SPARK_MASTER_PORT} \\
--driver-memory <%= context.only_driver_on_root == "1" ? avail_mem : 2 %>G \\
--executor-memory <%= avail_mem / num_workers %>G \\
--conf spark.driver.maxResultSize=0 \\
--properties-file \"${SPARK_CONFIG_FILE}\" \\
<%- pyspark_submit_args.each do |arg| -%>
<%= arg %> \\
<%- end -%>
pyspark-shell \\
"

Expand Down

0 comments on commit 3802af8

Please sign in to comment.