Skip to content

Commit

Permalink
Add asynchronous client for pyfirecrest (#65)
Browse files Browse the repository at this point in the history
* ✨ Add typing

* make upload/download more permissive

* apply black

* Add types in to documentation

* Update types

* fix formatting

* Add undoc-members in types reference

* First implementation of AsyncFirecrest client

* Add httpx in the dependencies

* Add some basic example for the async client

* Set polling rate in example

* Fix import

* Fix external transfer

* Add example for external transfers

* Add async external objects in reference

* Expose async external objects

* Fix issue with mixed responses in case of errors

* Get the access token before stalling requests

* Add documentation for async client + refactor

* Decouple async and sync clients

* Split ExternalStorage objects to another file

* Fix typo

* Fix typo in filename

* Refactor status tests

* Add auth handler

* Refactor compute tests

* Refactor extra tests

* Refactor reservation tests

* Refactor storage tests

* Refactor utilities tests

* Duplicate compute testing for async version

* Add compute tests

* Add extra tests for async client

* Add reservation async tests + small fixes

* Add status tests for async client + small fixes

* Add utilities tests + fix async whoami

* Add storage tests for async version

* Remove unused imports and fix annotation

* Small fixes

* Fix type errors

* Small fix in docs

* Remove old installation instructions
  • Loading branch information
ekouts committed Aug 17, 2023
1 parent cb541bc commit 526ca8a
Show file tree
Hide file tree
Showing 31 changed files with 5,094 additions and 1,054 deletions.
89 changes: 89 additions & 0 deletions async-example.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# Examples for asyncio with pyfirecrest

### Simple asynchronous workflow with the new client

Here is an example of how to use the `AsyncFirecrest` client with asyncio.

```python
import firecrest
import asyncio
import logging


# Setup variables before running the script
client_id = ""
client_secret = ""
token_uri = ""
firecrest_url = ""

machine = ""
local_script_path = ""

# Ignore this part, it is simply setup for logging
logger = logging.getLogger("simple_example")
logger.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
formatter = logging.Formatter("%(asctime)s - %(message)s", datefmt="%H:%M:%S")
ch.setFormatter(formatter)
logger.addHandler(ch)

async def workflow(client, i):
logger.info(f"{i}: Starting workflow")
job = await client.submit(machine, local_script_path)
logger.info(f"{i}: Submitted job with jobid: {job['jobid']}")
while True:
poll_res = await client.poll_active(machine, [job["jobid"]])
if len(poll_res) < 1:
logger.info(f"{i}: Job {job['jobid']} is no longer active")
break

logger.info(f"{i}: Job {job['jobid']} status: {poll_res[0]['state']}")
await asyncio.sleep(30)

output = await client.view(machine, job["job_file_out"])
logger.info(f"{i}: job output: {output}")


async def main():
auth = firecrest.ClientCredentialsAuth(client_id, client_secret, token_uri)
client = firecrest.AsyncFirecrest(firecrest_url, authorization=auth)

# Set up the desired polling rate for each microservice. The float number
# represents the number of seconds between consecutive requests in each
# microservice. Default is 5 seconds for now.
client.time_between_calls = {
"compute": 5,
"reservations": 5,
"status": 5,
"storage": 5,
"tasks": 5,
"utilities": 5,
}

workflows = [workflow(client, i) for i in range(5)]
await asyncio.gather(*workflows)


asyncio.run(main())

```


### External transfers with `AsyncFirecrest`

The uploads and downloads work as before but you have to keep in mind which methods are coroutines.

```python
# Download
down_obj = await client.external_download("cluster", "/remote/path/to/the/file")
status = await down_obj.status
print(status)
await down_obj.finish_download("my_local_file")

# Upload
up_obj = await client.external_upload("cluster", "/path/to/local/file", "/remote/path/to/filesystem")
await up_obj.finish_upload()
status = await up_obj.status
print(status)
```
5 changes: 2 additions & 3 deletions docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@ You can also clone it from `Github <https://github.com/eth-cscs/pyfirecrest>`__
:caption: Contents:

authorization
tutorial
reference
cli_reference
tutorial_index
reference_index

Contact
=======
Expand Down
29 changes: 29 additions & 0 deletions docs/source/reference_async.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
Asynchronous FirecREST objects
==============================

The library also provides an asynchronous API for the client:

The ``AsyncFirecrest`` class
****************************
.. autoclass:: firecrest.AsyncFirecrest
:members:
:undoc-members:
:show-inheritance:


The ``AsyncExternalDownload`` class
***********************************
.. autoclass:: firecrest.AsyncExternalDownload
:inherited-members:
:members:
:undoc-members:
:show-inheritance:


The ``AsyncExternalUpload`` class
*********************************
.. autoclass:: firecrest.AsyncExternalUpload
:inherited-members:
:members:
:undoc-members:
:show-inheritance:
4 changes: 2 additions & 2 deletions docs/source/reference.rst → docs/source/reference_basic.rst
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Reference
=========
The basic client
================

The wrapper includes the ``Firecrest`` class, which is in practice a very basic client.
Together with the authorisation class it takes care of the token and makes the appropriate calls for each action.
Expand Down
File renamed without changes.
10 changes: 10 additions & 0 deletions docs/source/reference_index.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
Reference
=========

.. toctree::
:maxdepth: 2
:caption: Contents:

reference_basic
reference_async
reference_cli
124 changes: 124 additions & 0 deletions docs/source/tutorial_async.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
How to use the asynchronous API [experimental]
==============================================

In this tutorial, we will explore the asynchronous API of the pyFirecREST library.
Asynchronous programming is a powerful technique that allows you to write more efficient and responsive code by handling concurrent tasks without blocking the main execution flow.
This capability is particularly valuable when dealing with time-consuming operations such as network requests, I/O operations, or interactions with external services.

In order to take advantage of the asynchronous client you may need to make many changes in your existing code, so the effort is worth it when you develop a code from the start or if you need to make a large number of requests.
You could submit hundreds or thousands of jobs, set a reasonable rate and pyFirecREST will handle it in the background without going over the request rate limit or overflowing the system.

If you are already familiar with the synchronous version of pyFirecREST, you will find it quite straightforward to adapt to the asynchronous paradigm.

We will be going through an example that will use the `asyncio library <https://docs.python.org/3/library/asyncio.html>`__.
First you will need to create an ``AsyncFirecrest`` object, instead of the simple ``Firecrest`` object.

.. code-block:: Python
client = fc.AsyncFirecrest(
firecrest_url=<firecrest_url>,
authorization=MyAuthorizationClass()
)
As you can see in the reference, the methods of ``AsyncFirecrest`` have the same name as the ones from the simple client, with the same arguments and types, but you will need to use the async/await syntax when you call them.

Here is an example of the calls we saw in the previous section:

.. code-block:: Python
# Getting all the systems
systems = await client.all_systems()
print(systems)
# Getting the files of a directory
files = await client.list_files("cluster", "/home/test_user")
print(files)
# Submit a job
job = await client.submit("cluster", "script.sh")
print(job)
The uploads and downloads work as before but you have to keep in mind which methods are coroutines.

.. code-block:: Python
# Download
down_obj = await client.external_download("cluster", "/remote/path/to/the/file")
status = await down_obj.status
print(status)
await down_obj.finish_download("my_local_file")
# Upload
up_obj = await client.external_upload("cluster", "/path/to/local/file", "/remote/path/to/filesystem")
await up_obj.finish_upload()
status = await up_obj.status
print(status)
Here is a more complete example for how you could use the asynchronous client:


.. code-block:: Python
import firecrest
import asyncio
import logging
# Setup variables before running the script
client_id = ""
client_secret = ""
token_uri = ""
firecrest_url = ""
machine = ""
local_script_path = ""
# This is simply setup for logging, you can ignore it
logger = logging.getLogger("simple_example")
logger.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
formatter = logging.Formatter("%(asctime)s - %(message)s", datefmt="%H:%M:%S")
ch.setFormatter(formatter)
logger.addHandler(ch)
async def workflow(client, i):
logger.info(f"{i}: Starting workflow")
job = await client.submit(machine, local_script_path)
logger.info(f"{i}: Submitted job with jobid: {job['jobid']}")
while True:
poll_res = await client.poll_active(machine, [job["jobid"]])
if len(poll_res) < 1:
logger.info(f"{i}: Job {job['jobid']} is no longer active")
break
logger.info(f"{i}: Job {job['jobid']} status: {poll_res[0]['state']}")
await asyncio.sleep(30)
output = await client.view(machine, job["job_file_out"])
logger.info(f"{i}: job output: {output}")
async def main():
auth = firecrest.ClientCredentialsAuth(client_id, client_secret, token_uri)
client = firecrest.AsyncFirecrest(firecrest_url, authorization=auth)
# Set up the desired polling rate for each microservice. The float number
# represents the number of seconds between consecutive requests in each
# microservice.
client.time_between_calls = {
"compute": 5,
"reservations": 5,
"status": 5,
"storage": 5,
"tasks": 5,
"utilities": 5,
}
workflows = [workflow(client, i) for i in range(5)]
await asyncio.gather(*workflows)
asyncio.run(main())
101 changes: 4 additions & 97 deletions docs/source/tutorial.rst → docs/source/tutorial_basic.rst
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Tutorial
========
Simple tutorial
===============

Your starting point to use pyFirecREST will be the creation of a FirecREST object.
This is simply a mini client that, in cooperation with the authorization object, will take care of the necessary requests that need to be made and handle the responses.
Expand All @@ -9,7 +9,7 @@ For this tutorial we will assume the simplest kind of authorization class, where

.. code-block:: Python
import firecrest as f7t
import firecrest as fc
class MyAuthorizationClass:
def __init__(self):
Expand All @@ -19,7 +19,7 @@ For this tutorial we will assume the simplest kind of authorization class, where
return <TOKEN>
# Setup the client with the appropriate URL and the authorization class
client = f7t.Firecrest(firecrest_url=<firecrest_url>, authorization=MyAuthorizationClass())
client = fc.Firecrest(firecrest_url=<firecrest_url>, authorization=MyAuthorizationClass())
Simple blocking requests
Expand Down Expand Up @@ -216,96 +216,3 @@ The simplest way to do the uploading through pyFirecREST is as follows:
But, as before, you can get the necessary components for the upload from the ``object_storage_data`` property.
You can get the link, as well as all the necessary arguments for the request to Object Storage and the full command you could perform manually from the terminal.

Enable logging in your python code
----------------------------------

The simplest way to enable logging in your code would be to add this in the beginning of your file:

.. code-block:: Python
import logging
logging.basicConfig(
level=logging.INFO,
format="%(levelname)s:%(name)s:%(message)s",
)
pyFirecREST has all of it's messages in `INFO` level. If you want to avoid messages from other packages, you can do the following:

.. code-block:: Python
import logging
logging.basicConfig(
level=logging.WARNING,
format="%(levelname)s:%(name)s:%(message)s",
)
logging.getLogger("firecrest").setLevel(logging.INFO)
Handling of errors
------------------

The methods of the Firecrest, ExternalUpload and ExternalDownload objects can raise exceptions in case something goes wrong.
When the error comes from the response of some request pyFirecREST will raise ``FirecrestException``.
In these cases you can manually examine all the responses from the requests in order to get more information, when the message is not informative enough.
These responses are from the requests package of python and you can get all types of useful information from it, like the status code, the json response, the headers and more.
Here is an example of the code that will handle those failures.

.. code-block:: Python
try:
parameters = client.parameters()
print(f"Firecrest parameters: {parameters}")
except fc.FirecrestException as e:
# You can just print the exception to get more information about the type of error,
# for example an invalid or expired token.
print(e)
# Or you can manually examine the responses.
print(e.responses[-1])
except Exception as e:
# You might also get regular exceptions in some cases. For example when you are
# trying to upload a file that doesn't exist in your local filesystem.
pass
CLI support
-----------

After version 1.3.0, pyFirecREST comes together with a CLI but for now it can only be used with the `f7t.ClientCredentialsAuth` authentication class.

You will need to set the environment variables ``FIRECREST_CLIENT_ID``, ``FIRECREST_CLIENT_SECRET`` and ``AUTH_TOKEN_URL`` to set up the Client Credentials client, as well as ``FIRECREST_URL`` with the URL for the FirecREST instance you are using.

After that you can explore the capabilities of the CLI with the `--help` option:

.. code-block:: bash
firecrest --help
firecrest ls --help
firecrest submit --help
firecrest upload --help
firecrest download --help
firecrest submit-template --help
Some basic examples:

.. code-block:: bash
# Get the available systems
firecrest systems
# Get the parameters of different microservices of FirecREST
firecrest parameters
# List files of directory
firecrest ls cluster1 /home
# Submit a job
firecrest submit cluster script.sh
# Upload a "small" file (you can check the maximum size in `UTILITIES_MAX_FILE_SIZE` from the `parameters` command)
firecrest upload --type=direct cluster local_file.txt /path/to/cluster/fs
# Upload a "large" file
firecrest upload --type=external cluster local_file.txt /path/to/cluster/fs
# You will have to finish the upload with a second command that will be given in the output
Loading

0 comments on commit 526ca8a

Please sign in to comment.