Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Let tasks change visibility timeout #62

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyqs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .decorator import task # noqa

__title__ = 'pyqs'
__version__ = '0.1.2'
__version__ = '0.1.4'
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to bump this, we will do so when it is released.

5 changes: 5 additions & 0 deletions pyqs/decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ def wrapper(*args, **kwargs):


class task(object):
""" Decorator that enables sqs based task execution. If the function
accepts an optional `_context` argument, an instance of TaskContext is
passed to the task function. The context allows the function to do things
like change message visibility. """

def __init__(self, queue=None, delay_seconds=None,
custom_function_path=None):
self.queue_name = queue
Expand Down
18 changes: 18 additions & 0 deletions pyqs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import pickle

import boto3
from datetime import timedelta


def decode_message(message):
Expand Down Expand Up @@ -36,3 +37,20 @@ def get_aws_region_name():
region_name = 'us-east-1'

return region_name


class TaskContext(object):
""" Tasks may optionally accept a _context variable. If they do, an
instance of this object is passed as the context. """

def __init__(self, conn, queue_url, receipt_handle):
self.conn = conn
self.queue_url = queue_url
self.receipt_handle = receipt_handle

def change_message_visibility(self, timeout=timedelta(minutes=10)):
self.conn.change_message_visibility(
QueueUrl=self.queue_url,
ReceiptHandle=self.receipt_handle,
VisibilityTimeout=int(timeout.total_seconds())
)
28 changes: 26 additions & 2 deletions pyqs/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,20 @@
import time

from multiprocessing import Event, Process, Queue

try:
from queue import Empty, Full
except ImportError:
from Queue import Empty, Full

try:
from inspect import getfullargspec as get_args
except ImportError:
from inspect import getargspec as get_args

import boto3

from pyqs.utils import get_aws_region_name, decode_message
from pyqs.utils import get_aws_region_name, decode_message, TaskContext

MESSAGE_DOWNLOAD_BATCH_SIZE = 10
LONG_POLLING_INTERVAL = 20
Expand Down Expand Up @@ -180,6 +186,7 @@ def process_message(self):
full_task_path = message_body['task']
args = message_body['args']
kwargs = message_body['kwargs']
receipt_handle = message['ReceiptHandle']

task_name = full_task_path.split(".")[-1]
task_path = ".".join(full_task_path.split(".")[:-1])
Expand All @@ -188,6 +195,15 @@ def process_message(self):

task = getattr(task_module, task_name)

# if the task accepts the optional _context argument, pass it the TaskContext
if '_context' in get_args(task).args:
kwargs = dict(kwargs)
kwargs['_context'] = TaskContext(
conn=self.conn,
queue_url=queue_url,
receipt_handle=receipt_handle
)

current_time = time.time()
if int(current_time - fetch_time) >= timeout:
logger.warning(
Expand All @@ -214,12 +230,20 @@ def process_message(self):
traceback.format_exc(),
)
)

# since the task failed, mark it is available again quickly (10 seconds)
self.conn.change_message_visibility(
QueueUrl=queue_url,
ReceiptHandle=receipt_handle,
VisibilityTimeout=10
)

return True
else:
end_time = time.clock()
self.conn.delete_message(
QueueUrl=queue_url,
ReceiptHandle=message['ReceiptHandle']
ReceiptHandle=receipt_handle
)
logger.info(
"Processed task {} in {:.4f} seconds with args: {} "
Expand Down