Skip to content

Commit

Permalink
Separate tera_sort tests, refactor examples to have nicer names
Browse files Browse the repository at this point in the history
  • Loading branch information
kowaalczyk committed Jun 6, 2020
1 parent 9147400 commit 28b9ace
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 43 deletions.
28 changes: 14 additions & 14 deletions spark_minimal_algorithms/examples/countifs.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def _get_format_str(n_elements: int) -> str:
return binary_format_str


class CountifsInitialStep(Step):
class SortAndAssignLabel(Step):
"""
IN: (point coords, point type info) where:
Expand Down Expand Up @@ -54,7 +54,7 @@ def extract_partition_idx(
@staticmethod
def group(rdd: RDD) -> RDD: # type: ignore
# sort by values - todo: consider using custom terasort implementation
cls = CountifsInitialStep
cls = SortAndAssignLabel
rdd = rdd.map(cls.select_key).sortByKey().map(cls.unselect_key)
rdd = rdd.mapPartitionsWithIndex(cls.extract_partition_idx).groupByKey()
return rdd
Expand Down Expand Up @@ -114,7 +114,7 @@ def step( # type: ignore
yield label[:prefix_len], type_info


class CountifsNextStep(Step):
class AssignNestedLabel(Step):
"""
IN: (label, collection of points with label)
Expand All @@ -135,7 +135,7 @@ def step( # type: ignore
group_items: Iterable[Any],
broadcast: Broadcast,
) -> Iterable[Any]:
points = sorted(group_items, key=CountifsNextStep.first_coord_and_point_type)
points = sorted(group_items, key=AssignNestedLabel.first_coord_and_point_type)
label_format_str = _get_format_str(len(points))
old_label = group_key

Expand Down Expand Up @@ -166,7 +166,7 @@ def step( # type: ignore
yield (old_label, new_label[:prefix_len]), type_info


class CountifsResultsForLabel(Step):
class GetResultsByLabel(Step):
"""
IN: (label, points with this label)
Expand All @@ -190,7 +190,7 @@ def step( # type: ignore
yield query_point_idx, n_data_points


class CountifsResultsForQuery(Step):
class AggregateResultsByQuery(Step):
"""
IN: (query point index, collection of results for this query point for various labels)
Expand Down Expand Up @@ -221,10 +221,10 @@ class Countifs(Algorithm):
"""

__steps__ = {
"first_step": CountifsInitialStep,
"next_step": CountifsNextStep,
"results_for_label": CountifsResultsForLabel,
"results_for_query": CountifsResultsForQuery,
"sort_and_assign_label": SortAndAssignLabel,
"assign_nested_label": AssignNestedLabel,
"get_results_by_label": GetResultsByLabel,
"aggregate_results_by_query": AggregateResultsByQuery,
}

def run(self, data_rdd: RDD, query_rdd: RDD, n_dim: int) -> RDD: # type: ignore
Expand All @@ -238,10 +238,10 @@ def run(self, data_rdd: RDD, query_rdd: RDD, n_dim: int) -> RDD: # type: ignore
)
rdd = data_rdd.union(query_rdd)

rdd = self.first_step(rdd) # type: ignore
rdd = self.sort_and_assign_label(rdd) # type: ignore
for _ in range(n_dim - 1):
rdd = self.next_step(rdd) # type: ignore
rdd = self.assign_nested_label(rdd) # type: ignore

rdd = empty_result_rdd.union(self.results_for_label(rdd)) # type: ignore
rdd = self.results_for_query(rdd).sortByKey() # type: ignore
rdd = empty_result_rdd.union(self.get_results_by_label(rdd)) # type: ignore
rdd = self.aggregate_results_by_query(rdd).sortByKey() # type: ignore
return rdd
12 changes: 6 additions & 6 deletions spark_minimal_algorithms/examples/tera_sort.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from pyspark import RDD, Broadcast


class TeraSortFirstRound(Step):
class SampleAndAssignBuckets(Step):
p = 0.1
""" Default value for probability of sampling a point to be a bucket key """

Expand All @@ -21,13 +21,13 @@ def extract_idx(

@staticmethod
def group(rdd: RDD, **kwargs: Any) -> RDD:
rdd = rdd.mapPartitionsWithIndex(TeraSortFirstRound.extract_idx).groupByKey()
rdd = rdd.mapPartitionsWithIndex(SampleAndAssignBuckets.extract_idx).groupByKey()
return rdd

@staticmethod
def emit_by_group(group_key: int, group_items: Iterable[Any], **kwargs: Any) -> Any:
samples = list()
p: float = kwargs.get("p", TeraSortFirstRound.p)
p: float = kwargs.get("p", SampleAndAssignBuckets.p)
for point in group_items:
if random.random() < p:
samples.append(point)
Expand All @@ -52,7 +52,7 @@ def step( # type: ignore
yield point_bucket, point


class TeraSortFinalRound(Step):
class SortByKeyAndValue(Step):
@staticmethod
def group(rdd: RDD) -> RDD: # type: ignore
rdd = rdd.groupByKey().sortByKey()
Expand All @@ -69,8 +69,8 @@ def step( # type: ignore

class TeraSort(Algorithm):
__steps__ = {
"assign_buckets": TeraSortFirstRound,
"sort": TeraSortFinalRound,
"assign_buckets": SampleAndAssignBuckets,
"sort": SortByKeyAndValue,
}

def run(self, rdd: RDD, n_dim: int) -> RDD: # type: ignore
Expand Down
40 changes: 20 additions & 20 deletions tests/examples/test_countifs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@

from spark_minimal_algorithms.algorithm import Step, Algorithm
from spark_minimal_algorithms.examples.countifs import (
CountifsInitialStep,
CountifsNextStep,
CountifsResultsForLabel,
CountifsResultsForQuery,
SortAndAssignLabel,
AssignNestedLabel,
GetResultsByLabel,
AggregateResultsByQuery,
Countifs,
)

Expand All @@ -19,10 +19,10 @@
@pytest.mark.parametrize(
"cls",
[
CountifsInitialStep,
CountifsNextStep,
CountifsResultsForLabel,
CountifsResultsForQuery,
SortAndAssignLabel,
AssignNestedLabel,
GetResultsByLabel,
AggregateResultsByQuery,
],
)
@pytest.mark.parametrize("n_partitions", [1])
Expand All @@ -40,21 +40,21 @@ def test_algorithm_creation(spark_context, n_partitions):
assert isinstance(instance, Algorithm)
assert instance._n_partitions == n_partitions

assert hasattr(instance, "first_step")
assert type(instance.first_step) == CountifsInitialStep
assert instance.first_step._n_partitions == n_partitions
assert hasattr(instance, "sort_and_assign_label")
assert type(instance.sort_and_assign_label) == SortAndAssignLabel
assert instance.sort_and_assign_label._n_partitions == n_partitions

assert hasattr(instance, "next_step")
assert type(instance.next_step) == CountifsNextStep
assert instance.next_step._n_partitions == n_partitions
assert hasattr(instance, "assign_nested_label")
assert type(instance.assign_nested_label) == AssignNestedLabel
assert instance.assign_nested_label._n_partitions == n_partitions

assert hasattr(instance, "results_for_label")
assert type(instance.results_for_label) == CountifsResultsForLabel
assert instance.results_for_label._n_partitions == n_partitions
assert hasattr(instance, "get_results_by_label")
assert type(instance.get_results_by_label) == GetResultsByLabel
assert instance.get_results_by_label._n_partitions == n_partitions

assert hasattr(instance, "results_for_query")
assert type(instance.results_for_query) == CountifsResultsForQuery
assert instance.results_for_query._n_partitions == n_partitions
assert hasattr(instance, "aggregate_results_by_query")
assert type(instance.aggregate_results_by_query) == AggregateResultsByQuery
assert instance.aggregate_results_by_query._n_partitions == n_partitions


TESTS_1D = [
Expand Down
28 changes: 25 additions & 3 deletions tests/examples/test_tera_sort.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,12 @@ def create_test_case(n_points: int, n_dim: int) -> List[Tuple[Any]]:
create_test_case(5, 1),
create_test_case(10, 1),
create_test_case(100, 1),
create_test_case(1_000, 1),
create_test_case(5, 2),
create_test_case(10, 2),
create_test_case(100, 2),
create_test_case(1_000, 2),
create_test_case(5, 3),
create_test_case(10, 3),
create_test_case(100, 3),
create_test_case(1_000, 3),
]


Expand All @@ -45,3 +42,28 @@ def test_tera_sort(spark_context, n_partitions, test_case):

assert len(result) == len(sorted_points)
assert result == sorted_points


LONG_TESTS = [
create_test_case(100, 1),
create_test_case(1_000, 1),
create_test_case(100, 2),
create_test_case(1_000, 2),
create_test_case(100, 3),
create_test_case(1_000, 3),
]


@pytest.mark.long
@pytest.mark.parametrize("test_case", LONG_TESTS)
@pytest.mark.parametrize("n_partitions", [1, 2, 3, 4, 8, 16])
def test_tera_sort_performance(spark_context, n_partitions, test_case):
points, sorted_points = test_case
n_dim = len(points[0])
rdd = spark_context.parallelize(points)

tera_sort = TeraSort(spark_context, n_partitions)
result = tera_sort(rdd=rdd, n_dim=n_dim).collect()

assert len(result) == len(sorted_points)
assert result == sorted_points

0 comments on commit 28b9ace

Please sign in to comment.