Skip to content

Commit

Permalink
fix bug: update successors
Browse files Browse the repository at this point in the history
  • Loading branch information
Yacobolo committed Jul 28, 2023
1 parent ff6ab04 commit e5c4d70
Showing 1 changed file with 4 additions and 6 deletions.
10 changes: 4 additions & 6 deletions src/planbee/scheduler/task_batch_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,14 @@ class TaskBatchProcessor:

def __init__(self, task_graph: nx.DiGraph, task_dict: dict[str, Task]):
self.task_graph = task_graph
self.task_dict = task_dict
self.task_dict = deepcopy(task_dict)

def split_tasks_into_batches(self) -> dict[str, Task]:
"""
This function performs splitting of tasks into batches if necessary and returns
an updated task dictionary with possibly split tasks. Tasks are split only if
they have a batch size and the quantity is greater than the batch size.
"""
task_dict_copy = deepcopy(self.task_dict)

# get the order of tasks based on their dependencies
task_order = list(nx.topological_sort(self.task_graph))

Expand All @@ -39,13 +37,13 @@ def split_tasks_into_batches(self) -> dict[str, Task]:
self._update_predecessor_successor_relationships(current_task, task_splits)

# remove the current task from the task dictionary
del task_dict_copy[task_uid]
del self.task_dict[task_uid]

# add the split tasks to the task dictionary
for split_task in task_splits:
task_dict_copy[split_task.uid] = split_task
self.task_dict[split_task.uid] = split_task

return task_dict_copy
return self.task_dict

def _update_predecessor_successor_relationships(
self, task: Task, batches: list[Task]
Expand Down

0 comments on commit e5c4d70

Please sign in to comment.