From a46a0cbf4c15914c9c037cb3f57588c05648c466 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Wed, 9 Nov 2022 11:16:26 +0100 Subject: [PATCH] update slurm submitter --- .../harvestersubmitter/slurm_submitter.py | 68 +++++++++++++++++-- 1 file changed, 62 insertions(+), 6 deletions(-) diff --git a/pandaharvester/harvestersubmitter/slurm_submitter.py b/pandaharvester/harvestersubmitter/slurm_submitter.py index d6f764dc..559f1c1a 100644 --- a/pandaharvester/harvestersubmitter/slurm_submitter.py +++ b/pandaharvester/harvestersubmitter/slurm_submitter.py @@ -4,12 +4,15 @@ import six import os import stat +import datetime +from math import ceil try: import subprocess32 as subprocess except ImportError: import subprocess +from pandaharvester.harvesterconfig import harvester_config from pandaharvester.harvestercore import core_utils from pandaharvester.harvestercore.plugin_base import PluginBase @@ -28,7 +31,6 @@ def __init__(self, **kwarg): # submit workers def submit_workers(self, workspec_list): retList = [] - retStrList = [] for workSpec in workspec_list: # make logger tmpLog = self.make_logger(baseLogger, 'workerID={0}'.format(workSpec.workerID), @@ -75,6 +77,63 @@ def submit_workers(self, workspec_list): retList.append(tmpRetVal) return retList + def make_placeholder_map(self, workspec): + timeNow = datetime.datetime.utcnow() + + panda_queue_name = self.queueName + this_panda_queue_dict = dict() + + # get default information from queue info + n_core_per_node_from_queue = this_panda_queue_dict.get('corecount', 1) if this_panda_queue_dict.get('corecount', 1) else 1 + + # get override requirements from queue configured + try: + n_core_per_node = self.nCorePerNode if self.nCorePerNode else n_core_per_node_from_queue + except AttributeError: + n_core_per_node = n_core_per_node_from_queue + + n_core_total = workspec.nCore if workspec.nCore else n_core_per_node + request_ram = max(workspec.minRamCount, 1 * n_core_total) if workspec.minRamCount else 1 * n_core_total + request_disk = workspec.maxDiskCount * 1024 if workspec.maxDiskCount else 1 + request_walltime = workspec.maxWalltime if workspec.maxWalltime else 0 + + n_node = ceil(n_core_total / n_core_per_node) + request_ram_bytes = request_ram * 2 ** 20 + request_ram_per_core = ceil(request_ram * n_node / n_core_total) + request_ram_bytes_per_core = ceil(request_ram_bytes * n_node / n_core_total) + request_cputime = request_walltime * n_core_total + request_walltime_minute = ceil(request_walltime / 60) + request_cputime_minute = ceil(request_cputime / 60) + + placeholder_map = { + 'nCorePerNode': n_core_per_node, + 'nCoreTotal': n_core_total, + 'nNode': n_node, + 'requestRam': request_ram, + 'requestRamBytes': request_ram_bytes, + 'requestRamPerCore': request_ram_per_core, + 'requestRamBytesPerCore': request_ram_bytes_per_core, + 'requestDisk': request_disk, + 'requestWalltime': request_walltime, + 'requestWalltimeMinute': request_walltime_minute, + 'requestCputime': request_cputime, + 'requestCputimeMinute': request_cputime_minute, + 'accessPoint': workspec.accessPoint, + 'harvesterID': harvester_config.master.harvester_id, + 'workerID': workspec.workerID, + 'computingSite': workspec.computingSite, + 'pandaQueueName': panda_queue_name, + # 'x509UserProxy': x509_user_proxy, + 'logDir': self.logDir, + 'logSubDir': os.path.join(self.logDir, timeNow.strftime('%y-%m-%d_%H')), + 'jobType': workspec.jobType, + 'tokenDir': self.tokenDir, + 'tokenName': self.tokenName, + 'tokenOrigin': self.tokenOrigin, + 'submitMode': self.submitMode + } + return placeholder_map + # make batch script def make_batch_script(self, workspec): # template for batch script @@ -83,11 +142,8 @@ def make_batch_script(self, workspec): tmpFile.close() del tmpFile tmpFile = tempfile.NamedTemporaryFile(delete=False, suffix='_submit.sh', dir=workspec.get_access_point()) - tmpFile.write(six.b(self.template.format(nCorePerNode=self.nCorePerNode, - nNode=workspec.nCore // self.nCorePerNode, - accessPoint=workspec.accessPoint, - workerID=workspec.workerID)) - ) + placeholder = self.make_placeholder_map(workspec) + tmpFile.write(six.b(self.template.format(**placeholder))) tmpFile.close() # set execution bit and group permissions on the temp file