Skip to content

Commit

Permalink
Update documentation on thread-safety and add API for pregel
Browse files Browse the repository at this point in the history
  • Loading branch information
joowani committed Aug 28, 2017
1 parent 9ca1045 commit 0a8a814
Show file tree
Hide file tree
Showing 13 changed files with 269 additions and 25 deletions.
4 changes: 4 additions & 0 deletions arango/async.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ class AsyncExecution(Connection):
instance (which holds the result of the request) is returned each
time an API request is queued, otherwise ``None`` is returned
:type return_result: bool
.. warning::
Asynchronous execution is currently an experimental feature and is not
thread-safe.
"""

def __init__(self, connection, return_result=True):
Expand Down
4 changes: 4 additions & 0 deletions arango/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ class BatchExecution(Connection):
so far are committed even if an exception is raised before existing
out of the context (default: ``False``)
:type commit_on_error: bool
.. warning::
Batch execution is currently an experimental feature and is not
thread-safe.
"""

def __init__(self, connection, return_result=True, commit_on_error=False):
Expand Down
16 changes: 8 additions & 8 deletions arango/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -955,13 +955,13 @@ def revoke_user_access(self, username, database):
def async_jobs(self, status, count=None):
"""Return the IDs of asynchronous jobs with the specified status.
:param status: the job status (``"pending"`` or ``"done"``)
:param status: The job status (``"pending"`` or ``"done"``).
:type status: str | unicode
:param count: the maximum number of job IDs to return
:param count: The maximum number of job IDs to return.
:type count: int
:returns: the list of job IDs
:returns: The list of job IDs.
:rtype: [str]
:raises arango.exceptions.AsyncJobListError: if the retrieval fails
:raises arango.exceptions.AsyncJobListError: If the retrieval fails.
.. note::
Only the root user can access this method. For non-root users,
Expand All @@ -979,13 +979,13 @@ def async_jobs(self, status, count=None):
def clear_async_jobs(self, threshold=None):
"""Delete asynchronous job results from the server.
:param threshold: if specified, only the job results created prior to
:param threshold: If specified, only the job results created prior to
the threshold (a unix timestamp) are deleted, otherwise *all* job
results are deleted
results are deleted.
:type threshold: int
:returns: whether the deletion of results was successful
:returns: Whether the deletion of results was successful.
:rtype: bool
:raises arango.exceptions.AsyncJobClearError: if the operation fails
:raises arango.exceptions.AsyncJobClearError: If the operation fails.
.. note::
Async jobs currently queued or running are not stopped.
Expand Down
84 changes: 76 additions & 8 deletions arango/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -1150,13 +1150,13 @@ def revoke_user_access(self, username, database=None):
def async_jobs(self, status, count=None):
"""Return the IDs of asynchronous jobs with the specified status.
:param status: the job status (``"pending"`` or ``"done"``)
:param status: The job status (``"pending"`` or ``"done"``).
:type status: str | unicode
:param count: the maximum number of job IDs to return
:param count: The maximum number of job IDs to return.
:type count: int
:returns: the list of job IDs
:returns: The list of job IDs.
:rtype: [str]
:raises arango.exceptions.AsyncJobListError: if the retrieval fails
:raises arango.exceptions.AsyncJobListError: If the retrieval fails.
"""
res = self._conn.get(
'/_api/job/{}'.format(status),
Expand All @@ -1169,13 +1169,13 @@ def async_jobs(self, status, count=None):
def clear_async_jobs(self, threshold=None):
"""Delete asynchronous job results from the server.
:param threshold: if specified, only the job results created prior to
:param threshold: If specified, only the job results created prior to
the threshold (a unix timestamp) are deleted, otherwise *all* job
results are deleted
results are deleted.
:type threshold: int
:returns: whether the deletion of results was successful
:returns: Whether the deletion of results was successful.
:rtype: bool
:raises arango.exceptions.AsyncJobClearError: if the operation fails
:raises arango.exceptions.AsyncJobClearError: If the operation fails.
.. note::
Async jobs currently queued or running are not stopped.
Expand All @@ -1190,3 +1190,71 @@ def clear_async_jobs(self, threshold=None):
if res.status_code in HTTP_OK:
return True
raise AsyncJobClearError(res)

###############
# Pregel Jobs #
###############

def create_pregel_job(self, algorithm, graph):
"""Start/create a Pregel job.
:param algorithm: The name of the algorithm (e.g. ``"pagerank"``).
:type algorithm: str | unicode
:param graph: The name of the graph.
:type graph: str | unicode
:returns: The ID of the Pregel job.
:rtype: int
:raises arango.exceptions.PregelJobCreateError: If the operation fails.
"""
res = self._conn.post(
'/_api/control_pregel',
data={
'algorithm': algorithm,
'graphName': graph,
}
)
if res.status_code in HTTP_OK:
return res.body
raise PregelJobCreateError(res)

def pregel_job(self, job_id):
"""Return the details of a Pregel job.
:param job_id: The Pregel job ID.
:type job_id: int
:returns: The details of the Pregel job.
:rtype: dict
:raises arango.exceptions.PregelJobGetError: If the lookup fails.
"""
res = self._conn.get(
'/_api/control_pregel/{}'.format(job_id)
)
if res.status_code in HTTP_OK:
return {
'aggregators': res.body['aggregators'],
'edge_count': res.body.get('edgeCount'),
'gss': res.body['gss'],
'received_count': res.body['receivedCount'],
'send_count': res.body['sendCount'],
'state': res.body['state'],
'total_runtime': res.body['totalRuntime'],
'vertex_count': res.body.get('vertexCount')
}
raise PregelJobGetError(res)

def delete_pregel_job(self, job_id):
"""Cancel/delete a Pregel job.
:param job_id: The Pregel job ID.
:type job_id: int
:returns: ``True`` if the Pregel job was successfully cancelled.
:rtype: bool
:raises arango.exceptions.PregelJobDeleteError: If the deletion fails.
"""
res = self._conn.delete(
'/_api/control_pregel/{}'.format(job_id)
)
if res.status_code in HTTP_OK:
return True
raise PregelJobDeleteError(res)
15 changes: 15 additions & 0 deletions arango/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,21 @@ class AsyncJobResultError(ArangoError):
class AsyncJobClearError(ArangoError):
"""Failed to delete the asynchronous job result from the server."""

#####################
# Pregel Exceptions #
#####################

class PregelJobCreateError(ArangoError):
"""Failed to start/create a Pregel job."""


class PregelJobGetError(ArangoError):
"""Failed to retrieve a Pregel job."""


class PregelJobDeleteError(ArangoError):
"""Failed to cancel/delete a Pregel job."""


###########################
# Cluster Test Exceptions #
Expand Down
2 changes: 1 addition & 1 deletion arango/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
VERSION = '3.10.1'
VERSION = '3.11.0'
4 changes: 4 additions & 0 deletions docs/async.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ fire-and-forget style. The results of the requests can be retrieved later via
The user should be mindful of the server-side memory while using
asynchronous executions with a large number of requests.

.. warning::
Asynchronous execution is currently an experimental feature and is not
thread-safe.

Here is an example showing how asynchronous executions can be used:

.. code-block:: python
Expand Down
4 changes: 4 additions & 0 deletions docs/batch.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ retrieved via :ref:`BatchJob` objects.
The user should be mindful of the client-side memory while using batch
executions with a large number of requests.

.. warning::
Batch execution is currently an experimental feature and is not
thread-safe.

Here is an example showing how batch executions can be used:

.. code-block:: python
Expand Down
9 changes: 2 additions & 7 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,6 @@ You may need to use ``sudo`` depending on your environment.
.. _PyPi: https://pypi.python.org/pypi/python-arango
.. _GitHub: https://github.com/joowani/python-arango

A Note on Thread Safety and Eventlet
========

This driver should be compatible with eventlet for the most part. By default, python-arango makes API calls using the requests library, which eventlet seems to be able to monkey patch.

Assuming that, all python-arango APIs except Batch Execution and Asynchronous Execution should be thread-safe.


Contents
========
Expand All @@ -76,6 +69,8 @@ Contents
user
task
wal
pregel
threading
errors
logging
classes
2 changes: 1 addition & 1 deletion docs/logging.rst
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ The logging output for above would look something like this:
In order to see the full request information, turn on logging for the requests_
library which **python-arango** uses under the hood:

.. _requests: https://github.com/kennethreitz/requests
.. _requests: https://github.com/requests/requests

.. code-block:: python
Expand Down
38 changes: 38 additions & 0 deletions docs/pregel.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
.. _pregel-page:

Pregel
------

**Python-arango** provides APIs for distributed iterative graph processing
(Pregel). For more information, please refer to the ArangoDB manual
`here <https://docs.arangodb.com/Manual/Graphs/Pregel/>`__.

Here is an example showing how Pregel jobs can be started, fetched or cancelled:

.. code-block:: python
from arango import ArangoClient
client = ArangoClient()
db = client.db('my_database')
db.create_graph('my_graph')
# Create and start a new Pregel job
job_id = db.create_pregel_job(algorithm='pagerank', graph='my_graph')
# Get the details of a Pregel job by its ID
job = db.pregel_job(job_id)
print(job['aggregators'])
print(job['edge_count'])
print(job['gss'])
print(job['received_count'])
print(job['send_count'])
print(job['state'])
print(job['total_runtime'])
print(job['vertex_count'])
# Delete/cancel a Pregel job by its ID
db.delete_pregel_job(job_id)
Refer to class :class:`arango.database.Database` for more details on the methods
for Pregel jobs.
24 changes: 24 additions & 0 deletions docs/threading.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
.. _multithreading-page:

Multithreading
--------------


Notes on Eventlet
=================

**Python-arango** should be compatible with eventlet_ *for the most part*.
By default, **python-arango** makes API calls to ArangoDB using the requests_
library which can be monkeypatched:

.. code-block:: python
import eventlet
requests = eventlet.import_patched("requests")
.. _requests: https://github.com/requests/requests
.. _eventlet: http://eventlet.net

Assuming the requests library is used and monkeypatched properly, all
python-arango APIs except :ref:`Batch Execution <batch-page>` and
:ref:`Async Execution <async-page>` should be thread-safe.
88 changes: 88 additions & 0 deletions tests/test_pregel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
from __future__ import absolute_import, unicode_literals


import pytest

from arango import ArangoClient
from arango.exceptions import (
PregelJobCreateError,
PregelJobGetError,
PregelJobDeleteError
)

from .utils import (
generate_db_name,
generate_col_name,
generate_graph_name,
)

arango_client = ArangoClient()
db_name = generate_db_name()
db = arango_client.create_database(db_name)
graph_name = generate_graph_name()
graph = db.create_graph(graph_name)
from_col_name = generate_col_name()
to_col_name = generate_col_name()
edge_col_name = generate_col_name()
graph.create_vertex_collection(from_col_name)
graph.create_vertex_collection(to_col_name)
graph.create_edge_definition(
edge_col_name, [from_col_name], [to_col_name]
)


def teardown_module(*_):
arango_client.delete_database(db_name, ignore_missing=True)


@pytest.mark.order1
def test_start_pregel_job():
# Test start_pregel_job with page rank algorithm (happy path)
job_id = db.create_pregel_job('pagerank', graph_name)
assert isinstance(job_id, int)

# Test start_pregel_job with unsupported algorithm
with pytest.raises(PregelJobCreateError):
db.create_pregel_job('unsupported_algorithm', graph_name)


@pytest.mark.order2
def test_get_pregel_job():
# Create a test Pregel job
job_id = db.create_pregel_job('pagerank', graph_name)

# Test pregel_job with existing job ID (happy path)
job = db.pregel_job(job_id)
assert isinstance(job['aggregators'], dict)
assert isinstance(job['gss'], int)
assert isinstance(job['received_count'], int)
assert isinstance(job['send_count'], int)
assert isinstance(job['total_runtime'], float)
assert job['state'] == 'running'
assert 'edge_count' in job
assert 'vertex_count' in job

# Test pregel_job with an invalid job ID
with pytest.raises(PregelJobGetError):
db.pregel_job(-1)


@pytest.mark.order3
def test_delete_pregel_job():
# Create a test Pregel job
job_id = db.create_pregel_job('pagerank', graph_name)

# Get the newly created job
job = db.pregel_job(job_id)
assert job['state'] == 'running'

# Test delete_pregel_job with existing job ID (happy path)
assert db.delete_pregel_job(job_id) == True

# The fetch for the same job should now fail
with pytest.raises(PregelJobGetError):
db.pregel_job(job_id)

# Test delete_pregel_job with an invalid job ID
with pytest.raises(PregelJobDeleteError):
db.delete_pregel_job(-1)

0 comments on commit 0a8a814

Please sign in to comment.