Skip to content

Commit

Permalink
Merge pull request #711 from binpash/parallel-pipelines-fix
Browse files Browse the repository at this point in the history
Make parallel_pipelines default, with a limit
  • Loading branch information
angelhof authored Dec 12, 2023
2 parents 8863a11 + 6a2d03d commit 0e96ac6
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 25 deletions.
21 changes: 18 additions & 3 deletions compiler/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from util import *


## Global
__version__ = "0.12.2" # FIXME add libdash version
GIT_TOP_CMD = [
Expand Down Expand Up @@ -209,10 +210,22 @@ def add_common_arguments(parser):
)
parser.add_argument(
"--parallel_pipelines",
help="Run multiple pipelines in parallel if they are safe to run",
help="(obsolete) Run multiple pipelines in parallel if they are safe to run. Now true by default. See --no_parallel_pipelines.",
action="store_true",
default=True,
)
parser.add_argument(
"--no_parallel_pipelines",
help="Disable parallel running of independent pipelines",
action="store_true",
default=False,
)
parser.add_argument(
"--parallel_pipelines_limit",
help="Maximum number of parallel independent pipelines",
type=int,
default=2,
)
parser.add_argument(
"--r_split_batch_size",
type=int,
Expand Down Expand Up @@ -301,10 +314,12 @@ def pass_common_arguments(pash_arguments):
arguments.append("--distributed_exec")
if pash_arguments.speculative:
arguments.append("--speculative")
if pash_arguments.parallel_pipelines:
arguments.append("--parallel_pipelines")
if pash_arguments.no_parallel_pipelines:
arguments.append("--no_parallel_pipelines")
if pash_arguments.daemon_communicates_through_unix_pipes:
arguments.append("--daemon_communicates_through_unix_pipes")
arguments.append("--parallel_pipelines_limit")
arguments.append(str(pash_arguments.parallel_pipelines_limit))
arguments.append("--r_split_batch_size")
arguments.append(str(pash_arguments.r_split_batch_size))
arguments.append("--debug")
Expand Down
6 changes: 3 additions & 3 deletions compiler/orchestrator_runtime/pash_init_setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ export pash_checking_log_file=0
export pash_checking_debug_level=0
export pash_avoid_pash_runtime_completion_flag=0
export pash_profile_driven_flag=1
export pash_parallel_pipelines=0
export pash_no_parallel_pipelines=0
export pash_daemon_communicates_through_unix_pipes_flag=0
export pash_speculative_flag=0
export show_version=0
Expand Down Expand Up @@ -67,8 +67,8 @@ do
pash_checking_debug_level=1
fi

if [ "--parallel_pipelines" == "$item" ]; then
export pash_parallel_pipelines=1
if [ "--no_parallel_pipelines" == "$item" ]; then
export pash_no_parallel_pipelines=1
fi

if [ "--daemon_communicates_through_unix_pipes" == "$item" ]; then
Expand Down
21 changes: 16 additions & 5 deletions compiler/pash_compilation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,9 @@ def get_averages_per_width(self, input_ir_file):

## This adds the time measurement, or just removes the entry if there is no exec_time (for space reclamation)
def handle_time_measurement(self, process_id, exec_time):
## TODO: Could put those behind the profile_driven check too to not fill memory
assert self.process_id_input_ir_map[process_id].exec_time is None
## 2023-12-08 KK: When in parallel pipelines we receive two exits (when I tried to make it one something got stuck...)
## so this assert is not true
# assert self.process_id_input_ir_map[process_id].exec_time is None

## If we don't have the exec time we do Nothing
##
Expand Down Expand Up @@ -315,7 +316,11 @@ def compile_and_add(self, compiled_script_file, var_file, input_ir_file):
)

if not run_parallel:
## If we are not running in parallel everything has to finish first before scheduling for execution
self.wait_for_all()
else:
## Wait if we have more pipelines running than our current limit
self.wait_until_limit(config.pash_args.parallel_pipelines_limit)

if compile_success:
response = server_util.success_response(
Expand Down Expand Up @@ -367,18 +372,24 @@ def get_next_id(self):

def wait_for_all(self):
log(
"Waiting for all processes to finish. There are",
"Waiting for all processes to finish."
)
self.wait_until_limit(1)
self.unsafe_running = False

def wait_until_limit(self, limit: int):
log(
f"Waiting for less than {limit} processes to be running. There are",
self.running_procs,
"processes remaining.",
)
while self.running_procs > 0:
while self.running_procs >= limit:
input_cmd = self.get_input()
# must be exit command or something is wrong
if input_cmd.startswith("Exit:"):
self.handle_exit(input_cmd)
else:
raise Exception(f"Command should be exit but it was {input_cmd}")
self.unsafe_running = False

def handle_exit(self, input_cmd):
assert input_cmd.startswith("Exit:")
Expand Down
17 changes: 8 additions & 9 deletions compiler/pash_runtime.sh
Original file line number Diff line number Diff line change
Expand Up @@ -103,21 +103,14 @@ else
## Invoke the compiler and make any necessary preparations
source "$RUNTIME_DIR/pash_prepare_call_compiler.sh"

function run_parallel() {
trap inform_daemon_exit SIGTERM SIGINT EXIT
export SCRIPT_TO_EXECUTE="$pash_script_to_execute"
source "$RUNTIME_DIR/pash_restore_state_and_execute.sh"
inform_daemon_exit
}

## Check if there are traps set, and if so do not execute in parallel
## TODO: This might be an overkill but is conservative
traps_set=$(trap)
pash_redir_output echo "$$: (2) Traps set: $traps_set"
# Don't fork if compilation failed. The script might have effects on the shell state.
if [ "$pash_runtime_return_code" -ne 0 ] ||
## If parallel pipelines is not enabled we shouldn't fork
[ "$pash_parallel_pipelines" -eq 0 ] ||
## If parallel pipelines is disabled using a flag we shouldn't fork
[ "$pash_no_parallel_pipelines" -eq 1 ] ||
## If parallel pipelines is explicitly disabled (e.g., due to context), no forking
[ "$pash_disable_parallel_pipelines" -eq 1 ] ||
## If traps are set, no forking
Expand Down Expand Up @@ -147,6 +140,12 @@ else

pash_redir_output echo "$$: (5) BaSh script exited with ec: $pash_runtime_final_status"
else
function run_parallel() {
trap inform_daemon_exit SIGTERM SIGINT EXIT
export SCRIPT_TO_EXECUTE="$pash_script_to_execute"
source "$RUNTIME_DIR/pash_restore_state_and_execute.sh"
inform_daemon_exit
}
# Should we redirect errors aswell?
# TODO: capturing the return state here isn't completely correct.
run_parallel "$@" <&0 &
Expand Down
2 changes: 1 addition & 1 deletion evaluation/tests/interface_tests/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ export PASH_TOP=${PASH_TOP:-$(git rev-parse --show-toplevel --show-superproject-
# time: print real in seconds, to simplify parsing

bash="bash"
pash="$PASH_TOP/pa.sh --parallel_pipelines --profile_driven"
pash="$PASH_TOP/pa.sh --profile_driven"

output_dir="$PASH_TOP/evaluation/tests/interface_tests/output"
rm -rf "$output_dir"
Expand Down
5 changes: 2 additions & 3 deletions evaluation/tests/test_evaluation_scripts.sh
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,11 @@ n_inputs=(

if [ "$EXPERIMENTAL" -eq 1 ]; then
configurations=(
# "" # Commenting this out since the tests take a lot of time to finish
"--parallel_pipelines"
""
)
else
configurations=(
"--parallel_pipelines --profile_driven"
"--profile_driven"
)
fi

Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
graphviz
libdash
pash-annotations==0.2.0
pash-annotations==0.2.2
shasta==0.1.0
sh-expand>=0.1.3

0 comments on commit 0e96ac6

Please sign in to comment.