diff --git a/docs/source/UserGuide/WebPortalReference/PilotMonitor/index.rst b/docs/source/UserGuide/WebPortalReference/PilotMonitor/index.rst index 35cf86f8da1..a61627e05bd 100644 --- a/docs/source/UserGuide/WebPortalReference/PilotMonitor/index.rst +++ b/docs/source/UserGuide/WebPortalReference/PilotMonitor/index.rst @@ -90,9 +90,6 @@ The following columns are provided: **Benchmark** Estimation of the power of the Worker Node CPU which is running the Pilot Job. If 0, the estimation was not possible. -**TaskQueueID** - Internal DIRAC WMS identifier of the Task Queue for which the Pilot Job is sent. - **PilotID** Internal DIRAC WMS Pilot Job identifier diff --git a/src/DIRAC/ConfigurationSystem/Client/Helpers/Resources.py b/src/DIRAC/ConfigurationSystem/Client/Helpers/Resources.py index 983062fabf2..990e9009c94 100644 --- a/src/DIRAC/ConfigurationSystem/Client/Helpers/Resources.py +++ b/src/DIRAC/ConfigurationSystem/Client/Helpers/Resources.py @@ -252,10 +252,7 @@ def getQueues(siteList=None, ceList=None, ceTypeList=None, community=None, tags= for site in sites: if siteList and site not in siteList: continue - if community: - comList = gConfig.getValue(f"/Resources/Sites/{grid}/{site}/VO", []) - if comList and community.lower() not in [cl.lower() for cl in comList]: - continue + siteCEParameters = {} result = gConfig.getOptionsDict(f"/Resources/Sites/{grid}/{site}/CEs") if result["OK"]: @@ -272,10 +269,7 @@ def getQueues(siteList=None, ceList=None, ceTypeList=None, community=None, tags= continue if ceList and ce not in ceList: continue - if community: - comList = gConfig.getValue(f"/Resources/Sites/{grid}/{site}/CEs/{ce}/VO", []) - if comList and community.lower() not in [cl.lower() for cl in comList]: - continue + ceOptionsDict = dict(siteCEParameters) result = gConfig.getOptionsDict(f"/Resources/Sites/{grid}/{site}/CEs/{ce}") if not result["OK"]: @@ -287,9 +281,23 @@ def getQueues(siteList=None, ceList=None, ceTypeList=None, community=None, tags= queues = result["Value"] for queue in queues: if community: - comList = gConfig.getValue(f"/Resources/Sites/{grid}/{site}/CEs/{ce}/Queues/{queue}/VO", []) + # Community can be defined on site, CE or queue level + paths = [ + f"/Resources/Sites/{grid}/{site}/CEs/{ce}/Queues/{queue}/VO", + f"/Resources/Sites/{grid}/{site}/CEs/{ce}/VO", + f"/Resources/Sites/{grid}/{site}/VO", + ] + + # Try each path in order, stopping when we find a non-empty list + for path in paths: + comList = gConfig.getValue(path, []) + if comList: + break + + # If we found a list and the community is not in it, skip this iteration if comList and community.lower() not in [cl.lower() for cl in comList]: continue + if tags: queueTags = gConfig.getValue(f"/Resources/Sites/{grid}/{site}/CEs/{ce}/Queues/{queue}/Tag", []) queueTags = set(ceTags + queueTags) diff --git a/src/DIRAC/Interfaces/scripts/dirac_admin_get_job_pilots.py b/src/DIRAC/Interfaces/scripts/dirac_admin_get_job_pilots.py index cc6701d2af4..00b5304c43e 100755 --- a/src/DIRAC/Interfaces/scripts/dirac_admin_get_job_pilots.py +++ b/src/DIRAC/Interfaces/scripts/dirac_admin_get_job_pilots.py @@ -16,8 +16,7 @@ 'PilotJobReference': 'https://marlb.in2p3.fr:9000/biq6KT45Q', 'PilotStamp': '', 'Status': 'Done', - 'SubmissionTime': datetime.datetime(2011, 2, 21, 12, 27, 52), - 'TaskQueueID': 399L}} + 'SubmissionTime': datetime.datetime(2011, 2, 21, 12, 27, 52)}} """ from DIRAC.Core.Base.Script import Script diff --git a/src/DIRAC/Interfaces/scripts/dirac_admin_get_pilot_info.py b/src/DIRAC/Interfaces/scripts/dirac_admin_get_pilot_info.py index b3cbaa7cd5c..224606957fc 100755 --- a/src/DIRAC/Interfaces/scripts/dirac_admin_get_pilot_info.py +++ b/src/DIRAC/Interfaces/scripts/dirac_admin_get_pilot_info.py @@ -15,8 +15,7 @@ 'PilotJobReference': 'https://marlb.in2p3.fr:9000/2KHFrQjkw', 'PilotStamp': '', 'Status': 'Done', - 'SubmissionTime': datetime.datetime(2011, 2, 21, 12, 27, 52), - 'TaskQueueID': 399L}} + 'SubmissionTime': datetime.datetime(2011, 2, 21, 12, 27, 52)}} """ from DIRAC.Core.Base.Script import Script diff --git a/src/DIRAC/MonitoringSystem/Client/Types/PilotsHistory.py b/src/DIRAC/MonitoringSystem/Client/Types/PilotsHistory.py index 1aa3ac06e60..ff101940541 100644 --- a/src/DIRAC/MonitoringSystem/Client/Types/PilotsHistory.py +++ b/src/DIRAC/MonitoringSystem/Client/Types/PilotsHistory.py @@ -17,7 +17,7 @@ def __init__(self): super().__init__() - self.keyFields = ["TaskQueueID", "GridSite", "GridType", "Status"] + self.keyFields = ["GridSite", "GridType", "Status"] self.monitoringFields = ["NumOfPilots"] @@ -25,7 +25,6 @@ def __init__(self): self.addMapping( { - "TaskQueueID": {"type": "keyword"}, "GridSite": {"type": "keyword"}, "GridType": {"type": "keyword"}, "Status": {"type": "keyword"}, diff --git a/src/DIRAC/Resources/Computing/AREXComputingElement.py b/src/DIRAC/Resources/Computing/AREXComputingElement.py index 524d9420392..3d8622d5e66 100755 --- a/src/DIRAC/Resources/Computing/AREXComputingElement.py +++ b/src/DIRAC/Resources/Computing/AREXComputingElement.py @@ -618,7 +618,11 @@ def getCEStatus(self): self.log.error("Failed getting the status of the CE.", result["Message"]) return S_ERROR("Failed getting the status of the CE") response = result["Value"] - ceData = response.json() + try: + ceData = response.json() + except requests.JSONDecodeError: + self.log.exception("Failed decoding the status of the CE") + return S_ERROR(f"Failed decoding the status of the CE") # Look only in the relevant section out of the headache queueInfo = ceData["Domains"]["AdminDomain"]["Services"]["ComputingService"]["ComputingShare"] diff --git a/src/DIRAC/Resources/Computing/CloudComputingElement.py b/src/DIRAC/Resources/Computing/CloudComputingElement.py index 0559b2ff95d..e98f9b5aec9 100644 --- a/src/DIRAC/Resources/Computing/CloudComputingElement.py +++ b/src/DIRAC/Resources/Computing/CloudComputingElement.py @@ -355,7 +355,7 @@ def _getMetadata(self, executableFile, pilotStamp=""): template = yaml.safe_load(template_fd) for filedef in template["write_files"]: if filedef["content"] == "PROXY_STR": - filedef["content"] = self.proxy + filedef["content"] = self.proxy.dumpAllToString()["Value"] elif filedef["content"] == "EXECUTABLE_STR": filedef["content"] = exe_str elif "STAMP_STR" in filedef["content"]: @@ -398,11 +398,7 @@ def _renewCloudProxy(self): if not res["OK"]: self.log.error("Could not download proxy", res["Message"]) return False - resdump = res["Value"].dumpAllToString() - if not resdump["OK"]: - self.log.error("Failed to dump proxy to string", resdump["Message"]) - return False - self.proxy = resdump["Value"] + self.proxy = res["Value"] self.valid = datetime.datetime.utcnow() + proxyLifetime * datetime.timedelta(seconds=1) return True diff --git a/src/DIRAC/Resources/Computing/ComputingElement.py b/src/DIRAC/Resources/Computing/ComputingElement.py index b8d07072abb..219c0bd6bed 100755 --- a/src/DIRAC/Resources/Computing/ComputingElement.py +++ b/src/DIRAC/Resources/Computing/ComputingElement.py @@ -263,6 +263,7 @@ def available(self): self.log.verbose("Max Number of Jobs:", maxTotalJobs) self.log.verbose("Max Waiting Jobs:", maxWaitingJobs) + result["CEInfoDict"] = ceInfoDict # If we reached the maximum number of total jobs, then the CE is not available totalJobs = runningJobs + waitingJobs if totalJobs >= maxTotalJobs: @@ -283,8 +284,6 @@ def available(self): if availableProcessors is not None: additionalJobs = min(additionalJobs, availableProcessors) result["Value"] = additionalJobs - - result["CEInfoDict"] = ceInfoDict return result ############################################################################# diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py b/src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py index 069d8126352..8c9af36ca58 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py @@ -9,9 +9,9 @@ """ import datetime import os -import socket from collections import defaultdict from concurrent.futures import ThreadPoolExecutor, as_completed +from typing import Any import DIRAC from DIRAC import S_ERROR, S_OK, gConfig @@ -24,6 +24,8 @@ from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations from DIRAC.ConfigurationSystem.Client.Helpers.Resources import getCESiteMapping, getQueues from DIRAC.Core.Base.AgentModule import AgentModule +from DIRAC.Core.Security import X509Chain +from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader from DIRAC.Core.Utilities.TimeUtilities import second, toEpochMilliSeconds from DIRAC.FrameworkSystem.Client.ProxyManagerClient import gProxyManager from DIRAC.FrameworkSystem.Client.TokenManagerClient import gTokenManager @@ -42,14 +44,10 @@ pilotWrapperScript, ) from DIRAC.WorkloadManagementSystem.Utilities.QueueUtilities import getQueuesResolved +from DIRAC.WorkloadManagementSystem.Utilities.SubmissionPolicy import WAITING_SUPPORTED_JOBS MAX_PILOTS_TO_SUBMIT = 100 -# Submission policies -AGGRESSIVE_FILLING = "AggressiveFilling" -WAITING_SUPPORTED_JOBS = "WaitingSupportedJobs" -SUBMISSION_POLICIES = [AGGRESSIVE_FILLING, WAITING_SUPPORTED_JOBS] - class SiteDirector(AgentModule): """SiteDirector class provides an implementation of a DIRAC agent. @@ -135,7 +133,6 @@ def beginExecution(self): result = self._loadSubmissionPolicy() if not result: return result - self.submissionPolicy = result["Value"]() # Flags self.sendAccounting = self.am_getOption("SendPilotAccounting", self.sendAccounting) @@ -192,14 +189,22 @@ def _loadSubmissionPolicy(self): """Load a submission policy""" objectLoader = ObjectLoader() result = objectLoader.loadObject( - f"WorkloadManagementSystem.Agent.SiteDirector.{self.submissionPolicyName}", self.submissionPolicyName + "WorkloadManagementSystem.Utilities.SubmissionPolicy", f"{self.submissionPolicyName}Policy" ) if not result["OK"]: self.log.error(f"Failed to load submission policy: {result['Message']}") return result - return S_OK(result["Value"]) - def _buildQueueDict(self, siteNames, ces, ceTypes, tags): + self.submissionPolicy = result["Value"]() + return S_OK() + + def _buildQueueDict( + self, + siteNames: list[str] | None = None, + ces: list[str] | None = None, + ceTypes: list[str] | None = None, + tags: list[str] | None = None, + ): """Build the queueDict dictionary containing information about the queues that will be provisioned""" # Get details about the resources result = getQueues(community=self.vo, siteList=siteNames, ceList=ces, ceTypeList=ceTypes, tags=tags) @@ -210,6 +215,7 @@ def _buildQueueDict(self, siteNames, ces, ceTypes, tags): result = getQueuesResolved( siteDict=result["Value"], queueCECache=self.queueCECache, + vo=self.vo, instantiateCEs=True, ) if not result["OK"]: @@ -260,15 +266,14 @@ def execute(self): It basically just submits pilots and gets their status """ - result = self.submitPilots() - if not result["OK"]: - self.log.error("Errors in the pilot submission:", result["Message"]) - return result + submissionResult = self.submitPilots() + monitoringResult = self.monitorPilots() - result = self.monitorPilots() - if not result["OK"]: - self.log.error("Errors in pilot monitoring:", result["Message"]) - return result + if not submissionResult["OK"]: + return submissionResult + + if not monitoringResult["OK"]: + return monitoringResult return S_OK() @@ -297,7 +302,7 @@ def submitPilots(self): return S_OK() - def _submitPilotsPerQueue(self, queueName): + def _submitPilotsPerQueue(self, queueName: str): """Submit pilots within a given computing elements :return: S_OK/S_ERROR @@ -326,6 +331,13 @@ def _submitPilotsPerQueue(self, queueName): # Get CE instance ce = self.queueDict[queueName]["CE"] + # Set credentials + cpuTime = queueCPUTime + 86400 + result = self._setCredentials(ce, cpuTime) + if not result["OK"]: + self.log.error("Failed to set credentials:", result["Message"]) + return result + # Get the number of available slots on the target site/queue totalSlots = self._getQueueSlots(queueName) if totalSlots <= 0: @@ -334,18 +346,11 @@ def _submitPilotsPerQueue(self, queueName): self.log.info(f"{queueName}: to submit={totalSlots}") # Apply the submission policy - totalSlots = self.submissionPolicy.apply(totalSlots) + totalSlots = self.submissionPolicy.apply(totalSlots, ceParameters=self.queueDict[queueName]["CE"].ceParameters) # Limit the number of pilots to submit to self.maxPilotsToSubmit pilotsToSubmit = min(self.maxPilotsToSubmit, totalSlots) - # Set credentials - cpuTime = queueCPUTime + 86400 - result = self._setCredentials(ce, cpuTime) - if not result["OK"]: - self.log.error("Failed to set credentials:", result["Message"]) - return result - # Now really submitting result = self._submitPilotsToQueue(pilotsToSubmit, ce, queueName) if not result["OK"]: @@ -354,7 +359,7 @@ def _submitPilotsPerQueue(self, queueName): pilotList, stampDict = result["Value"] # updating the pilotAgentsDB... done by default but maybe not strictly necessary - result = self._addPilotReference(queueName, pilotList, stampDict) + result = self._addPilotReferences(queueName, pilotList, stampDict) if not result["OK"]: return result @@ -362,7 +367,7 @@ def _submitPilotsPerQueue(self, queueName): self.log.info("Total number of pilots submitted in this cycle", f"{len(pilotList)} to {queueName}") return S_OK() - def _getQueueSlots(self, queue): + def _getQueueSlots(self, queue: str): """Get the number of available slots in the queue""" ce = self.queueDict[queue]["CE"] ceName = self.queueDict[queue]["CEName"] @@ -411,14 +416,12 @@ def _getQueueSlots(self, queue): totalSlots = min((maxTotalJobs - totalJobs), (maxWaitingJobs - waitingJobs)) return totalSlots - def _submitPilotsToQueue(self, pilotsToSubmit, ce, queue): + def _submitPilotsToQueue(self, pilotsToSubmit: int, ce: ComputingElement, queue: str): """Method that really submits the pilots to the ComputingElements' queue :param pilotsToSubmit: number of pilots to submit. - :type pilotsToSubmit: int :param ce: computing element object to where we submit - :type ce: ComputingElement - :param str queue: queue where to submit + :param queue: queue where to submit :return: S_OK/S_ERROR. If S_OK, returns tuple with (pilotList, stampDict) @@ -513,18 +516,14 @@ def _submitPilotsToQueue(self, pilotsToSubmit, ce, queue): return S_OK((pilotList, stampDict)) - def _addPilotReference(self, queue, pilotList, stampDict): + def _addPilotReferences(self, queue: str, pilotList: list[str], stampDict: dict[str, str]): """Add pilotReference to pilotAgentsDB - :param str queue: the queue name + :param queue: the queue name :param pilotList: list of pilots - :type pilotList: list :param stampDict: dictionary of pilots timestamps - :type stampDict: dict - - :return: None """ - result = self.pilotAgentsDB.addPilotReference( + result = self.pilotAgentsDB.addPilotReferences( pilotList, self.pilotGroup, self.queueDict[queue]["CEType"], @@ -548,18 +547,19 @@ def _addPilotReference(self, queue, pilotList, stampDict): return result return S_OK() - def _getExecutable(self, queue, proxy=None, jobExecDir="", envVariables=None, **kwargs): + def _getExecutable( + self, queue: str, proxy: X509Chain = None, jobExecDir: str = "", envVariables: dict[str, str] = None + ): """Prepare the full executable for queue - :param str queue: queue name - :param bool proxy: flag that say if to bundle or not the proxy - :param str jobExecDir: pilot execution dir (normally an empty string) + :param queue: queue name + :param proxy: flag that say if to bundle or not the proxy + :param jobExecDir: pilot execution dir (normally an empty string) :returns: a string the options for the pilot - :rtype: str """ - pilotOptions = self._getPilotOptions(queue, **kwargs) + pilotOptions = self._getPilotOptions(queue) if not pilotOptions: self.log.warn("Pilots will be submitted without additional options") pilotOptions = [] @@ -584,13 +584,12 @@ def _getExecutable(self, queue, proxy=None, jobExecDir="", envVariables=None, ** ) return executable - def _getPilotOptions(self, queue, **kwargs): + def _getPilotOptions(self, queue: str) -> list[str]: """Prepare pilot options - :param str queue: queue name + :param queue: queue name :returns: pilotOptions is a list of strings, each one is an option to the dirac-pilot script invocation - :rtype: list """ queueDict = self.queueDict[queue]["ParametersDict"] pilotOptions = [] @@ -663,16 +662,22 @@ def _getPilotOptions(self, queue, **kwargs): return pilotOptions - def _writePilotScript(self, workingDirectory, pilotOptions, proxy=None, pilotExecDir="", envVariables=None): + def _writePilotScript( + self, + workingDirectory: str, + pilotOptions: str, + proxy: X509Chain = None, + pilotExecDir: str = "", + envVariables: dict[str, str] = None, + ): """Bundle together and write out the pilot executable script, admix the proxy if given - :param str workingDirectory: pilot wrapper working directory - :param str pilotOptions: options with which to start the pilot - :param str proxy: proxy file we are going to bundle - :param str pilotExecDir: pilot executing directory + :param workingDirectory: pilot wrapper working directory + :param pilotOptions: options with which to start the pilot + :param proxy: proxy file we are going to bundle + :param pilotExecDir: pilot executing directory :returns: file name of the pilot wrapper created - :rtype: str """ try: @@ -717,17 +722,16 @@ def monitorPilots(self): return S_OK() - def _monitorPilotsPerQueue(self, queueName): + def _monitorPilotsPerQueue(self, queue: str): """Update status of pilots in transient state for a given queue :param queue: queue name - :param proxy: proxy to check the pilot status and renewals """ - ce = self.queueDict[queueName]["CE"] - ceName = self.queueDict[queueName]["CEName"] - queueName = self.queueDict[queueName]["QueueName"] - ceType = self.queueDict[queueName]["CEType"] - siteName = self.queueDict[queueName]["Site"] + ce = self.queueDict[queue]["CE"] + ceName = self.queueDict[queue]["CEName"] + queueName = self.queueDict[queue]["QueueName"] + ceType = self.queueDict[queue]["CEType"] + siteName = self.queueDict[queue]["Site"] # Select pilots in a transient states result = self.pilotAgentsDB.selectPilots( @@ -769,11 +773,14 @@ def _monitorPilotsPerQueue(self, queueName): return result pilotCEDict = result["Value"] - # Update pilot status in DB - abortedPilots = self._updatePilotStatus(pilotRefs, pilotDict, pilotCEDict) + # Get updated pilots + updatedPilots = self._getUpdatedPilotStatus(pilotDict, pilotCEDict) # If something wrong in the queue, make a pause for the job submission + abortedPilots = self._getAbortedPilots(updatedPilots) if abortedPilots: - self.failedQueues[queueName] += 1 + self.failedQueues[queue] += 1 + # Update the status of the pilots in the DB + self._updatePilotsInDB(updatedPilots) # FIXME: seems like it is only used by the CloudCE? Couldn't it be called from CloudCE.getJobStatus()? if callable(getattr(ce, "cleanupPilots", None)): @@ -812,46 +819,43 @@ def _monitorPilotsPerQueue(self, queueName): return S_OK() - def _updatePilotStatus(self, pilotRefs, pilotDict, pilotCEDict): - """Really updates the pilots status - - :return: number of aborted pilots, flag for getting the pilot output - """ - abortedPilots = 0 + def _getUpdatedPilotStatus(self, pilotDict: dict[str, Any], pilotCEDict: dict[str, Any]) -> dict[str, str]: + """Get the updated pilots, from a list of pilots, and their new status""" + updatedPilots = {} + for pilotReference, pilotInfo in pilotDict.items(): + oldStatus = pilotInfo["Status"] + sinceLastUpdate = datetime.datetime.utcnow() - pilotInfo["LastUpdateTime"] + ceStatus = pilotCEDict.get(pilotReference, oldStatus) + + if oldStatus != ceStatus: + # Normal case: update the pilot status to the new value + updatedPilots[pilotReference] = ceStatus + continue - for pRef in pilotRefs: - newStatus = "" - oldStatus = pilotDict[pRef]["Status"] - lastUpdateTime = pilotDict[pRef]["LastUpdateTime"] - sinceLastUpdate = datetime.datetime.utcnow() - lastUpdateTime + if oldStatus == ceStatus and ceStatus == PilotStatus.UNKNOWN and sinceLastUpdate > 3600 * second: + # Pilots are not reachable, we mark them as aborted + updatedPilots[pilotReference] = PilotStatus.ABORTED + continue - ceStatus = pilotCEDict.get(pRef, oldStatus) + return updatedPilots - if oldStatus == ceStatus and ceStatus != PilotStatus.UNKNOWN: - # Normal status did not change, continue - continue - if ceStatus == oldStatus == PilotStatus.UNKNOWN: - if sinceLastUpdate < 3600 * second: - # Allow 1 hour of Unknown status assuming temporary problems on the CE - continue - newStatus = PilotStatus.ABORTED - elif ceStatus == PilotStatus.UNKNOWN and oldStatus not in PilotStatus.PILOT_FINAL_STATES: - # Possible problems on the CE, let's keep the Unknown status for a while - newStatus = PilotStatus.UNKNOWN - elif ceStatus != PilotStatus.UNKNOWN: - # Update the pilot status to the new value - newStatus = ceStatus - - if newStatus: - self.log.info("Updating status", f"to {newStatus} for pilot {pRef}") - result = self.pilotAgentsDB.setPilotStatus(pRef, newStatus, "", "Updated by SiteDirector") - if not result["OK"]: - self.log.error(result["Message"]) - if newStatus == "Aborted": - abortedPilots += 1 + def _getAbortedPilots(self, pilotsDict: dict[str, str]) -> list[str]: + """Get aborted pilots from a list of pilots and their status""" + abortedPilots = [] + for pilotReference, status in pilotsDict.items(): + if status == PilotStatus.ABORTED: + abortedPilots.append(pilotReference) return abortedPilots + def _updatePilotsInDB(self, updatedPilotsDict: dict[str, str]): + """Update the status of the pilots in the DB""" + for pilotReference, newStatus in updatedPilotsDict.items(): + self.log.info("Updating status", f"to {newStatus} for pilot {pilotReference}") + result = self.pilotAgentsDB.setPilotStatus(pilotReference, newStatus, "", "Updated by SiteDirector") + if not result["OK"]: + self.log.error(result["Message"]) + ##################################################################################### def __getPilotToken(self, audience: str, scope: list[str] = None): @@ -923,7 +927,7 @@ def _setCredentials(self, ce: ComputingElement, proxyMinimumRequiredValidity: in ##################################################################################### - def _sendPilotAccounting(self, pilotDict): + def _sendPilotAccounting(self, pilotDict: dict[str, Any]): """Send pilot accounting record""" for pRef in pilotDict: self.log.verbose("Preparing accounting record", f"for pilot {pRef}") @@ -974,15 +978,17 @@ def _sendPilotAccounting(self, pilotDict): return S_OK() - def _sendPilotSubmissionAccounting(self, siteName, ceName, queueName, numTotal, numSucceeded, status): + def _sendPilotSubmissionAccounting( + self, siteName: str, ceName: str, queueName: str, numTotal: int, numSucceeded: int, status: str + ): """Send pilot submission accounting record - :param str siteName: Site name - :param str ceName: CE name - :param str queueName: queue Name - :param int numTotal: Total number of submission - :param int numSucceeded: Total number of submission succeeded - :param str status: 'Succeeded' or 'Failed' + :param siteName: Site name + :param ceName: CE name + :param queueName: queue Name + :param numTotal: Total number of submission + :param numSucceeded: Total number of submission succeeded + :param status: 'Succeeded' or 'Failed' :returns: S_OK / S_ERROR """ @@ -1011,15 +1017,17 @@ def _sendPilotSubmissionAccounting(self, siteName, ceName, queueName, numTotal, return result return S_OK() - def _sendPilotSubmissionMonitoring(self, siteName, ceName, queueName, numTotal, numSucceeded, status): + def _sendPilotSubmissionMonitoring( + self, siteName: str, ceName: str, queueName: str, numTotal: int, numSucceeded: int, status: str + ): """Sends pilot submission records to monitoring - :param str siteName: Site name - :param str ceName: CE name - :param str queueName: queue Name - :param int numTotal: Total number of submission - :param int numSucceeded: Total number of submission succeeded - :param str status: 'Succeeded' or 'Failed' + :param siteName: Site name + :param ceName: CE name + :param queueName: queue Name + :param numTotal: Total number of submission + :param numSucceeded: Total number of submission succeeded + :param status: 'Succeeded' or 'Failed' :returns: S_OK / S_ERROR """ diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/StatesAccountingAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/StatesAccountingAgent.py index 0d842dc1ea8..96582bab4f5 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/StatesAccountingAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/StatesAccountingAgent.py @@ -43,7 +43,7 @@ class StatesAccountingAgent(AgentModule): __renameFieldsMapping = {"JobType": "JobSplitType"} # PilotsHistory fields - __pilotsMapping = ["TaskQueueID", "GridSite", "GridType", "Status", "NumOfPilots"] + __pilotsMapping = ["GridSite", "GridType", "Status", "NumOfPilots"] def initialize(self): """Standard initialization""" diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_SiteDirector.py b/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_SiteDirector.py index ad343c7e89a..9d00e622243 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_SiteDirector.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_SiteDirector.py @@ -2,214 +2,271 @@ """ # pylint: disable=protected-access -# imports import datetime +import os import pytest -from unittest.mock import MagicMock +from diraccfg import CFG -from DIRAC import gLogger +from DIRAC import gLogger, gConfig +from DIRAC.ConfigurationSystem.Client import ConfigurationData +from DIRAC.ResourceStatusSystem.Client.SiteStatus import SiteStatus -# sut from DIRAC.WorkloadManagementSystem.Agent.SiteDirector import SiteDirector +from DIRAC.WorkloadManagementSystem.Client import PilotStatus +from DIRAC.WorkloadManagementSystem.Utilities.SubmissionPolicy import SUBMISSION_POLICIES -mockAM = MagicMock() -mockGCReply = MagicMock() -mockGCReply.return_value = "TestSetup" -mockOPSObject = MagicMock() -mockOPSObject.getValue.return_value = "123" -mockOPSReply = MagicMock() -mockOPSReply.return_value = "123" -mockOPS = MagicMock() -mockOPS.return_value = mockOPSObject -# mockOPS.Operations = mockOPSObject -mockPM = MagicMock() -mockPM.requestToken.return_value = {"OK": True, "Value": ("token", 1)} -mockPMReply = MagicMock() -mockPMReply.return_value = {"OK": True, "Value": ("token", 1)} - -mockCSGlobalReply = MagicMock() -mockCSGlobalReply.return_value = "TestSetup" -mockResourcesReply = MagicMock() -mockResourcesReply.return_value = {"OK": True, "Value": ["x86_64-slc6", "x86_64-slc5"]} +CONFIG = """ +Resources +{ + Sites + { + LCG + { + LCG.Site1.com + { + VO = dteam + CEs + { + ce1.site1.com + { + architecture = x86_64 + OS = linux_AlmaLinux_9 + CEType = HTCondorCE + LocalCEType = Singularity + MaxRAM = 6974 + Queues + { + condor + { + MaxTotalJobs = 1000 + MaxWaitingJobs = 100 + maxCPUTime = 1152 + VO = dteam + NumberOfProcessors = 1 + } + } + Tag = Token + } + ce2.site1.com + { + architecture = x86_64 + OS = linux_AlmaLinux_9 + CEType = HTCondorCE + LocalCEType = Singularity + MaxRAM = 6974 + Queues + { + condor + { + MaxTotalJobs = 1000 + MaxWaitingJobs = 100 + maxCPUTime = 1152 + VO = dteam + NumberOfProcessors = 1 + } + } + } + } + } + LCG.Site2.site2 + { + CEs + { + ce1.site2.com + { + architecture = x86_64 + OS = linux_AlmaLinux_9 + CEType = HTCondorCE + LocalCEType = Singularity + MaxRAM = 6974 + Queues + { + condor + { + MaxTotalJobs = 1000 + MaxWaitingJobs = 100 + maxCPUTime = 1152 + VO = dteam + NumberOfProcessors = 1 + } + } + Tag = Token + } + } + } + } + DIRAC + { + DIRAC.Site3.site3 + { + CEs + { + ce1.site3.com + { + architecture = x86_64 + OS = linux_AlmaLinux_9 + CEType = HTCondorCE + LocalCEType = Singularity + MaxRAM = 6974 + Queues + { + condor + { + MaxTotalJobs = 1000 + MaxWaitingJobs = 100 + maxCPUTime = 1152 + VO = dteam + NumberOfProcessors = 1 + } + } + Tag = Token + } + } + } + } + } +} +""" -mockPilotAgentsDB = MagicMock() -mockPilotAgentsDB.setPilotStatus.return_value = {"OK": True} -gLogger.setLevel("DEBUG") +@pytest.fixture +def config(): + """Load a fake configuration""" + ConfigurationData.localCFG = CFG() + cfg = CFG() + cfg.loadFromBuffer(CONFIG) + gConfig.loadCFG(cfg) @pytest.fixture -def sd(mocker): - """mocker for SiteDirector""" +def sd(mocker, config): + """Basic configuration of a SiteDirector. It tests the _buildQueueDict() method at the same time""" mocker.patch("DIRAC.WorkloadManagementSystem.Agent.SiteDirector.AgentModule.__init__") - mocker.patch("DIRAC.WorkloadManagementSystem.Agent.SiteDirector.gConfig.getValue", side_effect=mockGCReply) - mocker.patch("DIRAC.WorkloadManagementSystem.Agent.SiteDirector.Operations", side_effect=mockOPS) + mocker.patch("DIRAC.WorkloadManagementSystem.Agent.SiteDirector.Operations.getValue", return_value="123") + mocker.patch("DIRAC.WorkloadManagementSystem.Agent.SiteDirector.getPilotAgentsDB") + + usableSites = ( + gConfig.getSections("Resources/Sites/LCG")["Value"] + gConfig.getSections("Resources/Sites/DIRAC")["Value"] + ) mocker.patch( - "DIRAC.WorkloadManagementSystem.Agent.SiteDirector.gProxyManager.requestToken", side_effect=mockPMReply + "DIRAC.WorkloadManagementSystem.Agent.SiteDirector.SiteStatus.getUsableSites", return_values=usableSites ) - mocker.patch("DIRAC.WorkloadManagementSystem.Agent.SiteDirector.AgentModule", side_effect=mockAM) sd = SiteDirector() + + # Set logger sd.log = gLogger - sd.am_getOption = mockAM sd.log.setLevel("DEBUG") - sd.rpcMatcher = MagicMock() - sd.rssClient = MagicMock() - sd.pilotAgentsDB = MagicMock() + + # Set basic parameters sd.workingDirectory = "" - sd.queueDict = { - "aQueue": { - "Site": "LCG.CERN.cern", - "CEName": "aCE", - "CEType": "SSH", - "QueueName": "aQueue", - "ParametersDict": { - "CPUTime": 12345, - "Community": "lhcb", - "OwnerGroup": ["lhcb_user"], - "Setup": "LHCb-Production", - "Site": "LCG.CERN.cern", - }, - } - } + + # Set VO + sd.vo = "dteam" + + # Set queueDict + sd.siteClient = SiteStatus() + sd._buildQueueDict() return sd -def test__getPilotOptions(sd): - """Testing SiteDirector()._getPilotOptions()""" - res = sd._getPilotOptions("aQueue") - assert {"-S TestSetup", "-V 123", "-l 123", "-n LCG.CERN.cern"} <= set(res) - - -@pytest.mark.parametrize( - "mockMatcherReturnValue, expected, anyExpected, sitesExpected", - [ - ({"OK": False, "Message": "boh"}, False, True, set()), - ({"OK": True, "Value": None}, False, True, set()), - ({"OK": True, "Value": {"1": {"Jobs": 10}, "2": {"Jobs": 20}}}, True, True, set()), - ({"OK": True, "Value": {"1": {"Jobs": 10, "Sites": ["Site1"]}, "2": {"Jobs": 20}}}, True, True, {"Site1"}), - ( - {"OK": True, "Value": {"1": {"Jobs": 10, "Sites": ["Site1", "Site2"]}, "2": {"Jobs": 20}}}, - True, - True, - {"Site1", "Site2"}, - ), - ( - { - "OK": True, - "Value": {"1": {"Jobs": 10, "Sites": ["Site1", "Site2"]}, "2": {"Jobs": 20, "Sites": ["Site1"]}}, - }, - True, - False, - {"Site1", "Site2"}, - ), - ( - { - "OK": True, - "Value": {"1": {"Jobs": 10, "Sites": ["Site1", "Site2"]}, "2": {"Jobs": 20, "Sites": ["ANY"]}}, - }, - True, - False, - {"Site1", "Site2", "ANY"}, - ), - ( - { - "OK": True, - "Value": {"1": {"Jobs": 10, "Sites": ["Site1", "Site2"]}, "2": {"Jobs": 20, "Sites": ["ANY", "Site3"]}}, - }, - True, - False, - {"Site1", "Site2", "Site3", "ANY"}, - ), - ( - { - "OK": True, - "Value": {"1": {"Jobs": 10, "Sites": ["Site1", "Site2"]}, "2": {"Jobs": 20, "Sites": ["Any", "Site3"]}}, - }, - True, - False, - {"Site1", "Site2", "Site3", "Any"}, - ), - ( - { - "OK": True, - "Value": { - "1": {"Jobs": 10, "Sites": ["Site1", "Site2"]}, - "2": {"Jobs": 20, "Sites": ["NotAny", "Site2"]}, - }, - }, - True, - False, - {"Site1", "Site2", "NotAny"}, - ), - ], -) -def test__ifAndWhereToSubmit(sd, mockMatcherReturnValue, expected, anyExpected, sitesExpected): - """Testing SiteDirector()._ifAndWhereToSubmit()""" - sd.matcherClient = MagicMock() - sd.matcherClient.getMatchingTaskQueues.return_value = mockMatcherReturnValue - res = sd._ifAndWhereToSubmit() - assert res[0] == expected - if res[0]: - assert res == (expected, anyExpected, sitesExpected, set()) - - -def test__allowedToSubmit(sd): - """Testing SiteDirector()._allowedToSubmit()""" - submit = sd._allowedToSubmit("aQueue", True, {"LCG.CERN.cern"}, set()) - assert submit is False - - sd.siteMaskList = ["LCG.CERN.cern", "DIRAC.CNAF.it"] - submit = sd._allowedToSubmit("aQueue", True, {"LCG.CERN.cern"}, set()) - assert submit is True - - sd.rssFlag = True - submit = sd._allowedToSubmit("aQueue", True, {"LCG.CERN.cern"}, set()) - assert submit is False - - sd.ceMaskList = ["aCE", "anotherCE"] - submit = sd._allowedToSubmit("aQueue", True, {"LCG.CERN.cern"}, set()) - assert submit is True - - -def test__submitPilotsToQueue(sd): - """Testing SiteDirector()._submitPilotsToQueue()""" - # Create a MagicMock that does not have the workingDirectory - # attribute (https://cpython-test-docs.readthedocs.io/en/latest/library/unittest.mock.html#deleting-attributes) - # This is to use the SiteDirector's working directory, not the CE one - ceMock = MagicMock() - del ceMock.workingDirectory - - sd.queueCECache = {"aQueue": {"CE": ceMock}} - sd.queueSlots = {"aQueue": {"AvailableSlots": 10}} - assert sd._submitPilotsToQueue(1, MagicMock(), "aQueue")["OK"] - - -@pytest.mark.parametrize( - "pilotRefs, pilotDict, pilotCEDict, expected", - [ - ([], {}, {}, (0, [])), - ( - ["aPilotRef"], - {"aPilotRef": {"Status": "Running", "LastUpdateTime": datetime.datetime(2000, 1, 1).utcnow()}}, - {}, - (0, []), - ), - ( - ["aPilotRef"], - {"aPilotRef": {"Status": "Running", "LastUpdateTime": datetime.datetime(2000, 1, 1).utcnow()}}, - {"aPilotRef": "Running"}, - (0, []), - ), - ( - ["aPilotRef"], - {"aPilotRef": {"Status": "Running", "LastUpdateTime": datetime.datetime(2000, 1, 1).utcnow()}}, - {"aPilotRef": "Unknown"}, - (0, []), - ), - ], -) -def test__updatePilotStatus(sd, pilotRefs, pilotDict, pilotCEDict, expected): - """Testing SiteDirector()._updatePilotStatus()""" - res = sd._updatePilotStatus(pilotRefs, pilotDict, pilotCEDict) - assert res == expected +@pytest.fixture(scope="session") +def pilotWrapperDirectory(tmp_path_factory): + """Create a temporary directory""" + fn = tmp_path_factory.mktemp("pilotWrappers") + return fn + + +def test_loadSubmissionPolicy(sd): + """Load each submission policy and call it""" + for submissionPolicyName in SUBMISSION_POLICIES: + # Load the submission policy + sd.submissionPolicyName = submissionPolicyName + res = sd._loadSubmissionPolicy() + assert res["OK"] + + # Call the submission policy with predefined parameters + targetQueue = "ce1.site1.com_condor" + res = sd.submissionPolicy.apply(50, ceParameters=sd.queueDict[targetQueue]["CE"].ceParameters) + assert res >= 0 and res <= 50 + + +def test_getPilotWrapper(mocker, sd, pilotWrapperDirectory): + """Get pilot options for a specific queue and check the result, then generate the pilot wrapper""" + mocker.patch("DIRAC.WorkloadManagementSystem.Agent.SiteDirector.gConfig.getValue", return_value="TestSetup") + + # Get pilot options + pilotOptions = sd._getPilotOptions("ce1.site1.com_condor") + assert { + "--preinstalledEnv=123", + "--pythonVersion=3", + "--wnVO=dteam", + "-n LCG.Site1.com", + "-N ce1.site1.com", + "-Q condor", + "-S TestSetup", + "-V 123", + "-l 123", + "-e 1,2,3", + } == set(pilotOptions) + + # Write pilot script + res = sd._writePilotScript(pilotWrapperDirectory, pilotOptions) + + # Make sure the file exists + assert os.path.exists(res) and os.path.isfile(res) + + +def test_updatePilotStatus(sd): + """Updating the status of some fake pilot references""" + # 1. We have not submitted any pilots, there is nothing to update + pilotDict = {} + pilotCEDict = {} + res = sd._getUpdatedPilotStatus(pilotDict, pilotCEDict) + assert not res + + res = sd._getAbortedPilots(res) + assert not res + + # 2. We just submitted a pilot, the remote system has not had the time to register the pilot + pilotDict["pilotRef1"] = {"Status": PilotStatus.SUBMITTED, "LastUpdateTime": datetime.datetime.utcnow()} + pilotCEDict = {} + res = sd._getUpdatedPilotStatus(pilotDict, pilotCEDict) + assert not res + + res = sd._getAbortedPilots(res) + assert not res + + # 3. The pilot is now registered + pilotCEDict["pilotRef1"] = PilotStatus.SUBMITTED + res = sd._getUpdatedPilotStatus(pilotDict, pilotCEDict) + assert not res + + res = sd._getAbortedPilots(res) + assert not res + + # 4. The pilot waits in the queue of the remote CE + pilotCEDict["pilotRef1"] = PilotStatus.WAITING + res = sd._getUpdatedPilotStatus(pilotDict, pilotCEDict) + assert res == {"pilotRef1": PilotStatus.WAITING} + + res = sd._getAbortedPilots(res) + assert not res + pilotDict["pilotRef1"]["Status"] = PilotStatus.WAITING + + # 5. CE issue: the pilot status becomes unknown + pilotCEDict["pilotRef1"] = PilotStatus.UNKNOWN + res = sd._getUpdatedPilotStatus(pilotDict, pilotCEDict) + assert res == {"pilotRef1": PilotStatus.UNKNOWN} + + res = sd._getAbortedPilots(res) + assert not res + pilotDict["pilotRef1"]["Status"] = PilotStatus.UNKNOWN + + # 6. Engineers do not manage to fix the issue, the CE is still under maintenance + pilotDict["pilotRef1"]["LastUpdateTime"] = datetime.datetime.utcnow() - datetime.timedelta(seconds=3610) + res = sd._getUpdatedPilotStatus(pilotDict, pilotCEDict) + assert res == {"pilotRef1": PilotStatus.ABORTED} + + res = sd._getAbortedPilots(res) + assert res == ["pilotRef1"] diff --git a/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg b/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg index fbbfb60f4bf..b8e84a85e2f 100644 --- a/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg +++ b/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg @@ -282,49 +282,33 @@ Agents VO = # VO treated (leave empty for auto-discovery) Community = - # Grid Environment (leave empty for auto-discovery) - GridEnv = # the DN of the certificate proxy used to submit pilots. If not found here, what is in Operations/Pilot section of the CS will be used PilotDN = # the group of the certificate proxy used to submit pilots. If not found here, what is in Operations/Pilot section of the CS will be used PilotGroup = - # List of sites that will be treated by this SiteDirector ("any" can refer to any Site defined in the CS) - Site = any - # List of CE types that will be treated by this SiteDirector ("any" can refer to any CE defined in the CS) - CETypes = any - # List of CEs that will be treated by this SiteDirector ("any" can refer to any type of CE defined in the CS) - CEs = any + # List of sites that will be treated by this SiteDirector (No value can refer to any Site defined in the CS) + Site = + # List of CEs that will be treated by this SiteDirector (No value can refer to any CE defined in the CS) + CEs = + # List of CE types that will be treated by this SiteDirector (No value can refer to any type of CE defined in the CS) + CETypes = # List of Tags that are required to be present in the CE/Queue definition Tags = + # How many cycles to skip if queue is not working + FailedQueueCycleFactor = 10 # The maximum length of a queue (in seconds). Default: 3 days MaxQueueLength = 259200 - # Log level of the pilots - PilotLogLevel = INFO # Max number of pilots to submit per cycle MaxPilotsToSubmit = 100 - # Check, or not, for the waiting pilots already submitted - PilotWaitingFlag = True - # How many cycels to skip if queue is not working - FailedQueueCycleFactor = 10 - # Every N cycles we update the pilots status - PilotStatusUpdateCycleFactor = 10 - # Every N cycles we update the number of available slots in the queues - AvailableSlotsUpdateCycleFactor = 10 - # Maximum number of times the Site Director is going to try to get a pilot output before stopping - MaxRetryGetPilotOutput = 3 - # To submit pilots to empty sites in any case - AddPilotsToEmptySites = False - # Should the SiteDirector consider platforms when deciding to submit pilots? - CheckPlatform = False - # Attribute used to define if the status of the pilots will be updated - UpdatePilotStatus = True - # Boolean value used to indicate if the pilot output will be or not retrieved - GetPilotOutput = False # Boolean value that indicates if the pilot job will send information for accounting SendPilotAccounting = True + # Submission policy to apply + SubmissionPolicy = WaitingSupportedJobs + # Working directory containing the pilot files if not set in the CE + WorkDirectory = } ##END ##BEGIN PushJobAgent diff --git a/src/DIRAC/WorkloadManagementSystem/DB/PilotAgentsDB.py b/src/DIRAC/WorkloadManagementSystem/DB/PilotAgentsDB.py index e53bc883135..7d2e87056e9 100755 --- a/src/DIRAC/WorkloadManagementSystem/DB/PilotAgentsDB.py +++ b/src/DIRAC/WorkloadManagementSystem/DB/PilotAgentsDB.py @@ -5,7 +5,7 @@ Available methods are: - addPilotReference() + addPilotReferences() setPilotStatus() deletePilot() clearPilots() @@ -39,7 +39,7 @@ def __init__(self, parentLogger=None): ########################################################################################## - def addPilotReference(self, pilotRef, ownerGroup, gridType="DIRAC", pilotStampDict={}): + def addPilotReferences(self, pilotRef, ownerGroup, gridType="DIRAC", pilotStampDict={}): """Add a new pilot job reference""" for ref in pilotRef: stamp = "" @@ -58,7 +58,7 @@ def addPilotReference(self, pilotRef, ownerGroup, gridType="DIRAC", pilotStampDi return result if "lastRowId" not in result: - return S_ERROR("PilotAgentsDB.addPilotReference: Failed to retrieve a new Id.") + return S_ERROR("PilotAgentsDB.addPilotReferences: Failed to retrieve a new Id.") return S_OK() @@ -276,7 +276,6 @@ def getPilotInfo(self, pilotRef=False, conn=False, paramNames=[], pilotID=False) "SubmissionTime", "PilotID", "LastUpdateTime", - "TaskQueueID", "GridSite", "PilotStamp", "Queue", @@ -468,31 +467,6 @@ def getJobsForPilot(self, pilotID): resDict[row[0]].append(row[1]) return S_OK(resDict) - ########################################################################################## - def getPilotsForTaskQueue(self, taskQueueID, gridType=None, limit=None): - """Get IDs of Pilot Agents that were submitted for the given taskQueue, - specify optionally the grid type, results are sorted by Submission time - an Optional limit can be set. - """ - - if gridType: - req = f"SELECT PilotID FROM PilotAgents WHERE TaskQueueID={taskQueueID} AND GridType='{gridType}' " - else: - req = f"SELECT PilotID FROM PilotAgents WHERE TaskQueueID={taskQueueID} " - - req += "ORDER BY SubmissionTime DESC " - - if limit: - req += f"LIMIT {limit}" - - result = self._query(req) - if not result["OK"]: - return result - if result["Value"]: - pilotList = [x[0] for x in result["Value"]] - return S_OK(pilotList) - return S_ERROR(f"PilotJobReferences for TaskQueueID {taskQueueID} not found") - ########################################################################################## def getPilotsForJobID(self, jobID): """Get ID of Pilot Agent that is running a given JobID""" @@ -1044,7 +1018,6 @@ def getPilotMonitorWeb(self, selectDict, sortList, startItem, maxItems): "PilotID", "LastUpdateTime", "CurrentJobID", - "TaskQueueID", "GridSite", ] @@ -1082,7 +1055,7 @@ def getPilotMonitorWeb(self, selectDict, sortList, startItem, maxItems): def getSummarySnapshot(self, requestedFields=False): """Get the summary snapshot for a given combination""" if not requestedFields: - requestedFields = ["TaskQueueID", "GridSite", "GridType", "Status"] + requestedFields = ["GridSite", "GridType", "Status"] valueFields = ["COUNT(PilotID)"] defString = ", ".join(requestedFields) valueString = ", ".join(valueFields) diff --git a/src/DIRAC/WorkloadManagementSystem/Service/PilotManagerHandler.py b/src/DIRAC/WorkloadManagementSystem/Service/PilotManagerHandler.py index a7c060c60fe..06aaee3de0a 100644 --- a/src/DIRAC/WorkloadManagementSystem/Service/PilotManagerHandler.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/PilotManagerHandler.py @@ -74,12 +74,12 @@ def export_getCurrentPilotCounters(cls, attrDict={}): return S_OK(resultDict) ########################################################################################## - types_addPilotReference = [list, str] + types_addPilotReferences = [list, str] @classmethod - def export_addPilotReference(cls, pilotRef, ownerGroup, gridType="DIRAC", pilotStampDict={}): + def export_addPilotReferences(cls, pilotRef, ownerGroup, gridType="DIRAC", pilotStampDict={}): """Add a new pilot job reference""" - return cls.pilotAgentsDB.addPilotReference(pilotRef, ownerGroup, gridType, pilotStampDict) + return cls.pilotAgentsDB.addPilotReferences(pilotRef, ownerGroup, gridType, pilotStampDict) ############################################################################## types_getPilotOutput = [str] @@ -336,41 +336,12 @@ def export_getGroupedPilotSummary(cls, columnList): @classmethod def export_getPilots(cls, jobID): - """Get pilot references and their states for : - - those pilots submitted for the TQ where job is sitting - - (or) the pilots executing/having executed the Job - """ - - pilots = [] + """Get pilots executing/having executed the Job""" result = cls.pilotAgentsDB.getPilotsForJobID(int(jobID)) - if not result["OK"]: - if result["Message"].find("not found") == -1: - return S_ERROR("Failed to get pilot: " + result["Message"]) - else: - pilots += result["Value"] - if not pilots: - # Pilots were not found try to look in the Task Queue - taskQueueID = 0 - try: - result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.TaskQueueDB", "TaskQueueDB") - if not result["OK"]: - return result - tqDB = result["Value"]() - except RuntimeError as excp: - return S_ERROR(f"Can't connect to DB: {excp}") - result = tqDB.getTaskQueueForJob(int(jobID)) - if result["OK"] and result["Value"]: - taskQueueID = result["Value"] - if taskQueueID: - result = cls.pilotAgentsDB.getPilotsForTaskQueue(taskQueueID, limit=10) - if not result["OK"]: - return S_ERROR("Failed to get pilot: " + result["Message"]) - pilots += result["Value"] - - if not pilots: - return S_ERROR("Failed to get pilot for Job %d" % int(jobID)) - - return cls.pilotAgentsDB.getPilotInfo(pilotID=pilots) + if not result["OK"] or not result["Value"]: + return S_ERROR(f"Failed to get pilot for Job {int(jobID)}: {result.get('Message', '')}") + + return cls.pilotAgentsDB.getPilotInfo(pilotID=result["Value"]) ############################################################################## types_killPilot = [[str, list]] diff --git a/src/DIRAC/WorkloadManagementSystem/Utilities/QueueUtilities.py b/src/DIRAC/WorkloadManagementSystem/Utilities/QueueUtilities.py index e508fd0aaca..226c4183a38 100644 --- a/src/DIRAC/WorkloadManagementSystem/Utilities/QueueUtilities.py +++ b/src/DIRAC/WorkloadManagementSystem/Utilities/QueueUtilities.py @@ -11,7 +11,7 @@ from DIRAC.Resources.Computing.ComputingElementFactory import ComputingElementFactory -def getQueuesResolved(siteDict, queueCECache, checkPlatform=False, instantiateCEs=False): +def getQueuesResolved(siteDict, queueCECache, vo=None, checkPlatform=False, instantiateCEs=False): """Get the list of relevant CEs (what is in siteDict) and their descriptions. The main goal of this method is to return a dictionary of queues """ @@ -30,6 +30,8 @@ def getQueuesResolved(siteDict, queueCECache, checkPlatform=False, instantiateCE queueDict[queueName]["ParametersDict"]["Queue"] = queue queueDict[queueName]["ParametersDict"]["GridCE"] = ce queueDict[queueName]["ParametersDict"]["Site"] = site + if vo: + queueDict[queueName]["ParametersDict"]["Community"] = vo # Evaluate the CPU limit of the queue according to the Glue convention computeQueueCPULimit(queueDict[queueName]["ParametersDict"]) diff --git a/src/DIRAC/WorkloadManagementSystem/Utilities/RemoteRunner.py b/src/DIRAC/WorkloadManagementSystem/Utilities/RemoteRunner.py index 61503a03dc0..ea816e68a88 100644 --- a/src/DIRAC/WorkloadManagementSystem/Utilities/RemoteRunner.py +++ b/src/DIRAC/WorkloadManagementSystem/Utilities/RemoteRunner.py @@ -169,11 +169,7 @@ def _setUpWorkloadCE(self, numberOfProcessorsPayload=1): if not result["OK"]: return result proxy = result["Value"]["chain"] - result = proxy.getRemainingSecs() - if not result["OK"]: - return result - lifetime_secs = result["Value"] - workloadCE.setProxy(proxy, lifetime_secs) + workloadCE.setProxy(proxy) return S_OK(workloadCE) diff --git a/src/DIRAC/WorkloadManagementSystem/Utilities/SubmissionPolicy.py b/src/DIRAC/WorkloadManagementSystem/Utilities/SubmissionPolicy.py index 29450ffa67a..2ae82f6d34b 100644 --- a/src/DIRAC/WorkloadManagementSystem/Utilities/SubmissionPolicy.py +++ b/src/DIRAC/WorkloadManagementSystem/Utilities/SubmissionPolicy.py @@ -3,29 +3,34 @@ from DIRAC.WorkloadManagementSystem.Client.MatcherClient import MatcherClient +# Submission policies +AGGRESSIVE_FILLING = "AggressiveFilling" +WAITING_SUPPORTED_JOBS = "WaitingSupportedJobs" +SUBMISSION_POLICIES = [AGGRESSIVE_FILLING, WAITING_SUPPORTED_JOBS] + + class SubmissionPolicy(ABC): """Abstract class to define a submission strategy.""" @abstractmethod - def apply(self, availableSlots: int, queueName: str, queueInfo: dict[str, str], vo: str) -> int: + def apply(self, availableSlots: int, **kwargs) -> int: """Method to redefine in the concrete subclasses :param availableSlots: slots available for new pilots - :param queueName: the name of the targeted queue - :param queueInfo: a dictionary of attributes related to the queue - :param vo: VO """ - pass + if availableSlots < 0: + raise RuntimeError("Available slots cannot be negative") -class AgressiveFillingPolicy(SubmissionPolicy): - def apply(self, availableSlots: int, queueName: str, queueInfo: dict[str, str], vo: str) -> int: +class AggressiveFillingPolicy(SubmissionPolicy): + def apply(self, availableSlots: int, **kwargs) -> int: """All the available slots should be filled up. Should be employed for sites that are always processing jobs. * Pros: would quickly fill up a queue * Cons: would consume a lot of CPU hours for nothing if pilots do not match jobs """ + super().apply(availableSlots, **kwargs) return availableSlots @@ -34,22 +39,18 @@ def __init__(self) -> None: super().__init__() self.matcherClient = MatcherClient() - def apply(self, availableSlots: int, queueName: str, queueInfo: dict[str, str], vo: str) -> int: + def apply(self, availableSlots: int, **kwargs) -> int: """Fill up available slots only if waiting supported jobs exist. Should be employed for sites that are used from time to time (targeting specific Task Queues). * Pros: submit pilots only if necessary, and quickly fill up the queue if needed * Cons: would create some unused pilots in all the sites supervised by this policy and targeting a same task queue - """ - # Prepare CE dictionary from the queue info - ce = queueInfo["CE"] - ceDict = ce.ceParameters - ceDict["GridCE"] = queueInfo["CEName"] - if vo: - ceDict["Community"] = vo + :param ceParameters: CE parameters + """ + super().apply(availableSlots, **kwargs) # Get Task Queues related to the CE - result = self.matcherClient.getMatchingTaskQueues(ceDict) + result = self.matcherClient.getMatchingTaskQueues(kwargs["ceParameters"]) if not result["OK"]: return 0 taskQueueDict = result["Value"] diff --git a/src/DIRAC/WorkloadManagementSystem/Utilities/test/Test_SubmissionPolicy.py b/src/DIRAC/WorkloadManagementSystem/Utilities/test/Test_SubmissionPolicy.py new file mode 100644 index 00000000000..66f3ce5209b --- /dev/null +++ b/src/DIRAC/WorkloadManagementSystem/Utilities/test/Test_SubmissionPolicy.py @@ -0,0 +1,91 @@ +""" Test class for Submission policy +""" +# pylint: disable=protected-access + +import pytest +from DIRAC.Core.Utilities.ReturnValues import S_OK + +from DIRAC.WorkloadManagementSystem.Client import PilotStatus +from DIRAC.WorkloadManagementSystem.Utilities.SubmissionPolicy import ( + SUBMISSION_POLICIES, + AggressiveFillingPolicy, + WaitingSupportedJobsPolicy, +) + + +def test_AggressiveFillingPolicy(): + """Make sure it always return the number of slots provided""" + policy = AggressiveFillingPolicy() + + # 1. We want to submit 50 elements + numberToSubmit = policy.apply(50) + assert numberToSubmit == 50 + + # 2. We want to submit 0 element + numberToSubmit = policy.apply(0) + assert numberToSubmit == 0 + + # 3. We want to submit -10 elements + with pytest.raises(RuntimeError): + numberToSubmit = policy.apply(-10) + + +def test_WaitingSupportedJobsPolicy(mocker): + """Make sure it returns the min between the available slots and the jobs available""" + policy = WaitingSupportedJobsPolicy() + + # 1. We want to submit 50 elements without specifying the CE parameters + with pytest.raises(KeyError): + numberToSubmit = policy.apply(50) + + # 2. We want to submit 50 elements but there are no waiting job + # Because it requires an access to a DB, we mock the value returned by the Matcher + mocker.patch( + "DIRAC.WorkloadManagementSystem.Client.MatcherClient.MatcherClient.getMatchingTaskQueues", return_value=S_OK({}) + ) + numberToSubmit = policy.apply(50, ceParameters={}) + assert numberToSubmit == 0 + + # 3. We want to submit 50 elements and we have 10 similar waiting jobs + mocker.patch( + "DIRAC.WorkloadManagementSystem.Client.MatcherClient.MatcherClient.getMatchingTaskQueues", + return_value=S_OK({"TQ1": {"Jobs": 10}}), + ) + numberToSubmit = policy.apply(50, ceParameters={}) + assert numberToSubmit == 10 + + # 4. We want to submit 50 elements and we have 10 waiting jobs, split into 2 task queues + mocker.patch( + "DIRAC.WorkloadManagementSystem.Client.MatcherClient.MatcherClient.getMatchingTaskQueues", + return_value=S_OK({"TQ1": {"Jobs": 8}, "TQ2": {"Jobs": 2}}), + ) + numberToSubmit = policy.apply(50, ceParameters={}) + assert numberToSubmit == 10 + + # 5. We want to submit 50 elements and we have 60 similar waiting jobs + mocker.patch( + "DIRAC.WorkloadManagementSystem.Client.MatcherClient.MatcherClient.getMatchingTaskQueues", + return_value=S_OK({"TQ1": {"Jobs": 60}}), + ) + numberToSubmit = policy.apply(50, ceParameters={}) + assert numberToSubmit == 50 + + # 6. We want to submit 50 elements and we have 60 waiting jobs, split into 2 task queues + mocker.patch( + "DIRAC.WorkloadManagementSystem.Client.MatcherClient.MatcherClient.getMatchingTaskQueues", + return_value=S_OK({"TQ1": {"Jobs": 35}, "TQ2": {"Jobs": 25}}), + ) + numberToSubmit = policy.apply(50, ceParameters={}) + assert numberToSubmit == 50 + + # 6. We want to submit 50 elements and we have 60 waiting jobs, split into 2 task queues + mocker.patch( + "DIRAC.WorkloadManagementSystem.Client.MatcherClient.MatcherClient.getMatchingTaskQueues", + return_value=S_OK({"TQ1": {"Jobs": 35}, "TQ2": {"Jobs": 25}}), + ) + numberToSubmit = policy.apply(50, ceParameters={}) + assert numberToSubmit == 50 + + # 7. We want to submit -10 elements + with pytest.raises(RuntimeError): + numberToSubmit = policy.apply(-10, ceParameters={}) diff --git a/src/DIRAC/WorkloadManagementSystem/scripts/dirac_admin_add_pilot.py b/src/DIRAC/WorkloadManagementSystem/scripts/dirac_admin_add_pilot.py index f1187ba088c..301d71b2131 100644 --- a/src/DIRAC/WorkloadManagementSystem/scripts/dirac_admin_add_pilot.py +++ b/src/DIRAC/WorkloadManagementSystem/scripts/dirac_admin_add_pilot.py @@ -25,10 +25,8 @@ class Params: def __init__(self): """C'or""" self.status = False - self.taskQueueID = 0 self.switches = [ ("", "status=", "sets the pilot status", self.setStatus), - ("t:", "taskQueueID=", "sets the taskQueueID", self.setTaskQueueID), ] def setStatus(self, value): @@ -45,19 +43,6 @@ def setStatus(self, value): self.status = value return S_OK() - def setTaskQueueID(self, value): - """sets self.taskQueueID - - :param value: option argument - - :return: S_OK()/S_ERROR() - """ - try: - self.taskQueueID = int(value) - except ValueError: - return S_ERROR("TaskQueueID has to be a number") - return S_OK() - @Script() def main(): @@ -89,7 +74,7 @@ def main(): if not DErrno.cmpError(res, DErrno.EWMSNOPILOT): gLogger.error(res["Message"]) DIRACExit(1) - res = pmc.addPilotTQRef([pilotRef], params.taskQueueID, ownerGroup, gridType, {pilotRef: pilotStamp}) + res = pmc.addPilotReferences([pilotRef], ownerGroup, gridType, {pilotRef: pilotStamp}) if not res["OK"]: gLogger.error(res["Message"]) DIRACExit(1) diff --git a/tests/Integration/Monitoring/Test_MonitoringReporter.py b/tests/Integration/Monitoring/Test_MonitoringReporter.py index 66b1f39771c..fe6ebf273f1 100644 --- a/tests/Integration/Monitoring/Test_MonitoringReporter.py +++ b/tests/Integration/Monitoring/Test_MonitoringReporter.py @@ -905,7 +905,6 @@ pilotsHistoryData = [ { - "TaskQueueID": "1240451", "GridSite": "LCG.CNAF.it", "GridType": "", "Status": "failed", @@ -913,7 +912,6 @@ "timestamp": 1649161714000, }, { - "TaskQueueID": "12401", "GridSite": "LCG.CNAF.it", "GridType": "", "Status": "failed", diff --git a/tests/Integration/WorkloadManagementSystem/Test_PilotAgentsDB.py b/tests/Integration/WorkloadManagementSystem/Test_PilotAgentsDB.py index a9f1c43874f..51cecfab9ca 100644 --- a/tests/Integration/WorkloadManagementSystem/Test_PilotAgentsDB.py +++ b/tests/Integration/WorkloadManagementSystem/Test_PilotAgentsDB.py @@ -36,7 +36,7 @@ def preparePilots(stateCount, testSite, testCE, testGroup): for i in range(nPilots): pilotRef.append("pilotRef_" + str(i)) - res = paDB.addPilotReference( + res = paDB.addPilotReferences( pilotRef, testGroup, ) @@ -79,7 +79,7 @@ def cleanUpPilots(pilotRef): def test_basic(): """usual insert/verify""" - res = paDB.addPilotReference(["pilotRef"], "ownerGroup") + res = paDB.addPilotReferences(["pilotRef"], "ownerGroup") assert res["OK"] is True res = paDB.deletePilot("pilotRef") diff --git a/tests/Integration/WorkloadManagementSystem/Test_PilotsClient.py b/tests/Integration/WorkloadManagementSystem/Test_PilotsClient.py index f97bf7be961..9d0069ff670 100644 --- a/tests/Integration/WorkloadManagementSystem/Test_PilotsClient.py +++ b/tests/Integration/WorkloadManagementSystem/Test_PilotsClient.py @@ -27,7 +27,7 @@ def test_PilotsDB(): for jobID in ["aPilot", "anotherPilot"]: pilots.deletePilots(jobID) - res = pilots.addPilotTQRef(["aPilot"], 1, "a/owner/Group") + res = pilots.addPilotReferences(["aPilot"], "a/owner/Group") assert res["OK"], res["Message"] res = pilots.getCurrentPilotCounters({}) assert res["OK"], res["Message"] @@ -38,7 +38,7 @@ def test_PilotsDB(): assert res["OK"], res["Message"] assert res["Value"] == {} - res = pilots.addPilotTQRef(["anotherPilot"], 1, "a/owner/Group") + res = pilots.addPilotReferences(["anotherPilot"], "a/owner/Group") assert res["OK"], res["Message"] res = pilots.storePilotOutput("anotherPilot", "This is an output", "this is an error") assert res["OK"], res["Message"]