Skip to content

Commit

Permalink
Merge pull request #16665 from jdavcs/dev_docs_runner
Browse files Browse the repository at this point in the history
Clarify documentation on how to build a job runner
  • Loading branch information
mvdbeek authored Sep 10, 2023
2 parents f43f350 + 0ab66ae commit 19faa82
Showing 1 changed file with 80 additions and 67 deletions.
147 changes: 80 additions & 67 deletions doc/source/dev/build_a_job_runner.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ Build a job runner
A walk through the steps of building a runner for Galaxy
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

In this tutorial, we will build a runner in a block by block fashion
(like building blocks), so we will divide the runner into
components based on their function.
In this tutorial, we will look at how to build a runner in a block by block
fashion (like building blocks), so we will divide the runner into components
based on their function.

We assume you are familiar with setting up and managing a local installation of Galaxy.

Expand All @@ -20,43 +20,53 @@ What is required to make a runner for Galaxy?
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

`galaxy.jobs.runners.\_\_init\_\_.py <https://github.com/galaxyproject/galaxy/blob/dev/lib/galaxy/jobs/runners/__init__.py>`__
has the base runner implementation. To create a new runner, that base
runner must be inherited and only certain methods need to be
contains the base runner implementation. To create a new runner, the base
runner class (in most cases, ``AsynchronousJobRunner``) must be inherited and only certain methods need to be
overridden with your logic.

These are the methods that need to be implemented:

1. ``__init__(app, nworkers, **kwargs)``
1. ``queue_job(job_wrapper)``

2. ``queue_job(job_wrapper)``
2. ``check_watched_item(job_state)``

3. ``check_watched_item(job_state)``
3. ``stop_job(job)``

4. ``stop_job(job)``
4. ``recover(job, job_wrapper)``

5. ``recover(job, job_wrapper)``
In addition, you will almost certainly override the ``__init__(app, nworkers, **kwargs)``
method in order to add custom logic to initialize your runner. In
doing so, make sure to call the parent class constructor:

.. code-block:: python
super().__init__(app, nworkers, **kwargs)
Keep in mind that when you override a method, you should not reimplement any of
the logic that is present in the base class: the above call to ``super()``
ensures that all that such logic is handled automatically.

The big picture
---------------

The above methods are invoked at various stages of job execution in
Galaxy. These methods will act as a mediator between the Galaxy
framework and the external execution platform. To know when and how
framework and the external execution platform. To learn when and how
these methods are invoked, we will look at the implementation of
the parent class and process lifecycle of a runner.

Implementation of parent class (``galaxy.jobs.runners.__init__.py``)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

- .. rubric:: Class Inheritance structure
:name: class-inheritance-structure
Class Inheritance structure
---------------------------

.. image:: inherit.png
.. image:: inherit.png

- .. rubric:: The big picture!
:name: the-big-picture-1
The big picture
---------------

.. image:: runner_diag.png
.. image:: runner_diag.png

The whole process is divided into different stages for ease of
understanding.
Expand All @@ -65,7 +75,7 @@ Runner Methods in detail
~~~~~~~~~~~~~~~~~~~~~~~~

1. ``__init__`` method - STAGE 1
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
--------------------------------

Input params:

Expand Down Expand Up @@ -107,28 +117,28 @@ Have a look at the sample ``job_conf.xml``:
</destinations>
</job_conf>
The following steps are followed to manipulate the data in ``job_conf.xml``
The data in ``job_conf.xml`` is manipulated through the following steps:

A: Define structure of data under plugin tag (plugin tag in
**Step 1:** Define structure of data under plugins tag (in
``job_conf.xml``) as a dictionary.

.. code-block:: python
runner_param_specs = dict(user=dict(map=str), key=dict(map=str))
B: Update the dictionary structure in kwargs.
**Step 2:** Update the dictionary structure in kwargs.

.. code-block:: python
kwargs.update({'runner_param_specs': runner_param_specs})
C: Now call the parent constructor to assign the values.
**Step 3:** Now call the parent constructor to assign the values.

.. code-block:: python
super(GodockerJobRunner, self).__init__(app, nworkers, **kwargs)
super().__init__(app, nworkers, **kwargs)
D: The assigned values can be accessed in runner in the following way.
**Step 4:** The assigned values can be accessed in a runner in the following way.

.. code-block:: python
Expand All @@ -142,10 +152,10 @@ The output will be:
gosc
HELLOWORLD

E: Invoke the external API with the values obtained by the above method
**Step 5:** Invoke the external API with the values obtained by the above method
for initialization.

Finally the worker threads and monitor threads are invoked for galaxy to
Finally, the worker threads and monitor threads are invoked for galaxy to
listen for incoming tool submissions.

.. code-block:: python
Expand All @@ -154,51 +164,54 @@ listen for incoming tool submissions.
self._init_worker_threads()
2. ``queue_job`` method - STAGE 2
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
---------------------------------

Input params: ``job_wrapper`` (Object of
Input params: ``job_wrapper`` (Object of type
`galaxy.jobs.JobWrapper <https://github.com/galaxyproject/galaxy/blob/dev/lib/galaxy/jobs/__init__.py#L743>`__)

Output params: None

``galaxy.jobs.JobWrapper`` is a Wrapper around 'model.Job' with convenience
``galaxy.jobs.JobWrapper`` is a wrapper around 'model.Job' with convenience
methods for running processes and state management.

- Functioning of ``queue_job`` method.
Functioning of ``queue_job`` method
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

The logic in the ``queue_job`` method follows these steps:

A. ``prepare_job()`` method is invoked to do some sanity checks that all runners' ``queue_job()`` methods are
likely to want to do and also to build runner command line for that
job. Initial state and configuration of the job are set and every
data is associated with **job\_wrapper**.
**Step 1.** ``prepare_job()`` method is invoked to do some sanity checks that all runners' ``queue_job()`` methods are
likely to want to do and also to build the runner command line for that
job. Initial state and configuration of the job are set and all
data is associated with **job\_wrapper**.

B. Submit job to the external runner and return the jobid. Accessing
jobs data (tool submitted in Galaxy webframework) is purely from
``job_wrapper``. eg: ``job_wrapper.get_state()`` -> gives state of a job
(queued/running/failed/success/...)
**Step 2.** Submit job to the external runner and return the job id. Accessing
jobs data (tool submitted in Galaxy webframework) is purely from
``job_wrapper``. eg: ``job_wrapper.get_state()`` -> gives state of a job
(queued/running/failed/success/...)

Let us look at a means of accessing external runner's configuration
present under destination tag of ``job_conf.xml`` in the above example.
Let us look at how to access the external runner's configuration
present under the destination tag of ``job_conf.xml`` in the above example.

.. code-block:: python
job_destination = job_wrapper.job_destination
docker_cpu = int(job_destination.params["docker_cpu"])
docker_ram = int(job_destination.params["docker_memory"])
A special case: User Story: A docker based external runner is present. A
A special case. User Story: a docker based external runner is present. A
default docker image for execution is set in ``job_conf.xml``. A tool can
also specify the docker image for its execution. Specification in tool
is given more priority than the default specification. To achieve such a
functionality. We can use the following statement:
also specify the docker image for its execution. Specification in the tool
is given more priority than the default specification. For this functionality
we can use the following statement:

.. code-block:: python
docker_image = self._find_container(job_wrapper).container_id
Note: This pre-written method is only for getting the external
image/container/os..
image/container/os.

C. After successful submission of a job to the external runner, submit the
**Step 3.** After successful submission of a job to the external runner, submit the
job to the Galaxy framework. To do that, make an object of type
AsynchronousJobState and put it in the ``monitor_queue``.

Expand All @@ -208,14 +221,14 @@ AsynchronousJobState and put it in the ``monitor_queue``.
self.monitor_queue.put(ajs)
3. ``check_watched_item`` method - STAGE 3
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
------------------------------------------

Input params: ``job_state`` (Object of
Input params: ``job_state`` (Object of type
`galaxy.jobs.runners.AsynchronousJobState <https://github.com/galaxyproject/galaxy/blob/dev/lib/galaxy/jobs/runners/__init__.py#L400>`__)

Output params: ``AsynchronousJobState`` object

Without going into much detail, assume there is a queue to track the status of every job. eg:
Without going into much detail, assume there is a queue to track the status of every job:

.. image:: queue.png
:align: center
Expand All @@ -236,15 +249,15 @@ Note: Iterating through the queue is already taken care of by the framework.

To inform Galaxy about the status of the job:

- Get the job status from external runner using the ``job_id``.
- Get the job status from the external runner using ``job_id``.

- Check if the job is queued/running/completed.. etc. A general structure is provided below.
- Check if the job is queued/running/completed, etc. A general structure is provided below.

- Call ``self.mark_as_finished(job_state)``, if the job has been successfully executed.
- Call ``self.mark_as_finished(job_state)`` if the job has been successfully executed.

- Call ``self.mark_as_failed(job_state)``, if the job has failed during execution.
- Call ``self.mark_as_failed(job_state)`` if the job has failed during execution.

- To change state of a job, change ``job_state.running`` and ``job_state.job_wrapper.change_state()``
- To change the state of a job, change ``job_state.running`` and call ``job_state.job_wrapper.change_state()``

.. code-block:: python
Expand Down Expand Up @@ -280,23 +293,23 @@ Note:
jobs (i.e. if it is running or pending). If it no longer needs to be watched (e.g. it has terminated
either successfully or with an error) it should return None.

``create_log_files()`` are nothing but copying the files (``error_file``,
``create_log_files()`` is nothing but copying the files (``error_file``,
``output_file``, ``exit_code_file``) from the external runner's directory to
the working directory of Galaxy.

Source of the files are from the output directory of your external
runner. Destination of the files will be:
The source of the files is the output directory of your external runner.
The destination of the files will be:

- output file -> ``job_state.output_file``.
- output file -> ``job_state.output_file``

- error file -> ``job_state.error_file``.
- error file -> ``job_state.error_file``

- exit code file -> ``job_state.exit_code_file``.
- exit code file -> ``job_state.exit_code_file``

4. ``stop_job`` method - STAGE 4
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
--------------------------------

Input params: job (Object of
Input params: job (Object of type
`galaxy.model.Job <https://github.com/galaxyproject/galaxy/blob/dev/lib/galaxy/model/__init__.py#L344>`__)

Output params: None
Expand All @@ -314,22 +327,22 @@ The ``job_id`` of the job to be deleted is accessed by
job.id
5. ``recover`` method - STAGE 5
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-------------------------------

Input params:

- ``job`` (Object of `galaxy.model.Job <https://github.com/galaxyproject/galaxy/blob/dev/lib/galaxy/model/__init__.py#L344>`__).
- ``job`` (Object of type `galaxy.model.Job <https://github.com/galaxyproject/galaxy/blob/dev/lib/galaxy/model/__init__.py#L344>`__).

- ``job_wrapper`` (Object of `galaxy.jobs.JobWrapper <https://github.com/galaxyproject/galaxy/blob/dev/lib/galaxy/jobs/__init__.py#L743>`__).
- ``job_wrapper`` (Object of type `galaxy.jobs.JobWrapper <https://github.com/galaxyproject/galaxy/blob/dev/lib/galaxy/jobs/__init__.py#L743>`__).


Output params: None

Functionality: Recovers any jobs stuck in a queued/running state when
Galaxy was started.
Galaxy starts.

This method is invoked by Galaxy at the time of startup. Jobs in Running
& Queued status in Galaxy are put in the ``monitor_queue`` by creating an
and Queued state in Galaxy are put in the ``monitor_queue`` by creating an
``AsynchronousJobState`` object.

The following is a generic code snippet for the ``recover`` method.
Expand Down

0 comments on commit 19faa82

Please sign in to comment.