Skip to content

Commit

Permalink
Remove updates on created and updated timestamps from the Moab adapter
Browse files Browse the repository at this point in the history
  • Loading branch information
giffels committed Aug 3, 2023
1 parent fa2675e commit 759c932
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 121 deletions.
50 changes: 9 additions & 41 deletions tardis/adapters/sites/moab.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from ...interfaces.siteadapter import SiteAdapter
from ...utilities.staticmapping import StaticMapping
from ...utilities.attributedict import AttributeDict
from ...utilities.attributedict import convert_to_attribute_dict
from ...utilities.executors.shellexecutor import ShellExecutor
from ...utilities.asynccachemap import AsyncCacheMap
from ...utilities.utils import (
Expand All @@ -18,11 +17,9 @@
from asyncio import TimeoutError
from contextlib import contextmanager
from functools import partial
from datetime import datetime

import asyncssh
import logging
import re
import warnings
from xml.dom import minidom

Expand Down Expand Up @@ -133,27 +130,11 @@ async def deploy_resource(
logger.debug(f"{self.site_name} servers create returned {result}")

remote_resource_uuid = int(result.stdout)
resource_attributes.update(

return AttributeDict(
remote_resource_uuid=remote_resource_uuid,
created=datetime.now(),
updated=datetime.now(),
resource_status=ResourceStatus.Booting,
)
return resource_attributes

@staticmethod
def check_remote_resource_uuid(resource_attributes, regex, response):
pattern = re.compile(regex, flags=re.MULTILINE)
remote_resource_uuid = int(pattern.findall(response)[0])
if remote_resource_uuid != int(resource_attributes.remote_resource_uuid):
raise TardisError(
f"Failed to terminate {resource_attributes.remote_resource_uuid}."
)
else:
resource_attributes.update(
resource_status=ResourceStatus.Stopped, updated=datetime.now()
)
return remote_resource_uuid

async def resource_status(
self, resource_attributes: AttributeDict
Expand All @@ -176,40 +157,27 @@ async def resource_status(
"State": "Completed",
}
logger.debug(f"{self.site_name} has status {resource_status}.")
resource_attributes.update(updated=datetime.now())
return convert_to_attribute_dict(
{**resource_attributes, **self.handle_response(resource_status)}
)

async def terminate_resource(self, resource_attributes: AttributeDict):
return self.handle_response(resource_status)

async def terminate_resource(self, resource_attributes: AttributeDict) -> None:
request_command = f"canceljob {resource_attributes.remote_resource_uuid}"
try:
response = await self._executor.run_command(request_command)
except CommandExecutionFailure as cf:
if cf.exit_code == 1:
logger.warning(
f"{self.site_name} servers terminate returned {cf.stdout}"
)
remote_resource_uuid = self.check_remote_resource_uuid(
resource_attributes,
r"ERROR: invalid job specified \((\d*)\)",
cf.stderr,
f"{self.site_name} servers terminate returned {cf.stdout}."
"Potentially already terminated."
)
else:
raise cf
else:
logger.debug(f"{self.site_name} servers terminate returned {response}")
remote_resource_uuid = self.check_remote_resource_uuid(
resource_attributes, r"^job \'(\d*)\' cancelled", response.stdout
)

return self.handle_response(
{"SystemJID": remote_resource_uuid}, **resource_attributes
)

async def stop_resource(self, resource_attributes: AttributeDict):
async def stop_resource(self, resource_attributes: AttributeDict) -> None:
logger.debug("MOAB jobs cannot be stopped gracefully. Terminating instead.")
return await self.terminate_resource(resource_attributes)
await self.terminate_resource(resource_attributes)

def msub_cmdline_options(self, drone_uuid, machine_meta_data_translation_mapping):
sbatch_options = self.machine_type_configuration.get(
Expand Down
117 changes: 37 additions & 80 deletions tests/adapters_t/sites_t/test_moab.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,28 +225,16 @@ def test_start_up_command_deprecation_warning(self):

@mock_executor_run_command(TEST_DEPLOY_RESOURCE_RESPONSE)
def test_deploy_resource(self):
expected_resource_attributes = self.resource_attributes
expected_resource_attributes.update(
created=datetime.now(), updated=datetime.now()
)
return_resource_attributes = run_async(
self.moab_adapter.deploy_resource,
resource_attributes=self.resource_attributes,
)
if (
return_resource_attributes.created - expected_resource_attributes.created
> timedelta(seconds=1)
or return_resource_attributes.updated - expected_resource_attributes.updated
> timedelta(seconds=1)
):
raise Exception("Creation time or update time wrong!")
del (
expected_resource_attributes.created,
expected_resource_attributes.updated,
return_resource_attributes.created,
return_resource_attributes.updated,
self.assertDictEqual(
AttributeDict(
remote_resource_uuid=4761849, resource_status=ResourceStatus.Booting
),
run_async(
self.moab_adapter.deploy_resource,
resource_attributes=self.resource_attributes,
),
)
self.assertEqual(return_resource_attributes, expected_resource_attributes)

self.mock_executor.return_value.run_command.assert_called_with(
"msub -j oe -m p -l walltime=02:00:00:00,mem=120gb,nodes=1:ppn=20 -v TardisDroneCores=128,TardisDroneMemory=120,TardisDroneDisk=100,TardisDroneUuid=testsite-abcdef startVM.py" # noqa: B950
)
Expand All @@ -260,12 +248,11 @@ def test_deploy_resource_w_submit_options(self):
)
)

moab_adapter = MoabAdapter(machine_type="test2large", site_name="TestSite")

run_async(
moab_adapter.deploy_resource,
self.moab_adapter.deploy_resource,
resource_attributes=self.resource_attributes,
)

self.mock_executor.return_value.run_command.assert_called_with(
"msub -M [email protected] -j oe -m p -l walltime=02:00:00:00,mem=120gb,nodes=1:ppn=20 -v TardisDroneCores=128,TardisDroneMemory=120,TardisDroneDisk=100,TardisDroneUuid=testsite-abcdef --timeout=60 startVM.py" # noqa: B950
)
Expand All @@ -283,19 +270,15 @@ def test_site_name(self):

@mock_executor_run_command(TEST_RESOURCE_STATUS_RESPONSE)
def test_resource_status(self):
expected_resource_attributes = self.resource_attributes
expected_resource_attributes.update(updated=datetime.now())
return_resource_attributes = run_async(
self.moab_adapter.resource_status,
resource_attributes=self.resource_attributes,
self.assertDictEqual(
AttributeDict(
remote_resource_uuid=4761849, resource_status=ResourceStatus.Booting
),
run_async(
self.moab_adapter.resource_status,
resource_attributes=self.resource_attributes,
),
)
if (
return_resource_attributes.updated - expected_resource_attributes.updated
> timedelta(seconds=1)
):
raise Exception("Update time wrong!")
del expected_resource_attributes.updated, return_resource_attributes.updated
self.assertEqual(return_resource_attributes, expected_resource_attributes)

@mock_executor_run_command(TEST_RESOURCE_STATE_TRANSLATION_RESPONSE)
def test_resource_state_translation(self):
Expand All @@ -316,49 +299,36 @@ def test_resource_status_update(self):
self.assertEqual(
self.resource_attributes["resource_status"], ResourceStatus.Booting
)
return_resource_attributes = run_async(
self.moab_adapter.resource_status,
resource_attributes=self.resource_attributes,
)
self.assertEqual(
return_resource_attributes["resource_status"], ResourceStatus.Running

self.assertDictEqual(
AttributeDict(
remote_resource_uuid=4761849, resource_status=ResourceStatus.Running
),
run_async(
self.moab_adapter.resource_status,
resource_attributes=self.resource_attributes,
),
)

@mock_executor_run_command(TEST_TERMINATE_RESOURCE_RESPONSE)
def test_stop_resource(self):
expected_resource_attributes = self.resource_attributes
expected_resource_attributes.update(
updated=datetime.now(), resource_status=ResourceStatus.Stopped
)
return_resource_attributes = run_async(
run_async(
self.moab_adapter.stop_resource,
resource_attributes=self.resource_attributes,
)
if (
return_resource_attributes.updated - expected_resource_attributes.updated
> timedelta(seconds=1)
):
raise Exception("Update time wrong!")
del expected_resource_attributes.updated, return_resource_attributes.updated
self.assertEqual(return_resource_attributes, expected_resource_attributes)
self.mock_executor.return_value.run_command.assert_called_with(
"canceljob 4761849"
)

@mock_executor_run_command(TEST_TERMINATE_RESOURCE_RESPONSE)
def test_terminate_resource(self):
expected_resource_attributes = self.resource_attributes
expected_resource_attributes.update(
updated=datetime.now(), resource_status=ResourceStatus.Stopped
)
return_resource_attributes = run_async(
run_async(
self.moab_adapter.terminate_resource,
resource_attributes=self.resource_attributes,
)
if (
return_resource_attributes.updated - expected_resource_attributes.updated
> timedelta(seconds=1)
):
raise Exception("Update time wrong!")
del expected_resource_attributes.updated, return_resource_attributes.updated
self.assertEqual(return_resource_attributes, expected_resource_attributes)
self.mock_executor.return_value.run_command.assert_called_with(
"canceljob 4761849"
)

@mock_executor_run_command(
"",
Expand All @@ -372,18 +342,11 @@ def test_terminate_resource(self):
),
)
def test_terminate_dead_resource(self):
expected_resource_attributes = self.resource_attributes
expected_resource_attributes.update(
updated=datetime.now(), resource_status=ResourceStatus.Stopped
)
with self.assertLogs(level=logging.WARNING):
return_resource_attributes = run_async(
run_async(
self.moab_adapter.terminate_resource,
resource_attributes=self.resource_attributes,
)
self.assertEqual(
return_resource_attributes["resource_status"], ResourceStatus.Stopped
)

@mock_executor_run_command(
"",
Expand Down Expand Up @@ -461,9 +424,3 @@ def test_exception_handling(to_raise, to_catch):

for to_raise, to_catch in matrix:
test_exception_handling(to_raise, to_catch)

def test_check_remote_resource_uuid(self):
with self.assertRaises(TardisError):
self.moab_adapter.check_remote_resource_uuid(
AttributeDict(remote_resource_uuid=1), regex=r"^(\d)$", response="2"
)

0 comments on commit 759c932

Please sign in to comment.