From 69f067e564e79e0f0c97992118132e33d7f06115 Mon Sep 17 00:00:00 2001 From: mightqxc Date: Tue, 29 Oct 2024 15:33:18 +0100 Subject: [PATCH 1/2] job_fetcher: remove limits about HIMEM --- pandaharvester/commit_timestamp.py | 2 +- pandaharvester/harvesterbody/job_fetcher.py | 68 ++------------------- 2 files changed, 5 insertions(+), 65 deletions(-) diff --git a/pandaharvester/commit_timestamp.py b/pandaharvester/commit_timestamp.py index ec4a11f3..587e7b66 100644 --- a/pandaharvester/commit_timestamp.py +++ b/pandaharvester/commit_timestamp.py @@ -1 +1 @@ -timestamp = "01-10-2024 08:19:30 on flin (by mightqxc)" +timestamp = "29-10-2024 14:33:19 on flin (by mightqxc)" diff --git a/pandaharvester/harvesterbody/job_fetcher.py b/pandaharvester/harvesterbody/job_fetcher.py index be194911..d836e370 100644 --- a/pandaharvester/harvesterbody/job_fetcher.py +++ b/pandaharvester/harvesterbody/job_fetcher.py @@ -48,10 +48,6 @@ def run(self): mainLog.debug(f"got {len(job_limit_to_fetch_dict)} queues") # get up to date queue configuration pandaQueueDict = PandaQueuesDict(filter_site_list=job_limit_to_fetch_dict.keys()) - # get job statistics - job_stats_dict = self.dbProxy.get_job_stats_full() - if job_stats_dict is None: - mainLog.warning(f"cannot get job stats") # loop over all queues for queueName, value_dict in job_limit_to_fetch_dict.items(): n_jobs = value_dict["jobs"] @@ -71,12 +67,11 @@ def run(self): if n_jobs == 0: tmpLog.debug("no job to fetch; skip") continue - # get jobs + # prodsourcelabel try: is_grandly_unified_queue = pandaQueueDict.is_grandly_unified_queue(siteName) except Exception: is_grandly_unified_queue = False - default_prodSourceLabel = queueConfig.get_source_label(is_gu=is_grandly_unified_queue) # randomize prodsourcelabel if configured pdpm = getattr(queueConfig, "prodSourceLabelRandomWeightsPermille", {}) @@ -89,54 +84,6 @@ def run(self): new_key = str(key).lstrip("resource_type_limits.") if isinstance(val, int): resource_type_limits_dict[new_key] = val - # FIXME: all parts about HIMEM are temporary as HIMEM rtypes and parameters will be replaced or reimplemented - # compute cores of active (submitted and running) jobs - n_jobs_rem = n_jobs - n_cores_rem = n_cores - pq_mcore_corecount = pandaQueueDict.get("corecount", 8) or 8 - rt_n_jobs_dict = {} - rt_n_cores_dict = { - "normal": { - "starting": 0, - "running": 0, - }, - "HIMEM": { - "starting": 0, - "running": 0, - }, - } - if job_stats_dict and queueName in job_stats_dict: - for tmp_rt, val_dict in job_stats_dict[queueName].items(): - if tmp_rt == "_total": - continue - for tmp_status in ["starting", "running"]: - increment = val_dict["cores"][tmp_status] - if rt_mapper.is_high_memory_resource_type(tmp_rt): - rt_n_cores_dict["HIMEM"][tmp_status] += increment - else: - rt_n_cores_dict["normal"][tmp_status] += increment - # compute n_jobs to fetch for resource types - for j, resource_type in enumerate(random.sample(list(all_resource_types), k=len(all_resource_types))): - # corecount - rt_corecount = 1 - if not rt_mapper.is_single_core_resource_type(resource_type): - rt_corecount = pq_mcore_corecount - # compute n jobs to get for this resource type - rt_n_jobs = min(n_jobs_rem / (len(all_resource_types) - j), n_cores_rem // rt_corecount) - if job_stats_dict and queueName in job_stats_dict: - pq_rt_job_stats_dict = job_stats_dict[queueName].get(resource_type, {}).get("jobs", {}) - rt_n_active_jobs = pq_rt_job_stats_dict.get("starting", 0) + pq_rt_job_stats_dict.get("running", 0) - if resource_type in resource_type_limits_dict: - # capped by limit of specific resource type - rt_n_jobs = min(rt_n_jobs, resource_type_limits_dict[resource_type] - rt_n_active_jobs) - if "HIMEM" in resource_type_limits_dict and rt_mapper.is_high_memory_resource_type(resource_type): - # capped by total cores of HIMEM - rt_n_active_himem_cores = rt_n_cores_dict["HIMEM"]["starting"] + rt_n_cores_dict["HIMEM"]["running"] - rt_n_jobs = min(rt_n_jobs, (resource_type_limits_dict["HIMEM"] - rt_n_active_himem_cores) / rt_corecount) - rt_n_jobs = max(rt_n_jobs, 0) - rt_n_jobs_dict[resource_type] = rt_n_jobs - n_jobs_rem -= rt_n_jobs - n_cores_rem -= rt_n_jobs * rt_corecount # function to call get jobs def _get_jobs(resource_type=None, n_jobs=0): @@ -216,16 +163,9 @@ def _get_jobs(resource_type=None, n_jobs=0): return len(jobs) # call get jobs - if all([val > 0 for val in rt_n_jobs_dict.values()]): - # no n_jobs limit on any resourcetypes, call get_jobs without constraint - _get_jobs(resource_type=None, n_jobs=n_jobs) - else: - # call get_jobs for each resourcetype with calculated rt_n_jobs - n_jobs_rem = n_jobs - for resource_type, rt_n_jobs in rt_n_jobs_dict.items(): - n_jobs_to_get = max(min(round(rt_n_jobs), n_jobs_rem), 0) - got_n_jobs = _get_jobs(resource_type=resource_type, n_jobs=n_jobs_to_get) - n_jobs_rem -= got_n_jobs + _get_jobs(n_jobs=n_jobs) + + # done loop mainLog.debug("done") # check if being terminated if self.terminated(harvester_config.jobfetcher.sleepTime): From 1c2fff646dc2de00a77c05f5e2855639493516d3 Mon Sep 17 00:00:00 2001 From: mightqxc Date: Tue, 29 Oct 2024 15:35:02 +0100 Subject: [PATCH 2/2] v0.5.13 --- pandaharvester/commit_timestamp.py | 2 +- pandaharvester/panda_pkg_info.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pandaharvester/commit_timestamp.py b/pandaharvester/commit_timestamp.py index 587e7b66..911d6de3 100644 --- a/pandaharvester/commit_timestamp.py +++ b/pandaharvester/commit_timestamp.py @@ -1 +1 @@ -timestamp = "29-10-2024 14:33:19 on flin (by mightqxc)" +timestamp = "29-10-2024 14:35:03 on flin (by mightqxc)" diff --git a/pandaharvester/panda_pkg_info.py b/pandaharvester/panda_pkg_info.py index 23e8883a..6b3f6d42 100644 --- a/pandaharvester/panda_pkg_info.py +++ b/pandaharvester/panda_pkg_info.py @@ -1 +1 @@ -release_version = "0.5.12" +release_version = "0.5.13"