diff --git a/arango/async.py b/arango/async.py index 7318751e..819dcf97 100644 --- a/arango/async.py +++ b/arango/async.py @@ -26,6 +26,10 @@ class AsyncExecution(Connection): instance (which holds the result of the request) is returned each time an API request is queued, otherwise ``None`` is returned :type return_result: bool + + .. warning:: + Asynchronous execution is currently an experimental feature and is not + thread-safe. """ def __init__(self, connection, return_result=True): diff --git a/arango/batch.py b/arango/batch.py index 7e14ac62..e6193ff5 100644 --- a/arango/batch.py +++ b/arango/batch.py @@ -28,6 +28,10 @@ class BatchExecution(Connection): so far are committed even if an exception is raised before existing out of the context (default: ``False``) :type commit_on_error: bool + + .. warning:: + Batch execution is currently an experimental feature and is not + thread-safe. """ def __init__(self, connection, return_result=True, commit_on_error=False): diff --git a/arango/client.py b/arango/client.py index 5bd44da7..8c743757 100644 --- a/arango/client.py +++ b/arango/client.py @@ -955,13 +955,13 @@ def revoke_user_access(self, username, database): def async_jobs(self, status, count=None): """Return the IDs of asynchronous jobs with the specified status. - :param status: the job status (``"pending"`` or ``"done"``) + :param status: The job status (``"pending"`` or ``"done"``). :type status: str | unicode - :param count: the maximum number of job IDs to return + :param count: The maximum number of job IDs to return. :type count: int - :returns: the list of job IDs + :returns: The list of job IDs. :rtype: [str] - :raises arango.exceptions.AsyncJobListError: if the retrieval fails + :raises arango.exceptions.AsyncJobListError: If the retrieval fails. .. note:: Only the root user can access this method. For non-root users, @@ -979,13 +979,13 @@ def async_jobs(self, status, count=None): def clear_async_jobs(self, threshold=None): """Delete asynchronous job results from the server. - :param threshold: if specified, only the job results created prior to + :param threshold: If specified, only the job results created prior to the threshold (a unix timestamp) are deleted, otherwise *all* job - results are deleted + results are deleted. :type threshold: int - :returns: whether the deletion of results was successful + :returns: Whether the deletion of results was successful. :rtype: bool - :raises arango.exceptions.AsyncJobClearError: if the operation fails + :raises arango.exceptions.AsyncJobClearError: If the operation fails. .. note:: Async jobs currently queued or running are not stopped. diff --git a/arango/database.py b/arango/database.py index c51d3645..3d2828a7 100644 --- a/arango/database.py +++ b/arango/database.py @@ -1150,13 +1150,13 @@ def revoke_user_access(self, username, database=None): def async_jobs(self, status, count=None): """Return the IDs of asynchronous jobs with the specified status. - :param status: the job status (``"pending"`` or ``"done"``) + :param status: The job status (``"pending"`` or ``"done"``). :type status: str | unicode - :param count: the maximum number of job IDs to return + :param count: The maximum number of job IDs to return. :type count: int - :returns: the list of job IDs + :returns: The list of job IDs. :rtype: [str] - :raises arango.exceptions.AsyncJobListError: if the retrieval fails + :raises arango.exceptions.AsyncJobListError: If the retrieval fails. """ res = self._conn.get( '/_api/job/{}'.format(status), @@ -1169,13 +1169,13 @@ def async_jobs(self, status, count=None): def clear_async_jobs(self, threshold=None): """Delete asynchronous job results from the server. - :param threshold: if specified, only the job results created prior to + :param threshold: If specified, only the job results created prior to the threshold (a unix timestamp) are deleted, otherwise *all* job - results are deleted + results are deleted. :type threshold: int - :returns: whether the deletion of results was successful + :returns: Whether the deletion of results was successful. :rtype: bool - :raises arango.exceptions.AsyncJobClearError: if the operation fails + :raises arango.exceptions.AsyncJobClearError: If the operation fails. .. note:: Async jobs currently queued or running are not stopped. @@ -1190,3 +1190,71 @@ def clear_async_jobs(self, threshold=None): if res.status_code in HTTP_OK: return True raise AsyncJobClearError(res) + + ############### + # Pregel Jobs # + ############### + + def create_pregel_job(self, algorithm, graph): + """Start/create a Pregel job. + + :param algorithm: The name of the algorithm (e.g. ``"pagerank"``). + :type algorithm: str | unicode + :param graph: The name of the graph. + :type graph: str | unicode + :returns: The ID of the Pregel job. + :rtype: int + :raises arango.exceptions.PregelJobCreateError: If the operation fails. + + """ + res = self._conn.post( + '/_api/control_pregel', + data={ + 'algorithm': algorithm, + 'graphName': graph, + } + ) + if res.status_code in HTTP_OK: + return res.body + raise PregelJobCreateError(res) + + def pregel_job(self, job_id): + """Return the details of a Pregel job. + + :param job_id: The Pregel job ID. + :type job_id: int + :returns: The details of the Pregel job. + :rtype: dict + :raises arango.exceptions.PregelJobGetError: If the lookup fails. + """ + res = self._conn.get( + '/_api/control_pregel/{}'.format(job_id) + ) + if res.status_code in HTTP_OK: + return { + 'aggregators': res.body['aggregators'], + 'edge_count': res.body.get('edgeCount'), + 'gss': res.body['gss'], + 'received_count': res.body['receivedCount'], + 'send_count': res.body['sendCount'], + 'state': res.body['state'], + 'total_runtime': res.body['totalRuntime'], + 'vertex_count': res.body.get('vertexCount') + } + raise PregelJobGetError(res) + + def delete_pregel_job(self, job_id): + """Cancel/delete a Pregel job. + + :param job_id: The Pregel job ID. + :type job_id: int + :returns: ``True`` if the Pregel job was successfully cancelled. + :rtype: bool + :raises arango.exceptions.PregelJobDeleteError: If the deletion fails. + """ + res = self._conn.delete( + '/_api/control_pregel/{}'.format(job_id) + ) + if res.status_code in HTTP_OK: + return True + raise PregelJobDeleteError(res) diff --git a/arango/exceptions.py b/arango/exceptions.py index 4038b3e0..74b6a9a0 100644 --- a/arango/exceptions.py +++ b/arango/exceptions.py @@ -448,6 +448,21 @@ class AsyncJobResultError(ArangoError): class AsyncJobClearError(ArangoError): """Failed to delete the asynchronous job result from the server.""" +##################### +# Pregel Exceptions # +##################### + +class PregelJobCreateError(ArangoError): + """Failed to start/create a Pregel job.""" + + +class PregelJobGetError(ArangoError): + """Failed to retrieve a Pregel job.""" + + +class PregelJobDeleteError(ArangoError): + """Failed to cancel/delete a Pregel job.""" + ########################### # Cluster Test Exceptions # diff --git a/arango/version.py b/arango/version.py index 57385738..74b94b16 100644 --- a/arango/version.py +++ b/arango/version.py @@ -1 +1 @@ -VERSION = '3.10.1' +VERSION = '3.11.0' diff --git a/docs/async.rst b/docs/async.rst index 067dd795..506aed15 100644 --- a/docs/async.rst +++ b/docs/async.rst @@ -12,6 +12,10 @@ fire-and-forget style. The results of the requests can be retrieved later via The user should be mindful of the server-side memory while using asynchronous executions with a large number of requests. +.. warning:: + Asynchronous execution is currently an experimental feature and is not + thread-safe. + Here is an example showing how asynchronous executions can be used: .. code-block:: python diff --git a/docs/batch.rst b/docs/batch.rst index 14d13b3c..ec542693 100644 --- a/docs/batch.rst +++ b/docs/batch.rst @@ -12,6 +12,10 @@ retrieved via :ref:`BatchJob` objects. The user should be mindful of the client-side memory while using batch executions with a large number of requests. +.. warning:: + Batch execution is currently an experimental feature and is not + thread-safe. + Here is an example showing how batch executions can be used: .. code-block:: python diff --git a/docs/index.rst b/docs/index.rst index 38090325..db495026 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -47,13 +47,6 @@ You may need to use ``sudo`` depending on your environment. .. _PyPi: https://pypi.python.org/pypi/python-arango .. _GitHub: https://github.com/joowani/python-arango -A Note on Thread Safety and Eventlet -======== - -This driver should be compatible with eventlet for the most part. By default, python-arango makes API calls using the requests library, which eventlet seems to be able to monkey patch. - -Assuming that, all python-arango APIs except Batch Execution and Asynchronous Execution should be thread-safe. - Contents ======== @@ -76,6 +69,8 @@ Contents user task wal + pregel + threading errors logging classes diff --git a/docs/logging.rst b/docs/logging.rst index 057521ef..b0810be1 100644 --- a/docs/logging.rst +++ b/docs/logging.rst @@ -48,7 +48,7 @@ The logging output for above would look something like this: In order to see the full request information, turn on logging for the requests_ library which **python-arango** uses under the hood: -.. _requests: https://github.com/kennethreitz/requests +.. _requests: https://github.com/requests/requests .. code-block:: python diff --git a/docs/pregel.rst b/docs/pregel.rst new file mode 100644 index 00000000..9aa94f69 --- /dev/null +++ b/docs/pregel.rst @@ -0,0 +1,38 @@ +.. _pregel-page: + +Pregel +------ + +**Python-arango** provides APIs for distributed iterative graph processing +(Pregel). For more information, please refer to the ArangoDB manual +`here `__. + +Here is an example showing how Pregel jobs can be started, fetched or cancelled: + +.. code-block:: python + + from arango import ArangoClient + + client = ArangoClient() + db = client.db('my_database') + db.create_graph('my_graph') + + # Create and start a new Pregel job + job_id = db.create_pregel_job(algorithm='pagerank', graph='my_graph') + + # Get the details of a Pregel job by its ID + job = db.pregel_job(job_id) + print(job['aggregators']) + print(job['edge_count']) + print(job['gss']) + print(job['received_count']) + print(job['send_count']) + print(job['state']) + print(job['total_runtime']) + print(job['vertex_count']) + + # Delete/cancel a Pregel job by its ID + db.delete_pregel_job(job_id) + +Refer to class :class:`arango.database.Database` for more details on the methods +for Pregel jobs. diff --git a/docs/threading.rst b/docs/threading.rst new file mode 100644 index 00000000..a7038d1c --- /dev/null +++ b/docs/threading.rst @@ -0,0 +1,24 @@ +.. _multithreading-page: + +Multithreading +-------------- + + +Notes on Eventlet +================= + +**Python-arango** should be compatible with eventlet_ *for the most part*. +By default, **python-arango** makes API calls to ArangoDB using the requests_ +library which can be monkeypatched: + +.. code-block:: python + + import eventlet + requests = eventlet.import_patched("requests") + +.. _requests: https://github.com/requests/requests +.. _eventlet: http://eventlet.net + +Assuming the requests library is used and monkeypatched properly, all +python-arango APIs except :ref:`Batch Execution ` and +:ref:`Async Execution ` should be thread-safe. diff --git a/tests/test_pregel.py b/tests/test_pregel.py new file mode 100644 index 00000000..becfa209 --- /dev/null +++ b/tests/test_pregel.py @@ -0,0 +1,88 @@ +from __future__ import absolute_import, unicode_literals + + +import pytest + +from arango import ArangoClient +from arango.exceptions import ( + PregelJobCreateError, + PregelJobGetError, + PregelJobDeleteError +) + +from .utils import ( + generate_db_name, + generate_col_name, + generate_graph_name, +) + +arango_client = ArangoClient() +db_name = generate_db_name() +db = arango_client.create_database(db_name) +graph_name = generate_graph_name() +graph = db.create_graph(graph_name) +from_col_name = generate_col_name() +to_col_name = generate_col_name() +edge_col_name = generate_col_name() +graph.create_vertex_collection(from_col_name) +graph.create_vertex_collection(to_col_name) +graph.create_edge_definition( + edge_col_name, [from_col_name], [to_col_name] +) + + +def teardown_module(*_): + arango_client.delete_database(db_name, ignore_missing=True) + + +@pytest.mark.order1 +def test_start_pregel_job(): + # Test start_pregel_job with page rank algorithm (happy path) + job_id = db.create_pregel_job('pagerank', graph_name) + assert isinstance(job_id, int) + + # Test start_pregel_job with unsupported algorithm + with pytest.raises(PregelJobCreateError): + db.create_pregel_job('unsupported_algorithm', graph_name) + + +@pytest.mark.order2 +def test_get_pregel_job(): + # Create a test Pregel job + job_id = db.create_pregel_job('pagerank', graph_name) + + # Test pregel_job with existing job ID (happy path) + job = db.pregel_job(job_id) + assert isinstance(job['aggregators'], dict) + assert isinstance(job['gss'], int) + assert isinstance(job['received_count'], int) + assert isinstance(job['send_count'], int) + assert isinstance(job['total_runtime'], float) + assert job['state'] == 'running' + assert 'edge_count' in job + assert 'vertex_count' in job + + # Test pregel_job with an invalid job ID + with pytest.raises(PregelJobGetError): + db.pregel_job(-1) + + +@pytest.mark.order3 +def test_delete_pregel_job(): + # Create a test Pregel job + job_id = db.create_pregel_job('pagerank', graph_name) + + # Get the newly created job + job = db.pregel_job(job_id) + assert job['state'] == 'running' + + # Test delete_pregel_job with existing job ID (happy path) + assert db.delete_pregel_job(job_id) == True + + # The fetch for the same job should now fail + with pytest.raises(PregelJobGetError): + db.pregel_job(job_id) + + # Test delete_pregel_job with an invalid job ID + with pytest.raises(PregelJobDeleteError): + db.delete_pregel_job(-1)