diff --git a/.gitignore b/.gitignore index 6b987e0..85f38dd 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,9 @@ # Prototyping Notebooks notebooks/ +*.ipynb + +# Test data +inputs/ # vscode settings .vscode/ diff --git a/src/factryengine/models/task.py b/src/factryengine/models/task.py index cf04f20..046f4f0 100644 --- a/src/factryengine/models/task.py +++ b/src/factryengine/models/task.py @@ -1,4 +1,4 @@ -from pydantic import BaseModel, Field, model_validator, validator +from pydantic import BaseModel, Field, model_validator, validator, PrivateAttr from .resource import Resource, ResourceGroup @@ -44,15 +44,16 @@ def get_unique_resources(self) -> set[Resource]: class Task(BaseModel): - id: int + id: int | str name: str = "" duration: int = Field(gt=0) priority: int = Field(gt=0) assignments: list[Assignment] = [] constraints: set[Resource] = set() - predecessor_ids: set[int] = set() + predecessor_ids: set[int] | set[str] = set() predecessor_delay: int = Field(0, gt=0) quantity: int = Field(None, gt=0) + _batch_id: int = PrivateAttr(None) def __hash__(self): return hash(self.id) @@ -85,3 +86,12 @@ def set_name(cls, v, values) -> str: def get_id(self) -> int: """returns the task id""" return self.id + + @property + def batch_id(self): + """returns the batch id of the task""" + return self._batch_id + + def set_batch_id(self, batch_id): + """sets the batch id of the task""" + self._batch_id = batch_id diff --git a/src/factryengine/scheduler/heuristic_solver/main.py b/src/factryengine/scheduler/heuristic_solver/main.py index 4b5bb52..5b51052 100644 --- a/src/factryengine/scheduler/heuristic_solver/main.py +++ b/src/factryengine/scheduler/heuristic_solver/main.py @@ -74,24 +74,25 @@ def solve(self) -> list[dict]: except AllocationError as e: self.mark_task_as_unscheduled(task_id=task_id, error_message=str(e)) continue - + # update resource windows self.window_manager.update_resource_windows(allocated_resource_windows_dict) - # Append task values + task_values = { "task_id": task_id, "assigned_resource_ids": list(allocated_resource_windows_dict.keys()), "task_start": min( - start for start, _ in allocated_resource_windows_dict.values() + start for intervals in allocated_resource_windows_dict.values() for start, _ in intervals ), "task_end": max( - end for _, end in allocated_resource_windows_dict.values() + end for intervals in allocated_resource_windows_dict.values() for _, end in intervals ), "resource_intervals": allocated_resource_windows_dict.values(), } self.task_vars[task_id] = task_values + return list( self.task_vars.values() ) # Return values of the dictionary as a list diff --git a/src/factryengine/scheduler/heuristic_solver/matrix.py b/src/factryengine/scheduler/heuristic_solver/matrix.py index 98f8248..8180538 100644 --- a/src/factryengine/scheduler/heuristic_solver/matrix.py +++ b/src/factryengine/scheduler/heuristic_solver/matrix.py @@ -56,9 +56,12 @@ def trim_end(cls, original_matrix: "Matrix", trim_matrix: "Matrix") -> "Matrix": Trims a Matrix based on another """ new_intervals = original_matrix.intervals[: len(trim_matrix.intervals)] - # Check if intervals are the same - if not np.array_equal(new_intervals, trim_matrix.intervals): + # if not np.array_equal(new_intervals, trim_matrix.intervals): + # raise ValueError("All matrices must have the same intervals") + + # Used np.allclose to allow for small differences in the intervals + if not np.allclose(new_intervals, trim_matrix.intervals, atol=1e-8): raise ValueError("All matrices must have the same intervals") return cls( diff --git a/src/factryengine/scheduler/heuristic_solver/task_allocator.py b/src/factryengine/scheduler/heuristic_solver/task_allocator.py index 927c67f..b7084f5 100644 --- a/src/factryengine/scheduler/heuristic_solver/task_allocator.py +++ b/src/factryengine/scheduler/heuristic_solver/task_allocator.py @@ -31,6 +31,7 @@ def allocate_task( resource_windows_matrix=resource_windows_matrix, task_duration=task_duration, ) + if assignments and constraints: # update the resource matrix with the constraint matrix self._apply_constraint_to_resource_windows_matrix( @@ -44,27 +45,41 @@ def allocate_task( task_duration=task_duration, ) - # matrix to solve - matrix_to_solve = assignments_matrix or constraints_matrix + if assignments_matrix and constraints_matrix: + # find the solution for assignments + solution_matrix = self._solve_matrix( + matrix=assignments_matrix, + task_duration=task_duration, + ) + + # find the solution for constraints + constraints_solution = self._solve_matrix( + matrix=constraints_matrix, + task_duration=task_duration, + ) + else: + # matrix to solve + matrix_to_solve = assignments_matrix or constraints_matrix + + # find the solution + solution_matrix = self._solve_matrix( + matrix=matrix_to_solve, + task_duration=task_duration, + ) - # find the solution - solution_matrix = self._solve_matrix( - matrix=matrix_to_solve, - task_duration=task_duration, - ) # process solution to find allocated resource windows allocated_windows = self._get_resource_intervals( - matrix=solution_matrix, + matrix=solution_matrix ) # add constraints to allocated windows if constraints and assignments: constraints_matrix_trimmed = Matrix.trim_end( - original_matrix=constraints_matrix, trim_matrix=solution_matrix + original_matrix=constraints_solution, trim_matrix=solution_matrix ) allocated_windows.update( self._get_resource_intervals( - matrix=constraints_matrix_trimmed, + matrix=constraints_matrix_trimmed ) ) @@ -160,27 +175,130 @@ def _solve_task_end( return col0_value, other_columns_values def _get_resource_intervals( - self, - matrix: np.array, - ) -> dict[int, tuple[int, int]]: + self, matrix: Matrix + ) -> dict[int, list[tuple[int, int]]]: """ - gets the resource intervals from the solution matrix. + Extracts all the resource intervals used from the solution matrix, + including non-contiguous intervals and partial usage. """ - end_index = matrix.resource_matrix.shape[0] - 1 - resource_windows_dict = {} - # loop through resource ids and resource intervals - for resource_id, resource_intervals in zip( - matrix.resource_ids, matrix.resource_matrix.T - ): - # ensure only continuous intervals are selected - indexes = self._find_indexes(resource_intervals.data) - if indexes is not None: - start_index, end_index = indexes - resource_windows_dict[resource_id] = ( - ceil(round(matrix.intervals[start_index], 1)), - ceil(round(matrix.intervals[end_index], 1)), - ) - return resource_windows_dict + resource_windows_output = {} + + # Iterate over each resource and its corresponding matrix intervals + for resource_id, resource_intervals in zip(matrix.resource_ids, matrix.resource_matrix.T): + + + # Get all relevant indexes + indexes = self._find_indexes(resource_intervals) + + # Pair the indexes in groups of 2 (start, end) + intervals = [] + for start, end in zip(indexes[::2], indexes[1::2]): + # Use start and end indexes directly without skipping + # print(f"Start: {start}, End: {end}") + interval_start = matrix.intervals[start] + interval_end = matrix.intervals[end] + + # Append the interval to the list + intervals.append((int(np.round(interval_start)), int(np.round(interval_end)))) + + # Store the intervals for the current resource + resource_windows_output[resource_id] = intervals + + return resource_windows_output + + def _find_first_index(self, resource_intervals: np.ma.MaskedArray) -> int | None: + # Shift the mask by 1 to align with the 'next' element comparison + current_mask = resource_intervals.mask[:-1] + next_mask = resource_intervals.mask[1:] + next_values = resource_intervals.data[1:] + + # Vectorized condition: current is masked, next is non-masked, and next value > 0 + condition = (current_mask) & (~next_mask) & (next_values > 0) + + # Find the first index where the condition is met + indices = np.where(condition)[0] + + first_index = indices[0] if len(indices) > 0 else 0 + + next_non_zero_index = np.where( + (~resource_intervals.mask[first_index + 2:]) # Non-masked (non-zero) + & (resource_intervals.mask[first_index + 1:-1]) # Previous value masked + )[0] + + # Adjust x to align with the original array's indices + next_non_zero_index = ( + (first_index + 2 + next_non_zero_index[0]) if len(next_non_zero_index) > 0 else None + ) + + if next_non_zero_index and resource_intervals[next_non_zero_index] == resource_intervals[first_index+1]: + first_index = next_non_zero_index + + return first_index + + + def _find_indexes(self, resource_intervals: np.ma.MaskedArray) -> int | None: + """ + Finds relevant indexes in the resource intervals where the resource is used. + """ + # Mask where the data in resource_intervals is 0 + resource_intervals = np.ma.masked_where(resource_intervals == 0.0, resource_intervals) + + indexes = [] + first_index = self._find_first_index(resource_intervals) + last_index = resource_intervals.size-1 + + indexes = [first_index] # Start with the first index + is_last_window_start = True # Flag to indicate the start of a window + + # Iterate through the range between first and last index + for i in range(first_index, last_index + 1): + current = resource_intervals[i] + previous = resource_intervals[i - 1] if i > 0 else 0 + next_value = resource_intervals[i + 1] if i < last_index else 0 + + # Check if the current value is masked + is_prev_masked = resource_intervals.mask[i - 1] if i > 0 else False + is_curr_masked = resource_intervals.mask[i] + is_next_masked = resource_intervals.mask[i+1] if i < last_index else False + + # Skip if all values are the same (stable window) + if current > 0 and current == previous == next_value: + continue + + # Skip increasing trend from masked value + if current > 0 and current < next_value and is_prev_masked: + continue + + # Detect end of a window + if current > 0 and current == next_value and (is_prev_masked or previous < current) and is_last_window_start: + indexes.append(i) + is_last_window_start = False + continue + + # Detect end of window using masks + if current > 0 and is_next_masked and not is_curr_masked and is_last_window_start: + indexes.append(i) + is_last_window_start = False + continue + + # Detect start of a new window + if current > 0 and next_value > current and (is_prev_masked or previous == current) and not is_last_window_start: + indexes.append(i) + is_last_window_start = True + continue + + # Detect start of window using masks + if is_curr_masked and previous > 0 and next_value > 0 and not is_last_window_start: + indexes.append(i) + is_last_window_start = True + continue + + # Always add the last index + if i == last_index: + indexes.append(i) + + # Return the first valid index, or None if no valid index is found + return indexes def _mask_smallest_elements_except_top_k_per_row( self, array: np.ma.core.MaskedArray, k @@ -200,17 +318,19 @@ def _mask_smallest_elements_except_top_k_per_row( def _cumsum_reset_at_minus_one(self, a: np.ndarray) -> np.ndarray: """ - Computes the cumulative sum of an array but resets the sum to zero whenever a - -1 is encountered. This is a helper method used in the creation of the resource - windows matrix. + Computes the cumulative sum but resets to 0 whenever a -1 is encountered. """ - reset_at = a == -1 - a[reset_at] = 0 - without_reset = a.cumsum() - overcount = np.maximum.accumulate(without_reset * reset_at) - return without_reset - overcount + reset_mask = (a == -1) + a[reset_mask] = 0 # Replace -1 with 0 for sum calculation + cumsum_result = np.cumsum(a) + cumsum_result[reset_mask] = 0 # Reset at gaps + + return cumsum_result def _cumsum_reset_at_minus_one_2d(self, arr: np.ndarray) -> np.ndarray: + """ + Applies cumulative sum along the columns of a 2D array and resets at gaps (-1). + """ return np.apply_along_axis(self._cumsum_reset_at_minus_one, axis=0, arr=arr) def _replace_masked_values_with_nan( @@ -390,6 +510,7 @@ def _create_constraints_matrix( resource_matrix=resource_matrix, ) + def _apply_constraint_to_resource_windows_matrix( self, constraint_matrix: Matrix, resource_windows_matrix: Matrix ) -> None: @@ -448,36 +569,50 @@ def _create_assignments_matrix( return assignments_matrix - def _find_indexes(self, arr: np.array) -> tuple[int, int] | None: - """ - Find the start and end indexes from the last zero to the last number with no increase in a NumPy array. - """ - # if last element is zero return None - if arr[-1] == 0: - return None - # Find the index of the last zero - zero_indexes = np.nonzero(arr == 0)[0] - if zero_indexes.size > 0: - start_index = zero_indexes[-1] - else: - return None + # def _find_indexes(self, arr: np.array) -> tuple[int, int] | None: + # """ + # Find the start and end indexes for a valid segment of resource availability. + # This version avoids explicit loops and ensures the start index is correctly identified. + # """ + # # If the input is a MaskedArray, handle it accordingly + # if isinstance(arr, np.ma.MaskedArray): + # arr_data = arr.data + # mask = arr.mask + # # Find valid (unmasked and positive) indices + # valid_indices = np.where((~mask) & (arr_data >= 0))[0] + # else: + # valid_indices = np.where(arr >= 0)[0] - # Use np.diff to find where the array stops increasing - diffs = np.diff(arr[start_index:]) + # # If no valid indices are found, return None (no available resources) + # if valid_indices.size == 0: + # return None - # Find where the difference is less than or equal to zero (non-increasing sequence) - non_increasing = np.where(diffs == 0)[0] + # # Identify if the start of the array is valid + # start_index = 0 if arr[0] > 0 else valid_indices[0] + + # # Calculate differences between consecutive indices + # diffs = np.diff(valid_indices) + + # # Identify segment boundaries where there is a gap greater than 1 + # gaps = diffs > 1 + # segment_boundaries = np.where(gaps)[0] + + # # Insert the start index explicitly to ensure it is considered + # segment_starts = np.insert(segment_boundaries + 1, 0, 0) + # segment_ends = np.append(segment_starts[1:], len(valid_indices)) + + # # Always take the first segment (which starts at the earliest valid index) + # start_pos = segment_starts[0] + # end_pos = segment_ends[0] - 1 + + # # Convert these segment positions to the actual start and end indices + # start_index = valid_indices[start_pos] + # end_index = valid_indices[end_pos] + + # return start_index, end_index - if non_increasing.size > 0: - # The end index is the last non-increasing index + 1 to account for the difference in np.diff indexing - end_index = non_increasing[0] + start_index - else: - end_index = ( - arr.size - 1 - ) # If the array always increases, end at the last index - return start_index, end_index def _linear_interpolate_nan(self, y: np.ndarray, x: np.ndarray) -> np.ndarray: """ diff --git a/src/factryengine/scheduler/heuristic_solver/window_manager.py b/src/factryengine/scheduler/heuristic_solver/window_manager.py index 6cb8324..32cb63e 100644 --- a/src/factryengine/scheduler/heuristic_solver/window_manager.py +++ b/src/factryengine/scheduler/heuristic_solver/window_manager.py @@ -45,14 +45,29 @@ def update_resource_windows( self, allocated_resource_windows_dict: dict[int, list[tuple[int, int]]] ) -> None: """ - Removes the task interaval from the resource windows + Removes the allocated intervals from the resource windows. """ - for resource_id, trim_interval in allocated_resource_windows_dict.items(): + for resource_id, trim_intervals in allocated_resource_windows_dict.items(): + if not trim_intervals: + continue + + # Get the earliest start and latest end of the intervals + combined_start = trim_intervals[0][0] + combined_end = trim_intervals[-1][1] + + # Create a single trim interval + combined_trim_interval = (combined_start, combined_end) + + # Get the window to trim window = self.resource_windows_dict[resource_id] + + # Trim the window using the combined interval self.resource_windows_dict[resource_id] = self._trim_window( - window, trim_interval + window, combined_trim_interval ) + + def _create_resource_windows_dict(self) -> dict[int, np.ndarray]: """ Creates a dictionary mapping resource IDs to numpy arrays representing windows. diff --git a/src/factryengine/scheduler/scheduler_result.py b/src/factryengine/scheduler/scheduler_result.py index fdf3737..19b19d4 100644 --- a/src/factryengine/scheduler/scheduler_result.py +++ b/src/factryengine/scheduler/scheduler_result.py @@ -106,14 +106,21 @@ def get_resource_intervals_df(self) -> pd.DataFrame: # Drop any rows with missing values cleaned_df = exploded_df.dropna() + exploded_intervals_df = cleaned_df.explode("resource_intervals") + exploded_intervals_df = exploded_intervals_df.reset_index(drop=True) + # Extract the start and end of the interval from the 'resource_intervals' column - cleaned_df["interval_start"] = cleaned_df.resource_intervals.apply( + exploded_intervals_df["interval_start"] = exploded_intervals_df.resource_intervals.apply( lambda x: x[0] ) - cleaned_df["interval_end"] = cleaned_df.resource_intervals.apply(lambda x: x[1]) + + print('PASS INTERVAL START') + exploded_intervals_df["interval_end"] = exploded_intervals_df.resource_intervals.apply(lambda x: x[1]) + + print('PASS INTERVAL END') # Rename the 'assigned_resource_ids' column to 'resource_id' - renamed_df = cleaned_df.rename(columns={"assigned_resource_ids": "resource_id"}) + renamed_df = exploded_intervals_df.rename(columns={"assigned_resource_ids": "resource_id"}) # Select only the columns we're interested in selected_columns_df = renamed_df[ diff --git a/src/factryengine/scheduler/task_batch_processor.py b/src/factryengine/scheduler/task_batch_processor.py new file mode 100644 index 0000000..b4e4083 --- /dev/null +++ b/src/factryengine/scheduler/task_batch_processor.py @@ -0,0 +1,33 @@ +from ..models import Resource, Task + +class TaskSplitter: + """ + The TaskSplitter class is responsible for splitting tasks into batches. + """ + + def __init__(self, task: Task, batch_size: int): + self.task = task + self.batch_size = batch_size + + def split_into_batches(self) -> list[Task]: + """ + Splits a task into batches. + """ + num_batches, remaining = divmod(self.task.quantity, self.batch_size) + batches = [ + self._create_new_task(i + 1, self.batch_size) + for i in range(num_batches) + ] + + if remaining > 0: + batches.append(self._create_new_task(num_batches + 1, remaining)) + + return batches + + def _create_new_task(self, batch_id: int, quantity: int) -> Task: + """Creates a new task with the given batch_id and quantity.""" + new_task = self.task.model_copy(deep=True) + new_task.quantity = quantity + new_task.duration = (quantity / self.task.quantity) * self.task.duration + new_task.set_batch_id(batch_id) + return new_task diff --git a/tests/scheduler/test_matrix.py b/tests/scheduler/test_matrix.py index 07a72b4..4348485 100644 --- a/tests/scheduler/test_matrix.py +++ b/tests/scheduler/test_matrix.py @@ -81,7 +81,6 @@ def test_can_compare_update_mask_and_merge(matrix_data_dict): [True, True, True, False, False, False], ] ) - print(merged_matrix.resource_matrix.mask) assert np.array_equal(merged_matrix.resource_matrix.mask, expected_mask) diff --git a/tests/scheduler/test_task_allocator.py b/tests/scheduler/test_task_allocator.py index 772b979..d6336b3 100644 --- a/tests/scheduler/test_task_allocator.py +++ b/tests/scheduler/test_task_allocator.py @@ -28,17 +28,32 @@ def test_solve_task_end(task_allocator): assert np.array_equal(result_y, np.array([5, 5])) -def test_get_resource_intervals(task_allocator): - solution_resource_ids = np.array([1, 2, 3]) - solution_intervals = np.array([0, 1, 2]) - resource_matrix = np.ma.array([[0, 0, 0], [1, 0, 0], [2, 1, 0]]) +def test_get_resource_intervals_continuous(task_allocator): + # Test case continuous values 1 task 2 resources + solution_resource_ids = np.array([1, 2]) + solution_intervals = np.array([0, 1]) + resource_matrix = np.ma.array([[0, 0], [1, 1]], mask=[[False, False], [False, False]],) solution_matrix = Matrix( resource_ids=solution_resource_ids, intervals=solution_intervals, resource_matrix=resource_matrix, ) result = task_allocator._get_resource_intervals(solution_matrix) - expeceted = {1: (0, 2), 2: (1, 2)} + expeceted = {1: [(0, 1)], 2: [(0, 1)]} + assert result == expeceted + +def test_get_resource_intervals_windowed(task_allocator): + # Test case windowed values 1 task 1 resource + solution_resource_ids = np.array([1]) + solution_intervals = np.array([0, 2, 3, 4]) + resource_matrix = np.ma.array([[0], [2], [2], [3]], mask=[[False], [False], [False], [False]],) + solution_matrix = Matrix( + resource_ids=solution_resource_ids, + intervals=solution_intervals, + resource_matrix=resource_matrix, + ) + result = task_allocator._get_resource_intervals(solution_matrix) + expeceted = {1: [(0, 2), (3, 4)]} assert result == expeceted @@ -111,14 +126,15 @@ def test_mask_smallest_elements_except_top_k_per_row( @pytest.mark.parametrize( "array, expected", [ - (np.array([0, 1, 5, -1, 10]), [0, 1, 6, 0, 10]), + (np.array([0, 1, 5, -1, 10]), [0, 1, 6, 0, 16]), (np.array([-1, 2, 3, 0, 4]), [0, 2, 5, 5, 9]), (np.array([0, -1, 2, 4, -1]), [0, 0, 2, 6, 0]), ], ) def test_cumsum_reset_at_minus_one(task_allocator, array, expected): result = task_allocator._cumsum_reset_at_minus_one(array) - assert np.array_equal(result, expected) + print(f"Input: {array}, Result: {result}, Expected: {expected}") + np.testing.assert_array_equal(result, expected) @pytest.mark.parametrize( @@ -201,20 +217,11 @@ def test_diff_and_zero_negatives(array, expected): @pytest.mark.parametrize( - "array , expected", + "array, expected", [ - ( - np.array([0, 1, 2, 3, 4]), - (0, 4), - ), - ( - np.array([0, 3, 3, 3, 3]), - (0, 1), - ), - ( - np.array([0, 0, 1, 2, 0]), - None, - ), + # Full valid sequence without gaps + (np.ma.array([0, 2, 2, 3], mask=[False, False, False, False]), ([0, 1, 2, 3])), + (np.ma.array([0, 3], mask=[False, False]), ([0, 1])), ], ) def test_find_indexes(array, expected): @@ -222,4 +229,4 @@ def test_find_indexes(array, expected): result = task_allocator._find_indexes(array) print("result :", result) print("expected:", expected) - np.testing.assert_array_equal(result, expected) + assert result == expected