Skip to content

Commit

Permalink
Merge pull request #251 from HSF/flin
Browse files Browse the repository at this point in the history
Revert walkaround about PUSH respecting resource_type_limits; v0.5.13
  • Loading branch information
mightqxc authored Oct 31, 2024
2 parents c259a65 + 1c2fff6 commit 9b150df
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 66 deletions.
2 changes: 1 addition & 1 deletion pandaharvester/commit_timestamp.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
timestamp = "01-10-2024 08:19:30 on flin (by mightqxc)"
timestamp = "29-10-2024 14:35:03 on flin (by mightqxc)"
68 changes: 4 additions & 64 deletions pandaharvester/harvesterbody/job_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,6 @@ def run(self):
mainLog.debug(f"got {len(job_limit_to_fetch_dict)} queues")
# get up to date queue configuration
pandaQueueDict = PandaQueuesDict(filter_site_list=job_limit_to_fetch_dict.keys())
# get job statistics
job_stats_dict = self.dbProxy.get_job_stats_full()
if job_stats_dict is None:
mainLog.warning(f"cannot get job stats")
# loop over all queues
for queueName, value_dict in job_limit_to_fetch_dict.items():
n_jobs = value_dict["jobs"]
Expand All @@ -71,12 +67,11 @@ def run(self):
if n_jobs == 0:
tmpLog.debug("no job to fetch; skip")
continue
# get jobs
# prodsourcelabel
try:
is_grandly_unified_queue = pandaQueueDict.is_grandly_unified_queue(siteName)
except Exception:
is_grandly_unified_queue = False

default_prodSourceLabel = queueConfig.get_source_label(is_gu=is_grandly_unified_queue)
# randomize prodsourcelabel if configured
pdpm = getattr(queueConfig, "prodSourceLabelRandomWeightsPermille", {})
Expand All @@ -89,54 +84,6 @@ def run(self):
new_key = str(key).lstrip("resource_type_limits.")
if isinstance(val, int):
resource_type_limits_dict[new_key] = val
# FIXME: all parts about HIMEM are temporary as HIMEM rtypes and parameters will be replaced or reimplemented
# compute cores of active (submitted and running) jobs
n_jobs_rem = n_jobs
n_cores_rem = n_cores
pq_mcore_corecount = pandaQueueDict.get("corecount", 8) or 8
rt_n_jobs_dict = {}
rt_n_cores_dict = {
"normal": {
"starting": 0,
"running": 0,
},
"HIMEM": {
"starting": 0,
"running": 0,
},
}
if job_stats_dict and queueName in job_stats_dict:
for tmp_rt, val_dict in job_stats_dict[queueName].items():
if tmp_rt == "_total":
continue
for tmp_status in ["starting", "running"]:
increment = val_dict["cores"][tmp_status]
if rt_mapper.is_high_memory_resource_type(tmp_rt):
rt_n_cores_dict["HIMEM"][tmp_status] += increment
else:
rt_n_cores_dict["normal"][tmp_status] += increment
# compute n_jobs to fetch for resource types
for j, resource_type in enumerate(random.sample(list(all_resource_types), k=len(all_resource_types))):
# corecount
rt_corecount = 1
if not rt_mapper.is_single_core_resource_type(resource_type):
rt_corecount = pq_mcore_corecount
# compute n jobs to get for this resource type
rt_n_jobs = min(n_jobs_rem / (len(all_resource_types) - j), n_cores_rem // rt_corecount)
if job_stats_dict and queueName in job_stats_dict:
pq_rt_job_stats_dict = job_stats_dict[queueName].get(resource_type, {}).get("jobs", {})
rt_n_active_jobs = pq_rt_job_stats_dict.get("starting", 0) + pq_rt_job_stats_dict.get("running", 0)
if resource_type in resource_type_limits_dict:
# capped by limit of specific resource type
rt_n_jobs = min(rt_n_jobs, resource_type_limits_dict[resource_type] - rt_n_active_jobs)
if "HIMEM" in resource_type_limits_dict and rt_mapper.is_high_memory_resource_type(resource_type):
# capped by total cores of HIMEM
rt_n_active_himem_cores = rt_n_cores_dict["HIMEM"]["starting"] + rt_n_cores_dict["HIMEM"]["running"]
rt_n_jobs = min(rt_n_jobs, (resource_type_limits_dict["HIMEM"] - rt_n_active_himem_cores) / rt_corecount)
rt_n_jobs = max(rt_n_jobs, 0)
rt_n_jobs_dict[resource_type] = rt_n_jobs
n_jobs_rem -= rt_n_jobs
n_cores_rem -= rt_n_jobs * rt_corecount

# function to call get jobs
def _get_jobs(resource_type=None, n_jobs=0):
Expand Down Expand Up @@ -216,16 +163,9 @@ def _get_jobs(resource_type=None, n_jobs=0):
return len(jobs)

# call get jobs
if all([val > 0 for val in rt_n_jobs_dict.values()]):
# no n_jobs limit on any resourcetypes, call get_jobs without constraint
_get_jobs(resource_type=None, n_jobs=n_jobs)
else:
# call get_jobs for each resourcetype with calculated rt_n_jobs
n_jobs_rem = n_jobs
for resource_type, rt_n_jobs in rt_n_jobs_dict.items():
n_jobs_to_get = max(min(round(rt_n_jobs), n_jobs_rem), 0)
got_n_jobs = _get_jobs(resource_type=resource_type, n_jobs=n_jobs_to_get)
n_jobs_rem -= got_n_jobs
_get_jobs(n_jobs=n_jobs)

# done loop
mainLog.debug("done")
# check if being terminated
if self.terminated(harvester_config.jobfetcher.sleepTime):
Expand Down
2 changes: 1 addition & 1 deletion pandaharvester/panda_pkg_info.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
release_version = "0.5.12"
release_version = "0.5.13"

0 comments on commit 9b150df

Please sign in to comment.