From cdfe46733cb6e8db340ed047b5f67b83441eb51d Mon Sep 17 00:00:00 2001 From: lukemartinlogan Date: Sat, 30 Mar 2024 17:52:58 -0500 Subject: [PATCH] Add spark exec --- jarvis_util/shell/spark_exec.py | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) create mode 100644 jarvis_util/shell/spark_exec.py diff --git a/jarvis_util/shell/spark_exec.py b/jarvis_util/shell/spark_exec.py new file mode 100644 index 0000000..be7f860 --- /dev/null +++ b/jarvis_util/shell/spark_exec.py @@ -0,0 +1,30 @@ +""" +This module provides methods to execute a process in parallel using the +Message Passing Interface (MPI). This module assumes MPI is installed +on the system. This class is intended to be called from Exec, +not by general users. +""" + +from jarvis_util.jutil_manager import JutilManager +from jarvis_util.shell.local_exec import LocalExec +from .exec_info import ExecInfo, ExecType +from abc import abstractmethod + + +class SparkExec(LocalExec): + def __init__(self, cmd, master_host, master_port, + driver_mem='1g', executor_mem='1g', scratch='/tmp', replication=1, + exec_info=None): + master_url = f'spark://{master_host}:{master_port}' + sparkcmd = [ + 'spark-submit', + f'--master {master_url}', + f'--driver-memory {driver_mem}', + f'--executor-memory {executor_mem}', + f'--conf spark.speculation=false', + f'--conf spark.storage.replication={replication}', + f'--conf spark.local.dir={scratch}', + cmd + ] + sparkcmd = ' '.join(cmd) + super().__init__(sparkcmd, exec_info)