From 56ba1f990473a2401a2cbb3c0ea39e0bd9df4c23 Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Tue, 16 Jul 2024 08:21:49 +0300 Subject: [PATCH] Fix compression/extraction of large files (#114) * Extend poll_task to return the system that is associated with the task * Add information for xfer system in the internal transfers * Format fixes --- firecrest/AsyncClient.py | 75 ++++++++++++++++++++++++------------- firecrest/BasicClient.py | 72 ++++++++++++++++++++++------------- firecrest/types.py | 7 ++++ tests/test_compute.py | 31 ++++++++++++--- tests/test_extras.py | 9 +++++ tests/test_extras_async.py | 6 +++ tests/test_storage.py | 15 ++++++++ tests/test_storage_async.py | 4 ++ 8 files changed, 161 insertions(+), 58 deletions(-) diff --git a/firecrest/AsyncClient.py b/firecrest/AsyncClient.py index d715291..1d8f259 100644 --- a/firecrest/AsyncClient.py +++ b/firecrest/AsyncClient.py @@ -77,7 +77,7 @@ async def poll_task(self, final_status, sleep_times): resp = await self._client._task_safe(self._task_id, self._responses) logger.info(f'Status of {self._task_id} is {resp["status"]}') - return resp["data"] + return resp["data"], resp.get("system", "") class AsyncFirecrest: @@ -888,8 +888,9 @@ async def compress( dereference ) jobid = job_info['jobid'] + xfer_system = job_info["system"] active_jobs = await self.poll_active( - machine, + xfer_system, [jobid] ) intervals = (2**i for i in itertools.count(start=0)) @@ -899,7 +900,7 @@ async def compress( ): await asyncio.sleep(next(intervals)) active_jobs = await self.poll_active( - machine, + xfer_system, [jobid] ) @@ -913,7 +914,7 @@ async def compress( ) err_output = await self.head( - machine, + xfer_system, job_info['job_file_err'] ) if (err_output != ''): @@ -983,8 +984,9 @@ async def extract( extension ) jobid = job_info['jobid'] + xfer_system = job_info["system"] active_jobs = await self.poll_active( - machine, + xfer_system, [jobid] ) intervals = (2**i for i in itertools.count(start=0)) @@ -994,7 +996,7 @@ async def extract( ): await asyncio.sleep(next(intervals)) active_jobs = await self.poll_active( - machine, + xfer_system, [jobid] ) @@ -1008,7 +1010,7 @@ async def extract( ) err_output = await self.head( - machine, + xfer_system, job_info['job_file_err'] ) if (err_output != ''): @@ -1398,7 +1400,7 @@ async def submit( logger.info(f"Job submission task: {json_response['task_id']}") t = ComputeTask(self, json_response["task_id"], [resp]) - result = await t.poll_task("200", iter(self.polling_sleep_times)) + result = (await t.poll_task("200", iter(self.polling_sleep_times)))[0] # Inject taskid in the result result["firecrest_taskid"] = json_response["task_id"] return result @@ -1450,7 +1452,7 @@ async def poll( json_response = self._json_response([resp], 200) logger.info(f"Job polling task: {json_response['task_id']}") t = ComputeTask(self, json_response["task_id"], [resp]) - res = await t.poll_task("200", iter(self.polling_sleep_times)) + res = (await t.poll_task("200", iter(self.polling_sleep_times)))[0] # When there is no job in the sacct output firecrest will return an empty dictionary instead of list if isinstance(res, dict): return list(res.values()) @@ -1499,7 +1501,9 @@ async def poll_active( json_response = self._json_response([resp], 200) logger.info(f"Job active polling task: {json_response['task_id']}") t = ComputeTask(self, json_response["task_id"], [resp]) - dict_result = await t.poll_task("200", iter(self.polling_sleep_times)) + dict_result = ( + await t.poll_task("200", iter(self.polling_sleep_times)) + )[0] if len(jobids): ret = [i for i in dict_result.values() if i["jobid"] in jobids] else: @@ -1534,7 +1538,7 @@ async def nodes( ) json_response = self._json_response([resp], 200) t = ComputeTask(self, json_response["task_id"], [resp]) - result = await t.poll_task("200", iter(self.polling_sleep_times)) + result = (await t.poll_task("200", iter(self.polling_sleep_times)))[0] return result async def partitions( @@ -1564,7 +1568,7 @@ async def partitions( ) json_response = self._json_response([resp], 200) t = ComputeTask(self, json_response["task_id"], [resp]) - result = await t.poll_task("200", iter(self.polling_sleep_times)) + result = (await t.poll_task("200", iter(self.polling_sleep_times)))[0] return result async def reservations( @@ -1592,7 +1596,7 @@ async def reservations( json_response = self._json_response([resp], 200) t = ComputeTask(self, json_response["task_id"], [resp]) result = await t.poll_task("200", iter(self.polling_sleep_times)) - return result + return result[0] async def cancel(self, machine: str, job_id: str | int) -> str: """Cancels running job. @@ -1611,7 +1615,7 @@ async def cancel(self, machine: str, job_id: str | int) -> str: json_response = self._json_response([resp], 200) logger.info(f"Job cancellation task: {json_response['task_id']}") t = ComputeTask(self, json_response["task_id"], [resp]) - return await t.poll_task("200", iter(self.polling_sleep_times)) + return (await t.poll_task("200", iter(self.polling_sleep_times)))[0] # Storage async def _internal_transfer( @@ -1665,7 +1669,7 @@ async def submit_move_job( time: Optional[str] = None, stage_out_job_id: Optional[str] = None, account: Optional[str] = None, - ) -> t.JobSubmit: + ) -> t.InternalTransferJobSubmit: """Move files between internal CSCS file systems. Rename/Move source_path to target_path. Possible to stage-out jobs providing the SLURM ID of a production job. @@ -1697,7 +1701,10 @@ async def submit_move_job( ) logger.info(f"Job submission task: {json_response['task_id']}") t = ComputeTask(self, json_response["task_id"], resp) - return await t.poll_task("200", iter(self.polling_sleep_times)) + job_info = await t.poll_task("200", iter(self.polling_sleep_times)) + result = job_info[0] + result.update({"system": job_info[1]}) + return result async def submit_copy_job( self, @@ -1708,7 +1715,7 @@ async def submit_copy_job( time: Optional[str] = None, stage_out_job_id: Optional[str] = None, account: Optional[str] = None, - ) -> t.JobSubmit: + ) -> t.InternalTransferJobSubmit: """Copy files between internal CSCS file systems. Copy source_path to target_path. Possible to stage-out jobs providing the SLURM Id of a production job. @@ -1740,7 +1747,10 @@ async def submit_copy_job( ) logger.info(f"Job submission task: {json_response['task_id']}") t = ComputeTask(self, json_response["task_id"], resp) - return await t.poll_task("200", iter(self.polling_sleep_times)) + job_info = await t.poll_task("200", iter(self.polling_sleep_times)) + result = job_info[0] + result.update({"system": job_info[1]}) + return result async def submit_compress_job( self, @@ -1752,7 +1762,7 @@ async def submit_compress_job( time: Optional[str] = None, stage_out_job_id: Optional[str] = None, account: Optional[str] = None, - ) -> t.JobSubmit: + ) -> t.InternalTransferJobSubmit: """Compress files using gzip compression. You can name the output file as you like, but typically these files have a .tar.gz extension. Possible to stage-out jobs providing the SLURM Id of a production job. @@ -1787,7 +1797,10 @@ async def submit_compress_job( ) logger.info(f"Job submission task: {json_response['task_id']}") t = ComputeTask(self, json_response["task_id"], resp) - return await t.poll_task("200", iter(self.polling_sleep_times)) + job_info = await t.poll_task("200", iter(self.polling_sleep_times)) + result = job_info[0] + result.update({"system": job_info[1]}) + return result async def submit_extract_job( self, @@ -1799,7 +1812,7 @@ async def submit_extract_job( time: Optional[str] = None, stage_out_job_id: Optional[str] = None, account: Optional[str] = None, - ) -> t.JobSubmit: + ) -> t.InternalTransferJobSubmit: """Extract files. If you don't select the extension, FirecREST will try to guess the right command based on the extension of the sourcePath. Supported extensions are `.zip`, `.tar`, `.tgz`, `.gz` and `.bz2`. @@ -1831,10 +1844,14 @@ async def submit_extract_job( stage_out_job_id, account, resp, + extension=extension, ) logger.info(f"Job submission task: {json_response['task_id']}") t = ComputeTask(self, json_response["task_id"], resp) - return await t.poll_task("200", iter(self.polling_sleep_times)) + job_info = await t.poll_task("200", iter(self.polling_sleep_times)) + result = job_info[0] + result.update({"system": job_info[1]}) + return result async def submit_rsync_job( self, @@ -1845,7 +1862,7 @@ async def submit_rsync_job( time: Optional[str] = None, stage_out_job_id: Optional[str] = None, account: Optional[str] = None, - ) -> t.JobSubmit: + ) -> t.InternalTransferJobSubmit: """Transfer files between internal CSCS file systems. Transfer source_path to target_path. Possible to stage-out jobs providing the SLURM Id of a production job. @@ -1877,7 +1894,10 @@ async def submit_rsync_job( ) logger.info(f"Job submission task: {json_response['task_id']}") t = ComputeTask(self, json_response["task_id"], resp) - return await t.poll_task("200", iter(self.polling_sleep_times)) + job_info = await t.poll_task("200", iter(self.polling_sleep_times)) + result = job_info[0] + result.update({"system": job_info[1]}) + return result async def submit_delete_job( self, @@ -1887,7 +1907,7 @@ async def submit_delete_job( time: Optional[str] = None, stage_out_job_id: Optional[str] = None, account: Optional[str] = None, - ) -> t.JobSubmit: + ) -> t.InternalTransferJobSubmit: """Remove files in internal CSCS file systems. Remove file in target_path. Possible to stage-out jobs providing the SLURM Id of a production job. @@ -1918,7 +1938,10 @@ async def submit_delete_job( ) logger.info(f"Job submission task: {json_response['task_id']}") t = ComputeTask(self, json_response["task_id"], resp) - return await t.poll_task("200", iter(self.polling_sleep_times)) + job_info = await t.poll_task("200", iter(self.polling_sleep_times)) + result = job_info[0] + result.update({"system": job_info[1]}) + return result async def external_upload( self, machine: str, source_path: str, target_path: str diff --git a/firecrest/BasicClient.py b/firecrest/BasicClient.py index 45c583e..a846bc4 100644 --- a/firecrest/BasicClient.py +++ b/firecrest/BasicClient.py @@ -396,7 +396,7 @@ def _poll_tasks(self, task_id: str, final_status, sleep_time): resp = self._task_safe(task_id) logger.info(f'Status of {task_id} is {resp["status"]}') - return resp["data"] + return resp["data"], resp.get("system", "") # Status def all_services(self) -> List[t.Service]: @@ -656,8 +656,9 @@ def compress( dereference ) jobid = job_info['jobid'] + xfer_system = job_info["system"] active_jobs = self.poll_active( - machine, + xfer_system, [jobid] ) intervals = (2**i for i in itertools.count(start=0)) @@ -667,7 +668,7 @@ def compress( ): time.sleep(next(intervals)) active_jobs = self.poll_active( - machine, + xfer_system, [jobid] ) @@ -681,7 +682,7 @@ def compress( ) err_output = self.head( - machine, + xfer_system, job_info['job_file_err'] ) if (err_output != ''): @@ -753,8 +754,9 @@ def extract( extension ) jobid = job_info['jobid'] + xfer_system = job_info["system"] active_jobs = self.poll_active( - machine, + xfer_system, [jobid] ) intervals = (2**i for i in itertools.count(start=0)) @@ -764,7 +766,7 @@ def extract( ): time.sleep(next(intervals)) active_jobs = self.poll_active( - machine, + xfer_system, [jobid] ) @@ -778,7 +780,7 @@ def extract( ) err_output = self.head( - machine, + xfer_system, job_info['job_file_err'] ) if (err_output != ''): @@ -1221,7 +1223,7 @@ def submit( # Inject taskid in the result result = self._poll_tasks( json_response["task_id"], "200", iter(self.polling_sleep_times) - ) + )[0] result["firecrest_taskid"] = json_response["task_id"] return result @@ -1258,7 +1260,7 @@ def poll( logger.info(f"Job polling task: {json_response['task_id']}") res = self._poll_tasks( json_response["task_id"], "200", iter(self.polling_sleep_times) - ) + )[0] # When there is no job in the sacct output firecrest will return an empty dictionary instead of list if isinstance(res, dict): return list(res.values()) @@ -1295,7 +1297,7 @@ def poll_active( logger.info(f"Job active polling task: {json_response['task_id']}") dict_result = self._poll_tasks( json_response["task_id"], "200", iter(self.polling_sleep_times) - ) + )[0] return list(dict_result.values()) def nodes( @@ -1327,7 +1329,7 @@ def nodes( json_response = self._json_response(self._current_method_requests, 200) result = self._poll_tasks( json_response["task_id"], "200", iter(self.polling_sleep_times) - ) + )[0] return result def partitions( @@ -1359,7 +1361,7 @@ def partitions( json_response = self._json_response(self._current_method_requests, 200) result = self._poll_tasks( json_response["task_id"], "200", iter(self.polling_sleep_times) - ) + )[0] return result def reservations( @@ -1388,7 +1390,7 @@ def reservations( json_response = self._json_response(self._current_method_requests, 200) result = self._poll_tasks( json_response["task_id"], "200", iter(self.polling_sleep_times) - ) + )[0] return result def cancel(self, machine: str, job_id: str | int) -> str: @@ -1411,7 +1413,7 @@ def cancel(self, machine: str, job_id: str | int) -> str: logger.info(f"Job cancellation task: {json_response['task_id']}") return self._poll_tasks( json_response["task_id"], "200", iter(self.polling_sleep_times) - ) + )[0] # Storage def _internal_transfer( @@ -1464,7 +1466,7 @@ def submit_move_job( time: Optional[str] = None, stage_out_job_id: Optional[str] = None, account: Optional[str] = None, - ) -> t.JobSubmit: + ) -> t.InternalTransferJobSubmit: """Move files between internal CSCS file systems. Rename/Move source_path to target_path. Possible to stage-out jobs providing the SLURM ID of a production job. @@ -1494,9 +1496,12 @@ def submit_move_job( account, ) logger.info(f"Job submission task: {json_response['task_id']}") - return self._poll_tasks( + transfer_info = self._poll_tasks( json_response["task_id"], "200", iter(self.polling_sleep_times) ) + result = transfer_info[0] + result.update({"system": transfer_info[1]}) + return result def submit_copy_job( self, @@ -1507,7 +1512,7 @@ def submit_copy_job( time: Optional[str] = None, stage_out_job_id: Optional[str] = None, account: Optional[str] = None, - ) -> t.JobSubmit: + ) -> t.InternalTransferJobSubmit: """Copy files between internal CSCS file systems. Copy source_path to target_path. Possible to stage-out jobs providing the SLURM Id of a production job. @@ -1537,9 +1542,12 @@ def submit_copy_job( account, ) logger.info(f"Job submission task: {json_response['task_id']}") - return self._poll_tasks( + transfer_info = self._poll_tasks( json_response["task_id"], "200", iter(self.polling_sleep_times) ) + result = transfer_info[0] + result.update({"system": transfer_info[1]}) + return result def submit_rsync_job( self, @@ -1550,7 +1558,7 @@ def submit_rsync_job( time: Optional[str] = None, stage_out_job_id: Optional[str] = None, account: Optional[str] = None, - ) -> t.JobSubmit: + ) -> t.InternalTransferJobSubmit: """Transfer files between internal CSCS file systems. Transfer source_path to target_path. Possible to stage-out jobs providing the SLURM Id of a production job. @@ -1580,9 +1588,12 @@ def submit_rsync_job( account, ) logger.info(f"Job submission task: {json_response['task_id']}") - return self._poll_tasks( + transfer_info = self._poll_tasks( json_response["task_id"], "200", iter(self.polling_sleep_times) ) + result = transfer_info[0] + result.update({"system": transfer_info[1]}) + return result def submit_delete_job( self, @@ -1592,7 +1603,7 @@ def submit_delete_job( time: Optional[str] = None, stage_out_job_id: Optional[str] = None, account: Optional[str] = None, - ) -> t.JobSubmit: + ) -> t.InternalTransferJobSubmit: """Remove files in internal CSCS file systems. Remove file in target_path. Possible to stage-out jobs providing the SLURM Id of a production job. @@ -1621,9 +1632,12 @@ def submit_delete_job( account, ) logger.info(f"Job submission task: {json_response['task_id']}") - return self._poll_tasks( + transfer_info = self._poll_tasks( json_response["task_id"], "200", iter(self.polling_sleep_times) ) + result = transfer_info[0] + result.update({"system": transfer_info[1]}) + return result def external_upload( self, machine: str, source_path: str, target_path: str @@ -1669,7 +1683,7 @@ def submit_compress_job( time: Optional[str] = None, stage_out_job_id: Optional[str] = None, account: Optional[str] = None, - ) -> t.JobSubmit: + ) -> t.InternalTransferJobSubmit: """Compress files using gzip compression. You can name the output file as you like, but typically these files have a .tar.gz extension. Possible to stage-out jobs providing the SLURM Id of a production job. @@ -1702,9 +1716,12 @@ def submit_compress_job( dereference=dereference, ) logger.info(f"Job submission task: {json_response['task_id']}") - return self._poll_tasks( + transfer_info = self._poll_tasks( json_response["task_id"], "200", iter(self.polling_sleep_times) ) + result = transfer_info[0] + result.update({"system": transfer_info[1]}) + return result def submit_extract_job( self, @@ -1716,7 +1733,7 @@ def submit_extract_job( time: Optional[str] = None, stage_out_job_id: Optional[str] = None, account: Optional[str] = None, - ) -> t.JobSubmit: + ) -> t.InternalTransferJobSubmit: """Extract files. If you don't select the extension, FirecREST will try to guess the right command based on the extension of the sourcePath. Supported extensions are `.zip`, `.tar`, `.tgz`, `.gz` and `.bz2`. @@ -1750,9 +1767,12 @@ def submit_extract_job( extension ) logger.info(f"Job submission task: {json_response['task_id']}") - return self._poll_tasks( + transfer_info = self._poll_tasks( json_response["task_id"], "200", iter(self.polling_sleep_times) ) + result = transfer_info[0] + result.update({"system": transfer_info[1]}) + return result # Reservation def all_reservations(self, machine: str) -> List[dict]: diff --git a/firecrest/types.py b/firecrest/types.py index ac9e3fa..0d60d7f 100644 --- a/firecrest/types.py +++ b/firecrest/types.py @@ -66,6 +66,7 @@ class Task(TypedDict): last_modify: str service: str status: str + system: str task_id: str updated_at: str user: str @@ -182,6 +183,12 @@ class JobSubmit(TypedDict): result: str +class InternalTransferJobSubmit(JobSubmit): + """A transfer job submit record, from `storage/xfer-internal/{op}`""" + + system: str + + class Id(TypedDict): name: str id: str diff --git a/tests/test_compute.py b/tests/test_compute.py index 2794fc7..041329e 100644 --- a/tests/test_compute.py +++ b/tests/test_compute.py @@ -456,6 +456,7 @@ def tasks_handler(request: Request): "last_modify": "2021-12-04T11:52:10", "service": "compute", "status": "100", + "system": "cluster1", "task_id": taskid, "task_url": f"TASK_IP/tasks/{taskid}", "user": "username", @@ -487,6 +488,7 @@ def tasks_handler(request: Request): "last_modify": "2021-12-04T11:52:11", "service": "compute", "status": "200", + "system": "cluster1", "task_id": taskid, "task_url": f"TASK_IP/tasks/{taskid}", "user": "username", @@ -504,6 +506,7 @@ def tasks_handler(request: Request): "last_modify": "2021-12-04T11:52:11", "service": "compute", "status": "400", + "system": "cluster1", "task_id": taskid, "task_url": f"TASK_IP/tasks/{taskid}", "user": "username", @@ -527,6 +530,7 @@ def tasks_handler(request: Request): "last_modify": "2021-12-04T11:52:10", "service": "compute", "status": "100", + "system": "cluster1", "task_id": taskid, "task_url": f"TASK_IP/tasks/{taskid}", "user": "username", @@ -560,6 +564,7 @@ def tasks_handler(request: Request): "last_modify": "2021-12-04T11:52:11", "service": "compute", "status": "200", + "system": "cluster1", "task_id": taskid, "task_url": f"TASK_IP/tasks/{taskid}", "user": "username", @@ -577,6 +582,7 @@ def tasks_handler(request: Request): "last_modify": "2021-12-04T11:52:11", "service": "compute", "status": "400", + "system": "cluster1", "task_id": taskid, "task_url": f"TASK_IP/tasks/{taskid}", "user": "username", @@ -601,6 +607,7 @@ def tasks_handler(request: Request): "last_modify": "2021-12-04T11:52:10", "service": "compute", "status": "100", + "system": "cluster1", "task_id": taskid, "task_url": f"TASK_IP/tasks/{taskid}", "user": "username", @@ -643,6 +650,7 @@ def tasks_handler(request: Request): "last_modify": "2021-12-06T09:53:48", "service": "compute", "status": "200", + "system": "cluster1", "task_id": taskid, "task_url": f"TASK_IP/tasks/{taskid}", "user": "username", @@ -673,6 +681,7 @@ def tasks_handler(request: Request): "last_modify": "2021-12-06T09:53:48", "service": "compute", "status": "200", + "system": "cluster1", "task_id": taskid, "task_url": f"TASK_IP/tasks/{taskid}", "user": "username", @@ -690,6 +699,7 @@ def tasks_handler(request: Request): "last_modify": "2021-12-06T09:53:48", "service": "compute", "status": "200", + "system": "cluster1", "task_id": taskid, "task_url": f"TASK_IP/tasks/{taskid}", "user": "username", @@ -707,6 +717,7 @@ def tasks_handler(request: Request): "last_modify": "2021-12-06T09:47:22", "service": "compute", "status": "400", + "system": "cluster1", "task_id": taskid, "task_url": f"TASK_IP/tasks/{taskid}", "user": "username", @@ -730,6 +741,7 @@ def tasks_handler(request: Request): "last_modify": "2021-12-04T11:52:10", "service": "compute", "status": "100", + "system": "cluster1", "task_id": taskid, "task_url": f"TASK_IP/tasks/{taskid}", "user": "username", @@ -765,6 +777,7 @@ def tasks_handler(request: Request): "last_modify": "2021-12-06T09:53:48", "service": "compute", "status": "200", + "system": "cluster1", "task_id": taskid, "task_url": f"TASK_IP/tasks/{taskid}", "user": "username", @@ -817,6 +830,7 @@ def tasks_handler(request: Request): "last_modify": "2021-12-06T09:53:48", "service": "compute", "status": "200", + "system": "cluster1", "task_id": taskid, "task_url": f"TASK_IP/tasks/{taskid}", "user": "username", @@ -834,6 +848,7 @@ def tasks_handler(request: Request): "last_modify": "2021-12-06T09:47:22", "service": "compute", "status": "400", + "system": "cluster1", "task_id": taskid, "task_url": f"TASK_IP/tasks/{taskid}", "user": "username", @@ -857,6 +872,7 @@ def tasks_handler(request: Request): "last_modify": "2021-12-04T11:52:10", "service": "compute", "status": "100", + "system": "cluster1", "task_id": taskid, "task_url": f"TASK_IP/tasks/{taskid}", "user": "username", @@ -874,6 +890,7 @@ def tasks_handler(request: Request): "last_modify": "2021-12-06T10:42:06", "service": "compute", "status": "200", + "system": "cluster1", "task_id": taskid, "task_url": f"TASK_IP/tasks/{taskid}", "user": "username", @@ -891,6 +908,7 @@ def tasks_handler(request: Request): "last_modify": "2021-12-06T10:32:26", "service": "compute", "status": "400", + "system": "cluster1", "task_id": taskid, "task_url": f"TASK_IP/tasks/{taskid}", "user": "username", @@ -908,6 +926,7 @@ def tasks_handler(request: Request): "last_modify": "2021-12-06T10:39:47", "service": "compute", "status": "400", + "system": "cluster1", "task_id": taskid, "task_url": f"TASK_IP/tasks/{taskid}", "user": "username", @@ -936,7 +955,7 @@ def tasks_handler(request: Request): "last_modify": "2024-04-16T09:47:06", "service": "compute", "status": "200", - "system": "cluster", + "system": "cluster1", "task_id": "nodes_info", "task_url": "/tasks/nodes_info", "updated_at": "2024-04-16T09:47:06", @@ -956,7 +975,7 @@ def tasks_handler(request: Request): "last_modify": "2024-04-16T09:56:14", "service": "compute", "status": "400", - "system": "cluster", + "system": "cluster1", "task_id": "info_unknown_node", "task_url": "/tasks/info_unknown_node", "updated_at": "2024-04-16T09:56:14", @@ -998,7 +1017,7 @@ def tasks_handler(request: Request): "last_modify": "2024-04-24T12:02:25", "service": "compute", "status": "200", - "system": "cluster", + "system": "cluster1", "task_id": taskid, "task_url": f"/tasks/{taskid}", "updated_at": "2024-04-24T12:02:25", @@ -1018,7 +1037,7 @@ def tasks_handler(request: Request): "last_modify": "2024-04-24T12:02:25", "service": "compute", "status": "400", - "system": "cluster", + "system": "cluster1", "task_id": taskid, "task_url": f"/tasks/{taskid}", "updated_at": "2024-04-24T12:17:14", @@ -1055,7 +1074,7 @@ def tasks_handler(request: Request): "last_modify": "2024-04-24T12:02:25", "service": "compute", "status": "200", - "system": "cluster", + "system": "cluster1", "task_id": taskid, "task_url": f"/tasks/{taskid}", "updated_at": "2024-04-24T12:02:25", @@ -1075,7 +1094,7 @@ def tasks_handler(request: Request): "last_modify": "2024-04-24T12:02:25", "service": "compute", "status": "200", - "system": "cluster", + "system": "cluster1", "task_id": taskid, "task_url": f"/tasks/{taskid}", "updated_at": "2024-04-24T12:17:14", diff --git a/tests/test_extras.py b/tests/test_extras.py index 2e39b77..1c1c079 100644 --- a/tests/test_extras.py +++ b/tests/test_extras.py @@ -133,6 +133,7 @@ def tasks_handler(request: Request): "last_modify": "2022-08-16T07:18:54", "service": "storage", "status": "114", + "system": "cluster1", "task_id": "taskid_1", "task_url": "TASK_IP/tasks/taskid_1", "updated_at": "2022-08-16T07:18:54", @@ -146,6 +147,7 @@ def tasks_handler(request: Request): "last_modify": "2022-08-16T07:18:54", "service": "storage", "status": "112", + "system": "cluster1", "task_id": "taskid_2", "task_url": "TASK_IP/tasks/taskid_2", "updated_at": "2022-08-16T07:18:54", @@ -159,6 +161,7 @@ def tasks_handler(request: Request): "last_modify": "2022-08-16T07:18:54", "service": "storage", "status": "111", + "system": "cluster1", "task_id": "taskid_3", "task_url": "TASK_IP/tasks/taskid_3", "updated_at": "2022-08-16T07:18:54", @@ -224,6 +227,7 @@ def test_all_tasks(valid_client): "last_modify": "2022-08-16T07:18:54", "service": "storage", "status": "114", + "system": "cluster1", "task_id": "taskid_1", "task_url": "TASK_IP/tasks/taskid_1", "updated_at": "2022-08-16T07:18:54", @@ -237,6 +241,7 @@ def test_all_tasks(valid_client): "last_modify": "2022-08-16T07:18:54", "service": "storage", "status": "112", + "system": "cluster1", "task_id": "taskid_2", "task_url": "TASK_IP/tasks/taskid_2", "updated_at": "2022-08-16T07:18:54", @@ -250,6 +255,7 @@ def test_all_tasks(valid_client): "last_modify": "2022-08-16T07:18:54", "service": "storage", "status": "111", + "system": "cluster1", "task_id": "taskid_3", "task_url": "TASK_IP/tasks/taskid_3", "updated_at": "2022-08-16T07:18:54", @@ -281,6 +287,7 @@ def test_subset_tasks(valid_client): "last_modify": "2022-08-16T07:18:54", "service": "storage", "status": "114", + "system": "cluster1", "task_id": "taskid_1", "task_url": "TASK_IP/tasks/taskid_1", "updated_at": "2022-08-16T07:18:54", @@ -294,6 +301,7 @@ def test_subset_tasks(valid_client): "last_modify": "2022-08-16T07:18:54", "service": "storage", "status": "111", + "system": "cluster1", "task_id": "taskid_3", "task_url": "TASK_IP/tasks/taskid_3", "updated_at": "2022-08-16T07:18:54", @@ -324,6 +332,7 @@ def test_one_task(valid_client): "last_modify": "2022-08-16T07:18:54", "service": "storage", "status": "112", + "system": "cluster1", "task_id": "taskid_2", "task_url": "TASK_IP/tasks/taskid_2", "updated_at": "2022-08-16T07:18:54", diff --git a/tests/test_extras_async.py b/tests/test_extras_async.py index f19dd37..2c7bca3 100644 --- a/tests/test_extras_async.py +++ b/tests/test_extras_async.py @@ -69,6 +69,7 @@ async def test_all_tasks(valid_client): "last_modify": "2022-08-16T07:18:54", "service": "storage", "status": "114", + "system": "cluster1", "task_id": "taskid_1", "task_url": "TASK_IP/tasks/taskid_1", "updated_at": "2022-08-16T07:18:54", @@ -82,6 +83,7 @@ async def test_all_tasks(valid_client): "last_modify": "2022-08-16T07:18:54", "service": "storage", "status": "112", + "system": "cluster1", "task_id": "taskid_2", "task_url": "TASK_IP/tasks/taskid_2", "updated_at": "2022-08-16T07:18:54", @@ -95,6 +97,7 @@ async def test_all_tasks(valid_client): "last_modify": "2022-08-16T07:18:54", "service": "storage", "status": "111", + "system": "cluster1", "task_id": "taskid_3", "task_url": "TASK_IP/tasks/taskid_3", "updated_at": "2022-08-16T07:18:54", @@ -115,6 +118,7 @@ async def test_subset_tasks(valid_client): "last_modify": "2022-08-16T07:18:54", "service": "storage", "status": "114", + "system": "cluster1", "task_id": "taskid_1", "task_url": "TASK_IP/tasks/taskid_1", "updated_at": "2022-08-16T07:18:54", @@ -128,6 +132,7 @@ async def test_subset_tasks(valid_client): "last_modify": "2022-08-16T07:18:54", "service": "storage", "status": "111", + "system": "cluster1", "task_id": "taskid_3", "task_url": "TASK_IP/tasks/taskid_3", "updated_at": "2022-08-16T07:18:54", @@ -147,6 +152,7 @@ async def test_one_task(valid_client): "last_modify": "2022-08-16T07:18:54", "service": "storage", "status": "112", + "system": "cluster1", "task_id": "taskid_2", "task_url": "TASK_IP/tasks/taskid_2", "updated_at": "2022-08-16T07:18:54", diff --git a/tests/test_storage.py b/tests/test_storage.py index 69ae179..468eca9 100644 --- a/tests/test_storage.py +++ b/tests/test_storage.py @@ -210,6 +210,7 @@ def storage_tasks_handler(request: Request): "last_modify": "2021-12-04T11:52:10", "service": "compute", "status": "100", + "system": "cluster1", "task_id": taskid, "task_url": f"TASK_IP/tasks/{taskid}", "user": "username", @@ -235,6 +236,7 @@ def storage_tasks_handler(request: Request): "last_modify": "2021-12-06T13:48:52", "service": "compute", "status": "200", + "system": "cluster1", "task_id": "6f514b060ca036917f4194964b6e949c", "task_url": f"TASK_IP/tasks/{taskid}", "user": "username", @@ -254,6 +256,7 @@ def storage_tasks_handler(request: Request): "last_modify": "2021-12-04T11:52:10", "service": "storage", "status": "100", + "system": "cluster1", "task_id": taskid, "task_url": f"TASK_IP/tasks/{taskid}", "user": "username", @@ -272,6 +275,7 @@ def storage_tasks_handler(request: Request): "last_modify": "2021-12-04T11:52:10", "service": "storage", "status": "116", + "system": "cluster1", "task_id": taskid, "task_url": f"TASK_IP/tasks/{taskid}", "user": "username", @@ -289,6 +293,7 @@ def storage_tasks_handler(request: Request): "last_modify": "2021-12-04T11:52:10", "service": "storage", "status": "117", + "system": "cluster1", "task_id": taskid, "task_url": f"TASK_IP/tasks/{taskid}", "user": "username", @@ -310,6 +315,7 @@ def storage_tasks_handler(request: Request): "last_modify": "2021-12-04T11:52:10", "service": "storage", "status": "117", + "system": "cluster1", "task_id": taskid, "task_url": f"TASK_IP/tasks/{taskid}", "user": "username", @@ -329,6 +335,7 @@ def storage_tasks_handler(request: Request): "last_modify": "2021-12-04T11:52:10", "service": "storage", "status": "100", + "system": "cluster1", "task_id": taskid, "task_url": f"TASK_IP/tasks/{taskid}", "user": "username", @@ -347,6 +354,7 @@ def storage_tasks_handler(request: Request): "msg": "Waiting for Presigned URL to upload file to staging area (OpenStack Swift)", "source": "/path/to/local/source", "status": "110", + "system": "cluster1", "system_addr": "machine_addr", "system_name": "cluster1", "target": "/path/to/remote/destination", @@ -358,6 +366,7 @@ def storage_tasks_handler(request: Request): "last_modify": "2022-11-23T09:07:50", "service": "storage", "status": "110", + "system": "cluster1", "task_id": "taskid", "task_url": f"TASK_IP/tasks/{taskid}", "updated_at": "2022-11-23T09:07:50", @@ -393,6 +402,7 @@ def storage_tasks_handler(request: Request): }, "source": "/path/to/local/source", "status": "111", + "system": "cluster1", "system_addr": "machine_addr", "system_name": "cluster1", "target": "/path/to/remote/destination", @@ -404,6 +414,7 @@ def storage_tasks_handler(request: Request): "last_modify": "2022-11-23T09:18:43", "service": "storage", "status": "111", + "system": "cluster1", "task_id": taskid, "task_url": f"TASK_IP/tasks/{taskid}", "updated_at": "2022-11-23T09:18:43", @@ -466,6 +477,7 @@ def test_internal_transfer(valid_client): "job_file_out": "/path/to/firecrest/internal_transfer_id/job-35363861.out", "jobid": 35363861, "result": "Job submitted", + "system": "cluster1", } # cp job @@ -486,6 +498,7 @@ def test_internal_transfer(valid_client): "job_file_out": "/path/to/firecrest/internal_transfer_id/job-35363861.out", "jobid": 35363861, "result": "Job submitted", + "system": "cluster1", } # rsync job @@ -506,6 +519,7 @@ def test_internal_transfer(valid_client): "job_file_out": "/path/to/firecrest/internal_transfer_id/job-35363861.out", "jobid": 35363861, "result": "Job submitted", + "system": "cluster1", } # rm job @@ -525,6 +539,7 @@ def test_internal_transfer(valid_client): "job_file_out": "/path/to/firecrest/internal_transfer_id/job-35363861.out", "jobid": 35363861, "result": "Job submitted", + "system": "cluster1", } diff --git a/tests/test_storage_async.py b/tests/test_storage_async.py index 1bb8d5b..8da80d9 100644 --- a/tests/test_storage_async.py +++ b/tests/test_storage_async.py @@ -92,6 +92,7 @@ async def test_internal_transfer(valid_client): "job_file_out": "/path/to/firecrest/internal_transfer_id/job-35363861.out", "jobid": 35363861, "result": "Job submitted", + "system": "cluster1", } # cp job @@ -112,6 +113,7 @@ async def test_internal_transfer(valid_client): "job_file_out": "/path/to/firecrest/internal_transfer_id/job-35363861.out", "jobid": 35363861, "result": "Job submitted", + "system": "cluster1", } # rsync job @@ -132,6 +134,7 @@ async def test_internal_transfer(valid_client): "job_file_out": "/path/to/firecrest/internal_transfer_id/job-35363861.out", "jobid": 35363861, "result": "Job submitted", + "system": "cluster1", } # rm job @@ -151,6 +154,7 @@ async def test_internal_transfer(valid_client): "job_file_out": "/path/to/firecrest/internal_transfer_id/job-35363861.out", "jobid": 35363861, "result": "Job submitted", + "system": "cluster1", }