From abc9224c763ebe7acb9ff2b2dbea3e0772c7cf0b Mon Sep 17 00:00:00 2001 From: Manuel Giffels Date: Thu, 3 Aug 2023 10:12:55 +0200 Subject: [PATCH 01/20] Add fixed date helper function to test utilities --- tests/utilities/utilities.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/utilities/utilities.py b/tests/utilities/utilities.py index 66691ab4..d9c0b79c 100644 --- a/tests/utilities/utilities.py +++ b/tests/utilities/utilities.py @@ -1,5 +1,7 @@ from tardis.utilities.attributedict import AttributeDict +from datetime import datetime + import asyncio import socket @@ -19,6 +21,10 @@ def get_free_port(): # from https://gist.github.com/dbrgn/3979133 return port +def get_fixed_datetime(): + return datetime.fromtimestamp(12345689) + + def mock_executor_run_command(stdout, stderr="", exit_code=0, raise_exception=None): def decorator(func): def wrapper(self): From 3a55c5d25240cd17520c149c77984a23da0b5f0a Mon Sep 17 00:00:00 2001 From: Manuel Giffels Date: Thu, 3 Aug 2023 10:49:41 +0200 Subject: [PATCH 02/20] Add comments to specify the meaning of created and updated timestamps --- tardis/resources/drone.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tardis/resources/drone.py b/tardis/resources/drone.py index 43ba3ec6..b16f822d 100644 --- a/tardis/resources/drone.py +++ b/tardis/resources/drone.py @@ -45,8 +45,8 @@ def __init__( machine_type=self.site_agent.machine_type, obs_machine_meta_data_translation_mapping=self.batch_system_agent.machine_meta_data_translation_mapping, # noqa B950 remote_resource_uuid=remote_resource_uuid, - created=created or datetime.now(), - updated=updated or datetime.now(), + created=created or datetime.now(), # timestamp drone creation + updated=updated or datetime.now(), # timestamp last drone state update drone_uuid=drone_uuid or self.site_agent.drone_uuid(uuid.uuid4().hex[:10]), ) From be276812d959feeac5e2e5ee6d3cb2ebe0df0abd Mon Sep 17 00:00:00 2001 From: Manuel Giffels Date: Thu, 3 Aug 2023 10:55:09 +0200 Subject: [PATCH 03/20] Remove updates on created and updated timestamps from the OpenStack adapter --- tardis/adapters/sites/openstack.py | 3 --- tests/adapters_t/sites_t/test_openstack.py | 13 +++++++++---- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/tardis/adapters/sites/openstack.py b/tardis/adapters/sites/openstack.py index 398eb3a7..025b8712 100644 --- a/tardis/adapters/sites/openstack.py +++ b/tardis/adapters/sites/openstack.py @@ -18,7 +18,6 @@ from asyncio import TimeoutError from contextlib import contextmanager -from datetime import datetime from functools import partial import logging @@ -54,8 +53,6 @@ def __init__(self, machine_type: str, site_name: str): ) translator_functions = StaticMapping( - created=lambda date: datetime.strptime(date, "%Y-%m-%dT%H:%M:%SZ"), - updated=lambda date: datetime.strptime(date, "%Y-%m-%dT%H:%M:%SZ"), status=lambda x, translator=StaticMapping( BUILD=ResourceStatus.Booting, ACTIVE=ResourceStatus.Running, diff --git a/tests/adapters_t/sites_t/test_openstack.py b/tests/adapters_t/sites_t/test_openstack.py index ca4c9307..d46f4497 100644 --- a/tests/adapters_t/sites_t/test_openstack.py +++ b/tests/adapters_t/sites_t/test_openstack.py @@ -6,8 +6,7 @@ from tardis.exceptions.tardisexceptions import TardisResourceStatusUpdateFailed from tardis.utilities.attributedict import AttributeDict from tardis.interfaces.siteadapter import ResourceStatus -from tests.utilities.utilities import async_return -from tests.utilities.utilities import run_async +from tests.utilities.utilities import async_return, run_async from aiohttp import ClientConnectionError from aiohttp import ContentTypeError @@ -71,7 +70,11 @@ def setUp(self): self.get_return_value = AttributeDict( server=AttributeDict( - name="testsite-089123", id="029312-1231-123123", status="ACTIVE" + name="testsite-089123", + id="029312-1231-123123", + status="ACTIVE", + created="2023-07-31T12:46:24Z", + updated="2023-07-31T12:46:50Z", ) ) openstack_api.servers.get.return_value = async_return( @@ -147,7 +150,9 @@ def test_resource_status(self): run_async( self.openstack_adapter.resource_status, resource_attributes=AttributeDict( - remote_resource_uuid="029312-1231-123123" + drone_uuid="testsite-089123", + remote_resource_uuid="029312-1231-123123", + resource_status=ResourceStatus.Booting, ), ), AttributeDict( From 5452a50f39184705d515711002e314284ae9df4e Mon Sep 17 00:00:00 2001 From: Manuel Giffels Date: Thu, 3 Aug 2023 11:49:58 +0200 Subject: [PATCH 04/20] Remove return value of stop and terminate resources --- tardis/adapters/sites/openstack.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/tardis/adapters/sites/openstack.py b/tardis/adapters/sites/openstack.py index 025b8712..94f1ed09 100644 --- a/tardis/adapters/sites/openstack.py +++ b/tardis/adapters/sites/openstack.py @@ -85,22 +85,20 @@ async def resource_status( logger.debug(f"{self.site_name} servers get returned {response}") return self.handle_response(response["server"]) - async def stop_resource(self, resource_attributes: AttributeDict): + async def stop_resource(self, resource_attributes: AttributeDict) -> None: await self.nova.init_api(timeout=60) params = {"os-stop": None} response = await self.nova.servers.run_action( resource_attributes.remote_resource_uuid, **params ) logger.debug(f"{self.site_name} servers stop returned {response}") - return response - async def terminate_resource(self, resource_attributes: AttributeDict): + async def terminate_resource(self, resource_attributes: AttributeDict) -> None: await self.nova.init_api(timeout=60) response = await self.nova.servers.force_delete( resource_attributes.remote_resource_uuid ) logger.debug(f"{self.site_name} servers terminate returned {response}") - return response @contextmanager def handle_exceptions(self): From 6a032a18b5e9a9d5ac48c848e57f66cb2db075b0 Mon Sep 17 00:00:00 2001 From: Manuel Giffels Date: Thu, 3 Aug 2023 11:51:26 +0200 Subject: [PATCH 05/20] Remove updates on created and updated timestamps from the Slurm adapter --- tardis/adapters/sites/slurm.py | 26 +++----- tests/adapters_t/sites_t/test_slurm.py | 87 +++++++------------------- 2 files changed, 30 insertions(+), 83 deletions(-) diff --git a/tardis/adapters/sites/slurm.py b/tardis/adapters/sites/slurm.py index 049147aa..ffb47421 100644 --- a/tardis/adapters/sites/slurm.py +++ b/tardis/adapters/sites/slurm.py @@ -6,7 +6,6 @@ from ...interfaces.siteadapter import SiteAdapter from ...utilities.staticmapping import StaticMapping from ...utilities.attributedict import AttributeDict -from ...utilities.attributedict import convert_to_attribute_dict from ...utilities.executors.shellexecutor import ShellExecutor from ...utilities.asynccachemap import AsyncCacheMap from ...utilities.utils import ( @@ -19,7 +18,6 @@ from asyncio import TimeoutError from contextlib import contextmanager from functools import partial -from datetime import datetime import logging import re @@ -127,13 +125,11 @@ async def deploy_resource( logger.debug(f"{self.site_name} sbatch returned {result}") pattern = re.compile(r"^Submitted batch job (\d*)", flags=re.MULTILINE) remote_resource_uuid = int(pattern.findall(result.stdout)[0]) - resource_attributes.update( + + return AttributeDict( remote_resource_uuid=remote_resource_uuid, - created=datetime.now(), - updated=datetime.now(), resource_status=ResourceStatus.Booting, ) - return resource_attributes async def resource_status( self, resource_attributes: AttributeDict @@ -156,20 +152,12 @@ async def resource_status( "State": "COMPLETED", } logger.debug(f"{self.site_name} has status {resource_status}.") - resource_attributes.update(updated=datetime.now()) - return convert_to_attribute_dict( - {**resource_attributes, **self.handle_response(resource_status)} - ) - async def terminate_resource(self, resource_attributes: AttributeDict): + return self.handle_response(resource_status) + + async def terminate_resource(self, resource_attributes: AttributeDict) -> None: request_command = f"scancel {resource_attributes.remote_resource_uuid}" await self._executor.run_command(request_command) - resource_attributes.update( - resource_status=ResourceStatus.Stopped, updated=datetime.now() - ) - return self.handle_response( - {"JobId": resource_attributes.remote_resource_uuid}, **resource_attributes - ) def sbatch_cmdline_options(self, drone_uuid, machine_meta_data_translation_mapping): sbatch_options = self.machine_type_configuration.get( @@ -205,9 +193,9 @@ def sbatch_cmdline_options(self, drone_uuid, machine_meta_data_translation_mappi ), ) - async def stop_resource(self, resource_attributes: AttributeDict): + async def stop_resource(self, resource_attributes: AttributeDict) -> None: logger.debug("Slurm jobs cannot be stopped gracefully. Terminating instead.") - return await self.terminate_resource(resource_attributes) + await self.terminate_resource(resource_attributes) @contextmanager def handle_exceptions(self): diff --git a/tests/adapters_t/sites_t/test_slurm.py b/tests/adapters_t/sites_t/test_slurm.py index 13a5c29b..9aee9fbd 100644 --- a/tests/adapters_t/sites_t/test_slurm.py +++ b/tests/adapters_t/sites_t/test_slurm.py @@ -5,7 +5,7 @@ from tardis.exceptions.executorexceptions import CommandExecutionFailure from tardis.interfaces.siteadapter import ResourceStatus from tardis.utilities.attributedict import AttributeDict -from tests.utilities.utilities import mock_executor_run_command +from tests.utilities.utilities import mock_executor_run_command, run_async from tests.utilities.utilities import run_async from unittest import TestCase @@ -68,15 +68,6 @@ class TestSlurmAdapter(TestCase): mock_config_patcher = None mock_executor_patcher = None - def check_attribute_dicts( - self, expected_attributes, returned_attributes, exclude=tuple() - ): - for key in expected_attributes.keys(): - if key not in exclude: - self.assertEqual( - getattr(returned_attributes, key), getattr(expected_attributes, key) - ) - @classmethod def setUpClass(cls): cls.mock_config_patcher = patch("tardis.interfaces.siteadapter.Configuration") @@ -153,11 +144,6 @@ def test_start_up_command_deprecation_warning(self): @mock_executor_run_command(TEST_DEPLOY_RESOURCE_RESPONSE) def test_deploy_resource(self): - expected_resource_attributes = self.resource_attributes - expected_resource_attributes.update( - created=datetime.now(), updated=datetime.now() - ) - resource_attributes = AttributeDict( machine_type="test2large", site_name="TestSite", @@ -169,19 +155,11 @@ def test_deploy_resource(self): drone_uuid="testsite-1390065", ) - returned_resource_attributes = run_async( - self.slurm_adapter.deploy_resource, resource_attributes - ) - - self.assertLess( - returned_resource_attributes.created - expected_resource_attributes.created, - timedelta(seconds=1), - ) - - self.check_attribute_dicts( - expected_resource_attributes, - returned_resource_attributes, - exclude=("created", "updated"), + self.assertDictEqual( + AttributeDict( + remote_resource_uuid=1390065, resource_status=ResourceStatus.Booting + ), + run_async(self.slurm_adapter.deploy_resource, resource_attributes), ) self.mock_executor.return_value.run_command.assert_called_with( @@ -247,26 +225,14 @@ def test_site_name(self): @mock_executor_run_command(TEST_RESOURCE_STATUS_RESPONSE) def test_resource_status(self): - expected_resource_attributes = self.resource_attributes - expected_resource_attributes.update(updated=datetime.now()) - - returned_resource_attributes = run_async( - self.slurm_adapter.resource_status, - resource_attributes=self.resource_attributes, - ) - - self.assertLess( - ( - returned_resource_attributes.updated - - expected_resource_attributes.updated + self.assertDictEqual( + AttributeDict( + resource_status=ResourceStatus.Booting, remote_resource_uuid=1390065 + ), + run_async( + self.slurm_adapter.resource_status, + resource_attributes=self.resource_attributes, ), - timedelta(seconds=1), - ) - - self.check_attribute_dicts( - expected_resource_attributes, - returned_resource_attributes, - exclude=("created", "updated"), ) self.mock_executor.return_value.run_command.assert_called_with( @@ -279,18 +245,14 @@ def test_update_resource_status(self): self.resource_attributes["resource_status"], ResourceStatus.Booting ) - return_resource_attributes = run_async( - self.slurm_adapter.resource_status, - resource_attributes=self.resource_attributes, - ) - - self.assertEqual( - return_resource_attributes["resource_status"], ResourceStatus.Running - ) - - self.assertEqual( - return_resource_attributes["drone_uuid"], - self.resource_attributes["drone_uuid"], + self.assertDictEqual( + AttributeDict( + resource_status=ResourceStatus.Running, remote_resource_uuid=1390065 + ), + run_async( + self.slurm_adapter.resource_status, + resource_attributes=self.resource_attributes, + ), ) self.mock_executor.return_value.run_command.assert_called_with( @@ -402,14 +364,11 @@ def test_resource_status_update_failed(self): AttributeDict(remote_resource_uuid="1390065"), ) - self.check_attribute_dicts( + self.assertDictEqual( AttributeDict( - remote_resource_uuid=1390065, - resource_status=ResourceStatus.Running, - updated=datetime.now(), + remote_resource_uuid=1390065, resource_status=ResourceStatus.Running ), response, - exclude=("updated",), ) self.mock_executor.return_value.run_command.assert_called_with( From 013bb369333f581a9cb76cc081d0e13857602643 Mon Sep 17 00:00:00 2001 From: Manuel Giffels Date: Thu, 3 Aug 2023 12:37:09 +0200 Subject: [PATCH 06/20] Remove updates on created and updated timestamps from the Moab adapter --- tardis/adapters/sites/moab.py | 50 ++--------- tests/adapters_t/sites_t/test_moab.py | 117 ++++++++------------------ 2 files changed, 46 insertions(+), 121 deletions(-) diff --git a/tardis/adapters/sites/moab.py b/tardis/adapters/sites/moab.py index eeab67b4..1dfc19e0 100644 --- a/tardis/adapters/sites/moab.py +++ b/tardis/adapters/sites/moab.py @@ -6,7 +6,6 @@ from ...interfaces.siteadapter import SiteAdapter from ...utilities.staticmapping import StaticMapping from ...utilities.attributedict import AttributeDict -from ...utilities.attributedict import convert_to_attribute_dict from ...utilities.executors.shellexecutor import ShellExecutor from ...utilities.asynccachemap import AsyncCacheMap from ...utilities.utils import ( @@ -18,11 +17,9 @@ from asyncio import TimeoutError from contextlib import contextmanager from functools import partial -from datetime import datetime import asyncssh import logging -import re import warnings from xml.dom import minidom @@ -133,27 +130,11 @@ async def deploy_resource( logger.debug(f"{self.site_name} servers create returned {result}") remote_resource_uuid = int(result.stdout) - resource_attributes.update( + + return AttributeDict( remote_resource_uuid=remote_resource_uuid, - created=datetime.now(), - updated=datetime.now(), resource_status=ResourceStatus.Booting, ) - return resource_attributes - - @staticmethod - def check_remote_resource_uuid(resource_attributes, regex, response): - pattern = re.compile(regex, flags=re.MULTILINE) - remote_resource_uuid = int(pattern.findall(response)[0]) - if remote_resource_uuid != int(resource_attributes.remote_resource_uuid): - raise TardisError( - f"Failed to terminate {resource_attributes.remote_resource_uuid}." - ) - else: - resource_attributes.update( - resource_status=ResourceStatus.Stopped, updated=datetime.now() - ) - return remote_resource_uuid async def resource_status( self, resource_attributes: AttributeDict @@ -176,40 +157,27 @@ async def resource_status( "State": "Completed", } logger.debug(f"{self.site_name} has status {resource_status}.") - resource_attributes.update(updated=datetime.now()) - return convert_to_attribute_dict( - {**resource_attributes, **self.handle_response(resource_status)} - ) - async def terminate_resource(self, resource_attributes: AttributeDict): + return self.handle_response(resource_status) + + async def terminate_resource(self, resource_attributes: AttributeDict) -> None: request_command = f"canceljob {resource_attributes.remote_resource_uuid}" try: response = await self._executor.run_command(request_command) except CommandExecutionFailure as cf: if cf.exit_code == 1: logger.warning( - f"{self.site_name} servers terminate returned {cf.stdout}" - ) - remote_resource_uuid = self.check_remote_resource_uuid( - resource_attributes, - r"ERROR: invalid job specified \((\d*)\)", - cf.stderr, + f"{self.site_name} servers terminate returned {cf.stdout}." + "Potentially already terminated." ) else: raise cf else: logger.debug(f"{self.site_name} servers terminate returned {response}") - remote_resource_uuid = self.check_remote_resource_uuid( - resource_attributes, r"^job \'(\d*)\' cancelled", response.stdout - ) - - return self.handle_response( - {"SystemJID": remote_resource_uuid}, **resource_attributes - ) - async def stop_resource(self, resource_attributes: AttributeDict): + async def stop_resource(self, resource_attributes: AttributeDict) -> None: logger.debug("MOAB jobs cannot be stopped gracefully. Terminating instead.") - return await self.terminate_resource(resource_attributes) + await self.terminate_resource(resource_attributes) def msub_cmdline_options(self, drone_uuid, machine_meta_data_translation_mapping): sbatch_options = self.machine_type_configuration.get( diff --git a/tests/adapters_t/sites_t/test_moab.py b/tests/adapters_t/sites_t/test_moab.py index 53a67609..7c46f80b 100644 --- a/tests/adapters_t/sites_t/test_moab.py +++ b/tests/adapters_t/sites_t/test_moab.py @@ -225,28 +225,16 @@ def test_start_up_command_deprecation_warning(self): @mock_executor_run_command(TEST_DEPLOY_RESOURCE_RESPONSE) def test_deploy_resource(self): - expected_resource_attributes = self.resource_attributes - expected_resource_attributes.update( - created=datetime.now(), updated=datetime.now() - ) - return_resource_attributes = run_async( - self.moab_adapter.deploy_resource, - resource_attributes=self.resource_attributes, - ) - if ( - return_resource_attributes.created - expected_resource_attributes.created - > timedelta(seconds=1) - or return_resource_attributes.updated - expected_resource_attributes.updated - > timedelta(seconds=1) - ): - raise Exception("Creation time or update time wrong!") - del ( - expected_resource_attributes.created, - expected_resource_attributes.updated, - return_resource_attributes.created, - return_resource_attributes.updated, + self.assertDictEqual( + AttributeDict( + remote_resource_uuid=4761849, resource_status=ResourceStatus.Booting + ), + run_async( + self.moab_adapter.deploy_resource, + resource_attributes=self.resource_attributes, + ), ) - self.assertEqual(return_resource_attributes, expected_resource_attributes) + self.mock_executor.return_value.run_command.assert_called_with( "msub -j oe -m p -l walltime=02:00:00:00,mem=120gb,nodes=1:ppn=20 -v TardisDroneCores=128,TardisDroneMemory=120,TardisDroneDisk=100,TardisDroneUuid=testsite-abcdef startVM.py" # noqa: B950 ) @@ -260,12 +248,11 @@ def test_deploy_resource_w_submit_options(self): ) ) - moab_adapter = MoabAdapter(machine_type="test2large", site_name="TestSite") - run_async( - moab_adapter.deploy_resource, + self.moab_adapter.deploy_resource, resource_attributes=self.resource_attributes, ) + self.mock_executor.return_value.run_command.assert_called_with( "msub -M someone@somewhere.com -j oe -m p -l walltime=02:00:00:00,mem=120gb,nodes=1:ppn=20 -v TardisDroneCores=128,TardisDroneMemory=120,TardisDroneDisk=100,TardisDroneUuid=testsite-abcdef --timeout=60 startVM.py" # noqa: B950 ) @@ -283,19 +270,15 @@ def test_site_name(self): @mock_executor_run_command(TEST_RESOURCE_STATUS_RESPONSE) def test_resource_status(self): - expected_resource_attributes = self.resource_attributes - expected_resource_attributes.update(updated=datetime.now()) - return_resource_attributes = run_async( - self.moab_adapter.resource_status, - resource_attributes=self.resource_attributes, + self.assertDictEqual( + AttributeDict( + remote_resource_uuid=4761849, resource_status=ResourceStatus.Booting + ), + run_async( + self.moab_adapter.resource_status, + resource_attributes=self.resource_attributes, + ), ) - if ( - return_resource_attributes.updated - expected_resource_attributes.updated - > timedelta(seconds=1) - ): - raise Exception("Update time wrong!") - del expected_resource_attributes.updated, return_resource_attributes.updated - self.assertEqual(return_resource_attributes, expected_resource_attributes) @mock_executor_run_command(TEST_RESOURCE_STATE_TRANSLATION_RESPONSE) def test_resource_state_translation(self): @@ -316,49 +299,36 @@ def test_resource_status_update(self): self.assertEqual( self.resource_attributes["resource_status"], ResourceStatus.Booting ) - return_resource_attributes = run_async( - self.moab_adapter.resource_status, - resource_attributes=self.resource_attributes, - ) - self.assertEqual( - return_resource_attributes["resource_status"], ResourceStatus.Running + + self.assertDictEqual( + AttributeDict( + remote_resource_uuid=4761849, resource_status=ResourceStatus.Running + ), + run_async( + self.moab_adapter.resource_status, + resource_attributes=self.resource_attributes, + ), ) @mock_executor_run_command(TEST_TERMINATE_RESOURCE_RESPONSE) def test_stop_resource(self): - expected_resource_attributes = self.resource_attributes - expected_resource_attributes.update( - updated=datetime.now(), resource_status=ResourceStatus.Stopped - ) - return_resource_attributes = run_async( + run_async( self.moab_adapter.stop_resource, resource_attributes=self.resource_attributes, ) - if ( - return_resource_attributes.updated - expected_resource_attributes.updated - > timedelta(seconds=1) - ): - raise Exception("Update time wrong!") - del expected_resource_attributes.updated, return_resource_attributes.updated - self.assertEqual(return_resource_attributes, expected_resource_attributes) + self.mock_executor.return_value.run_command.assert_called_with( + "canceljob 4761849" + ) @mock_executor_run_command(TEST_TERMINATE_RESOURCE_RESPONSE) def test_terminate_resource(self): - expected_resource_attributes = self.resource_attributes - expected_resource_attributes.update( - updated=datetime.now(), resource_status=ResourceStatus.Stopped - ) - return_resource_attributes = run_async( + run_async( self.moab_adapter.terminate_resource, resource_attributes=self.resource_attributes, ) - if ( - return_resource_attributes.updated - expected_resource_attributes.updated - > timedelta(seconds=1) - ): - raise Exception("Update time wrong!") - del expected_resource_attributes.updated, return_resource_attributes.updated - self.assertEqual(return_resource_attributes, expected_resource_attributes) + self.mock_executor.return_value.run_command.assert_called_with( + "canceljob 4761849" + ) @mock_executor_run_command( "", @@ -372,18 +342,11 @@ def test_terminate_resource(self): ), ) def test_terminate_dead_resource(self): - expected_resource_attributes = self.resource_attributes - expected_resource_attributes.update( - updated=datetime.now(), resource_status=ResourceStatus.Stopped - ) with self.assertLogs(level=logging.WARNING): - return_resource_attributes = run_async( + run_async( self.moab_adapter.terminate_resource, resource_attributes=self.resource_attributes, ) - self.assertEqual( - return_resource_attributes["resource_status"], ResourceStatus.Stopped - ) @mock_executor_run_command( "", @@ -461,9 +424,3 @@ def test_exception_handling(to_raise, to_catch): for to_raise, to_catch in matrix: test_exception_handling(to_raise, to_catch) - - def test_check_remote_resource_uuid(self): - with self.assertRaises(TardisError): - self.moab_adapter.check_remote_resource_uuid( - AttributeDict(remote_resource_uuid=1), regex=r"^(\d)$", response="2" - ) From 7053619897693813a1d4f1a515bd859a3843e33e Mon Sep 17 00:00:00 2001 From: Manuel Giffels Date: Thu, 3 Aug 2023 13:14:39 +0200 Subject: [PATCH 07/20] The drone_uuid is static, so remove it from responses in Kubernetes adapter --- tardis/adapters/sites/kubernetes.py | 14 +++++--------- tests/adapters_t/sites_t/test_kubernetes.py | 5 ----- 2 files changed, 5 insertions(+), 14 deletions(-) diff --git a/tardis/adapters/sites/kubernetes.py b/tardis/adapters/sites/kubernetes.py index 7ef09ce7..ee7243bd 100644 --- a/tardis/adapters/sites/kubernetes.py +++ b/tardis/adapters/sites/kubernetes.py @@ -21,7 +21,7 @@ def __init__(self, machine_type: str, site_name: str): self._machine_type = machine_type self._site_name = site_name key_translator = StaticMapping( - remote_resource_uuid="uid", drone_uuid="name", resource_status="type" + remote_resource_uuid="uid", resource_status="type" ) translator_functions = StaticMapping( created=lambda date: datetime.strptime(date, "%Y-%m-%dT%H:%M:%S%z"), @@ -113,7 +113,6 @@ async def deploy_resource( ) response = { "uid": response_temp.metadata.uid, - "name": response_temp.metadata.name, "type": "Booting", } if self.machine_type_configuration.hpa: @@ -163,23 +162,21 @@ async def resource_status( response = {"uid": response_uid, "name": response_name, "type": response_type} return self.handle_response(response) - async def stop_resource(self, resource_attributes: AttributeDict): + async def stop_resource(self, resource_attributes: AttributeDict) -> None: body = await self.client.read_namespaced_deployment( name=resource_attributes.drone_uuid, namespace=self.machine_type_configuration.namespace, ) body.spec.replicas = 0 - response = await self.client.replace_namespaced_deployment( + await self.client.replace_namespaced_deployment( name=resource_attributes.drone_uuid, namespace=self.machine_type_configuration.namespace, body=body, ) - return response - async def terminate_resource(self, resource_attributes: AttributeDict): - response = None + async def terminate_resource(self, resource_attributes: AttributeDict) -> None: try: - response = await self.client.delete_namespaced_deployment( + await self.client.delete_namespaced_deployment( name=resource_attributes.drone_uuid, namespace=self.machine_type_configuration.namespace, body=k8s_client.V1DeleteOptions( @@ -200,7 +197,6 @@ async def terminate_resource(self, resource_attributes: AttributeDict): if ex.status != 404: logger.warning(f"deleting hpa failed: {ex}") raise - return response @contextmanager def handle_exceptions(self): diff --git a/tests/adapters_t/sites_t/test_kubernetes.py b/tests/adapters_t/sites_t/test_kubernetes.py index 223a7d6c..07919434 100644 --- a/tests/adapters_t/sites_t/test_kubernetes.py +++ b/tests/adapters_t/sites_t/test_kubernetes.py @@ -167,7 +167,6 @@ def test_deploy_resource(self, mocked_aiohttp): ), AttributeDict( remote_resource_uuid="123456", - drone_uuid="testsite-089123", resource_status=ResourceStatus.Booting, ), ) @@ -199,7 +198,6 @@ def test_resource_status(self, mocked_aiohttp): ), AttributeDict( remote_resource_uuid="123456", - drone_uuid="testsite-089123", resource_status=ResourceStatus.Running, ), ) @@ -216,7 +214,6 @@ def test_resource_status(self, mocked_aiohttp): ), AttributeDict( remote_resource_uuid="123456", - drone_uuid="testsite-089123", resource_status=ResourceStatus.Stopped, ), ) @@ -230,7 +227,6 @@ def test_resource_status(self, mocked_aiohttp): ), AttributeDict( remote_resource_uuid="123456", - drone_uuid="testsite-089123", resource_status=ResourceStatus.Booting, ), ) @@ -244,7 +240,6 @@ def test_resource_status(self, mocked_aiohttp): ), AttributeDict( remote_resource_uuid="123456", - drone_uuid="testsite-089123", resource_status=ResourceStatus.Deleted, ), ) From f0ce5fcdcf8a7fe20c301e707fb68b6acdb82ec5 Mon Sep 17 00:00:00 2001 From: Manuel Giffels Date: Thu, 3 Aug 2023 15:55:08 +0200 Subject: [PATCH 08/20] Remove updates on created and updated timestamps from the HTCondor adapter --- tardis/adapters/sites/htcondor.py | 18 +++++------------- .../sites_t/test_htcondorsiteadapter.py | 6 ++---- 2 files changed, 7 insertions(+), 17 deletions(-) diff --git a/tardis/adapters/sites/htcondor.py b/tardis/adapters/sites/htcondor.py index a92d62d3..b6e133ff 100644 --- a/tardis/adapters/sites/htcondor.py +++ b/tardis/adapters/sites/htcondor.py @@ -17,7 +17,6 @@ ) from contextlib import contextmanager -from datetime import datetime from functools import partial from string import Template @@ -213,8 +212,6 @@ def __init__( key_translator = StaticMapping( remote_resource_uuid="JobId", resource_status="JobStatus", - created="created", - updated="updated", ) # HTCondor uses digits to indicate job states and digit as variable names @@ -264,7 +261,7 @@ async def deploy_resource( job_id = await self._condor_submit(submit_jdl) response = AttributeDict(JobId=job_id) - response.update(self.create_timestamps()) + return self.handle_response(response) async def resource_status( @@ -287,29 +284,24 @@ async def resource_status( else: return self.handle_response(resource_status) - async def stop_resource(self, resource_attributes: AttributeDict): + async def stop_resource(self, resource_attributes: AttributeDict) -> None: """ Stopping machines is equivalent to suspending jobs in HTCondor, therefore condor_suspend is called! """ resource_uuid = resource_attributes.remote_resource_uuid if await self._condor_suspend(resource_attributes): - return self.handle_response(AttributeDict(JobId=resource_uuid)) + return logger.debug(f"condor_suspend failed for {resource_uuid}") raise TardisResourceStatusUpdateFailed - async def terminate_resource(self, resource_attributes: AttributeDict): + async def terminate_resource(self, resource_attributes: AttributeDict) -> None: resource_uuid = resource_attributes.remote_resource_uuid if await self._condor_rm(resource_attributes): - return self.handle_response(AttributeDict(JobId=resource_uuid)) + return logger.debug(f"condor_rm failed for {resource_uuid}") raise TardisResourceStatusUpdateFailed - @staticmethod - def create_timestamps(): - now = datetime.now() - return AttributeDict(created=now, updated=now) - @contextmanager def handle_exceptions(self): try: diff --git a/tests/adapters_t/sites_t/test_htcondorsiteadapter.py b/tests/adapters_t/sites_t/test_htcondorsiteadapter.py index ea908d5d..cc2fb1ce 100644 --- a/tests/adapters_t/sites_t/test_htcondorsiteadapter.py +++ b/tests/adapters_t/sites_t/test_htcondorsiteadapter.py @@ -165,8 +165,6 @@ def test_deploy_resource_htcondor_obs(self): ), ) self.assertEqual(response.remote_resource_uuid, "1351043.0") - self.assertFalse(response.created - datetime.now() > timedelta(seconds=1)) - self.assertFalse(response.updated - datetime.now() > timedelta(seconds=1)) _, kwargs = self.mock_executor.return_value.run_command.call_args self.assertEqual( @@ -361,7 +359,7 @@ def test_stop_resource(self): response = run_async( self.adapter.stop_resource, AttributeDict(remote_resource_uuid="1351043.0") ) - self.assertEqual(response.remote_resource_uuid, "1351043.0") + self.assertIsNone(response) @mock_executor_run_command( stdout="", @@ -403,7 +401,7 @@ def test_terminate_resource(self): self.adapter.terminate_resource, AttributeDict(remote_resource_uuid="1351043.0"), ) - self.assertEqual(response.remote_resource_uuid, "1351043.0") + self.assertIsNone(response) @mock_executor_run_command(stdout=CONDOR_RM_FAILED_OUTPUT) def test_terminate_resource_failed_redo(self): From 03c1d56ee07693c968f6e790ac380ab74e5608fe Mon Sep 17 00:00:00 2001 From: Manuel Giffels Date: Thu, 3 Aug 2023 16:09:13 +0200 Subject: [PATCH 09/20] Add grace period for drones not yet in asynchronously updated batchsystem status --- tardis/adapters/sites/htcondor.py | 16 +++++++++++++--- tardis/adapters/sites/moab.py | 14 +++++++++++--- tardis/adapters/sites/slurm.py | 15 ++++++++++++--- 3 files changed, 36 insertions(+), 9 deletions(-) diff --git a/tardis/adapters/sites/htcondor.py b/tardis/adapters/sites/htcondor.py index b6e133ff..9a737109 100644 --- a/tardis/adapters/sites/htcondor.py +++ b/tardis/adapters/sites/htcondor.py @@ -273,13 +273,23 @@ async def resource_status( resource_status = self._htcondor_queue[resource_uuid] except KeyError: # In case the created timestamp is after last update timestamp of the - # asynccachemap, no decision about the current state can be given, - # since map is updated asynchronously. + # asynccachemap plus a grace period of max(10, bulk_delay) seconds, no + # decision about the current state can be given, since map is updated + # asynchronously. + bulk_delay = getattr(self.configuration, "bulk_delay", 1) if ( self._htcondor_queue.last_update - resource_attributes.created - ).total_seconds() < 0: + ).total_seconds() < max(bulk_delay, 10): + logger.debug( + "Time difference between drone creation and last_update of" + f"htcondor_queue is less then {max(bulk_delay, 10)} s." + ) raise TardisResourceStatusUpdateFailed from None else: + logger.debug( + f"Cannot find {resource_uuid} in htcondor_queue assuming" + "drone is already deleted." + ) return AttributeDict(resource_status=ResourceStatus.Deleted) else: return self.handle_response(resource_status) diff --git a/tardis/adapters/sites/moab.py b/tardis/adapters/sites/moab.py index 1dfc19e0..0cad11c4 100644 --- a/tardis/adapters/sites/moab.py +++ b/tardis/adapters/sites/moab.py @@ -141,17 +141,25 @@ async def resource_status( ) -> AttributeDict: await self._moab_status.update_status() # In case the created timestamp is after last update timestamp of the - # asynccachemap, no decision about the current state can be given, - # since map is updated asynchronously. + # asynccachemap plus 10 s grace period, no decision about the current + # state can be given, since map is updated asynchronously. try: resource_uuid = resource_attributes.remote_resource_uuid resource_status = self._moab_status[str(resource_uuid)] except KeyError as err: if ( self._moab_status._last_update - resource_attributes.created - ).total_seconds() < 0: + ).total_seconds() < 10: + logger.debug( + "Time difference between drone creation and last_update of" + "moab_status is less then 10 s." + ) raise TardisResourceStatusUpdateFailed from err else: + logger.debug( + f"Cannot find {resource_uuid} in moab_status assuming" + "drone is already deleted." + ) resource_status = { "JobID": resource_attributes.remote_resource_uuid, "State": "Completed", diff --git a/tardis/adapters/sites/slurm.py b/tardis/adapters/sites/slurm.py index ffb47421..786a7d78 100644 --- a/tardis/adapters/sites/slurm.py +++ b/tardis/adapters/sites/slurm.py @@ -141,12 +141,21 @@ async def resource_status( except KeyError: if ( self._slurm_status.last_update - resource_attributes.created - ).total_seconds() < 0: + ).total_seconds() < 10: # In case the created timestamp is after last update timestamp of the - # asynccachemap, no decision about the current state can be given, - # since map is updated asynchronously. Just retry later on. + # asynccachemap plus 10 s grace period, no decision about the current + # state can be given, since map is updated asynchronously. + # Just retry later on. + logger.debug( + "Time difference between drone creation and last_update of" + "slurm_status is less then 10 s." + ) raise TardisResourceStatusUpdateFailed from None else: + logger.debug( + f"Cannot find {resource_uuid} in slurm_status assuming" + "drone is already deleted." + ) resource_status = { "JobID": resource_attributes.remote_resource_uuid, "State": "COMPLETED", From 12139127564092f28e39aab6c11c9aab67190cd3 Mon Sep 17 00:00:00 2001 From: Manuel Giffels Date: Thu, 3 Aug 2023 16:34:19 +0200 Subject: [PATCH 10/20] Remove updates on created and updated timestamps from the cloudstack adapter --- tardis/adapters/sites/cloudstack.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/tardis/adapters/sites/cloudstack.py b/tardis/adapters/sites/cloudstack.py index efa618f4..0f45f142 100644 --- a/tardis/adapters/sites/cloudstack.py +++ b/tardis/adapters/sites/cloudstack.py @@ -12,7 +12,6 @@ from CloudStackAIO.CloudStack import CloudStackClientException from contextlib import contextmanager -from datetime import datetime from functools import partial import asyncio @@ -37,8 +36,6 @@ def __init__(self, machine_type: str, site_name: str): ) translator_functions = StaticMapping( - created=lambda date: datetime.strptime(date, "%Y-%m-%dT%H:%M:%S%z"), - updated=lambda date: datetime.strptime(date, "%Y-%m-%dT%H:%M:%S%z"), state=lambda x, translator=StaticMapping( Present=ResourceStatus.Booting, Running=ResourceStatus.Running, From b71dece2c7dae8a5730f24f318c1203fe735b765 Mon Sep 17 00:00:00 2001 From: Manuel Giffels Date: Thu, 3 Aug 2023 16:39:11 +0200 Subject: [PATCH 11/20] Revert "Add fixed date helper function to test utilities" This reverts commit 47c10d72ec460dff452c78f8b586152562480521. --- tests/utilities/utilities.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/tests/utilities/utilities.py b/tests/utilities/utilities.py index d9c0b79c..66691ab4 100644 --- a/tests/utilities/utilities.py +++ b/tests/utilities/utilities.py @@ -1,7 +1,5 @@ from tardis.utilities.attributedict import AttributeDict -from datetime import datetime - import asyncio import socket @@ -21,10 +19,6 @@ def get_free_port(): # from https://gist.github.com/dbrgn/3979133 return port -def get_fixed_datetime(): - return datetime.fromtimestamp(12345689) - - def mock_executor_run_command(stdout, stderr="", exit_code=0, raise_exception=None): def decorator(func): def wrapper(self): From 8a57e7700cf1c65086764c13960370d53f8badcd Mon Sep 17 00:00:00 2001 From: Manuel Giffels Date: Fri, 4 Aug 2023 09:38:15 +0200 Subject: [PATCH 12/20] Refactoring of the FakeSiteAdapter --- tardis/adapters/sites/fakesite.py | 60 +++---------- tests/adapters_t/sites_t/test_fakesite.py | 100 ++++++++++++++-------- 2 files changed, 77 insertions(+), 83 deletions(-) diff --git a/tardis/adapters/sites/fakesite.py b/tardis/adapters/sites/fakesite.py index 656e3844..81a76384 100644 --- a/tardis/adapters/sites/fakesite.py +++ b/tardis/adapters/sites/fakesite.py @@ -23,9 +23,6 @@ def __init__(self, machine_type: str, site_name: str) -> None: key_translator = StaticMapping( remote_resource_uuid="remote_resource_uuid", resource_status="resource_status", - created="created", - updated="updated", - resource_boot_time="resource_boot_time", ) self.handle_response = partial( @@ -34,72 +31,43 @@ def __init__(self, machine_type: str, site_name: str) -> None: translator_functions=StaticMapping(), ) - self._stopped_n_terminated_resources = {} - async def deploy_resource( self, resource_attributes: AttributeDict ) -> AttributeDict: await asyncio.sleep(self._api_response_delay.get_value()) - now = datetime.now() response = AttributeDict( remote_resource_uuid=uuid4().hex, resource_status=ResourceStatus.Booting, - created=now, - updated=now, - resource_boot_time=self._resource_boot_time.get_value(), ) return self.handle_response(response) - def get_resource_boot_time(self, resource_attributes: AttributeDict) -> float: - try: - return resource_attributes.resource_boot_time - except AttributeError: - # In case tardis is restarted, resource_boot_time is not set, so re-set - resource_boot_time = resource_attributes[ - "resource_boot_time" - ] = self._resource_boot_time.get_value() - return resource_boot_time - async def resource_status( self, resource_attributes: AttributeDict ) -> AttributeDict: await asyncio.sleep(self._api_response_delay.get_value()) - try: # check if resource has been stopped or terminated - resource_status = self._stopped_n_terminated_resources[ - resource_attributes.drone_uuid - ] - except KeyError: - pass - else: - return self.handle_response(AttributeDict(resource_status=resource_status)) - created_time = resource_attributes.created - resource_boot_time = self.get_resource_boot_time(resource_attributes) - # check if resource is already running - if (datetime.now() - created_time) > timedelta(seconds=resource_boot_time): + resource_boot_time = self._resource_boot_time.get_value() + # check if resource should already run + if (datetime.now() - created_time) > timedelta( + seconds=resource_boot_time + ) and resource_attributes.resource_status is ResourceStatus.Booting: return self.handle_response( AttributeDict(resource_status=ResourceStatus.Running) ) - return self.handle_response(resource_attributes) + return AttributeDict() # do not change anything - async def stop_resource(self, resource_attributes: AttributeDict): + async def stop_resource(self, resource_attributes: AttributeDict) -> None: await asyncio.sleep(self._api_response_delay.get_value()) - self._stopped_n_terminated_resources[ - resource_attributes.drone_uuid - ] = ResourceStatus.Stopped - return self.handle_response( - AttributeDict(resource_status=ResourceStatus.Stopped) - ) + # update resource status manually to ResourceStatus.Stopped, so that + # the life cycle comes to an end. + resource_attributes.resource_status = ResourceStatus.Stopped - async def terminate_resource(self, resource_attributes: AttributeDict): + async def terminate_resource(self, resource_attributes: AttributeDict) -> None: await asyncio.sleep(self._api_response_delay.get_value()) - self._stopped_n_terminated_resources[ - resource_attributes.drone_uuid - ] = ResourceStatus.Deleted - return self.handle_response( - AttributeDict(resource_status=ResourceStatus.Deleted) - ) + # update resource status manually to ResourceStatus.Deleted, so that + # the life cycle is ended. + resource_attributes.resource_status = ResourceStatus.Deleted @contextmanager def handle_exceptions(self) -> None: diff --git a/tests/adapters_t/sites_t/test_fakesite.py b/tests/adapters_t/sites_t/test_fakesite.py index 55bb489d..5b6d21e8 100644 --- a/tests/adapters_t/sites_t/test_fakesite.py +++ b/tests/adapters_t/sites_t/test_fakesite.py @@ -44,8 +44,6 @@ def machine_type_configuration(self): def test_deploy_resource(self): response = run_async(self.adapter.deploy_resource, AttributeDict()) self.assertEqual(response.resource_status, ResourceStatus.Booting) - self.assertFalse(response.created - datetime.now() > timedelta(seconds=1)) - self.assertFalse(response.updated - datetime.now() > timedelta(seconds=1)) def test_machine_meta_data(self): self.assertEqual( @@ -59,52 +57,80 @@ def test_site_name(self): self.assertEqual(self.adapter.site_name, "TestSite") def test_resource_status(self): - # test tardis restart, where resource_boot_time is not set - response = run_async( - self.adapter.resource_status, - AttributeDict( - created=datetime.now(), - resource_status=ResourceStatus.Booting, - drone_uuid="test-123", + # Booting resource, where boot time is not yet over + resource_attributes = AttributeDict( + created=datetime.now(), + resource_status=ResourceStatus.Booting, + drone_uuid="test-123", + ) + self.assertEqual( + AttributeDict(), # no update require, resource still in Booting state + run_async( + self.adapter.resource_status, + resource_attributes, ), ) - self.assertEqual(response.resource_status, ResourceStatus.Booting) - deploy_response = run_async(self.adapter.deploy_resource, AttributeDict()) - deploy_response.update(AttributeDict(drone_uuid="test-123")) - response = run_async(self.adapter.resource_status, deploy_response) - self.assertEqual(response.resource_status, ResourceStatus.Booting) + # Booting resource, where boot time is already over + created_timestamp = datetime.now() - timedelta(seconds=100) + resource_attributes.update(AttributeDict(created=created_timestamp)) - past_timestamp = datetime.now() - timedelta(seconds=100) - deploy_response.update( - AttributeDict(created=past_timestamp, drone_uuid="test-123") + self.assertDictEqual( + AttributeDict(resource_status=ResourceStatus.Running), + run_async(self.adapter.resource_status, resource_attributes), ) - response = run_async(self.adapter.resource_status, deploy_response) - self.assertEqual(response.resource_status, ResourceStatus.Running) - # test stopped resources - response.update(AttributeDict(drone_uuid="test-123")) - response = run_async(self.adapter.stop_resource, response) - self.assertEqual(response.resource_status, ResourceStatus.Stopped) + # test resource status of stop resources + resource_attributes.update( + AttributeDict(resource_status=ResourceStatus.Stopped) + ) + self.assertEqual( + AttributeDict(), + run_async(self.adapter.resource_status, resource_attributes), + ) + self.assertDictEqual( + AttributeDict( + created=created_timestamp, + resource_status=ResourceStatus.Stopped, + drone_uuid="test-123", + ), + resource_attributes, + ) - # test terminated resources - response.update(AttributeDict(drone_uuid="test-123")) - response = run_async(self.adapter.terminate_resource, response) - self.assertEqual(response.resource_status, ResourceStatus.Deleted) + # test resource status of terminated resources + resource_attributes.update( + AttributeDict(resource_status=ResourceStatus.Deleted) + ) + self.assertEqual( + AttributeDict(), + run_async(self.adapter.resource_status, resource_attributes), + ) + self.assertDictEqual( + AttributeDict( + created=created_timestamp, + resource_status=ResourceStatus.Deleted, + drone_uuid="test-123", + ), + resource_attributes, + ) def test_stop_resource(self): - deploy_response = run_async(self.adapter.deploy_resource, AttributeDict()) - deploy_response.update(AttributeDict(drone_uuid="test-123")) - run_async(self.adapter.stop_resource, deploy_response) - response = run_async(self.adapter.resource_status, deploy_response) - self.assertEqual(response.resource_status, ResourceStatus.Stopped) + resource_attributes = AttributeDict( + drone_uuid="test-123", + created=datetime.now(), + resource_status=ResourceStatus.Running, + ) + run_async(self.adapter.stop_resource, resource_attributes) + self.assertEqual(resource_attributes.resource_status, ResourceStatus.Stopped) def test_terminate_resource(self): - deploy_response = run_async(self.adapter.deploy_resource, AttributeDict()) - deploy_response.update(AttributeDict(drone_uuid="test-123")) - run_async(self.adapter.terminate_resource, deploy_response) - response = run_async(self.adapter.resource_status, deploy_response) - self.assertEqual(response.resource_status, ResourceStatus.Deleted) + resource_attributes = AttributeDict( + drone_uuid="test-123", + created=datetime.now(), + resource_status=ResourceStatus.Running, + ) + run_async(self.adapter.terminate_resource, resource_attributes) + self.assertEqual(resource_attributes.resource_status, ResourceStatus.Deleted) def test_exception_handling(self): def test_exception_handling(raise_it, catch_it): From 233374db0f0ad3b73eacda3da4a716dfe43cc0c1 Mon Sep 17 00:00:00 2001 From: Manuel Giffels Date: Fri, 4 Aug 2023 09:40:16 +0200 Subject: [PATCH 13/20] Update git settings to ignore .vscode --- .gitignore | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/.gitignore b/.gitignore index 8b76bbe2..d1c9b658 100644 --- a/.gitignore +++ b/.gitignore @@ -116,8 +116,11 @@ test_scripts/ cobald*.yml *tardis.yml -#Ignore cloudinit files +# Ignore cloudinit files cloudinit/ -#Ignore logs -*.log* +# Ignore logs +*.log + +# Igonre vscode +.vscode* From 75d9fa9e87f9afdf1e82f02267c53c0482b0b2e2 Mon Sep 17 00:00:00 2001 From: Manuel Giffels Date: Fri, 4 Aug 2023 09:49:49 +0200 Subject: [PATCH 14/20] Update the SiteAdapter interface, that stop and terminate resources do return None --- tardis/interfaces/siteadapter.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/tardis/interfaces/siteadapter.py b/tardis/interfaces/siteadapter.py index 27fb0eba..351a9e6f 100644 --- a/tardis/interfaces/siteadapter.py +++ b/tardis/interfaces/siteadapter.py @@ -273,27 +273,25 @@ class variable. ) from ae @abstractmethod - async def stop_resource(self, resource_attributes: AttributeDict): + async def stop_resource(self, resource_attributes: AttributeDict) -> None: """ Abstract method to define the interface to stop resources at a resource provider. :param resource_attributes: Contains describing attributes of the resource, defined in the :py:class:`~tardis.resources.drone.Drone` implementation! :type resource_attributes: AttributeDict - :return: Contains updated describing attributes of the resource. - :rtype: AttributeDict + :return: None """ raise NotImplementedError @abstractmethod - async def terminate_resource(self, resource_attributes: AttributeDict): + async def terminate_resource(self, resource_attributes: AttributeDict) -> None: """ Abstract method to define the interface to terminate resources at a resource provider. :param resource_attributes: Contains describing attributes of the resource, defined in the :py:class:`~tardis.resources.drone.Drone` implementation! :type resource_attributes: AttributeDict - :return: Contains updated describing attributes of the resource. - :rtype: AttributeDict + :return: None """ raise NotImplementedError From d0a0fcbff7ef3e476fb41d3c44866a34340c45ae Mon Sep 17 00:00:00 2001 From: Manuel Giffels Date: Tue, 8 Aug 2023 18:07:49 +0200 Subject: [PATCH 15/20] Use AsyncBulkCall to update job states in HTCondor site adapter --- tardis/adapters/sites/htcondor.py | 70 ++++++++----------- .../sites_t/test_htcondorsiteadapter.py | 46 +++++------- 2 files changed, 46 insertions(+), 70 deletions(-) diff --git a/tardis/adapters/sites/htcondor.py b/tardis/adapters/sites/htcondor.py index 9a737109..4e1e6f41 100644 --- a/tardis/adapters/sites/htcondor.py +++ b/tardis/adapters/sites/htcondor.py @@ -1,11 +1,10 @@ -from typing import Iterable, Tuple, Awaitable +from typing import Iterable, Tuple, Awaitable, Mapping from ...exceptions.executorexceptions import CommandExecutionFailure from ...exceptions.tardisexceptions import TardisError from ...exceptions.tardisexceptions import TardisResourceStatusUpdateFailed from ...interfaces.siteadapter import SiteAdapter from ...interfaces.siteadapter import ResourceStatus from ...interfaces.executor import Executor -from ...utilities.asynccachemap import AsyncCacheMap from ...utilities.attributedict import AttributeDict from ...utilities.staticmapping import StaticMapping from ...utilities.executors.shellexecutor import ShellExecutor @@ -35,18 +34,23 @@ def _job_id(resource_uuid: str) -> str: return resource_uuid if "." in resource_uuid else f"{resource_uuid}.0" -async def htcondor_queue_updater(executor): - attributes = dict( - Owner="Owner", JobStatus="JobStatus", ClusterId="ClusterId", ProcId="ProcId" - ) +async def condor_q( + *resource_attributes: Tuple[AttributeDict, ...], executor: Executor +) -> Iterable[Mapping]: + attributes = dict(JobStatus="JobStatus", ClusterId="ClusterId", ProcId="ProcId") attributes_string = " ".join(attributes.values()) - queue_command = f"condor_q -af:t {attributes_string}" + + remote_resource_ids = " ".join( + _job_id(resource.remote_resource_uuid) for resource in resource_attributes + ) + + queue_command = f"condor_q {remote_resource_ids} -af:t {attributes_string}" htcondor_queue = {} try: condor_queue = await executor.run_command(queue_command) except CommandExecutionFailure as cf: - logger.warning(f"htcondor_queue_update failed: {cf}") + logger.warning(f"{queue_command} failed with: {cf}") raise else: for row in csv_parser( @@ -57,7 +61,18 @@ async def htcondor_queue_updater(executor): ): row["JobId"] = f"{row['ClusterId']}.{row['ProcId']}" htcondor_queue[row["JobId"]] = row - return htcondor_queue + + return ( + htcondor_queue.get( + _job_id(resource.remote_resource_uuid), + # assume that jobs that do not show up (anymore) in condor_q have + # JobStatus 4 (Deleted) + { + "JobStatus": "4", + }, + ) + for resource in resource_attributes + ) JDL = str @@ -208,6 +223,11 @@ def __init__( ) for tool in (condor_submit, condor_suspend, condor_rm) ) + self._condor_q = AsyncBulkCall( + partial(condor_q, executor=self._executor), + size=bulk_size, + delay=bulk_delay, + ) key_translator = StaticMapping( remote_resource_uuid="JobId", @@ -229,11 +249,6 @@ def __init__( translator_functions=translator_functions, ) - self._htcondor_queue = AsyncCacheMap( - update_coroutine=partial(htcondor_queue_updater, self._executor), - max_age=self.configuration.max_age * 60, - ) - async def deploy_resource( self, resource_attributes: AttributeDict ) -> AttributeDict: @@ -267,32 +282,7 @@ async def deploy_resource( async def resource_status( self, resource_attributes: AttributeDict ) -> AttributeDict: - await self._htcondor_queue.update_status() - try: - resource_uuid = _job_id(resource_attributes.remote_resource_uuid) - resource_status = self._htcondor_queue[resource_uuid] - except KeyError: - # In case the created timestamp is after last update timestamp of the - # asynccachemap plus a grace period of max(10, bulk_delay) seconds, no - # decision about the current state can be given, since map is updated - # asynchronously. - bulk_delay = getattr(self.configuration, "bulk_delay", 1) - if ( - self._htcondor_queue.last_update - resource_attributes.created - ).total_seconds() < max(bulk_delay, 10): - logger.debug( - "Time difference between drone creation and last_update of" - f"htcondor_queue is less then {max(bulk_delay, 10)} s." - ) - raise TardisResourceStatusUpdateFailed from None - else: - logger.debug( - f"Cannot find {resource_uuid} in htcondor_queue assuming" - "drone is already deleted." - ) - return AttributeDict(resource_status=ResourceStatus.Deleted) - else: - return self.handle_response(resource_status) + return self.handle_response(await self._condor_q(resource_attributes)) async def stop_resource(self, resource_attributes: AttributeDict) -> None: """ diff --git a/tests/adapters_t/sites_t/test_htcondorsiteadapter.py b/tests/adapters_t/sites_t/test_htcondorsiteadapter.py index cc2fb1ce..328065b5 100644 --- a/tests/adapters_t/sites_t/test_htcondorsiteadapter.py +++ b/tests/adapters_t/sites_t/test_htcondorsiteadapter.py @@ -7,8 +7,6 @@ from tests.utilities.utilities import mock_executor_run_command from tests.utilities.utilities import run_async -from datetime import datetime -from datetime import timedelta from unittest import TestCase from unittest.mock import patch @@ -38,13 +36,14 @@ RequestMemory = 32768 """ -CONDOR_Q_OUTPUT_IDLE = "test\t1\t1351043\t0" -CONDOR_Q_OUTPUT_RUN = "test\t2\t1351043\t0" -CONDOR_Q_OUTPUT_REMOVING = "test\t3\t1351043\t0" -CONDOR_Q_OUTPUT_COMPLETED = "test\t4\t1351043\t0" -CONDOR_Q_OUTPUT_HELD = "test\t5\t1351043\t0" -CONDOR_Q_OUTPUT_TRANSFERING_OUTPUT = "test\t6\t1351043\t0" -CONDOR_Q_OUTPUT_SUSPENDED = "test\t7\t1351043\t0" +CONDOR_Q_OUTPUT_IDLE = "1\t1351043\t0" +CONDOR_Q_OUTPUT_RUN = "2\t1351043\t0" +CONDOR_Q_OUTPUT_REMOVING = "3\t1351043\t0" +CONDOR_Q_OUTPUT_COMPLETED = "4\t1351043\t0" +CONDOR_Q_OUTPUT_HELD = "5\t1351043\t0" +CONDOR_Q_OUTPUT_TRANSFERING_OUTPUT = "6\t1351043\t0" +CONDOR_Q_OUTPUT_SUSPENDED = "7\t1351043\t0" +CONDOR_Q_OUTPUT_DOES_NOT_EXISTS = "1\t1351042\t0" CONDOR_RM_OUTPUT = "Job 1351043.0 marked for removal" CONDOR_RM_FAILED_OUTPUT = "Job 1351043.0 not found" @@ -323,35 +322,22 @@ def test_resource_status_unexpanded(self): message="Failed", stdout="Failed", stderr="Failed", exit_code=2 ), ) - def test_resource_status_raise_future(self): - future_timestamp = datetime.now() + timedelta(minutes=1) + def test_resource_status_command_execution_error(self): with self.assertLogs(level=logging.WARNING): - with self.assertRaises(TardisResourceStatusUpdateFailed): + with self.assertRaises(CommandExecutionFailure): run_async( self.adapter.resource_status, AttributeDict( - remote_resource_uuid="1351043.0", created=future_timestamp + remote_resource_uuid="1351043.0", ), ) - @mock_executor_run_command( - stdout="", - raise_exception=CommandExecutionFailure( - message="Failed", stdout="Failed", stderr="Failed", exit_code=2 - ), - ) - def test_resource_status_raise_past(self): - # Update interval is 10 minutes, so set last update back by 11 minutes - # in order to execute condor_q command and creation date to 12 minutes ago - past_timestamp = datetime.now() - timedelta(minutes=12) - self.adapter._htcondor_queue._last_update = datetime.now() - timedelta( - minutes=11 + @mock_executor_run_command(stdout=CONDOR_Q_OUTPUT_DOES_NOT_EXISTS) + def test_resource_status_already_deleted(self): + response = run_async( + self.adapter.resource_status, + AttributeDict(remote_resource_uuid="1351043.0"), ) - with self.assertLogs(level=logging.WARNING): - response = run_async( - self.adapter.resource_status, - AttributeDict(remote_resource_uuid="1351043.0", created=past_timestamp), - ) self.assertEqual(response.resource_status, ResourceStatus.Deleted) @mock_executor_run_command(stdout=CONDOR_SUSPEND_OUTPUT) From 66624c27e4d71415fdf3cd3ee40a7d479ed36d7c Mon Sep 17 00:00:00 2001 From: Manuel Giffels Date: Wed, 9 Aug 2023 15:34:31 +0200 Subject: [PATCH 16/20] Use AsyncBulkCall to update job states in Slurm site adapter --- tardis/adapters/sites/slurm.py | 72 ++++++++++++-------------- tests/adapters_t/sites_t/test_slurm.py | 68 ++++++------------------ 2 files changed, 49 insertions(+), 91 deletions(-) diff --git a/tardis/adapters/sites/slurm.py b/tardis/adapters/sites/slurm.py index 786a7d78..ff6ed652 100644 --- a/tardis/adapters/sites/slurm.py +++ b/tardis/adapters/sites/slurm.py @@ -2,12 +2,13 @@ from ...exceptions.tardisexceptions import TardisError from ...exceptions.tardisexceptions import TardisTimeout from ...exceptions.tardisexceptions import TardisResourceStatusUpdateFailed +from ...interfaces.executor import Executor from ...interfaces.siteadapter import ResourceStatus from ...interfaces.siteadapter import SiteAdapter from ...utilities.staticmapping import StaticMapping +from ...utilities.asyncbulkcall import AsyncBulkCall from ...utilities.attributedict import AttributeDict from ...utilities.executors.shellexecutor import ShellExecutor -from ...utilities.asynccachemap import AsyncCacheMap from ...utilities.utils import ( convert_to, drone_environment_to_str, @@ -18,6 +19,7 @@ from asyncio import TimeoutError from contextlib import contextmanager from functools import partial +from typing import Iterable, Mapping, Tuple import logging import re @@ -26,10 +28,17 @@ logger = logging.getLogger("cobald.runtime.tardis.adapters.sites.slurm") -async def slurm_status_updater(executor): +async def squeue( + *resource_attributes: Tuple[AttributeDict, ...], executor: Executor +) -> Iterable[Mapping]: attributes = dict(JobId="%A", Host="%N", State="%T") attributes_string = "|".join(attributes.values()) - cmd = f'squeue -o "{attributes_string}" -h -t all' + + remote_resource_ids = ",".join( + str(resource.remote_resource_uuid) for resource in resource_attributes + ) + + cmd = f'squeue -o "{attributes_string}" -h -t all --job={remote_resource_ids}' slurm_resource_status = {} logger.debug("Slurm status update is started.") @@ -45,7 +54,18 @@ async def slurm_status_updater(executor): row["State"] = row["State"].strip() slurm_resource_status[row["JobId"]] = row logger.debug("Slurm status update finished.") - return slurm_resource_status + + return ( + slurm_resource_status.get( + str(resource.remote_resource_uuid), + # assume that jobs that do not show up (anymore) in squeue have + # State COMPLETED (Deleted) + { + "State": "COMPLETED", + }, + ) + for resource in resource_attributes + ) class SlurmAdapter(SiteAdapter): @@ -67,11 +87,6 @@ def __init__(self, machine_type: str, site_name: str): self._executor = getattr(self.configuration, "executor", ShellExecutor()) - self._slurm_status = AsyncCacheMap( - update_coroutine=partial(slurm_status_updater, self._executor), - max_age=self.configuration.StatusUpdate * 60, - ) - key_translator = StaticMapping( remote_resource_uuid="JobId", resource_status="State" ) @@ -107,6 +122,15 @@ def __init__(self, machine_type: str, site_name: str): translator_functions=translator_functions, ) + bulk_size = getattr(self.configuration, "bulk_size", 100) + bulk_delay = getattr(self.configuration, "bulk_delay", 1.0) + + self._squeue = AsyncBulkCall( + partial(squeue, executor=self._executor), + size=bulk_size, + delay=bulk_delay, + ) + async def deploy_resource( self, resource_attributes: AttributeDict ) -> AttributeDict: @@ -134,35 +158,7 @@ async def deploy_resource( async def resource_status( self, resource_attributes: AttributeDict ) -> AttributeDict: - await self._slurm_status.update_status() - try: - resource_uuid = resource_attributes.remote_resource_uuid - resource_status = self._slurm_status[str(resource_uuid)] - except KeyError: - if ( - self._slurm_status.last_update - resource_attributes.created - ).total_seconds() < 10: - # In case the created timestamp is after last update timestamp of the - # asynccachemap plus 10 s grace period, no decision about the current - # state can be given, since map is updated asynchronously. - # Just retry later on. - logger.debug( - "Time difference between drone creation and last_update of" - "slurm_status is less then 10 s." - ) - raise TardisResourceStatusUpdateFailed from None - else: - logger.debug( - f"Cannot find {resource_uuid} in slurm_status assuming" - "drone is already deleted." - ) - resource_status = { - "JobID": resource_attributes.remote_resource_uuid, - "State": "COMPLETED", - } - logger.debug(f"{self.site_name} has status {resource_status}.") - - return self.handle_response(resource_status) + return self.handle_response(await self._squeue(resource_attributes)) async def terminate_resource(self, resource_attributes: AttributeDict) -> None: request_command = f"scancel {resource_attributes.remote_resource_uuid}" diff --git a/tests/adapters_t/sites_t/test_slurm.py b/tests/adapters_t/sites_t/test_slurm.py index 9aee9fbd..3e391452 100644 --- a/tests/adapters_t/sites_t/test_slurm.py +++ b/tests/adapters_t/sites_t/test_slurm.py @@ -6,12 +6,10 @@ from tardis.interfaces.siteadapter import ResourceStatus from tardis.utilities.attributedict import AttributeDict from tests.utilities.utilities import mock_executor_run_command, run_async -from tests.utilities.utilities import run_async from unittest import TestCase from unittest.mock import MagicMock, patch -from datetime import datetime, timedelta from warnings import filterwarnings import asyncio @@ -95,6 +93,7 @@ def setUp(self): self.test_site_config.StatusUpdate = 10 self.test_site_config.MachineTypeConfiguration = self.machine_type_configuration self.test_site_config.executor = self.mock_executor.return_value + self.test_site_config.bulk_delay = 0.01 self.slurm_adapter = SlurmAdapter( machine_type="test2large", site_name="TestSite" @@ -236,7 +235,7 @@ def test_resource_status(self): ) self.mock_executor.return_value.run_command.assert_called_with( - 'squeue -o "%A|%N|%T" -h -t all' + 'squeue -o "%A|%N|%T" -h -t all --job=1390065' ) @mock_executor_run_command(TEST_RESOURCE_STATUS_RESPONSE_RUNNING) @@ -256,7 +255,7 @@ def test_update_resource_status(self): ) self.mock_executor.return_value.run_command.assert_called_with( - 'squeue -o "%A|%N|%T" -h -t all' + 'squeue -o "%A|%N|%T" -h -t all --job=1390065' ) @mock_executor_run_command(TEST_RESOURCE_STATUS_RESPONSE_ALL_STATES) @@ -296,54 +295,28 @@ def test_resource_state_translation(self): ) self.assertEqual(returned_resource_attributes.resource_status, value) - self.mock_executor.return_value.run_command.called_once() + self.mock_executor.return_value.run_command.called_once() - self.mock_executor.return_value.run_command.assert_called_with( - 'squeue -o "%A|%N|%T" -h -t all' - ) - - def test_resource_status_raise_update_failed(self): - # Update interval is 10 minutes, so turn back last update by 2 minutes - # and creation date to current date, so that - # TardisResourceStatusUpdateFailed is raised - created_timestamp = datetime.now() - new_timestamp = datetime.now() - timedelta(minutes=2) - - self.slurm_adapter._slurm_status._last_update = new_timestamp - - with self.assertRaises(TardisResourceStatusUpdateFailed): - run_async( - self.slurm_adapter.resource_status, - AttributeDict( - remote_resource_uuid=1351043, - resource_state=ResourceStatus.Booting, - created=created_timestamp, - ), + self.mock_executor.return_value.run_command.assert_called_with( + f'squeue -o "%A|%N|%T" -h -t all --job={job_id}' ) + self.mock_executor.reset_mock() + @mock_executor_run_command("") def test_resource_status_of_completed_jobs(self): - # Update interval is 10 minutes, so turn back last update by 11 minutes - # and creation date to 12 minutes ago. => squeue should be executed - # The empty string returned by squeue represents a resource is already - # gone. So, ResourceStatus returned should be Deleted. - past_timestamp = datetime.now() - timedelta(minutes=12) - new_timestamp = datetime.now() - timedelta(minutes=11) - self.slurm_adapter._slurm_status._last_update = new_timestamp - response = run_async( self.slurm_adapter.resource_status, AttributeDict( resource_id="1390065", remote_resource_uuid="1351043", - created=past_timestamp, ), ) self.assertEqual(response.resource_status, ResourceStatus.Deleted) self.mock_executor.return_value.run_command.assert_called_with( - 'squeue -o "%A|%N|%T" -h -t all' + 'squeue -o "%A|%N|%T" -h -t all --job=1351043' ) @mock_executor_run_command( @@ -353,26 +326,15 @@ def test_resource_status_of_completed_jobs(self): ), ) def test_resource_status_update_failed(self): - # set previous data, should be returned when update fails - self.slurm_adapter._slurm_status._data = { - "1390065": {"JobId": "1390065", "Host": "fh2n1552", "State": "RUNNING"} - } - with self.assertLogs(level=logging.WARNING): - response = run_async( - self.slurm_adapter.resource_status, - AttributeDict(remote_resource_uuid="1390065"), - ) - - self.assertDictEqual( - AttributeDict( - remote_resource_uuid=1390065, resource_status=ResourceStatus.Running - ), - response, - ) + with self.assertRaises(CommandExecutionFailure): + run_async( + self.slurm_adapter.resource_status, + AttributeDict(remote_resource_uuid="1390065"), + ) self.mock_executor.return_value.run_command.assert_called_with( - 'squeue -o "%A|%N|%T" -h -t all' + 'squeue -o "%A|%N|%T" -h -t all --job=1390065' ) @mock_executor_run_command(stdout="", stderr="", exit_code=0) From 3bfd2ca1f0e066f0e3f0f033d3b940eef1526171 Mon Sep 17 00:00:00 2001 From: Manuel Giffels Date: Wed, 9 Aug 2023 18:54:41 +0200 Subject: [PATCH 17/20] Use AsyncBulkCall to update job states in Moab site adapter --- tardis/adapters/sites/moab.py | 63 ++++++++++++--------------- tests/adapters_t/sites_t/test_moab.py | 28 +++++------- 2 files changed, 40 insertions(+), 51 deletions(-) diff --git a/tardis/adapters/sites/moab.py b/tardis/adapters/sites/moab.py index 0cad11c4..b9184238 100644 --- a/tardis/adapters/sites/moab.py +++ b/tardis/adapters/sites/moab.py @@ -2,12 +2,13 @@ from ...exceptions.tardisexceptions import TardisError from ...exceptions.tardisexceptions import TardisTimeout from ...exceptions.tardisexceptions import TardisResourceStatusUpdateFailed +from ...interfaces.executor import Executor from ...interfaces.siteadapter import ResourceStatus from ...interfaces.siteadapter import SiteAdapter from ...utilities.staticmapping import StaticMapping +from ...utilities.asyncbulkcall import AsyncBulkCall from ...utilities.attributedict import AttributeDict from ...utilities.executors.shellexecutor import ShellExecutor -from ...utilities.asynccachemap import AsyncCacheMap from ...utilities.utils import ( convert_to, drone_environment_to_str, @@ -17,6 +18,7 @@ from asyncio import TimeoutError from contextlib import contextmanager from functools import partial +from typing import Iterable, Mapping, Tuple import asyncssh import logging @@ -26,7 +28,9 @@ logger = logging.getLogger("cobald.runtime.tardis.adapters.sites.moab") -async def moab_status_updater(executor): +async def showq( + *resource_attributes: Tuple[AttributeDict, ...], executor: Executor +) -> Iterable[Mapping]: cmd = "showq --xml -w user=$(whoami) && showq -c --xml -w user=$(whoami)" logger.debug("Moab status update is running.") response = await executor.run_command(cmd) @@ -45,7 +49,18 @@ async def moab_status_updater(executor): "State": line.attributes["State"].value, } logger.debug("Moab status update completed") - return moab_resource_status + + return ( + moab_resource_status.get( + str(resource.remote_resource_uuid), + # assume that jobs that do not show up (anymore) in squeue have + # State Completed (Deleted) + { + "State": "Completed", + }, + ) + for resource in resource_attributes + ) class MoabAdapter(SiteAdapter): @@ -67,10 +82,6 @@ def __init__(self, machine_type: str, site_name: str): self._executor = getattr(self.configuration, "executor", ShellExecutor()) - self._moab_status = AsyncCacheMap( - update_coroutine=partial(moab_status_updater, self._executor), - max_age=self.configuration.StatusUpdate * 60, - ) key_translator = StaticMapping( remote_resource_uuid="JobID", resource_status="State" ) @@ -118,6 +129,15 @@ def __init__(self, machine_type: str, site_name: str): translator_functions=translator_functions, ) + bulk_size = getattr(self.configuration, "bulk_size", 100) + bulk_delay = getattr(self.configuration, "bulk_delay", 1.0) + + self._showq = AsyncBulkCall( + partial(showq, executor=self._executor), + size=bulk_size, + delay=bulk_delay, + ) + async def deploy_resource( self, resource_attributes: AttributeDict ) -> AttributeDict: @@ -139,34 +159,7 @@ async def deploy_resource( async def resource_status( self, resource_attributes: AttributeDict ) -> AttributeDict: - await self._moab_status.update_status() - # In case the created timestamp is after last update timestamp of the - # asynccachemap plus 10 s grace period, no decision about the current - # state can be given, since map is updated asynchronously. - try: - resource_uuid = resource_attributes.remote_resource_uuid - resource_status = self._moab_status[str(resource_uuid)] - except KeyError as err: - if ( - self._moab_status._last_update - resource_attributes.created - ).total_seconds() < 10: - logger.debug( - "Time difference between drone creation and last_update of" - "moab_status is less then 10 s." - ) - raise TardisResourceStatusUpdateFailed from err - else: - logger.debug( - f"Cannot find {resource_uuid} in moab_status assuming" - "drone is already deleted." - ) - resource_status = { - "JobID": resource_attributes.remote_resource_uuid, - "State": "Completed", - } - logger.debug(f"{self.site_name} has status {resource_status}.") - - return self.handle_response(resource_status) + return self.handle_response(await self._showq(resource_attributes)) async def terminate_resource(self, resource_attributes: AttributeDict) -> None: request_command = f"canceljob {resource_attributes.remote_resource_uuid}" diff --git a/tests/adapters_t/sites_t/test_moab.py b/tests/adapters_t/sites_t/test_moab.py index 7c46f80b..c948cc42 100644 --- a/tests/adapters_t/sites_t/test_moab.py +++ b/tests/adapters_t/sites_t/test_moab.py @@ -11,7 +11,7 @@ from unittest import TestCase from unittest.mock import MagicMock, patch -from datetime import datetime, timedelta +from datetime import datetime from warnings import filterwarnings import asyncio @@ -167,6 +167,7 @@ def setUp(self): self.test_site_config.StatusUpdate = 10 self.test_site_config.MachineTypeConfiguration = self.machine_type_configuration self.test_site_config.executor = self.mock_executor.return_value + self.test_site_config.bulk_delay = 0.01 self.moab_adapter = MoabAdapter(machine_type="test2large", site_name="TestSite") @@ -362,35 +363,30 @@ def test_terminate_resource_error(self): resource_attributes=self.resource_attributes, ) - def test_resource_status_raise(self): - # Update interval is 10 minutes, so set last update back by 2 minutes in - # order to execute sacct command and creation date to current date - created_timestamp = datetime.now() - new_timestamp = datetime.now() - timedelta(minutes=2) - self.moab_adapter._moab_status._last_update = new_timestamp - with self.assertRaises(TardisResourceStatusUpdateFailed): + @mock_executor_run_command( + stdout="", + raise_exception=CommandExecutionFailure( + message="Failed", stdout="Failed", stderr="Failed", exit_code=2 + ), + ) + def test_resource_status_update_failed(self): + with self.assertRaises(CommandExecutionFailure): run_async( self.moab_adapter.resource_status, AttributeDict( resource_id=1351043, remote_resource_uuid=1351043, resource_state=ResourceStatus.Booting, - created=created_timestamp, ), ) - def test_resource_status_raise_past(self): - # Update interval is 10 minutes, so set last update back by 11 minutes - # in order to execute sacct command and creation date to 12 minutes ago - creation_timestamp = datetime.now() - timedelta(minutes=12) - last_update_timestamp = datetime.now() - timedelta(minutes=11) - self.moab_adapter._moab_status._last_update = last_update_timestamp + @mock_executor_run_command(TEST_RESOURCE_STATUS_RESPONSE_RUNNING) + def test_resource_status_of_completed_jobs(self): response = run_async( self.moab_adapter.resource_status, AttributeDict( resource_id=1390065, remote_resource_uuid=1351043, - created=creation_timestamp, ), ) self.assertEqual(response.resource_status, ResourceStatus.Deleted) From 360b6b927deba8752d979c0bebfea1509b6f02bc Mon Sep 17 00:00:00 2001 From: Manuel Giffels Date: Wed, 9 Aug 2023 18:58:14 +0200 Subject: [PATCH 18/20] Speed up HTCondor site adapter's unittest --- tests/adapters_t/sites_t/test_htcondorsiteadapter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/adapters_t/sites_t/test_htcondorsiteadapter.py b/tests/adapters_t/sites_t/test_htcondorsiteadapter.py index 328065b5..eeda2062 100644 --- a/tests/adapters_t/sites_t/test_htcondorsiteadapter.py +++ b/tests/adapters_t/sites_t/test_htcondorsiteadapter.py @@ -127,7 +127,7 @@ def setUp(self): test_site_config.MachineTypeConfiguration = self.machine_type_configuration test_site_config.executor = self.mock_executor.return_value test_site_config.bulk_size = 100 - test_site_config.bulk_delay = 0.1 + test_site_config.bulk_delay = 0.01 test_site_config.max_age = 10 self.adapter = HTCondorAdapter(machine_type="test2large", site_name="TestSite") From 76994969486f7dca7c4c3aea2f3d910cf43e4faa Mon Sep 17 00:00:00 2001 From: Manuel Giffels Date: Thu, 10 Aug 2023 17:02:48 +0200 Subject: [PATCH 19/20] Fix issue with restarting a FakeSite --- tardis/adapters/sites/fakesite.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/tardis/adapters/sites/fakesite.py b/tardis/adapters/sites/fakesite.py index 81a76384..7cbff619 100644 --- a/tardis/adapters/sites/fakesite.py +++ b/tardis/adapters/sites/fakesite.py @@ -51,7 +51,13 @@ async def resource_status( # check if resource should already run if (datetime.now() - created_time) > timedelta( seconds=resource_boot_time - ) and resource_attributes.resource_status is ResourceStatus.Booting: + ) and resource_attributes.get( + "resource_status", + ResourceStatus.Booting + # When cobald is restarted, "resource_status" is not set. Since this is a + # FakeAdapter, when can safely start the cycle again by assuming + # ResourceStatus.Booting and let TARDIS manage the drone's life cycle + ) is ResourceStatus.Booting: return self.handle_response( AttributeDict(resource_status=ResourceStatus.Running) ) From 58fb461d81267cca4f82f851e8c824143b05aeec Mon Sep 17 00:00:00 2001 From: Manuel Giffels Date: Wed, 4 Oct 2023 08:38:23 +0200 Subject: [PATCH 20/20] Adjust documentation to new bulk handling --- docs/source/adapters/site.rst | 18 ++++++++++++++---- docs/source/changelog.rst | 4 ++-- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/docs/source/adapters/site.rst b/docs/source/adapters/site.rst index 438b69c3..f1e61802 100644 --- a/docs/source/adapters/site.rst +++ b/docs/source/adapters/site.rst @@ -316,7 +316,13 @@ Available adapter configuration options +----------------+------------------------------------------------------------------------------------------------+-----------------+ | Option | Short Description | Requirement | +================+================================================================================================+=================+ - | StatusUpdate | The result of the status call is cached for `StatusUpdate` in minutes. | **Required** | + | bulk_size | Maximum number of jobs to handle per bulk invocation of the ``showq`` command. | **Optional** | + + + + + + | | Default: 100 | | + +----------------+------------------------------------------------------------------------------------------------+-----------------+ + | bulk_delay | Maximum duration in seconds to wait per bulk invocation of the ``showq`` command. | **Optional** | + + + + + + | | Default: 1.0 | | +----------------+------------------------------------------------------------------------------------------------+-----------------+ | StartupCommand | The command executed in the batch job. (**Deprecated:** Moved to MachineTypeConfiguration!) | **Deprecated** | +----------------+------------------------------------------------------------------------------------------------+-----------------+ @@ -347,7 +353,6 @@ Available adapter configuration options username: clown client_keys: - /opt/tardis/ssh/tardis - StatusUpdate: 2 MachineTypes: - singularity_d2.large - singularity_d1.large @@ -468,7 +473,13 @@ Available adapter configuration options +----------------+---------------------------------------------------------------------------------------------+-----------------+ | Option | Short Description | Requirement | +================+=============================================================================================+=================+ - | StatusUpdate | The result of the status call is cached for `StatusUpdate` in minutes. | **Required** | + | bulk_size | Maximum number of jobs to handle per bulk invocation of the ``squeue`` command. | **Optional** | + + + + + + | | Default: 100 | | + +----------------+---------------------------------------------------------------------------------------------+-----------------+ + | bulk_delay | Maximum duration in seconds to wait per bulk invocation of the ``squeue`` command. | **Optional** | + + + + + + | | Default: 1.0 | | +----------------+---------------------------------------------------------------------------------------------+-----------------+ | StartUpCommand | The command executed in the batch job. (**Deprecated:** Moved to MachineTypeConfiguration!) | **Deprecated** | +----------------+---------------------------------------------------------------------------------------------+-----------------+ @@ -511,7 +522,6 @@ Available machine type configuration options username: billy client_keys: - /opt/tardis/ssh/tardis - StatusUpdate: 2 MachineTypes: - one_day - twelve_hours diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index a9b6a850..0f43080b 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -1,4 +1,4 @@ -.. Created by changelog.py at 2023-08-14, command +.. Created by changelog.py at 2023-10-04, command '/Users/giffler/.cache/pre-commit/repor6pnmwlm/py_env-python3.10/bin/changelog docs/source/changes compile --categories Added Changed Fixed Security Deprecated --output=docs/source/changelog.rst' based on the format of 'https://keepachangelog.com/' @@ -6,7 +6,7 @@ CHANGELOG ######### -[Unreleased] - 2023-08-14 +[Unreleased] - 2023-10-04 ========================= Deprecated