diff --git a/spark_minimal_algorithms/examples/countifs.py b/spark_minimal_algorithms/examples/countifs.py index b491768..81b4811 100644 --- a/spark_minimal_algorithms/examples/countifs.py +++ b/spark_minimal_algorithms/examples/countifs.py @@ -17,7 +17,7 @@ def _get_format_str(n_elements: int) -> str: return binary_format_str -class CountifsInitialStep(Step): +class SortAndAssignLabel(Step): """ IN: (point coords, point type info) where: @@ -54,7 +54,7 @@ def extract_partition_idx( @staticmethod def group(rdd: RDD) -> RDD: # type: ignore # sort by values - todo: consider using custom terasort implementation - cls = CountifsInitialStep + cls = SortAndAssignLabel rdd = rdd.map(cls.select_key).sortByKey().map(cls.unselect_key) rdd = rdd.mapPartitionsWithIndex(cls.extract_partition_idx).groupByKey() return rdd @@ -114,7 +114,7 @@ def step( # type: ignore yield label[:prefix_len], type_info -class CountifsNextStep(Step): +class AssignNestedLabel(Step): """ IN: (label, collection of points with label) @@ -135,7 +135,7 @@ def step( # type: ignore group_items: Iterable[Any], broadcast: Broadcast, ) -> Iterable[Any]: - points = sorted(group_items, key=CountifsNextStep.first_coord_and_point_type) + points = sorted(group_items, key=AssignNestedLabel.first_coord_and_point_type) label_format_str = _get_format_str(len(points)) old_label = group_key @@ -166,7 +166,7 @@ def step( # type: ignore yield (old_label, new_label[:prefix_len]), type_info -class CountifsResultsForLabel(Step): +class GetResultsByLabel(Step): """ IN: (label, points with this label) @@ -190,7 +190,7 @@ def step( # type: ignore yield query_point_idx, n_data_points -class CountifsResultsForQuery(Step): +class AggregateResultsByQuery(Step): """ IN: (query point index, collection of results for this query point for various labels) @@ -221,10 +221,10 @@ class Countifs(Algorithm): """ __steps__ = { - "first_step": CountifsInitialStep, - "next_step": CountifsNextStep, - "results_for_label": CountifsResultsForLabel, - "results_for_query": CountifsResultsForQuery, + "sort_and_assign_label": SortAndAssignLabel, + "assign_nested_label": AssignNestedLabel, + "get_results_by_label": GetResultsByLabel, + "aggregate_results_by_query": AggregateResultsByQuery, } def run(self, data_rdd: RDD, query_rdd: RDD, n_dim: int) -> RDD: # type: ignore @@ -238,10 +238,10 @@ def run(self, data_rdd: RDD, query_rdd: RDD, n_dim: int) -> RDD: # type: ignore ) rdd = data_rdd.union(query_rdd) - rdd = self.first_step(rdd) # type: ignore + rdd = self.sort_and_assign_label(rdd) # type: ignore for _ in range(n_dim - 1): - rdd = self.next_step(rdd) # type: ignore + rdd = self.assign_nested_label(rdd) # type: ignore - rdd = empty_result_rdd.union(self.results_for_label(rdd)) # type: ignore - rdd = self.results_for_query(rdd).sortByKey() # type: ignore + rdd = empty_result_rdd.union(self.get_results_by_label(rdd)) # type: ignore + rdd = self.aggregate_results_by_query(rdd).sortByKey() # type: ignore return rdd diff --git a/spark_minimal_algorithms/examples/tera_sort.py b/spark_minimal_algorithms/examples/tera_sort.py index cccc8b3..95d3f5d 100644 --- a/spark_minimal_algorithms/examples/tera_sort.py +++ b/spark_minimal_algorithms/examples/tera_sort.py @@ -8,7 +8,7 @@ from pyspark import RDD, Broadcast -class TeraSortFirstRound(Step): +class SampleAndAssignBuckets(Step): p = 0.1 """ Default value for probability of sampling a point to be a bucket key """ @@ -21,13 +21,13 @@ def extract_idx( @staticmethod def group(rdd: RDD, **kwargs: Any) -> RDD: - rdd = rdd.mapPartitionsWithIndex(TeraSortFirstRound.extract_idx).groupByKey() + rdd = rdd.mapPartitionsWithIndex(SampleAndAssignBuckets.extract_idx).groupByKey() return rdd @staticmethod def emit_by_group(group_key: int, group_items: Iterable[Any], **kwargs: Any) -> Any: samples = list() - p: float = kwargs.get("p", TeraSortFirstRound.p) + p: float = kwargs.get("p", SampleAndAssignBuckets.p) for point in group_items: if random.random() < p: samples.append(point) @@ -52,7 +52,7 @@ def step( # type: ignore yield point_bucket, point -class TeraSortFinalRound(Step): +class SortByKeyAndValue(Step): @staticmethod def group(rdd: RDD) -> RDD: # type: ignore rdd = rdd.groupByKey().sortByKey() @@ -69,8 +69,8 @@ def step( # type: ignore class TeraSort(Algorithm): __steps__ = { - "assign_buckets": TeraSortFirstRound, - "sort": TeraSortFinalRound, + "assign_buckets": SampleAndAssignBuckets, + "sort": SortByKeyAndValue, } def run(self, rdd: RDD, n_dim: int) -> RDD: # type: ignore diff --git a/tests/examples/test_countifs.py b/tests/examples/test_countifs.py index 64d2444..577c9ae 100644 --- a/tests/examples/test_countifs.py +++ b/tests/examples/test_countifs.py @@ -6,10 +6,10 @@ from spark_minimal_algorithms.algorithm import Step, Algorithm from spark_minimal_algorithms.examples.countifs import ( - CountifsInitialStep, - CountifsNextStep, - CountifsResultsForLabel, - CountifsResultsForQuery, + SortAndAssignLabel, + AssignNestedLabel, + GetResultsByLabel, + AggregateResultsByQuery, Countifs, ) @@ -19,10 +19,10 @@ @pytest.mark.parametrize( "cls", [ - CountifsInitialStep, - CountifsNextStep, - CountifsResultsForLabel, - CountifsResultsForQuery, + SortAndAssignLabel, + AssignNestedLabel, + GetResultsByLabel, + AggregateResultsByQuery, ], ) @pytest.mark.parametrize("n_partitions", [1]) @@ -40,21 +40,21 @@ def test_algorithm_creation(spark_context, n_partitions): assert isinstance(instance, Algorithm) assert instance._n_partitions == n_partitions - assert hasattr(instance, "first_step") - assert type(instance.first_step) == CountifsInitialStep - assert instance.first_step._n_partitions == n_partitions + assert hasattr(instance, "sort_and_assign_label") + assert type(instance.sort_and_assign_label) == SortAndAssignLabel + assert instance.sort_and_assign_label._n_partitions == n_partitions - assert hasattr(instance, "next_step") - assert type(instance.next_step) == CountifsNextStep - assert instance.next_step._n_partitions == n_partitions + assert hasattr(instance, "assign_nested_label") + assert type(instance.assign_nested_label) == AssignNestedLabel + assert instance.assign_nested_label._n_partitions == n_partitions - assert hasattr(instance, "results_for_label") - assert type(instance.results_for_label) == CountifsResultsForLabel - assert instance.results_for_label._n_partitions == n_partitions + assert hasattr(instance, "get_results_by_label") + assert type(instance.get_results_by_label) == GetResultsByLabel + assert instance.get_results_by_label._n_partitions == n_partitions - assert hasattr(instance, "results_for_query") - assert type(instance.results_for_query) == CountifsResultsForQuery - assert instance.results_for_query._n_partitions == n_partitions + assert hasattr(instance, "aggregate_results_by_query") + assert type(instance.aggregate_results_by_query) == AggregateResultsByQuery + assert instance.aggregate_results_by_query._n_partitions == n_partitions TESTS_1D = [ diff --git a/tests/examples/test_tera_sort.py b/tests/examples/test_tera_sort.py index ddddd4b..b1d2bb5 100644 --- a/tests/examples/test_tera_sort.py +++ b/tests/examples/test_tera_sort.py @@ -21,15 +21,12 @@ def create_test_case(n_points: int, n_dim: int) -> List[Tuple[Any]]: create_test_case(5, 1), create_test_case(10, 1), create_test_case(100, 1), - create_test_case(1_000, 1), create_test_case(5, 2), create_test_case(10, 2), create_test_case(100, 2), - create_test_case(1_000, 2), create_test_case(5, 3), create_test_case(10, 3), create_test_case(100, 3), - create_test_case(1_000, 3), ] @@ -45,3 +42,28 @@ def test_tera_sort(spark_context, n_partitions, test_case): assert len(result) == len(sorted_points) assert result == sorted_points + + +LONG_TESTS = [ + create_test_case(100, 1), + create_test_case(1_000, 1), + create_test_case(100, 2), + create_test_case(1_000, 2), + create_test_case(100, 3), + create_test_case(1_000, 3), +] + + +@pytest.mark.long +@pytest.mark.parametrize("test_case", LONG_TESTS) +@pytest.mark.parametrize("n_partitions", [1, 2, 3, 4, 8, 16]) +def test_tera_sort_performance(spark_context, n_partitions, test_case): + points, sorted_points = test_case + n_dim = len(points[0]) + rdd = spark_context.parallelize(points) + + tera_sort = TeraSort(spark_context, n_partitions) + result = tera_sort(rdd=rdd, n_dim=n_dim).collect() + + assert len(result) == len(sorted_points) + assert result == sorted_points