Skip to content

Commit

Permalink
[gantry] add functions _run_parallel and _parallel_worker.
Browse files Browse the repository at this point in the history
  • Loading branch information
shizunge committed May 31, 2024
1 parent 8c40808 commit 724cda0
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 29 deletions.
74 changes: 45 additions & 29 deletions src/lib-gantry.sh
Original file line number Diff line number Diff line change
Expand Up @@ -824,25 +824,40 @@ _update_single_service() {
return 0
}

# To run update in parallel
_update_worker() {
local INDEX="${1}"
local STATIC_VAR_LIST_NAME="${2}"
_parallel_worker() {
local FUNCTION="${1}"
local INDEX="${2}"
local STATIC_VAR_LIST_NAME="${3}"
local OLD_LOG_SCOPE="${LOG_SCOPE}"
LOG_SCOPE=$(attach_tag_to_log_scope "worker${INDEX}")
export LOG_SCOPE
local SERVICE_AND_IMAGE=
local SERVICE IMAGE
local ARGUMENTS=
while true; do
SERVICE_AND_IMAGE=$(_static_variable_pop_list "${STATIC_VAR_LIST_NAME}")
[ -z "${SERVICE_AND_IMAGE}" ] && break;
SERVICE=$(echo "${SERVICE_AND_IMAGE} " | cut -d ' ' -f 1)
IMAGE=$(echo "${SERVICE_AND_IMAGE} " | cut -d ' ' -f 2)
_update_single_service "${SERVICE}" "${IMAGE}"
ARGUMENTS=$(_static_variable_pop_list "${STATIC_VAR_LIST_NAME}")
[ -z "${ARGUMENTS}" ] && break;
# SC2086 (info): Double quote to prevent globbing and word splitting.
# shellcheck disable=SC2086
${FUNCTION} ${ARGUMENTS}
done
export LOG_SCOPE="${OLD_LOG_SCOPE}"
}

_run_parallel() {
local FUNCTION="${1}"
local NUM_WORKERS="${2}"
local STATIC_VAR_LIST_NAME="${3}"
log DEBUG "Run ${NUM_WORKERS} ${FUNCTION} in parallel."
local PIDS=
for INDEX in $(seq 0 $((NUM_WORKERS-1)) ); do
# All workers subscribe to the same list now.
_parallel_worker "${FUNCTION}" "${INDEX}" "${STATIC_VAR_LIST_NAME}" &
PIDS="${!} ${PIDS}"
done
# SC2086 (info): Double quote to prevent globbing and word splitting.
# shellcheck disable=SC2086
wait ${PIDS}
}

_get_services_filted() {
local SERVICES_FILTERS="${1}"
local SERVICES=
Expand Down Expand Up @@ -910,40 +925,41 @@ gantry_get_services_list() {
}

gantry_update_services_list() {
local NUM_WORKERS=
if ! NUM_WORKERS=$(gantry_read_number GANTRY_UPDATE_NUM_WORKERS 1); then
local UPDATE_NUM_WORKERS=
if ! UPDATE_NUM_WORKERS=$(gantry_read_number GANTRY_UPDATE_NUM_WORKERS 1); then
local ERROR_SERVICE="GANTRY_UPDATE_NUM_WORKERS-is-not-a-number"
_static_variable_add_unique_to_list STATIC_VAR_SERVICES_UPDATE_INPUT_ERROR "${ERROR_SERVICE}"
return 1
fi
local MANIFEST_NUM_WORKERS=
if ! MANIFEST_NUM_WORKERS=$(gantry_read_number GANTRY_MANIFEST_NUM_WORKERS 1); then
local ERROR_SERVICE="GANTRY_MANIFEST_NUM_WORKERS-is-not-a-number"
_static_variable_add_unique_to_list STATIC_VAR_SERVICES_UPDATE_INPUT_ERROR "${ERROR_SERVICE}"
return 1
fi
local LIST="${*}"
local NUM=
NUM=$(_get_number_of_elements "${LIST}")
log INFO "Inspecting ${NUM} service(s)."
local RUN_UPDATE=
for SERVICE in ${LIST}; do
# Immediately update self service after inspection, do not wait for other inspections to finish.
# This avoids running inspection on the same service twice, due to interruption from updating self, when running as a service.
# The self service is usually the first of the list.
RUN_UPDATE=$(_service_is_self "${SERVICE}" && echo "true" || echo "false")
_inspect_service "${SERVICE}" "${RUN_UPDATE}"
if _service_is_self "${SERVICE}"; then
# Immediately update self service after inspection, do not wait for other inspections to finish.
# This avoids running inspection on the same service twice, due to interruption from updating self, when running as a service.
# The self service is usually the first of the list.
local RUN_UPDATE=true
_inspect_service "${SERVICE}" "${RUN_UPDATE}"
continue
fi
_static_variable_add_unique_to_list STATIC_VAR_SERVICES_TO_INSPECT "${SERVICE}"
done
_run_parallel _inspect_service "${MANIFEST_NUM_WORKERS}" STATIC_VAR_SERVICES_TO_INSPECT

_report_services_from_static_variable STATIC_VAR_SERVICES_SKIP_JOB "Skip updating" "due to they are job(s)" | log_lines INFO
_report_services_from_static_variable STATIC_VAR_SERVICES_UPDATE_FAILED "Failed to inspect" | log_lines ERROR
_report_services_from_static_variable STATIC_VAR_SERVICES_NO_NEW_IMAGE "No new images for" | log_lines INFO
_report_services_from_static_variable STATIC_VAR_SERVICES_TO_UPDATE "Updating" | log_lines INFO

log DEBUG "NUM_WORKERS=${NUM_WORKERS}"
local PIDS=
for INDEX in $(seq 0 $((NUM_WORKERS-1)) ); do
# All workers subscribe to the same list now.
_update_worker "${INDEX}" STATIC_VAR_SERVICES_AND_IMAGES_TO_UPDATE &
PIDS="${!} ${PIDS}"
done
# SC2086 (info): Double quote to prevent globbing and word splitting.
# shellcheck disable=SC2086
wait ${PIDS}
_run_parallel _update_single_service "${UPDATE_NUM_WORKERS}" STATIC_VAR_SERVICES_AND_IMAGES_TO_UPDATE

local RETURN_VALUE=0
local FAILED_NUM=
Expand Down
2 changes: 2 additions & 0 deletions tests/spec_gantry_test_helper.sh
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ reset_gantry_env() {
fi
export GANTRY_SERVICES_SELF=
export GANTRY_MANIFEST_CMD=
export GANTRY_MANIFEST_NUM_WORKERS=
export GANTRY_MANIFEST_OPTIONS=
export GANTRY_ROLLBACK_OPTIONS=
export GANTRY_UPDATE_JOBS=
Expand Down Expand Up @@ -667,6 +668,7 @@ _run_gantry_container() {
--env "GANTRY_SERVICES_FILTERS=${GANTRY_SERVICES_FILTERS}" \
--env "GANTRY_SERVICES_SELF=${GANTRY_SERVICES_SELF}" \
--env "GANTRY_MANIFEST_CMD=${GANTRY_MANIFEST_CMD}" \
--env "GANTRY_MANIFEST_NUM_WORKERS=${GANTRY_MANIFEST_NUM_WORKERS}" \
--env "GANTRY_MANIFEST_OPTIONS=${GANTRY_MANIFEST_OPTIONS}" \
--env "GANTRY_ROLLBACK_ON_FAILURE=${GANTRY_ROLLBACK_ON_FAILURE}" \
--env "GANTRY_ROLLBACK_OPTIONS=${GANTRY_ROLLBACK_OPTIONS}" \
Expand Down

0 comments on commit 724cda0

Please sign in to comment.