Skip to content

Commit

Permalink
Merge pull request #383 from PanDAWMS/master
Browse files Browse the repository at this point in the history
Sync
  • Loading branch information
EdwardKaravakis authored Jul 25, 2024
2 parents ed0111d + fc72402 commit 8489be7
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 17 deletions.
26 changes: 17 additions & 9 deletions pandaserver/taskbuffer/OraDBProxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -23425,15 +23425,16 @@ def ups_load_worker_stats(self):
tmpLog.debug("done")
return worker_stats_dict

def get_average_memory_workers(self, queue, harvester_id):
def get_average_memory_workers(self, queue, harvester_id, target):
"""
Calculates the average memory for running and queued workers at a particular panda queue

:param queue: name of the PanDA queue
:param worker_stats_harvester: worker statistics for the particular harvester instance
:param harvester_id: string with the harvester ID serving the queue
:param target: memory target for the queue in MB. This value is only used in the logging

:return: average memory
:return: average_memory_running_submitted, average_memory_running
"""

comment = " /* DBProxy.get_average_memory_workers */"
Expand Down Expand Up @@ -23492,9 +23493,11 @@ def get_average_memory_workers(self, queue, harvester_id):
except TypeError:
average_memory_running = 0

tmp_logger.debug(
f"Queue {queue} and harvester_id {harvester_id} currently has ({average_memory_running_submitted}, {average_memory_running}) "
f"MB of average memory workers"
tmp_logger.info(
f"computingsite={queue} and harvester_id={harvester_id} currently has "
f"meanrss_running_submitted={average_memory_running_submitted} "
f"meanrss_running={average_memory_running} "
f"meanrss_target={target} MB"
)
return average_memory_running_submitted, average_memory_running

Expand Down Expand Up @@ -23539,7 +23542,10 @@ def ups_new_worker_distribution(self, queue, worker_stats):
tmp_log.debug("No resource type limits")
pass
try:
average_memory_target = pq_data_des["params"]["average_memory"]
if pq_data_des["meanrss"] != 0:
average_memory_target = pq_data_des["meanrss"]
else:
tmp_log.debug("meanrss is 0, not using it as average_memory_target")
except KeyError:
tmp_log.debug("No average memory defined")
pass
Expand Down Expand Up @@ -23573,10 +23579,12 @@ def ups_new_worker_distribution(self, queue, worker_stats):
# If the site defined a memory target, calculate the memory requested by running and queued workers
resource_types_under_target = []
if average_memory_target:
average_memory_workers_running_submitted, average_memory_workers_running = self.get_average_memory_workers(queue, harvester_id)
average_memory_workers_running_submitted, average_memory_workers_running = self.get_average_memory_workers(
queue, harvester_id, average_memory_target
)
# if the queue is over memory, we will only submit lower workers in the next cycle
if average_memory_target < min(average_memory_workers_running_submitted, average_memory_workers_running):
resource_types_under_target = self.__resource_spec_mapper.filter_out_high_memory_resourcetypes()
if average_memory_target < max(average_memory_workers_running_submitted, average_memory_workers_running):
resource_types_under_target = self.__resource_spec_mapper.filter_out_high_memory_resourcetypes(memory_threshold=average_memory_target)
tmp_log.debug(f"Accepting {resource_types_under_target} resource types to respect mean memory target")
else:
tmp_log.debug(f"Accepting all resource types as under memory target")
Expand Down
8 changes: 4 additions & 4 deletions pandaserver/taskbuffer/ResourceSpec.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ def is_multi_core(self, resource_name):
return resource_type.is_multi_core()
return False

def is_high_memory(self, resource_name):
def is_high_memory(self, resource_name, memory_threshold=HIMEM_THRESHOLD):
for resource_type in self.resource_types:
if resource_type.resource_name == resource_name:
if resource_type.minrampercore is not None and resource_type.minrampercore > HIMEM_THRESHOLD:
if resource_type.maxrampercore is None or resource_type.maxrampercore > memory_threshold:
return True
return False

Expand All @@ -42,11 +42,11 @@ def translate_resourcetype_to_cores(self, resource_name, cores_queue):

return 1

def filter_out_high_memory_resourcetypes(self):
def filter_out_high_memory_resourcetypes(self, memory_threshold=HIMEM_THRESHOLD):
resource_names = list(
map(
lambda resource_type: resource_type.resource_name,
filter(lambda resource_type: not self.is_high_memory(resource_type.resource_name), self.resource_types),
filter(lambda resource_type: not self.is_high_memory(resource_type.resource_name, memory_threshold=memory_threshold), self.resource_types),
)
)
return resource_names
Expand Down
4 changes: 2 additions & 2 deletions pandaserver/taskbuffer/TaskBuffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3748,11 +3748,11 @@ def ups_load_worker_stats(self):
return ret

# get the distribution of new workers to submit
def get_average_memory_workers(self, queue, harvester_id):
def get_average_memory_workers(self, queue, harvester_id, target):
# get DBproxy
proxy = self.proxyPool.getProxy()
# exec
ret = proxy.get_average_memory_workers(queue, harvester_id)
ret = proxy.get_average_memory_workers(queue, harvester_id, target)
# release proxy
self.proxyPool.putProxy(proxy)
# return
Expand Down
6 changes: 4 additions & 2 deletions pandaserver/taskbuffer/workflow_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@

# import PandaLogger before idds modules not to change message levels of other modules
from pandacommon.pandalogger.PandaLogger import PandaLogger
from ruamel.yaml import YAML

from pandaserver.srvcore.CoreUtils import clean_user_id, commands_get_status_output
from pandaserver.srvcore.MailUtils import MailUtils
from pandaserver.workflow import pcwl_utils, workflow_utils
from pandaserver.workflow.snakeparser import Parser
from ruamel import yaml

_logger = PandaLogger().getLogger("workflow_processor")

Expand Down Expand Up @@ -174,7 +175,8 @@ def core_exec(sandbox_url, log_token, dump_workflow, ops_file, user_name, test_m
workflow_name = ops["data"].get("workflow_name")
nodes, root_in = pcwl_utils.parse_workflow_file(ops["data"]["workflowSpecFile"], tmpLog)
with open(ops["data"]["workflowInputFile"]) as workflow_input:
data = yaml.safe_load(workflow_input)
yaml = YAML(typ="safe", pure=True)
data = yaml.load(workflow_input)
# noinspection DuplicatedCode
s_id, t_nodes, nodes = pcwl_utils.resolve_nodes(nodes, root_in, data, 0, set(), ops["data"]["outDS"], tmpLog)
workflow_utils.set_workflow_outputs(nodes)
Expand Down

0 comments on commit 8489be7

Please sign in to comment.