Skip to content

Commit

Permalink
Cleanup + autoformat
Browse files Browse the repository at this point in the history
  • Loading branch information
kowaalczyk committed Jun 8, 2020
1 parent a95811b commit 5d61975
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 117 deletions.
57 changes: 32 additions & 25 deletions spark_minimal_algorithms/examples/countifs.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,19 @@ class SortAndAssignLabels(Step):
"""

@staticmethod
def _sort_within_partition(bucket_and_points: Tuple[int, Iterable[Any]]) -> Tuple[int, Iterable[Any]]:
def _sort_within_partition(
bucket_and_points: Tuple[int, Iterable[Any]]
) -> Tuple[int, Iterable[Any]]:
bucket, points = bucket_and_points
points = sorted(points, key=_label_first_coord_and_type)
return bucket, points

@staticmethod
def group(rdd: RDD) -> RDD: # type: ignore
rdd = rdd.groupByKey().sortByKey()
rdd = rdd.map(SortAndAssignLabels._sort_within_partition, preservesPartitioning=True)
# for k, v in rdd.collect(): # todo: remove debug
# print(f"{k}: {list(v)}")
rdd = rdd.map(
SortAndAssignLabels._sort_within_partition, preservesPartitioning=True
)
return rdd

@staticmethod
Expand All @@ -69,21 +71,32 @@ def emit_by_group(group_key: int, group_items: Iterable[Any]) -> Optional[Any]:
last_label = label
n_points_for_last_label = 1

return bucket_idx, (first_label, n_points_for_first_label), (last_label, n_points_for_last_label)
return (
bucket_idx,
(first_label, n_points_for_first_label),
(last_label, n_points_for_last_label),
)

@staticmethod
def broadcast(emitted_items: List[List[Any]]) -> Dict[str, Any]: # type: ignore
bucket_label_counts = sorted(emitted_items, key=lambda bucket_count: bucket_count[0])

# print(f"bucket_label_counts: {bucket_label_counts}") # todo: remove debug
bucket_label_counts = sorted(
emitted_items, key=lambda bucket_count: bucket_count[0]
)

previous_label = () # empty tuple is never assigned as a label
previous_count = 0
bucket_prefix_counts = dict() # i => (last label in (i-1)-th bucket, count of points with this label in previous buckets)
total_label_counts = dict() # label => total count of points with this label (only for multi-bucket labels)
bucket_prefix_counts = (
dict()
) # i => (last label in (i-1)-th bucket, count of points with this label in previous buckets)
total_label_counts = (
dict()
) # label => total count of points with this label (only for multi-bucket labels)
for bucket_count in bucket_label_counts:
bucket_partition_idx = bucket_count[0]
bucket_prefix_counts[bucket_partition_idx] = (previous_label, previous_count)
bucket_prefix_counts[bucket_partition_idx] = (
previous_label,
previous_count,
)

first_label, first_label_count = bucket_count[1]
last_label, last_label_count = bucket_count[2]
Expand All @@ -104,13 +117,11 @@ def broadcast(emitted_items: List[List[Any]]) -> Dict[str, Any]: # type: ignore
# after iteration ends, we still need to assign total count for last label
total_label_counts[previous_label] = previous_count

# if this is not 1st round, dummy key () has value 0 which needs to be removed
keys_to_delete = {k for k in total_label_counts if total_label_counts[k] == 0}
for k in keys_to_delete:
del total_label_counts[k]

# print(f"bucket_prefix_counts: {bucket_prefix_counts}") # todo: remove debug
# print(f"total_label_counts: {total_label_counts}") # todo: remove debug

return {
"bucket_prefix_count": bucket_prefix_counts,
"total_label_count": total_label_counts,
Expand Down Expand Up @@ -144,13 +155,7 @@ def step( # type: ignore

# todo: label format strings for global labels can be pre-computed before broadcast
# todo: we can probably get rid of few intermediate dicts to save memory
label_count: Dict[str, int] = {
**global_label_count,
**local_label_count
}
# print(f"Caclulating label in partition: {group_key}")
# print(f"available label counts: {label_count}")
# print("")
label_count: Dict[str, int] = {**global_label_count, **local_label_count}
label_format_str = {
label: _get_format_str(n_points_for_label)
for label, n_points_for_label in label_count.items()
Expand All @@ -170,21 +175,23 @@ def step( # type: ignore
point_idx_within_label = 1
previous_label = old_label

# print(f"Point {point} (#{idx} in bucket #{bucket_idx}) got label {new_label}") # todo: remove debug

if t == DATA:
for prefix_len in range(len(new_label)):
if new_label[prefix_len] == "1":
if len(coords) > 1:
yield (old_label, new_label[:prefix_len]), coords[1:], type_info
yield (old_label, new_label[:prefix_len]), coords[
1:
], type_info
else:
yield (old_label, new_label[:prefix_len]), type_info

elif t == QUERY:
for prefix_len in range(len(new_label)):
if new_label[prefix_len] == "0":
if len(coords) > 1:
yield (old_label, new_label[:prefix_len]), coords[1:], type_info
yield (old_label, new_label[:prefix_len]), coords[
1:
], type_info
else:
yield (old_label, new_label[:prefix_len]), type_info

Expand Down
9 changes: 7 additions & 2 deletions spark_minimal_algorithms/examples/tera_sort.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ def broadcast(emitted_items: List[List[Any]], **kwargs: Any) -> List[Any]:
def step( # type: ignore
group_key: int, group_items: Iterable[Any], broadcast: Broadcast, **kwargs: Any
) -> Iterable[Tuple[int, Any]]:
key_func: Callable[[Tuple[Any]], Tuple[Any]] = kwargs.get("key_func", lambda x: x)
key_func: Callable[[Tuple[Any]], Tuple[Any]] = kwargs.get(
"key_func", lambda x: x
)
for point in group_items:
point_key = key_func(point)
point_bucket = bisect_left(broadcast.value, point_key)
Expand All @@ -80,7 +82,9 @@ def group(rdd: RDD, **kwargs: Any) -> RDD: # type: ignore
def step( # type: ignore
group_key: int, group_items: Iterable[Any], broadcast: Broadcast, **kwargs: Any
) -> Iterable[Any]:
key_func: Callable[[Tuple[Any]], Tuple[Any]] = kwargs.get("key_func", lambda x: x)
key_func: Callable[[Tuple[Any]], Tuple[Any]] = kwargs.get(
"key_func", lambda x: x
)
sorted_points = sorted(group_items, key=key_func)
for point in sorted_points:
yield point
Expand All @@ -99,6 +103,7 @@ class TeraSort(Algorithm):
- `results_rdd`: sorted `rdd`
"""

__steps__ = {
"assign_buckets": SampleAndAssignBuckets,
"sort": SortByKeyAndValue,
Expand Down
183 changes: 94 additions & 89 deletions tests/examples/test_countifs.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,7 @@


@pytest.mark.parametrize(
"cls",
[
TeraSortWithLabels,
GetResultsByLabel,
AggregateResultsByQuery,
],
"cls", [TeraSortWithLabels, GetResultsByLabel, AggregateResultsByQuery,],
)
@pytest.mark.parametrize("n_partitions", [1])
def test_step_creation(cls, spark_context, n_partitions):
Expand Down Expand Up @@ -53,16 +48,18 @@ def test_algorithm_creation(spark_context, n_partitions):

@pytest.mark.parametrize("n_partitions", [1, 2, 3, 4])
def test_tera_sort_label_assignment_1d(spark_context, n_partitions):
rdd = spark_context.parallelize([
((), (1,), (0, 0)), # D0: data at x = 1 with empty label
((), (1,), (1, 0)), # Q0: query at x = 1 with empty label
((), (2,), (0, 1)), # D1: data at x = 2 with empty label
((), (2,), (1, 1)), # Q1: query at x = 2 with empty label
])
rdd = spark_context.parallelize(
[
((), (1,), (0, 0)), # D0: data at x = 1 with empty label
((), (1,), (1, 0)), # Q0: query at x = 1 with empty label
((), (2,), (0, 1)), # D1: data at x = 2 with empty label
((), (2,), (1, 1)), # Q1: query at x = 2 with empty label
]
)
# Q0, D1 is the only pair of results that matches the COUNTIF criteria
expected_result = [
(((), ''), (1, 0)),
(((), ''), (0, 1)),
(((), ""), (1, 0)),
(((), ""), (0, 1)),
]

algorithm = TeraSortWithLabels(spark_context, n_partitions)
Expand All @@ -73,18 +70,20 @@ def test_tera_sort_label_assignment_1d(spark_context, n_partitions):

@pytest.mark.parametrize("n_partitions", [1, 2, 3, 4])
def test_tera_sort_label_assignment_2d_round_1_case_1(spark_context, n_partitions):
rdd = spark_context.parallelize([
((), (3, 6), (0, 0)), # D0
((), (4, 2), (0, 1)), # D1
((), (0, 5), (1, 0)), # Q0
((), (7, 1), (1, 1)), # Q1
])
rdd = spark_context.parallelize(
[
((), (3, 6), (0, 0)), # D0
((), (4, 2), (0, 1)), # D1
((), (0, 5), (1, 0)), # Q0
((), (7, 1), (1, 1)), # Q1
]
)
# after 1st dimension, for Q0 both D1 and D2 are feasible
expected_result_1st_round = [
(((), ''), (5,), (1, 0)),
(((), '0'), (5,), (1, 0)),
(((), '0'), (6,), (0, 0)),
(((), ''), (2,), (0, 1)),
(((), ""), (5,), (1, 0)),
(((), "0"), (5,), (1, 0)),
(((), "0"), (6,), (0, 0)),
(((), ""), (2,), (0, 1)),
]

algorithm = TeraSortWithLabels(spark_context, n_partitions)
Expand All @@ -95,16 +94,18 @@ def test_tera_sort_label_assignment_2d_round_1_case_1(spark_context, n_partition

@pytest.mark.parametrize("n_partitions", [1, 2, 3, 4])
def test_tera_sort_label_assignment_2d_round_2_case_1(spark_context, n_partitions):
rdd_after_1st_round = spark_context.parallelize([
(((), ''), (5,), (1, 0)), # Q0
(((), '0'), (5,), (1, 0)), # Q0
(((), '0'), (6,), (0, 0)), # D0
(((), ''), (2,), (0, 1)), # D1
])
rdd_after_1st_round = spark_context.parallelize(
[
(((), ""), (5,), (1, 0)), # Q0
(((), "0"), (5,), (1, 0)), # Q0
(((), "0"), (6,), (0, 0)), # D0
(((), ""), (2,), (0, 1)), # D1
]
)
# after 1st dimension, for Q0 both D1 and D2 are feasible
expected_result_2nd_round = [
((((), '0'), ''), (1, 0)),
((((), '0'), ''), (0, 0)),
((((), "0"), ""), (1, 0)),
((((), "0"), ""), (0, 0)),
]

algorithm = TeraSortWithLabels(spark_context, n_partitions)
Expand All @@ -117,32 +118,34 @@ def test_tera_sort_label_assignment_2d_round_2_case_1(spark_context, n_partition
def test_tera_sort_label_assignment_2d_round_1_case_2(spark_context, n_partitions):
# 'data_points': [(103, 480), (105, 1771), (1178, 101), (1243, 107)],
# 'query_points': [(100, 100), (102, 102), (104, 104), (106, 106)]
rdd = spark_context.parallelize([
((), (1178, 101), (0, 2)),
((), (103, 480), (0, 0)),
((), (105, 1771), (0, 1)),
((), (1243, 107), (0, 3)),
((), (104, 104), (1, 2)),
((), (100, 100), (1, 0)),
((), (102, 102), (1, 1)),
((), (106, 106), (1, 3))
])
rdd = spark_context.parallelize(
[
((), (1178, 101), (0, 2)),
((), (103, 480), (0, 0)),
((), (105, 1771), (0, 1)),
((), (1243, 107), (0, 3)),
((), (104, 104), (1, 2)),
((), (100, 100), (1, 0)),
((), (102, 102), (1, 1)),
((), (106, 106), (1, 3)),
]
)
# after 1st dimension, for Q0 both D1 and D2 are feasible
expected_result_1st_round = [
(((), ''), (100,), (1, 0)),
(((), '0'), (100,), (1, 0)),
(((), '00'), (100,), (1, 0)),
(((), ''), (102,), (1, 1)),
(((), '0'), (102,), (1, 1)),
(((), '0'), (480,), (0, 0)),
(((), ''), (104,), (1, 2)),
(((), ''), (1771,), (0, 1)),
(((), '1'), (106,), (1, 3)),
(((), ''), (101,), (0, 2)),
(((), '1'), (101,), (0, 2)),
(((), ''), (107,), (0, 3)),
(((), '1'), (107,), (0, 3)),
(((), '11'), (107,), (0, 3)),
(((), ""), (100,), (1, 0)),
(((), "0"), (100,), (1, 0)),
(((), "00"), (100,), (1, 0)),
(((), ""), (102,), (1, 1)),
(((), "0"), (102,), (1, 1)),
(((), "0"), (480,), (0, 0)),
(((), ""), (104,), (1, 2)),
(((), ""), (1771,), (0, 1)),
(((), "1"), (106,), (1, 3)),
(((), ""), (101,), (0, 2)),
(((), "1"), (101,), (0, 2)),
(((), ""), (107,), (0, 3)),
(((), "1"), (107,), (0, 3)),
(((), "11"), (107,), (0, 3)),
]

algorithm = TeraSortWithLabels(spark_context, n_partitions)
Expand All @@ -153,41 +156,43 @@ def test_tera_sort_label_assignment_2d_round_1_case_2(spark_context, n_partition

@pytest.mark.parametrize("n_partitions", [1, 2, 3, 4])
def test_tera_sort_label_assignment_2d_round_2_case_2(spark_context, n_partitions):
rdd_after_1st_round = spark_context.parallelize([
(((), ''), (100,), (1, 0)),
(((), '0'), (100,), (1, 0)),
(((), '00'), (100,), (1, 0)),
(((), ''), (102,), (1, 1)),
(((), '0'), (102,), (1, 1)),
(((), '0'), (480,), (0, 0)),
(((), ''), (104,), (1, 2)),
(((), ''), (1771,), (0, 1)),
(((), '1'), (106,), (1, 3)),
(((), ''), (101,), (0, 2)),
(((), '1'), (101,), (0, 2)),
(((), ''), (107,), (0, 3)),
(((), '1'), (107,), (0, 3)),
(((), '11'), (107,), (0, 3)),
])
rdd_after_1st_round = spark_context.parallelize(
[
(((), ""), (100,), (1, 0)),
(((), "0"), (100,), (1, 0)),
(((), "00"), (100,), (1, 0)),
(((), ""), (102,), (1, 1)),
(((), "0"), (102,), (1, 1)),
(((), "0"), (480,), (0, 0)),
(((), ""), (104,), (1, 2)),
(((), ""), (1771,), (0, 1)),
(((), "1"), (106,), (1, 3)),
(((), ""), (101,), (0, 2)),
(((), "1"), (101,), (0, 2)),
(((), ""), (107,), (0, 3)),
(((), "1"), (107,), (0, 3)),
(((), "11"), (107,), (0, 3)),
]
)
# after 1st dimension, for Q0 both D1 and D2 are feasible
expected_result_2nd_round = [
((((), ''), ''), (1, 0)),
((((), ''), '0'), (1, 0)),
((((), ''), '00'), (1, 0)),
((((), ''), '00'), (0, 2)),
((((), ''), ''), (1, 1)),
((((), ''), '01'), (1, 1)),
((((), ''), ''), (1, 2)),
((((), ''), ''), (0, 3)),
((((), ''), ''), (0, 1)),
((((), ''), '10'), (0, 1)),
((((), '0'), ''), (1, 0)),
((((), '0'), '0'), (1, 0)),
((((), '0'), ''), (1, 1)),
((((), '0'), ''), (0, 0)),
((((), '00'), ''), (1, 0)),
((((), '1'), ''), (1, 3)),
((((), '1'), ''), (0, 3)),
((((), ""), ""), (1, 0)),
((((), ""), "0"), (1, 0)),
((((), ""), "00"), (1, 0)),
((((), ""), "00"), (0, 2)),
((((), ""), ""), (1, 1)),
((((), ""), "01"), (1, 1)),
((((), ""), ""), (1, 2)),
((((), ""), ""), (0, 3)),
((((), ""), ""), (0, 1)),
((((), ""), "10"), (0, 1)),
((((), "0"), ""), (1, 0)),
((((), "0"), "0"), (1, 0)),
((((), "0"), ""), (1, 1)),
((((), "0"), ""), (0, 0)),
((((), "00"), ""), (1, 0)),
((((), "1"), ""), (1, 3)),
((((), "1"), ""), (0, 3)),
]

algorithm = TeraSortWithLabels(spark_context, n_partitions)
Expand Down
4 changes: 3 additions & 1 deletion tests/examples/test_tera_sort.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@


def create_test_case(
n_points: int, n_dim: int, key_func: Callable[[Tuple[Any]], Tuple[Any]] = lambda x: x
n_points: int,
n_dim: int,
key_func: Callable[[Tuple[Any]], Tuple[Any]] = lambda x: x,
) -> Tuple[List[Tuple[Any]], List[Tuple[Any]], Callable[[Tuple[Any]], Tuple[Any]]]:
max_point = 100 * n_points
points = [
Expand Down

0 comments on commit 5d61975

Please sign in to comment.