From 5c87e005383fad371beb3b5d8311c5c763bab210 Mon Sep 17 00:00:00 2001 From: aldbr Date: Wed, 20 Sep 2023 15:19:43 +0200 Subject: [PATCH] feat: add more SD tests --- .../Computing/AREXComputingElement.py | 6 +- .../Computing/CloudComputingElement.py | 8 +- .../Resources/Computing/ComputingElement.py | 3 +- .../Agent/SiteDirector.py | 233 +++++----- .../Agent/test/Test_Agent_SiteDirector.py | 428 ++++++++++-------- .../ConfigTemplate.cfg | 40 +- .../Service/PilotManagerHandler.py | 6 +- .../Utilities/QueueUtilities.py | 4 +- .../Utilities/RemoteRunner.py | 6 +- .../Utilities/SubmissionPolicy.py | 33 +- .../Utilities/test/Test_SubmissionPolicy.py | 91 ++++ .../scripts/dirac_admin_add_pilot.py | 2 +- .../Test_PilotAgentsDB.py | 4 +- .../Test_PilotsClient.py | 4 +- 14 files changed, 499 insertions(+), 369 deletions(-) create mode 100644 src/DIRAC/WorkloadManagementSystem/Utilities/test/Test_SubmissionPolicy.py diff --git a/src/DIRAC/Resources/Computing/AREXComputingElement.py b/src/DIRAC/Resources/Computing/AREXComputingElement.py index ccadb75a276..54cef13eec6 100755 --- a/src/DIRAC/Resources/Computing/AREXComputingElement.py +++ b/src/DIRAC/Resources/Computing/AREXComputingElement.py @@ -608,7 +608,11 @@ def getCEStatus(self): self.log.error("Failed getting the status of the CE.", result["Message"]) return S_ERROR("Failed getting the status of the CE") response = result["Value"] - ceData = response.json() + try: + ceData = response.json() + except requests.JSONDecodeError: + self.log.exception("Failed decoding the status of the CE") + return S_ERROR(f"Failed decoding the status of the CE") # Look only in the relevant section out of the headache queueInfo = ceData["Domains"]["AdminDomain"]["Services"]["ComputingService"]["ComputingShare"] diff --git a/src/DIRAC/Resources/Computing/CloudComputingElement.py b/src/DIRAC/Resources/Computing/CloudComputingElement.py index ee19162938d..3e40f5b7bfa 100644 --- a/src/DIRAC/Resources/Computing/CloudComputingElement.py +++ b/src/DIRAC/Resources/Computing/CloudComputingElement.py @@ -340,7 +340,7 @@ def _getMetadata(self, executableFile, pilotStamp=""): template = yaml.safe_load(template_fd) for filedef in template["write_files"]: if filedef["content"] == "PROXY_STR": - filedef["content"] = self.proxy + filedef["content"] = self.proxy.dumpAllToString()["Value"] elif filedef["content"] == "EXECUTABLE_STR": filedef["content"] = exe_str elif "STAMP_STR" in filedef["content"]: @@ -383,11 +383,7 @@ def _renewCloudProxy(self): if not res["OK"]: self.log.error("Could not download proxy", res["Message"]) return False - resdump = res["Value"].dumpAllToString() - if not resdump["OK"]: - self.log.error("Failed to dump proxy to string", resdump["Message"]) - return False - self.proxy = resdump["Value"] + self.proxy = res["Value"] self.valid = datetime.datetime.utcnow() + proxyLifetime * datetime.timedelta(seconds=1) return True diff --git a/src/DIRAC/Resources/Computing/ComputingElement.py b/src/DIRAC/Resources/Computing/ComputingElement.py index b8d07072abb..219c0bd6bed 100755 --- a/src/DIRAC/Resources/Computing/ComputingElement.py +++ b/src/DIRAC/Resources/Computing/ComputingElement.py @@ -263,6 +263,7 @@ def available(self): self.log.verbose("Max Number of Jobs:", maxTotalJobs) self.log.verbose("Max Waiting Jobs:", maxWaitingJobs) + result["CEInfoDict"] = ceInfoDict # If we reached the maximum number of total jobs, then the CE is not available totalJobs = runningJobs + waitingJobs if totalJobs >= maxTotalJobs: @@ -283,8 +284,6 @@ def available(self): if availableProcessors is not None: additionalJobs = min(additionalJobs, availableProcessors) result["Value"] = additionalJobs - - result["CEInfoDict"] = ceInfoDict return result ############################################################################# diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py b/src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py index 95ace3580c2..99803896fcd 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py @@ -9,9 +9,9 @@ """ import datetime import os -import socket from collections import defaultdict from concurrent.futures import ThreadPoolExecutor, as_completed +from typing import Any import DIRAC from DIRAC import S_ERROR, S_OK, gConfig @@ -22,6 +22,7 @@ from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations from DIRAC.ConfigurationSystem.Client.Helpers.Resources import getCESiteMapping, getQueues from DIRAC.Core.Base.AgentModule import AgentModule +from DIRAC.Core.Security import X509Chain from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader from DIRAC.Core.Utilities.TimeUtilities import second, toEpochMilliSeconds from DIRAC.FrameworkSystem.Client.ProxyManagerClient import gProxyManager @@ -40,14 +41,10 @@ pilotWrapperScript, ) from DIRAC.WorkloadManagementSystem.Utilities.QueueUtilities import getQueuesResolved +from DIRAC.WorkloadManagementSystem.Utilities.SubmissionPolicy import WAITING_SUPPORTED_JOBS MAX_PILOTS_TO_SUBMIT = 100 -# Submission policies -AGGRESSIVE_FILLING = "AggressiveFilling" -WAITING_SUPPORTED_JOBS = "WaitingSupportedJobs" -SUBMISSION_POLICIES = [AGGRESSIVE_FILLING, WAITING_SUPPORTED_JOBS] - class SiteDirector(AgentModule): """SiteDirector class provides an implementation of a DIRAC agent. @@ -136,7 +133,6 @@ def beginExecution(self): result = self._loadSubmissionPolicy() if not result: return result - self.submissionPolicy = result["Value"]() # Flags self.sendAccounting = self.am_getOption("SendPilotAccounting", self.sendAccounting) @@ -193,14 +189,22 @@ def _loadSubmissionPolicy(self): """Load a submission policy""" objectLoader = ObjectLoader() result = objectLoader.loadObject( - f"WorkloadManagementSystem.Agent.SiteDirector.{self.submissionPolicyName}", self.submissionPolicyName + "WorkloadManagementSystem.Utilities.SubmissionPolicy", f"{self.submissionPolicyName}Policy" ) if not result["OK"]: self.log.error(f"Failed to load submission policy: {result['Message']}") return result - return S_OK(result["Value"]) - def _buildQueueDict(self, siteNames, ces, ceTypes, tags): + self.submissionPolicy = result["Value"]() + return S_OK() + + def _buildQueueDict( + self, + siteNames: list[str] | None = None, + ces: list[str] | None = None, + ceTypes: list[str] | None = None, + tags: list[str] | None = None, + ): """Build the queueDict dictionary containing information about the queues that will be provisioned""" # Get details about the resources result = getQueues(community=self.vo, siteList=siteNames, ceList=ces, ceTypeList=ceTypes, tags=tags) @@ -211,6 +215,7 @@ def _buildQueueDict(self, siteNames, ces, ceTypes, tags): result = getQueuesResolved( siteDict=result["Value"], queueCECache=self.queueCECache, + vo=self.vo, instantiateCEs=True, ) if not result["OK"]: @@ -261,15 +266,14 @@ def execute(self): It basically just submits pilots and gets their status """ - result = self.submitPilots() - if not result["OK"]: - self.log.error("Errors in the pilot submission:", result["Message"]) - return result + submissionResult = self.submitPilots() + monitoringResult = self.monitorPilots() - result = self.monitorPilots() - if not result["OK"]: - self.log.error("Errors in pilot monitoring:", result["Message"]) - return result + if not submissionResult["OK"]: + return submissionResult + + if not monitoringResult["OK"]: + return monitoringResult return S_OK() @@ -298,7 +302,7 @@ def submitPilots(self): return S_OK() - def _submitPilotsPerQueue(self, queueName): + def _submitPilotsPerQueue(self, queueName: str): """Submit pilots within a given computing elements :return: S_OK/S_ERROR @@ -327,6 +331,13 @@ def _submitPilotsPerQueue(self, queueName): # Get CE instance ce = self.queueDict[queueName]["CE"] + # Set credentials + cpuTime = queueCPUTime + 86400 + result = self._setCredentials(ce, cpuTime) + if not result["OK"]: + self.log.error("Failed to set credentials:", result["Message"]) + return result + # Get the number of available slots on the target site/queue totalSlots = self._getQueueSlots(queueName) if totalSlots <= 0: @@ -335,18 +346,11 @@ def _submitPilotsPerQueue(self, queueName): self.log.info(f"{queueName}: to submit={totalSlots}") # Apply the submission policy - totalSlots = self.submissionPolicy.apply(totalSlots) + totalSlots = self.submissionPolicy.apply(totalSlots, ceParameters=self.queueDict[queueName]["CE"].ceParameters) # Limit the number of pilots to submit to self.maxPilotsToSubmit pilotsToSubmit = min(self.maxPilotsToSubmit, totalSlots) - # Set credentials - cpuTime = queueCPUTime + 86400 - result = self._setCredentials(ce, cpuTime) - if not result["OK"]: - self.log.error("Failed to set credentials:", result["Message"]) - return result - # Now really submitting result = self._submitPilotsToQueue(pilotsToSubmit, ce, queueName) if not result["OK"]: @@ -355,7 +359,7 @@ def _submitPilotsPerQueue(self, queueName): pilotList, stampDict = result["Value"] # updating the pilotAgentsDB... done by default but maybe not strictly necessary - result = self._addPilotReference(queueName, pilotList, stampDict) + result = self._addPilotReferences(queueName, pilotList, stampDict) if not result["OK"]: return result @@ -363,7 +367,7 @@ def _submitPilotsPerQueue(self, queueName): self.log.info("Total number of pilots submitted in this cycle", f"{len(pilotList)} to {queueName}") return S_OK() - def _getQueueSlots(self, queue): + def _getQueueSlots(self, queue: str): """Get the number of available slots in the queue""" ce = self.queueDict[queue]["CE"] ceName = self.queueDict[queue]["CEName"] @@ -412,14 +416,12 @@ def _getQueueSlots(self, queue): totalSlots = min((maxTotalJobs - totalJobs), (maxWaitingJobs - waitingJobs)) return totalSlots - def _submitPilotsToQueue(self, pilotsToSubmit, ce, queue): + def _submitPilotsToQueue(self, pilotsToSubmit: int, ce: ComputingElement, queue: str): """Method that really submits the pilots to the ComputingElements' queue :param pilotsToSubmit: number of pilots to submit. - :type pilotsToSubmit: int :param ce: computing element object to where we submit - :type ce: ComputingElement - :param str queue: queue where to submit + :param queue: queue where to submit :return: S_OK/S_ERROR. If S_OK, returns tuple with (pilotList, stampDict) @@ -514,16 +516,12 @@ def _submitPilotsToQueue(self, pilotsToSubmit, ce, queue): return S_OK((pilotList, stampDict)) - def _addPilotReference(self, queue, pilotList, stampDict): + def _addPilotReference(self, queue: str, pilotList: list[str], stampDict: dict[str, str]): """Add pilotReference to pilotAgentsDB - :param str queue: the queue name + :param queue: the queue name :param pilotList: list of pilots - :type pilotList: list :param stampDict: dictionary of pilots timestamps - :type stampDict: dict - - :return: None """ result = self.pilotAgentsDB.addPilotReference( pilotList, @@ -549,18 +547,19 @@ def _addPilotReference(self, queue, pilotList, stampDict): return result return S_OK() - def _getExecutable(self, queue, proxy=None, jobExecDir="", envVariables=None, **kwargs): + def _getExecutable( + self, queue: str, proxy: X509Chain = None, jobExecDir: str = "", envVariables: dict[str, str] = None + ): """Prepare the full executable for queue - :param str queue: queue name - :param bool proxy: flag that say if to bundle or not the proxy - :param str jobExecDir: pilot execution dir (normally an empty string) + :param queue: queue name + :param proxy: flag that say if to bundle or not the proxy + :param jobExecDir: pilot execution dir (normally an empty string) :returns: a string the options for the pilot - :rtype: str """ - pilotOptions = self._getPilotOptions(queue, **kwargs) + pilotOptions = self._getPilotOptions(queue) if not pilotOptions: self.log.warn("Pilots will be submitted without additional options") pilotOptions = [] @@ -585,13 +584,12 @@ def _getExecutable(self, queue, proxy=None, jobExecDir="", envVariables=None, ** ) return executable - def _getPilotOptions(self, queue, **kwargs): + def _getPilotOptions(self, queue: str) -> list[str]: """Prepare pilot options - :param str queue: queue name + :param queue: queue name :returns: pilotOptions is a list of strings, each one is an option to the dirac-pilot script invocation - :rtype: list """ queueDict = self.queueDict[queue]["ParametersDict"] pilotOptions = [] @@ -657,16 +655,22 @@ def _getPilotOptions(self, queue, **kwargs): return pilotOptions - def _writePilotScript(self, workingDirectory, pilotOptions, proxy=None, pilotExecDir="", envVariables=None): + def _writePilotScript( + self, + workingDirectory: str, + pilotOptions: str, + proxy: X509Chain = None, + pilotExecDir: str = "", + envVariables: dict[str, str] = None, + ): """Bundle together and write out the pilot executable script, admix the proxy if given - :param str workingDirectory: pilot wrapper working directory - :param str pilotOptions: options with which to start the pilot - :param str proxy: proxy file we are going to bundle - :param str pilotExecDir: pilot executing directory + :param workingDirectory: pilot wrapper working directory + :param pilotOptions: options with which to start the pilot + :param proxy: proxy file we are going to bundle + :param pilotExecDir: pilot executing directory :returns: file name of the pilot wrapper created - :rtype: str """ try: @@ -711,17 +715,16 @@ def monitorPilots(self): return S_OK() - def _monitorPilotsPerQueue(self, queueName): + def _monitorPilotsPerQueue(self, queue: str): """Update status of pilots in transient state for a given queue :param queue: queue name - :param proxy: proxy to check the pilot status and renewals """ - ce = self.queueDict[queueName]["CE"] - ceName = self.queueDict[queueName]["CEName"] - queueName = self.queueDict[queueName]["QueueName"] - ceType = self.queueDict[queueName]["CEType"] - siteName = self.queueDict[queueName]["Site"] + ce = self.queueDict[queue]["CE"] + ceName = self.queueDict[queue]["CEName"] + queueName = self.queueDict[queue]["QueueName"] + ceType = self.queueDict[queue]["CEType"] + siteName = self.queueDict[queue]["Site"] # Select pilots in a transient states result = self.pilotAgentsDB.selectPilots( @@ -763,11 +766,14 @@ def _monitorPilotsPerQueue(self, queueName): return result pilotCEDict = result["Value"] - # Update pilot status in DB - abortedPilots = self._updatePilotStatus(pilotRefs, pilotDict, pilotCEDict) + # Get updated pilots + updatedPilots = self._getUpdatedPilotStatus(pilotDict, pilotCEDict) # If something wrong in the queue, make a pause for the job submission + abortedPilots = self._getAbortedPilots(updatedPilots) if abortedPilots: - self.failedQueues[queueName] += 1 + self.failedQueues[queue] += 1 + # Update the status of the pilots in the DB + self._updatePilotsInDB(updatedPilots) # FIXME: seems like it is only used by the CloudCE? Couldn't it be called from CloudCE.getJobStatus()? if callable(getattr(ce, "cleanupPilots", None)): @@ -806,46 +812,43 @@ def _monitorPilotsPerQueue(self, queueName): return S_OK() - def _updatePilotStatus(self, pilotRefs, pilotDict, pilotCEDict): - """Really updates the pilots status - - :return: number of aborted pilots, flag for getting the pilot output - """ - abortedPilots = 0 + def _getUpdatedPilotStatus(self, pilotDict: dict[str, Any], pilotCEDict: dict[str, Any]) -> dict[str, str]: + """Get the updated pilots, from a list of pilots, and their new status""" + updatedPilots = {} + for pilotReference, pilotInfo in pilotDict.items(): + oldStatus = pilotInfo["Status"] + sinceLastUpdate = datetime.datetime.utcnow() - pilotInfo["LastUpdateTime"] + ceStatus = pilotCEDict.get(pilotReference, oldStatus) + + if oldStatus != ceStatus: + # Normal case: update the pilot status to the new value + updatedPilots[pilotReference] = ceStatus + continue - for pRef in pilotRefs: - newStatus = "" - oldStatus = pilotDict[pRef]["Status"] - lastUpdateTime = pilotDict[pRef]["LastUpdateTime"] - sinceLastUpdate = datetime.datetime.utcnow() - lastUpdateTime + if oldStatus == ceStatus and ceStatus == PilotStatus.UNKNOWN and sinceLastUpdate > 3600 * second: + # Pilots are not reachable, we mark them as aborted + updatedPilots[pilotReference] = PilotStatus.ABORTED + continue - ceStatus = pilotCEDict.get(pRef, oldStatus) + return updatedPilots - if oldStatus == ceStatus and ceStatus != PilotStatus.UNKNOWN: - # Normal status did not change, continue - continue - if ceStatus == oldStatus == PilotStatus.UNKNOWN: - if sinceLastUpdate < 3600 * second: - # Allow 1 hour of Unknown status assuming temporary problems on the CE - continue - newStatus = PilotStatus.ABORTED - elif ceStatus == PilotStatus.UNKNOWN and oldStatus not in PilotStatus.PILOT_FINAL_STATES: - # Possible problems on the CE, let's keep the Unknown status for a while - newStatus = PilotStatus.UNKNOWN - elif ceStatus != PilotStatus.UNKNOWN: - # Update the pilot status to the new value - newStatus = ceStatus - - if newStatus: - self.log.info("Updating status", f"to {newStatus} for pilot {pRef}") - result = self.pilotAgentsDB.setPilotStatus(pRef, newStatus, "", "Updated by SiteDirector") - if not result["OK"]: - self.log.error(result["Message"]) - if newStatus == "Aborted": - abortedPilots += 1 + def _getAbortedPilots(self, pilotsDict: dict[str, str]) -> list[str]: + """Get aborted pilots from a list of pilots and their status""" + abortedPilots = [] + for pilotReference, status in pilotsDict.items(): + if status == PilotStatus.ABORTED: + abortedPilots.append(pilotReference) return abortedPilots + def _updatePilotsInDB(self, updatedPilotsDict: dict[str, str]): + """Update the status of the pilots in the DB""" + for pilotReference, newStatus in updatedPilotsDict.items(): + self.log.info("Updating status", f"to {newStatus} for pilot {pilotReference}") + result = self.pilotAgentsDB.setPilotStatus(pilotReference, newStatus, "", "Updated by SiteDirector") + if not result["OK"]: + self.log.error(result["Message"]) + ##################################################################################### def __getPilotToken(self, audience: str, scope: list[str] = None): @@ -909,7 +912,7 @@ def _setCredentials(self, ce: ComputingElement, proxyMinimumRequiredValidity: in ##################################################################################### - def _sendPilotAccounting(self, pilotDict): + def _sendPilotAccounting(self, pilotDict: dict[str, Any]): """Send pilot accounting record""" for pRef in pilotDict: self.log.verbose("Preparing accounting record", f"for pilot {pRef}") @@ -960,15 +963,17 @@ def _sendPilotAccounting(self, pilotDict): return S_OK() - def _sendPilotSubmissionAccounting(self, siteName, ceName, queueName, numTotal, numSucceeded, status): + def _sendPilotSubmissionAccounting( + self, siteName: str, ceName: str, queueName: str, numTotal: int, numSucceeded: int, status: str + ): """Send pilot submission accounting record - :param str siteName: Site name - :param str ceName: CE name - :param str queueName: queue Name - :param int numTotal: Total number of submission - :param int numSucceeded: Total number of submission succeeded - :param str status: 'Succeeded' or 'Failed' + :param siteName: Site name + :param ceName: CE name + :param queueName: queue Name + :param numTotal: Total number of submission + :param numSucceeded: Total number of submission succeeded + :param status: 'Succeeded' or 'Failed' :returns: S_OK / S_ERROR """ @@ -997,15 +1002,17 @@ def _sendPilotSubmissionAccounting(self, siteName, ceName, queueName, numTotal, return result return S_OK() - def _sendPilotSubmissionMonitoring(self, siteName, ceName, queueName, numTotal, numSucceeded, status): + def _sendPilotSubmissionMonitoring( + self, siteName: str, ceName: str, queueName: str, numTotal: int, numSucceeded: int, status: str + ): """Sends pilot submission records to monitoring - :param str siteName: Site name - :param str ceName: CE name - :param str queueName: queue Name - :param int numTotal: Total number of submission - :param int numSucceeded: Total number of submission succeeded - :param str status: 'Succeeded' or 'Failed' + :param siteName: Site name + :param ceName: CE name + :param queueName: queue Name + :param numTotal: Total number of submission + :param numSucceeded: Total number of submission succeeded + :param status: 'Succeeded' or 'Failed' :returns: S_OK / S_ERROR """ diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_SiteDirector.py b/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_SiteDirector.py index ad343c7e89a..1cad5ab2826 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_SiteDirector.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_SiteDirector.py @@ -2,214 +2,264 @@ """ # pylint: disable=protected-access -# imports import datetime +import os import pytest -from unittest.mock import MagicMock +from diraccfg import CFG -from DIRAC import gLogger +from DIRAC import gLogger, gConfig +from DIRAC.ConfigurationSystem.Client import ConfigurationData +from DIRAC.ResourceStatusSystem.Client.SiteStatus import SiteStatus -# sut from DIRAC.WorkloadManagementSystem.Agent.SiteDirector import SiteDirector +from DIRAC.WorkloadManagementSystem.Client import PilotStatus +from DIRAC.WorkloadManagementSystem.Utilities.SubmissionPolicy import SUBMISSION_POLICIES -mockAM = MagicMock() -mockGCReply = MagicMock() -mockGCReply.return_value = "TestSetup" -mockOPSObject = MagicMock() -mockOPSObject.getValue.return_value = "123" -mockOPSReply = MagicMock() -mockOPSReply.return_value = "123" -mockOPS = MagicMock() -mockOPS.return_value = mockOPSObject -# mockOPS.Operations = mockOPSObject -mockPM = MagicMock() -mockPM.requestToken.return_value = {"OK": True, "Value": ("token", 1)} -mockPMReply = MagicMock() -mockPMReply.return_value = {"OK": True, "Value": ("token", 1)} - -mockCSGlobalReply = MagicMock() -mockCSGlobalReply.return_value = "TestSetup" -mockResourcesReply = MagicMock() -mockResourcesReply.return_value = {"OK": True, "Value": ["x86_64-slc6", "x86_64-slc5"]} +CONFIG = """ +Resources +{ + Sites + { + LCG + { + LCG.Site1.com + { + CEs + { + ce1.site1.com + { + architecture = x86_64 + OS = linux_AlmaLinux_9 + CEType = HTCondorCE + LocalCEType = Singularity + MaxRAM = 6974 + Queues + { + condor + { + MaxTotalJobs = 1000 + MaxWaitingJobs = 100 + maxCPUTime = 1152 + VO = dteam + NumberOfProcessors = 1 + } + } + Tag = Token + } + ce2.site1.com + { + architecture = x86_64 + OS = linux_AlmaLinux_9 + CEType = HTCondorCE + LocalCEType = Singularity + MaxRAM = 6974 + Queues + { + condor + { + MaxTotalJobs = 1000 + MaxWaitingJobs = 100 + maxCPUTime = 1152 + VO = dteam + NumberOfProcessors = 1 + } + } + } + } + } + LCG.Site2.site2 + { + CEs + { + ce1.site2.com + { + architecture = x86_64 + OS = linux_AlmaLinux_9 + CEType = HTCondorCE + LocalCEType = Singularity + MaxRAM = 6974 + Queues + { + condor + { + MaxTotalJobs = 1000 + MaxWaitingJobs = 100 + maxCPUTime = 1152 + VO = dteam + NumberOfProcessors = 1 + } + } + Tag = Token + } + } + } + } + DIRAC + { + DIRAC.Site3.site3 + { + CEs + { + ce1.site3.com + { + architecture = x86_64 + OS = linux_AlmaLinux_9 + CEType = HTCondorCE + LocalCEType = Singularity + MaxRAM = 6974 + Queues + { + condor + { + MaxTotalJobs = 1000 + MaxWaitingJobs = 100 + maxCPUTime = 1152 + VO = dteam + NumberOfProcessors = 1 + } + } + Tag = Token + } + } + } + } + } +} +""" -mockPilotAgentsDB = MagicMock() -mockPilotAgentsDB.setPilotStatus.return_value = {"OK": True} -gLogger.setLevel("DEBUG") +@pytest.fixture +def config(): + """Load a fake configuration""" + ConfigurationData.localCFG = CFG() + cfg = CFG() + cfg.loadFromBuffer(CONFIG) + gConfig.loadCFG(cfg) @pytest.fixture -def sd(mocker): - """mocker for SiteDirector""" +def sd(mocker, config): + """Basic configuration of a SiteDirector. It tests the _buildQueueDict() method at the same time""" mocker.patch("DIRAC.WorkloadManagementSystem.Agent.SiteDirector.AgentModule.__init__") - mocker.patch("DIRAC.WorkloadManagementSystem.Agent.SiteDirector.gConfig.getValue", side_effect=mockGCReply) - mocker.patch("DIRAC.WorkloadManagementSystem.Agent.SiteDirector.Operations", side_effect=mockOPS) + mocker.patch("DIRAC.WorkloadManagementSystem.Agent.SiteDirector.gConfig.getValue", return_value="TestSetup") + mocker.patch("DIRAC.WorkloadManagementSystem.Agent.SiteDirector.Operations.getValue", return_value="123") + mocker.patch("DIRAC.WorkloadManagementSystem.Agent.SiteDirector.getPilotAgentsDB") + + usableSites = ( + gConfig.getSections("Resources/Sites/LCG")["Value"] + gConfig.getSections("Resources/Sites/DIRAC")["Value"] + ) mocker.patch( - "DIRAC.WorkloadManagementSystem.Agent.SiteDirector.gProxyManager.requestToken", side_effect=mockPMReply + "DIRAC.WorkloadManagementSystem.Agent.SiteDirector.SiteStatus.getUsableSites", return_values=usableSites ) - mocker.patch("DIRAC.WorkloadManagementSystem.Agent.SiteDirector.AgentModule", side_effect=mockAM) sd = SiteDirector() + + # Set logger sd.log = gLogger - sd.am_getOption = mockAM sd.log.setLevel("DEBUG") - sd.rpcMatcher = MagicMock() - sd.rssClient = MagicMock() - sd.pilotAgentsDB = MagicMock() + + # Set basic parameters sd.workingDirectory = "" - sd.queueDict = { - "aQueue": { - "Site": "LCG.CERN.cern", - "CEName": "aCE", - "CEType": "SSH", - "QueueName": "aQueue", - "ParametersDict": { - "CPUTime": 12345, - "Community": "lhcb", - "OwnerGroup": ["lhcb_user"], - "Setup": "LHCb-Production", - "Site": "LCG.CERN.cern", - }, - } - } + + # Set queueDict + sd.siteClient = SiteStatus() + sd._buildQueueDict() return sd -def test__getPilotOptions(sd): - """Testing SiteDirector()._getPilotOptions()""" - res = sd._getPilotOptions("aQueue") - assert {"-S TestSetup", "-V 123", "-l 123", "-n LCG.CERN.cern"} <= set(res) - - -@pytest.mark.parametrize( - "mockMatcherReturnValue, expected, anyExpected, sitesExpected", - [ - ({"OK": False, "Message": "boh"}, False, True, set()), - ({"OK": True, "Value": None}, False, True, set()), - ({"OK": True, "Value": {"1": {"Jobs": 10}, "2": {"Jobs": 20}}}, True, True, set()), - ({"OK": True, "Value": {"1": {"Jobs": 10, "Sites": ["Site1"]}, "2": {"Jobs": 20}}}, True, True, {"Site1"}), - ( - {"OK": True, "Value": {"1": {"Jobs": 10, "Sites": ["Site1", "Site2"]}, "2": {"Jobs": 20}}}, - True, - True, - {"Site1", "Site2"}, - ), - ( - { - "OK": True, - "Value": {"1": {"Jobs": 10, "Sites": ["Site1", "Site2"]}, "2": {"Jobs": 20, "Sites": ["Site1"]}}, - }, - True, - False, - {"Site1", "Site2"}, - ), - ( - { - "OK": True, - "Value": {"1": {"Jobs": 10, "Sites": ["Site1", "Site2"]}, "2": {"Jobs": 20, "Sites": ["ANY"]}}, - }, - True, - False, - {"Site1", "Site2", "ANY"}, - ), - ( - { - "OK": True, - "Value": {"1": {"Jobs": 10, "Sites": ["Site1", "Site2"]}, "2": {"Jobs": 20, "Sites": ["ANY", "Site3"]}}, - }, - True, - False, - {"Site1", "Site2", "Site3", "ANY"}, - ), - ( - { - "OK": True, - "Value": {"1": {"Jobs": 10, "Sites": ["Site1", "Site2"]}, "2": {"Jobs": 20, "Sites": ["Any", "Site3"]}}, - }, - True, - False, - {"Site1", "Site2", "Site3", "Any"}, - ), - ( - { - "OK": True, - "Value": { - "1": {"Jobs": 10, "Sites": ["Site1", "Site2"]}, - "2": {"Jobs": 20, "Sites": ["NotAny", "Site2"]}, - }, - }, - True, - False, - {"Site1", "Site2", "NotAny"}, - ), - ], -) -def test__ifAndWhereToSubmit(sd, mockMatcherReturnValue, expected, anyExpected, sitesExpected): - """Testing SiteDirector()._ifAndWhereToSubmit()""" - sd.matcherClient = MagicMock() - sd.matcherClient.getMatchingTaskQueues.return_value = mockMatcherReturnValue - res = sd._ifAndWhereToSubmit() - assert res[0] == expected - if res[0]: - assert res == (expected, anyExpected, sitesExpected, set()) - - -def test__allowedToSubmit(sd): - """Testing SiteDirector()._allowedToSubmit()""" - submit = sd._allowedToSubmit("aQueue", True, {"LCG.CERN.cern"}, set()) - assert submit is False - - sd.siteMaskList = ["LCG.CERN.cern", "DIRAC.CNAF.it"] - submit = sd._allowedToSubmit("aQueue", True, {"LCG.CERN.cern"}, set()) - assert submit is True - - sd.rssFlag = True - submit = sd._allowedToSubmit("aQueue", True, {"LCG.CERN.cern"}, set()) - assert submit is False - - sd.ceMaskList = ["aCE", "anotherCE"] - submit = sd._allowedToSubmit("aQueue", True, {"LCG.CERN.cern"}, set()) - assert submit is True - - -def test__submitPilotsToQueue(sd): - """Testing SiteDirector()._submitPilotsToQueue()""" - # Create a MagicMock that does not have the workingDirectory - # attribute (https://cpython-test-docs.readthedocs.io/en/latest/library/unittest.mock.html#deleting-attributes) - # This is to use the SiteDirector's working directory, not the CE one - ceMock = MagicMock() - del ceMock.workingDirectory - - sd.queueCECache = {"aQueue": {"CE": ceMock}} - sd.queueSlots = {"aQueue": {"AvailableSlots": 10}} - assert sd._submitPilotsToQueue(1, MagicMock(), "aQueue")["OK"] - - -@pytest.mark.parametrize( - "pilotRefs, pilotDict, pilotCEDict, expected", - [ - ([], {}, {}, (0, [])), - ( - ["aPilotRef"], - {"aPilotRef": {"Status": "Running", "LastUpdateTime": datetime.datetime(2000, 1, 1).utcnow()}}, - {}, - (0, []), - ), - ( - ["aPilotRef"], - {"aPilotRef": {"Status": "Running", "LastUpdateTime": datetime.datetime(2000, 1, 1).utcnow()}}, - {"aPilotRef": "Running"}, - (0, []), - ), - ( - ["aPilotRef"], - {"aPilotRef": {"Status": "Running", "LastUpdateTime": datetime.datetime(2000, 1, 1).utcnow()}}, - {"aPilotRef": "Unknown"}, - (0, []), - ), - ], -) -def test__updatePilotStatus(sd, pilotRefs, pilotDict, pilotCEDict, expected): - """Testing SiteDirector()._updatePilotStatus()""" - res = sd._updatePilotStatus(pilotRefs, pilotDict, pilotCEDict) - assert res == expected +@pytest.fixture(scope="session") +def pilotWrapperDirectory(tmp_path_factory): + """Create a temporary directory""" + fn = tmp_path_factory.mktemp("pilotWrappers") + return fn + + +def test_loadSubmissionPolicy(sd): + """Load each submission policy and call it""" + for submissionPolicyName in SUBMISSION_POLICIES: + # Load the submission policy + sd.submissionPolicyName = submissionPolicyName + res = sd._loadSubmissionPolicy() + assert res["OK"] + + # Call the submission policy with predefined parameters + targetQueue = "ce1.site1.com_condor" + res = sd.submissionPolicy.apply(50, ceParameters=sd.queueDict[targetQueue]["CE"].ceParameters) + assert res >= 0 and res <= 50 + + +def test_getPilotWrapper(sd, pilotWrapperDirectory): + """Get pilot options for a specific queue and check the result, then generate the pilot wrapper""" + # Get pilot options + pilotOptions = sd._getPilotOptions("ce1.site1.com_condor") + assert { + "--pythonVersion=3", + "-n LCG.Site1.com", + "-N ce1.site1.com", + "-Q condor", + "-S TestSetup", + "-V 123", + "-l 123", + "-e 1,2,3", + } == set(pilotOptions) + + # Write pilot script + res = sd._writePilotScript(pilotWrapperDirectory, pilotOptions) + + # Make sure the file exists + assert os.path.exists(res) and os.path.isfile(res) + + +def test_updatePilotStatus(sd): + """Updating the status of some fake pilot references""" + # 1. We have not submitted any pilots, there is nothing to update + pilotDict = {} + pilotCEDict = {} + res = sd._getUpdatedPilotStatus(pilotDict, pilotCEDict) + assert not res + + res = sd._getAbortedPilots(res) + assert not res + + # 2. We just submitted a pilot, the remote system has not had the time to register the pilot + pilotDict["pilotRef1"] = {"Status": PilotStatus.SUBMITTED, "LastUpdateTime": datetime.datetime.utcnow()} + pilotCEDict = {} + res = sd._getUpdatedPilotStatus(pilotDict, pilotCEDict) + assert not res + + res = sd._getAbortedPilots(res) + assert not res + + # 3. The pilot is now registered + pilotCEDict["pilotRef1"] = PilotStatus.SUBMITTED + res = sd._getUpdatedPilotStatus(pilotDict, pilotCEDict) + assert not res + + res = sd._getAbortedPilots(res) + assert not res + + # 4. The pilot waits in the queue of the remote CE + pilotCEDict["pilotRef1"] = PilotStatus.WAITING + res = sd._getUpdatedPilotStatus(pilotDict, pilotCEDict) + assert res == {"pilotRef1": PilotStatus.WAITING} + + res = sd._getAbortedPilots(res) + assert not res + pilotDict["pilotRef1"]["Status"] = PilotStatus.WAITING + + # 5. CE issue: the pilot status becomes unknown + pilotCEDict["pilotRef1"] = PilotStatus.UNKNOWN + res = sd._getUpdatedPilotStatus(pilotDict, pilotCEDict) + assert res == {"pilotRef1": PilotStatus.UNKNOWN} + + res = sd._getAbortedPilots(res) + assert not res + pilotDict["pilotRef1"]["Status"] = PilotStatus.UNKNOWN + + # 6. Engineers do not manage to fix the issue, the CE is still under maintenance + pilotDict["pilotRef1"]["LastUpdateTime"] = datetime.datetime.utcnow() - datetime.timedelta(seconds=3610) + res = sd._getUpdatedPilotStatus(pilotDict, pilotCEDict) + assert res == {"pilotRef1": PilotStatus.ABORTED} + + res = sd._getAbortedPilots(res) + assert res == ["pilotRef1"] diff --git a/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg b/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg index b619a2cba9f..1f0d5262acf 100644 --- a/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg +++ b/src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg @@ -282,49 +282,33 @@ Agents Community = # Group treated (leave empty for auto-discovery) Group = - # Grid Environment (leave empty for auto-discovery) - GridEnv = # the DN of the certificate proxy used to submit pilots. If not found here, what is in Operations/Pilot section of the CS will be used PilotDN = # the group of the certificate proxy used to submit pilots. If not found here, what is in Operations/Pilot section of the CS will be used PilotGroup = - # List of sites that will be treated by this SiteDirector ("any" can refer to any Site defined in the CS) - Site = any - # List of CE types that will be treated by this SiteDirector ("any" can refer to any CE defined in the CS) - CETypes = any - # List of CEs that will be treated by this SiteDirector ("any" can refer to any type of CE defined in the CS) - CEs = any + # List of sites that will be treated by this SiteDirector (No value can refer to any Site defined in the CS) + Site = + # List of CEs that will be treated by this SiteDirector (No value can refer to any CE defined in the CS) + CEs = + # List of CE types that will be treated by this SiteDirector (No value can refer to any type of CE defined in the CS) + CETypes = # List of Tags that are required to be present in the CE/Queue definition Tags = + # How many cycles to skip if queue is not working + FailedQueueCycleFactor = 10 # The maximum length of a queue (in seconds). Default: 3 days MaxQueueLength = 259200 - # Log level of the pilots - PilotLogLevel = INFO # Max number of pilots to submit per cycle MaxPilotsToSubmit = 100 - # Check, or not, for the waiting pilots already submitted - PilotWaitingFlag = True - # How many cycels to skip if queue is not working - FailedQueueCycleFactor = 10 - # Every N cycles we update the pilots status - PilotStatusUpdateCycleFactor = 10 - # Every N cycles we update the number of available slots in the queues - AvailableSlotsUpdateCycleFactor = 10 - # Maximum number of times the Site Director is going to try to get a pilot output before stopping - MaxRetryGetPilotOutput = 3 - # To submit pilots to empty sites in any case - AddPilotsToEmptySites = False - # Should the SiteDirector consider platforms when deciding to submit pilots? - CheckPlatform = False - # Attribute used to define if the status of the pilots will be updated - UpdatePilotStatus = True - # Boolean value used to indicate if the pilot output will be or not retrieved - GetPilotOutput = False # Boolean value that indicates if the pilot job will send information for accounting SendPilotAccounting = True + # Submission policy to apply + SubmissionPolicy = WaitingSupportedJobs + # Working directory containing the pilot files if not set in the CE + WorkDirectory = } ##END ##BEGIN PushJobAgent diff --git a/src/DIRAC/WorkloadManagementSystem/Service/PilotManagerHandler.py b/src/DIRAC/WorkloadManagementSystem/Service/PilotManagerHandler.py index ad76a03c57b..ea06aeef797 100644 --- a/src/DIRAC/WorkloadManagementSystem/Service/PilotManagerHandler.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/PilotManagerHandler.py @@ -70,12 +70,12 @@ def export_getCurrentPilotCounters(cls, attrDict={}): return S_OK(resultDict) ########################################################################################## - types_addPilotReference = [list, str] + types_addPilotReferences = [list, str] @classmethod - def export_addPilotReference(cls, pilotRef, ownerGroup, gridType="DIRAC", pilotStampDict={}): + def export_addPilotReferences(cls, pilotRef, ownerGroup, gridType="DIRAC", pilotStampDict={}): """Add a new pilot job reference""" - return cls.pilotAgentsDB.addPilotReference(pilotRef, ownerGroup, gridType, pilotStampDict) + return cls.pilotAgentsDB.addPilotReferences(pilotRef, ownerGroup, gridType, pilotStampDict) ############################################################################## types_getPilotOutput = [str] diff --git a/src/DIRAC/WorkloadManagementSystem/Utilities/QueueUtilities.py b/src/DIRAC/WorkloadManagementSystem/Utilities/QueueUtilities.py index e508fd0aaca..226c4183a38 100644 --- a/src/DIRAC/WorkloadManagementSystem/Utilities/QueueUtilities.py +++ b/src/DIRAC/WorkloadManagementSystem/Utilities/QueueUtilities.py @@ -11,7 +11,7 @@ from DIRAC.Resources.Computing.ComputingElementFactory import ComputingElementFactory -def getQueuesResolved(siteDict, queueCECache, checkPlatform=False, instantiateCEs=False): +def getQueuesResolved(siteDict, queueCECache, vo=None, checkPlatform=False, instantiateCEs=False): """Get the list of relevant CEs (what is in siteDict) and their descriptions. The main goal of this method is to return a dictionary of queues """ @@ -30,6 +30,8 @@ def getQueuesResolved(siteDict, queueCECache, checkPlatform=False, instantiateCE queueDict[queueName]["ParametersDict"]["Queue"] = queue queueDict[queueName]["ParametersDict"]["GridCE"] = ce queueDict[queueName]["ParametersDict"]["Site"] = site + if vo: + queueDict[queueName]["ParametersDict"]["Community"] = vo # Evaluate the CPU limit of the queue according to the Glue convention computeQueueCPULimit(queueDict[queueName]["ParametersDict"]) diff --git a/src/DIRAC/WorkloadManagementSystem/Utilities/RemoteRunner.py b/src/DIRAC/WorkloadManagementSystem/Utilities/RemoteRunner.py index 61503a03dc0..ea816e68a88 100644 --- a/src/DIRAC/WorkloadManagementSystem/Utilities/RemoteRunner.py +++ b/src/DIRAC/WorkloadManagementSystem/Utilities/RemoteRunner.py @@ -169,11 +169,7 @@ def _setUpWorkloadCE(self, numberOfProcessorsPayload=1): if not result["OK"]: return result proxy = result["Value"]["chain"] - result = proxy.getRemainingSecs() - if not result["OK"]: - return result - lifetime_secs = result["Value"] - workloadCE.setProxy(proxy, lifetime_secs) + workloadCE.setProxy(proxy) return S_OK(workloadCE) diff --git a/src/DIRAC/WorkloadManagementSystem/Utilities/SubmissionPolicy.py b/src/DIRAC/WorkloadManagementSystem/Utilities/SubmissionPolicy.py index 29450ffa67a..2ae82f6d34b 100644 --- a/src/DIRAC/WorkloadManagementSystem/Utilities/SubmissionPolicy.py +++ b/src/DIRAC/WorkloadManagementSystem/Utilities/SubmissionPolicy.py @@ -3,29 +3,34 @@ from DIRAC.WorkloadManagementSystem.Client.MatcherClient import MatcherClient +# Submission policies +AGGRESSIVE_FILLING = "AggressiveFilling" +WAITING_SUPPORTED_JOBS = "WaitingSupportedJobs" +SUBMISSION_POLICIES = [AGGRESSIVE_FILLING, WAITING_SUPPORTED_JOBS] + + class SubmissionPolicy(ABC): """Abstract class to define a submission strategy.""" @abstractmethod - def apply(self, availableSlots: int, queueName: str, queueInfo: dict[str, str], vo: str) -> int: + def apply(self, availableSlots: int, **kwargs) -> int: """Method to redefine in the concrete subclasses :param availableSlots: slots available for new pilots - :param queueName: the name of the targeted queue - :param queueInfo: a dictionary of attributes related to the queue - :param vo: VO """ - pass + if availableSlots < 0: + raise RuntimeError("Available slots cannot be negative") -class AgressiveFillingPolicy(SubmissionPolicy): - def apply(self, availableSlots: int, queueName: str, queueInfo: dict[str, str], vo: str) -> int: +class AggressiveFillingPolicy(SubmissionPolicy): + def apply(self, availableSlots: int, **kwargs) -> int: """All the available slots should be filled up. Should be employed for sites that are always processing jobs. * Pros: would quickly fill up a queue * Cons: would consume a lot of CPU hours for nothing if pilots do not match jobs """ + super().apply(availableSlots, **kwargs) return availableSlots @@ -34,22 +39,18 @@ def __init__(self) -> None: super().__init__() self.matcherClient = MatcherClient() - def apply(self, availableSlots: int, queueName: str, queueInfo: dict[str, str], vo: str) -> int: + def apply(self, availableSlots: int, **kwargs) -> int: """Fill up available slots only if waiting supported jobs exist. Should be employed for sites that are used from time to time (targeting specific Task Queues). * Pros: submit pilots only if necessary, and quickly fill up the queue if needed * Cons: would create some unused pilots in all the sites supervised by this policy and targeting a same task queue - """ - # Prepare CE dictionary from the queue info - ce = queueInfo["CE"] - ceDict = ce.ceParameters - ceDict["GridCE"] = queueInfo["CEName"] - if vo: - ceDict["Community"] = vo + :param ceParameters: CE parameters + """ + super().apply(availableSlots, **kwargs) # Get Task Queues related to the CE - result = self.matcherClient.getMatchingTaskQueues(ceDict) + result = self.matcherClient.getMatchingTaskQueues(kwargs["ceParameters"]) if not result["OK"]: return 0 taskQueueDict = result["Value"] diff --git a/src/DIRAC/WorkloadManagementSystem/Utilities/test/Test_SubmissionPolicy.py b/src/DIRAC/WorkloadManagementSystem/Utilities/test/Test_SubmissionPolicy.py new file mode 100644 index 00000000000..66f3ce5209b --- /dev/null +++ b/src/DIRAC/WorkloadManagementSystem/Utilities/test/Test_SubmissionPolicy.py @@ -0,0 +1,91 @@ +""" Test class for Submission policy +""" +# pylint: disable=protected-access + +import pytest +from DIRAC.Core.Utilities.ReturnValues import S_OK + +from DIRAC.WorkloadManagementSystem.Client import PilotStatus +from DIRAC.WorkloadManagementSystem.Utilities.SubmissionPolicy import ( + SUBMISSION_POLICIES, + AggressiveFillingPolicy, + WaitingSupportedJobsPolicy, +) + + +def test_AggressiveFillingPolicy(): + """Make sure it always return the number of slots provided""" + policy = AggressiveFillingPolicy() + + # 1. We want to submit 50 elements + numberToSubmit = policy.apply(50) + assert numberToSubmit == 50 + + # 2. We want to submit 0 element + numberToSubmit = policy.apply(0) + assert numberToSubmit == 0 + + # 3. We want to submit -10 elements + with pytest.raises(RuntimeError): + numberToSubmit = policy.apply(-10) + + +def test_WaitingSupportedJobsPolicy(mocker): + """Make sure it returns the min between the available slots and the jobs available""" + policy = WaitingSupportedJobsPolicy() + + # 1. We want to submit 50 elements without specifying the CE parameters + with pytest.raises(KeyError): + numberToSubmit = policy.apply(50) + + # 2. We want to submit 50 elements but there are no waiting job + # Because it requires an access to a DB, we mock the value returned by the Matcher + mocker.patch( + "DIRAC.WorkloadManagementSystem.Client.MatcherClient.MatcherClient.getMatchingTaskQueues", return_value=S_OK({}) + ) + numberToSubmit = policy.apply(50, ceParameters={}) + assert numberToSubmit == 0 + + # 3. We want to submit 50 elements and we have 10 similar waiting jobs + mocker.patch( + "DIRAC.WorkloadManagementSystem.Client.MatcherClient.MatcherClient.getMatchingTaskQueues", + return_value=S_OK({"TQ1": {"Jobs": 10}}), + ) + numberToSubmit = policy.apply(50, ceParameters={}) + assert numberToSubmit == 10 + + # 4. We want to submit 50 elements and we have 10 waiting jobs, split into 2 task queues + mocker.patch( + "DIRAC.WorkloadManagementSystem.Client.MatcherClient.MatcherClient.getMatchingTaskQueues", + return_value=S_OK({"TQ1": {"Jobs": 8}, "TQ2": {"Jobs": 2}}), + ) + numberToSubmit = policy.apply(50, ceParameters={}) + assert numberToSubmit == 10 + + # 5. We want to submit 50 elements and we have 60 similar waiting jobs + mocker.patch( + "DIRAC.WorkloadManagementSystem.Client.MatcherClient.MatcherClient.getMatchingTaskQueues", + return_value=S_OK({"TQ1": {"Jobs": 60}}), + ) + numberToSubmit = policy.apply(50, ceParameters={}) + assert numberToSubmit == 50 + + # 6. We want to submit 50 elements and we have 60 waiting jobs, split into 2 task queues + mocker.patch( + "DIRAC.WorkloadManagementSystem.Client.MatcherClient.MatcherClient.getMatchingTaskQueues", + return_value=S_OK({"TQ1": {"Jobs": 35}, "TQ2": {"Jobs": 25}}), + ) + numberToSubmit = policy.apply(50, ceParameters={}) + assert numberToSubmit == 50 + + # 6. We want to submit 50 elements and we have 60 waiting jobs, split into 2 task queues + mocker.patch( + "DIRAC.WorkloadManagementSystem.Client.MatcherClient.MatcherClient.getMatchingTaskQueues", + return_value=S_OK({"TQ1": {"Jobs": 35}, "TQ2": {"Jobs": 25}}), + ) + numberToSubmit = policy.apply(50, ceParameters={}) + assert numberToSubmit == 50 + + # 7. We want to submit -10 elements + with pytest.raises(RuntimeError): + numberToSubmit = policy.apply(-10, ceParameters={}) diff --git a/src/DIRAC/WorkloadManagementSystem/scripts/dirac_admin_add_pilot.py b/src/DIRAC/WorkloadManagementSystem/scripts/dirac_admin_add_pilot.py index 72ba14a3c17..f47d14bd81d 100644 --- a/src/DIRAC/WorkloadManagementSystem/scripts/dirac_admin_add_pilot.py +++ b/src/DIRAC/WorkloadManagementSystem/scripts/dirac_admin_add_pilot.py @@ -88,7 +88,7 @@ def main(): if not DErrno.cmpError(res, DErrno.EWMSNOPILOT): gLogger.error(res["Message"]) DIRACExit(1) - res = pmc.addPilotTQRef([pilotRef], params.taskQueueID, ownerGroup, gridType, {pilotRef: pilotStamp}) + res = pmc.addPilotReferences([pilotRef], ownerGroup, gridType, {pilotRef: pilotStamp}) if not res["OK"]: gLogger.error(res["Message"]) DIRACExit(1) diff --git a/tests/Integration/WorkloadManagementSystem/Test_PilotAgentsDB.py b/tests/Integration/WorkloadManagementSystem/Test_PilotAgentsDB.py index a9f1c43874f..51cecfab9ca 100644 --- a/tests/Integration/WorkloadManagementSystem/Test_PilotAgentsDB.py +++ b/tests/Integration/WorkloadManagementSystem/Test_PilotAgentsDB.py @@ -36,7 +36,7 @@ def preparePilots(stateCount, testSite, testCE, testGroup): for i in range(nPilots): pilotRef.append("pilotRef_" + str(i)) - res = paDB.addPilotReference( + res = paDB.addPilotReferences( pilotRef, testGroup, ) @@ -79,7 +79,7 @@ def cleanUpPilots(pilotRef): def test_basic(): """usual insert/verify""" - res = paDB.addPilotReference(["pilotRef"], "ownerGroup") + res = paDB.addPilotReferences(["pilotRef"], "ownerGroup") assert res["OK"] is True res = paDB.deletePilot("pilotRef") diff --git a/tests/Integration/WorkloadManagementSystem/Test_PilotsClient.py b/tests/Integration/WorkloadManagementSystem/Test_PilotsClient.py index f97bf7be961..9d0069ff670 100644 --- a/tests/Integration/WorkloadManagementSystem/Test_PilotsClient.py +++ b/tests/Integration/WorkloadManagementSystem/Test_PilotsClient.py @@ -27,7 +27,7 @@ def test_PilotsDB(): for jobID in ["aPilot", "anotherPilot"]: pilots.deletePilots(jobID) - res = pilots.addPilotTQRef(["aPilot"], 1, "a/owner/Group") + res = pilots.addPilotReferences(["aPilot"], "a/owner/Group") assert res["OK"], res["Message"] res = pilots.getCurrentPilotCounters({}) assert res["OK"], res["Message"] @@ -38,7 +38,7 @@ def test_PilotsDB(): assert res["OK"], res["Message"] assert res["Value"] == {} - res = pilots.addPilotTQRef(["anotherPilot"], 1, "a/owner/Group") + res = pilots.addPilotReferences(["anotherPilot"], "a/owner/Group") assert res["OK"], res["Message"] res = pilots.storePilotOutput("anotherPilot", "This is an output", "this is an error") assert res["OK"], res["Message"]