Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate base task from redis logic #9

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 0 additions & 174 deletions celery_unique.py

This file was deleted.

8 changes: 8 additions & 0 deletions celery_unique/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from celery_unique.core import unique_task_factory
from celery_unique.core import UniqueTaskMixin


__all__ = [
'unique_task_factory',
'UniqueTaskMixin',
]
83 changes: 83 additions & 0 deletions celery_unique/backends.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
class BaseBackend:
"""
Abstract reference backend.

An abstract backend that defines the interface that other backends must
implement.
"""

def create_task_record(self, key, task_id, ttl): # pragma: no cover
"""
Creates a new record for the recently-published unique task.

:param str key: The unique key which identifies the task and its
configuration.
:param str task_id: The ID of the recently-published unique task.
:param ttl: The TTL for the record, which should be (approximately)
equal to the number of seconds remaining until the earliest time
that the task is expected to be executed by Celery.
"""
raise NotImplementedError()

def get_task_id(self, key): # pragma: no cover
"""
Returns the task_id for an exiting task
"""
raise NotImplementedError()

def revoke_extant_task(self, key): # pragma: no cover
"""
Deletes a task for a given key.

This deletes both the task and the cache entry.

:param key: The string (potentially) used by the backend as the key for
the record.
:type redis_key: str | unicode
"""
raise NotImplementedError()


class RedisBackend(BaseBackend):
"""
A uniqueness backend that uses redis as a key/value store.

See :class:`~.BaseBackend` for documentation on indivitual methods.
"""

def __init__(self, redis_client):
self.redis_client = redis_client

def get_task_id(self, key):
task_id = self.redis_client.get(key)
return task_id.decode() if task_id else None

def revoke_extant_task(self, key):
self.redis_client.delete(key)

def create_task_record(self, key, task_id, ttl):
self.redis_client.set(key, task_id, ex=ttl)


class InMemoryBackend(BaseBackend):
"""
Dummy backend which uses an in-memory store.

This is a dummy backend which uses an in-memory store. It is mostly
suitable for development and testing, and should not be using in production
environments.

See :class:`~.BaseBackend` for documentation on indivitual methods.
"""

def __init__(self):
self.tasks = {}

def get_task_id(self, key):
return self.tasks.get(key, None)

def revoke_extant_task(self, key):
self.tasks.pop(key, None)

def create_task_record(self, key, task_id, ttl=None):
self.tasks[key] = task_id
Loading