Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add 'retry' to opt_keys in 'async_task' #619

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

kennyhei
Copy link
Contributor

@kennyhei kennyhei commented Sep 24, 2021

Hi again @Koed00

I'd like to add retry to opt_keys in async_task. We have implemented our own custom ORM broker that makes use of custom retry value like this:

def _timeout(retry=None):
    retry = retry or Conf.RETRY
    return timezone.now() - timedelta(seconds=retry)


class ORM(BaseORM):

    def enqueue(self, task):
        data = SignedPackage.loads(task)
        retry = data.get('retry')
        package = self.get_connection().create(
            key=self.list_key, payload=task, lock=_timeout(retry)
        )
        return package.pk

    def dequeue(self):
        # Query queued tasks using default _timeout
        tasks = self.get_connection().filter(key=self.list_key, lock__lt=_timeout())[
            0:Conf.BULK
        ]
        if tasks:
            task_list = []
            for task in tasks:
                # Check here if task has custom timeout. If lock
                # is greater than custom timeout time, skip
                retry = task.task().get('retry')
                if task.lock > _timeout(retry):
                    continue
                if (
                    self.get_connection()
                    .filter(id=task.id, lock=task.lock)
                    .update(lock=timezone.now())
                ):
                    task_list.append((task.pk, task.payload))
                # else don't process, as another cluster has been faster than us on that task
            return task_list
        # empty queue, spare the cpu
        sleep(Conf.POLL)

This isn't at the moment possible because retry is missing from opt_keys and thus not added in the task payload data.

@jasonbodily
Copy link

I see this as a valuable feature. We have a few tasks that can take several minutes. I don't think it appropriate to raise the retry for all tasks in our app (including some that take a few seconds) to several minutes. It is curious that the timeout can be configured per task, but not the retry.

@kennyhei
Copy link
Contributor Author

kennyhei commented Oct 14, 2021

@jasonbodily For now I have solved this by checking whether task has custom timeout and then creating retry variable on the fly which equals to timeout + 5 minutes:

def _timeout(retry=None):
    retry = retry or Conf.RETRY
    return timezone.now() - timedelta(seconds=retry)

class ORM(BaseORM):

    def _get_retry(self, data):
        # Retry 5 minutes after timeout
        if not data.get('timeout'):
            return None
        return timeout + 300

    def enqueue(self, task):
        data = SignedPackage.loads(task)
        retry = self._get_retry(data)
        package = self.get_connection().create(
            key=self.list_key, payload=task, lock=_timeout(retry)
        )
        return package.pk

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants