Skip to content

Commit

Permalink
Fix compression/extraction of large files (#114)
Browse files Browse the repository at this point in the history
* Extend poll_task to return the system that is associated with the task

* Add information for xfer system in the internal transfers

* Format fixes
  • Loading branch information
ekouts authored Jul 16, 2024
1 parent 6f7cc66 commit 56ba1f9
Show file tree
Hide file tree
Showing 8 changed files with 161 additions and 58 deletions.
75 changes: 49 additions & 26 deletions firecrest/AsyncClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ async def poll_task(self, final_status, sleep_times):
resp = await self._client._task_safe(self._task_id, self._responses)

logger.info(f'Status of {self._task_id} is {resp["status"]}')
return resp["data"]
return resp["data"], resp.get("system", "")


class AsyncFirecrest:
Expand Down Expand Up @@ -888,8 +888,9 @@ async def compress(
dereference
)
jobid = job_info['jobid']
xfer_system = job_info["system"]
active_jobs = await self.poll_active(
machine,
xfer_system,
[jobid]
)
intervals = (2**i for i in itertools.count(start=0))
Expand All @@ -899,7 +900,7 @@ async def compress(
):
await asyncio.sleep(next(intervals))
active_jobs = await self.poll_active(
machine,
xfer_system,
[jobid]
)

Expand All @@ -913,7 +914,7 @@ async def compress(
)

err_output = await self.head(
machine,
xfer_system,
job_info['job_file_err']
)
if (err_output != ''):
Expand Down Expand Up @@ -983,8 +984,9 @@ async def extract(
extension
)
jobid = job_info['jobid']
xfer_system = job_info["system"]
active_jobs = await self.poll_active(
machine,
xfer_system,
[jobid]
)
intervals = (2**i for i in itertools.count(start=0))
Expand All @@ -994,7 +996,7 @@ async def extract(
):
await asyncio.sleep(next(intervals))
active_jobs = await self.poll_active(
machine,
xfer_system,
[jobid]
)

Expand All @@ -1008,7 +1010,7 @@ async def extract(
)

err_output = await self.head(
machine,
xfer_system,
job_info['job_file_err']
)
if (err_output != ''):
Expand Down Expand Up @@ -1398,7 +1400,7 @@ async def submit(
logger.info(f"Job submission task: {json_response['task_id']}")
t = ComputeTask(self, json_response["task_id"], [resp])

result = await t.poll_task("200", iter(self.polling_sleep_times))
result = (await t.poll_task("200", iter(self.polling_sleep_times)))[0]
# Inject taskid in the result
result["firecrest_taskid"] = json_response["task_id"]
return result
Expand Down Expand Up @@ -1450,7 +1452,7 @@ async def poll(
json_response = self._json_response([resp], 200)
logger.info(f"Job polling task: {json_response['task_id']}")
t = ComputeTask(self, json_response["task_id"], [resp])
res = await t.poll_task("200", iter(self.polling_sleep_times))
res = (await t.poll_task("200", iter(self.polling_sleep_times)))[0]
# When there is no job in the sacct output firecrest will return an empty dictionary instead of list
if isinstance(res, dict):
return list(res.values())
Expand Down Expand Up @@ -1499,7 +1501,9 @@ async def poll_active(
json_response = self._json_response([resp], 200)
logger.info(f"Job active polling task: {json_response['task_id']}")
t = ComputeTask(self, json_response["task_id"], [resp])
dict_result = await t.poll_task("200", iter(self.polling_sleep_times))
dict_result = (
await t.poll_task("200", iter(self.polling_sleep_times))
)[0]
if len(jobids):
ret = [i for i in dict_result.values() if i["jobid"] in jobids]
else:
Expand Down Expand Up @@ -1534,7 +1538,7 @@ async def nodes(
)
json_response = self._json_response([resp], 200)
t = ComputeTask(self, json_response["task_id"], [resp])
result = await t.poll_task("200", iter(self.polling_sleep_times))
result = (await t.poll_task("200", iter(self.polling_sleep_times)))[0]
return result

async def partitions(
Expand Down Expand Up @@ -1564,7 +1568,7 @@ async def partitions(
)
json_response = self._json_response([resp], 200)
t = ComputeTask(self, json_response["task_id"], [resp])
result = await t.poll_task("200", iter(self.polling_sleep_times))
result = (await t.poll_task("200", iter(self.polling_sleep_times)))[0]
return result

async def reservations(
Expand Down Expand Up @@ -1592,7 +1596,7 @@ async def reservations(
json_response = self._json_response([resp], 200)
t = ComputeTask(self, json_response["task_id"], [resp])
result = await t.poll_task("200", iter(self.polling_sleep_times))
return result
return result[0]

async def cancel(self, machine: str, job_id: str | int) -> str:
"""Cancels running job.
Expand All @@ -1611,7 +1615,7 @@ async def cancel(self, machine: str, job_id: str | int) -> str:
json_response = self._json_response([resp], 200)
logger.info(f"Job cancellation task: {json_response['task_id']}")
t = ComputeTask(self, json_response["task_id"], [resp])
return await t.poll_task("200", iter(self.polling_sleep_times))
return (await t.poll_task("200", iter(self.polling_sleep_times)))[0]

# Storage
async def _internal_transfer(
Expand Down Expand Up @@ -1665,7 +1669,7 @@ async def submit_move_job(
time: Optional[str] = None,
stage_out_job_id: Optional[str] = None,
account: Optional[str] = None,
) -> t.JobSubmit:
) -> t.InternalTransferJobSubmit:
"""Move files between internal CSCS file systems.
Rename/Move source_path to target_path.
Possible to stage-out jobs providing the SLURM ID of a production job.
Expand Down Expand Up @@ -1697,7 +1701,10 @@ async def submit_move_job(
)
logger.info(f"Job submission task: {json_response['task_id']}")
t = ComputeTask(self, json_response["task_id"], resp)
return await t.poll_task("200", iter(self.polling_sleep_times))
job_info = await t.poll_task("200", iter(self.polling_sleep_times))
result = job_info[0]
result.update({"system": job_info[1]})
return result

async def submit_copy_job(
self,
Expand All @@ -1708,7 +1715,7 @@ async def submit_copy_job(
time: Optional[str] = None,
stage_out_job_id: Optional[str] = None,
account: Optional[str] = None,
) -> t.JobSubmit:
) -> t.InternalTransferJobSubmit:
"""Copy files between internal CSCS file systems.
Copy source_path to target_path.
Possible to stage-out jobs providing the SLURM Id of a production job.
Expand Down Expand Up @@ -1740,7 +1747,10 @@ async def submit_copy_job(
)
logger.info(f"Job submission task: {json_response['task_id']}")
t = ComputeTask(self, json_response["task_id"], resp)
return await t.poll_task("200", iter(self.polling_sleep_times))
job_info = await t.poll_task("200", iter(self.polling_sleep_times))
result = job_info[0]
result.update({"system": job_info[1]})
return result

async def submit_compress_job(
self,
Expand All @@ -1752,7 +1762,7 @@ async def submit_compress_job(
time: Optional[str] = None,
stage_out_job_id: Optional[str] = None,
account: Optional[str] = None,
) -> t.JobSubmit:
) -> t.InternalTransferJobSubmit:
"""Compress files using gzip compression.
You can name the output file as you like, but typically these files have a .tar.gz extension.
Possible to stage-out jobs providing the SLURM Id of a production job.
Expand Down Expand Up @@ -1787,7 +1797,10 @@ async def submit_compress_job(
)
logger.info(f"Job submission task: {json_response['task_id']}")
t = ComputeTask(self, json_response["task_id"], resp)
return await t.poll_task("200", iter(self.polling_sleep_times))
job_info = await t.poll_task("200", iter(self.polling_sleep_times))
result = job_info[0]
result.update({"system": job_info[1]})
return result

async def submit_extract_job(
self,
Expand All @@ -1799,7 +1812,7 @@ async def submit_extract_job(
time: Optional[str] = None,
stage_out_job_id: Optional[str] = None,
account: Optional[str] = None,
) -> t.JobSubmit:
) -> t.InternalTransferJobSubmit:
"""Extract files.
If you don't select the extension, FirecREST will try to guess the right command based on the extension of the sourcePath.
Supported extensions are `.zip`, `.tar`, `.tgz`, `.gz` and `.bz2`.
Expand Down Expand Up @@ -1831,10 +1844,14 @@ async def submit_extract_job(
stage_out_job_id,
account,
resp,
extension=extension,
)
logger.info(f"Job submission task: {json_response['task_id']}")
t = ComputeTask(self, json_response["task_id"], resp)
return await t.poll_task("200", iter(self.polling_sleep_times))
job_info = await t.poll_task("200", iter(self.polling_sleep_times))
result = job_info[0]
result.update({"system": job_info[1]})
return result

async def submit_rsync_job(
self,
Expand All @@ -1845,7 +1862,7 @@ async def submit_rsync_job(
time: Optional[str] = None,
stage_out_job_id: Optional[str] = None,
account: Optional[str] = None,
) -> t.JobSubmit:
) -> t.InternalTransferJobSubmit:
"""Transfer files between internal CSCS file systems.
Transfer source_path to target_path.
Possible to stage-out jobs providing the SLURM Id of a production job.
Expand Down Expand Up @@ -1877,7 +1894,10 @@ async def submit_rsync_job(
)
logger.info(f"Job submission task: {json_response['task_id']}")
t = ComputeTask(self, json_response["task_id"], resp)
return await t.poll_task("200", iter(self.polling_sleep_times))
job_info = await t.poll_task("200", iter(self.polling_sleep_times))
result = job_info[0]
result.update({"system": job_info[1]})
return result

async def submit_delete_job(
self,
Expand All @@ -1887,7 +1907,7 @@ async def submit_delete_job(
time: Optional[str] = None,
stage_out_job_id: Optional[str] = None,
account: Optional[str] = None,
) -> t.JobSubmit:
) -> t.InternalTransferJobSubmit:
"""Remove files in internal CSCS file systems.
Remove file in target_path.
Possible to stage-out jobs providing the SLURM Id of a production job.
Expand Down Expand Up @@ -1918,7 +1938,10 @@ async def submit_delete_job(
)
logger.info(f"Job submission task: {json_response['task_id']}")
t = ComputeTask(self, json_response["task_id"], resp)
return await t.poll_task("200", iter(self.polling_sleep_times))
job_info = await t.poll_task("200", iter(self.polling_sleep_times))
result = job_info[0]
result.update({"system": job_info[1]})
return result

async def external_upload(
self, machine: str, source_path: str, target_path: str
Expand Down
Loading

0 comments on commit 56ba1f9

Please sign in to comment.