Skip to content

Commit

Permalink
Add tera sort + tests, support custom kwargs in algorithm steps
Browse files Browse the repository at this point in the history
  • Loading branch information
kowaalczyk committed Jun 6, 2020
1 parent 52ebaf2 commit 9147400
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 15 deletions.
29 changes: 17 additions & 12 deletions spark_minimal_algorithms/algorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,26 @@ def __init__(self, sc: SparkContext, n_partitions: int):
super().__init__()

@staticmethod
def group(rdd: RDD) -> RDD:
def group(rdd: RDD, **kwargs: Any) -> RDD:
return rdd.groupByKey()

@staticmethod
def emit_by_group(group_key: Any, group_items: Iterable[Any]) -> Optional[Any]:
def emit_by_group(
group_key: Any, group_items: Iterable[Any], **kwargs: Any
) -> Optional[Any]:
return None

@staticmethod
def broadcast(emitted_items: List[Any]) -> Optional[Any]:
def broadcast(emitted_items: List[Any], **kwargs: Any) -> Optional[Any]:
return None

@abstractstaticmethod
def step(
group_key: Any, group_items: Iterable[Any], broadcast: Broadcast
group_key: Any, group_items: Iterable[Any], broadcast: Broadcast, **kwargs: Any
) -> Iterable[Any]:
pass

def __call__(self, rdd: RDD) -> RDD:
def __call__(self, rdd: RDD, **kwargs: Any) -> RDD:
"""
Performs a single step of an algorithm.
"""
Expand All @@ -38,21 +40,21 @@ def __call__(self, rdd: RDD) -> RDD:

step_cls: Type[Step] = self.__class__
rdd = step_cls.group(
rdd
rdd, **kwargs
).cache() # cache because we use it twice (emit and step)

def unwrap_emit(kv: Tuple[Any, Iterable[Any]]) -> Optional[Tuple[Any, Any]]:
k, v = kv
new_v = step_cls.emit_by_group(k, v)
new_v = step_cls.emit_by_group(k, v, **kwargs)
return new_v

emitted = list(rdd.map(unwrap_emit).collect())
to_broadcast = step_cls.broadcast(emitted)
to_broadcast = step_cls.broadcast(emitted, **kwargs)
broadcast: Broadcast = self._sc.broadcast(to_broadcast)

def unwrap_step(kv: Tuple[Any, Iterable[Any]]) -> Iterable[Any]:
k, v = kv
for new_v in step_cls.step(k, v, broadcast):
for new_v in step_cls.step(k, v, broadcast, **kwargs):
yield new_v

rdd = rdd.flatMap(unwrap_step)
Expand All @@ -72,12 +74,15 @@ def __init__(self, sc: SparkContext, n_partitions: int):

super().__init__()

@property
def n_partitions(self) -> int:
return self._n_partitions

@abstractmethod
def run(self, **kwargs: Dict[str, Any]) -> RDD:
def run(self, **kwargs: Any) -> RDD:
pass

def __call__(self, **kwargs: Dict[str, Any]) -> RDD:
# todo: add support for positional arguments
def __call__(self, **kwargs: Any) -> RDD:
for arg_name, arg in kwargs.items():
if isinstance(arg, RDD) and arg.getNumPartitions() != self._n_partitions:
kwargs[arg_name] = arg.repartition(self._n_partitions)
Expand Down
6 changes: 3 additions & 3 deletions spark_minimal_algorithms/examples/countifs.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,19 @@ def extract_partition_idx(
yield idx, point

@staticmethod
def group(rdd: RDD) -> RDD:
def group(rdd: RDD) -> RDD: # type: ignore
# sort by values - todo: consider using custom terasort implementation
cls = CountifsInitialStep
rdd = rdd.map(cls.select_key).sortByKey().map(cls.unselect_key)
rdd = rdd.mapPartitionsWithIndex(cls.extract_partition_idx).groupByKey()
return rdd

@staticmethod
def emit_by_group(group_key: int, group_items: Iterable[Any]) -> Optional[Any]:
def emit_by_group(group_key: int, group_items: Iterable[Any]) -> Optional[Any]: # type: ignore
return group_key, len(list(group_items))

@staticmethod
def broadcast(
def broadcast( # type: ignore
emitted_items: List[Tuple[int, int]]
) -> Dict[str, Union[str, List[int]]]:
parition_counts = [
Expand Down
86 changes: 86 additions & 0 deletions spark_minimal_algorithms/examples/tera_sort.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
from typing import Iterable, Tuple, Any, List
from bisect import bisect_left
import random
import math

from spark_minimal_algorithms.algorithm import Step, Algorithm

from pyspark import RDD, Broadcast


class TeraSortFirstRound(Step):
p = 0.1
""" Default value for probability of sampling a point to be a bucket key """

@staticmethod
def extract_idx(
partition_idx: int, partition_points: Iterable[Any]
) -> Iterable[Tuple[int, Any]]:
for point in partition_points:
yield partition_idx, point

@staticmethod
def group(rdd: RDD, **kwargs: Any) -> RDD:
rdd = rdd.mapPartitionsWithIndex(TeraSortFirstRound.extract_idx).groupByKey()
return rdd

@staticmethod
def emit_by_group(group_key: int, group_items: Iterable[Any], **kwargs: Any) -> Any:
samples = list()
p: float = kwargs.get("p", TeraSortFirstRound.p)
for point in group_items:
if random.random() < p:
samples.append(point)

return samples

@staticmethod
def broadcast(emitted_items: List[List[Any]], **kwargs: Any) -> List[Any]:
n_dim = kwargs["n_dim"]
zero_point = tuple(0 for _ in range(n_dim))
buckets = [zero_point] + [
point for samples in emitted_items for point in samples
]
return sorted(buckets)

@staticmethod
def step( # type: ignore
group_key: int, group_items: Iterable[Any], broadcast: Broadcast, **kwargs: Any
) -> Iterable[Tuple[int, Any]]:
for point in group_items:
point_bucket = bisect_left(broadcast.value, point)
yield point_bucket, point


class TeraSortFinalRound(Step):
@staticmethod
def group(rdd: RDD) -> RDD: # type: ignore
rdd = rdd.groupByKey().sortByKey()
return rdd

@staticmethod
def step( # type: ignore
group_key: int, group_items: Iterable[Any], broadcast: Broadcast
) -> Iterable[Any]:
sorted_points = sorted(group_items)
for point in sorted_points:
yield point


class TeraSort(Algorithm):
__steps__ = {
"assign_buckets": TeraSortFirstRound,
"sort": TeraSortFinalRound,
}

def run(self, rdd: RDD, n_dim: int) -> RDD: # type: ignore
rdd = rdd.cache()

n_points = rdd.count()
m = n_points / self.n_partitions
optimal_p = math.log(n_points * self.n_partitions) / m

rdd = self.assign_buckets(rdd, p=optimal_p, n_dim=n_dim) # type: ignore
rdd = self.sort(rdd) # type: ignore

return rdd
47 changes: 47 additions & 0 deletions tests/examples/test_tera_sort.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from typing import List, Tuple, Any
import random

import pytest

from spark_minimal_algorithms.examples.tera_sort import TeraSort

random.seed(42)


def create_test_case(n_points: int, n_dim: int) -> List[Tuple[Any]]:
max_point = 100 * n_points
points = [
tuple(random.randint(1, max_point) for _ in range(n_dim))
for _ in range(n_points)
]
return points, sorted(points)


TESTS = [
create_test_case(5, 1),
create_test_case(10, 1),
create_test_case(100, 1),
create_test_case(1_000, 1),
create_test_case(5, 2),
create_test_case(10, 2),
create_test_case(100, 2),
create_test_case(1_000, 2),
create_test_case(5, 3),
create_test_case(10, 3),
create_test_case(100, 3),
create_test_case(1_000, 3),
]


@pytest.mark.parametrize("test_case", TESTS)
@pytest.mark.parametrize("n_partitions", [1, 2, 4])
def test_tera_sort(spark_context, n_partitions, test_case):
points, sorted_points = test_case
n_dim = len(points[0])
rdd = spark_context.parallelize(points)

tera_sort = TeraSort(spark_context, n_partitions)
result = tera_sort(rdd=rdd, n_dim=n_dim).collect()

assert len(result) == len(sorted_points)
assert result == sorted_points

0 comments on commit 9147400

Please sign in to comment.