Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#97 helpers for more flexible tasks progression #98

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions huey_monitor/tqdm.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from django.core.exceptions import ValidationError
from django.utils import timezone
from django.db.models import F

from huey.api import Task

from huey_monitor.constants import TASK_MODEL_DESC_MAX_LENGTH
Expand Down Expand Up @@ -87,3 +89,41 @@ def __str__(self):
f'{self.task.name} - {self.desc} {self.total_progress}/{self.total}{self.unit}'
f' (divisor: {self.unit_divisor})'
)

def update_parent_progress(self, n=1):
"""
Increment the parent TaskModel progress information
as well as update_dt
"""
assert self.parent_task_id
update_task_progress(self.parent_task_id, n)

def make_complete(self):
"""
Update TaskModel.total based on TaskModel.progress_count
to mark task with unkown number of steps complete
"""

TaskModel.objects.filter(task_id=self.task.id).update(
total=self.total_progress
)
self.total = self.total_progress


def make_task_complete(task_id):
"""
Update the TaskModel.total corresponding to a given Huey task to match its current progress_count
Used to mark task with unkown number of steps complete
"""
TaskModel.objects.filter(task_id=task_id).update(
update_dt=timezone.now(), total=F('progress_count')
)

def update_task_progress(task_id, n=1):
"""
Increment the TaskModel.progress_count corresponding to a given Huey task
and update update_dt
"""
TaskModel.objects.filter(task_id=task_id).update(
update_dt=timezone.now(), progress_count=F('progress_count') + n
)
66 changes: 65 additions & 1 deletion huey_monitor_tests/test_app/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@
import math
import sys
import time
import random

from bx_py_utils.iteration import chunk_iterable
from huey import crontab
from huey.contrib.djhuey import lock_task, periodic_task, task

from huey_monitor.models import TaskModel
from huey_monitor.tqdm import ProcessInfo
from huey_monitor.tqdm import ProcessInfo, make_task_complete


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -129,3 +130,66 @@ def parallel_task(task, total=2000, task_num=3, **info_kwargs):
logger.info('Start sub task no. %i', no)
time.sleep(5)
parallel_sub_task(parent_task_id=task.id, item_chunk=chunk, **info_kwargs)

@task(context=True, retries=1)
def sub_task_recursive(task, parent_task_id):
"""
Example of implementation for recursive tasks where final number of sub-tasks is unknown.
Each recursive sub-task will refer to the same task as their parent task
"""
logger.info('Recursive sub task started from main task: %s', parent_task_id)
TaskModel.objects.set_parent_task(
main_task_id=parent_task_id,
sub_task_id=task.id,
)
# let's consider we don't know yet the number of steps
process_info = ProcessInfo(task, desc='Recursive task execution', total=999)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm. total=999 is a little bit ugly, isn't it?

Can we resolve this in a other way?


continue_with_next_step = True

while continue_with_next_step:
# we execute the step:
continue_with_next_step = random.randrange(100)<80
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just a normal for loop?

e.g.:

for _ in range(random.randrange(80,100):


# progress_count is incremented (default incrementation step is 1):
process_info.update()
# process_info.update(n=10) for incrementing by 10

# Update TaskModel.total based on TaskModel.progress_count :
# (was 999 before because final number of steps was unknown)
process_info.make_complete()

# progress_count of the parent task is incremented (default incrementation step is 1):
process_info.update_parent_progress()
# process_info.update_parent_progress(n=5) for incrementing by 5

# for convenience purpose, the function 'huey_monitor.tqdm.update_task_progress(task_id, n=1)'
# can also be called if 'process_info' is not declared in current code

# we now test if conditions are met to exit the recursive loop:

condition_for_recursive_loop_exit = random.randrange(10)>7
if condition_for_recursive_loop_exit:
logger.info('This was the last of the recursive sub tasks')

# Update TaskModel.total based on TaskModel.progress_count for the parent:
# (was 999 before because final number of sub-tasks was unknown)
make_task_complete(parent_task_id)
else:
# next recursive task is launched:
sub_task_recursive(parent_task_id=parent_task_id)


@task(context=True)
def main_task_recursive(task):
"""
Example of implementation for recursive tasks where final number of sub-tasks is unknown.
This is the parent task which launches the recursive search
and will act as parent task for all sub-tasks
"""
logger.info('Main task %s starts recursive sub tasks', task.id)
process_info = ProcessInfo(task, desc='Launching recursive tasks', total=999)
# we don't know yet the number of sub-tasks

sub_task_recursive(parent_task_id=task.id)