Skip to content

Commit

Permalink
Spark impl part 1
Browse files Browse the repository at this point in the history
  • Loading branch information
dvadym committed Oct 11, 2023
1 parent a4f928c commit e7dab0c
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 28 deletions.
33 changes: 29 additions & 4 deletions examples/restaurant_visits/run_on_dataframes.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import pipeline_dp
from pipeline_dp import dataframes
import pandas as pd
from pyspark.sql import SparkSession
import pyspark

FLAGS = flags.FLAGS
flags.DEFINE_string('input_file', 'restaurants_week_data.csv',
Expand All @@ -31,7 +33,7 @@
'Which dataframes to use.')


def load_data_in_dataframe() -> pd.DataFrame:
def load_data_in_pandas_dataframe() -> pd.DataFrame:
df = pd.read_csv(FLAGS.input_file)
df.rename(inplace=True,
columns={
Expand All @@ -44,6 +46,16 @@ def load_data_in_dataframe() -> pd.DataFrame:
return df


def load_data_in_spark_dataframe(
spark: SparkSession) -> pyspark.sql.dataframe.DataFrame:
df = spark.read.csv(FLAGS.input_file, header=True)
return df.withColumnRenamed('VisitorId', 'visitor_id').withColumnRenamed(
'Time entered', 'enter_time').withColumnRenamed(
'Time spent (minutes)', 'spent_minutes').withColumnRenamed(
'Money spent (euros)',
'spent_money').withColumnRenamed('Day', 'day')


def compute_private_result(df):
dp_query_builder = dataframes.QueryBuilder(df, 'visitor_id')
query = dp_query_builder.groupby('day', 3, 1).count().sum(
Expand All @@ -54,15 +66,28 @@ def compute_private_result(df):
return result_df


def compute_on_pandas_dataframes(df: pd.DataFrame) -> None:
def compute_on_pandas_dataframes() -> None:
df = load_data_in_pandas_dataframe()
result_df = compute_private_result(df)
result_df.to_csv(FLAGS.output_file)


def compute_on_spark_dataframes() -> None:
spark = SparkSession.builder \
.master("local[1]") \
.appName("SparkByExamples.com") \
.getOrCreate()
df = load_data_in_spark_dataframe(spark)
df.printSchema()
result_df = compute_private_result(df)
result_df.write.format("csv").save(FLAGS.output_file)


def main(unused_argv):
df = load_data_in_dataframe()
if FLAGS.dataframes == 'pandas':
compute_on_pandas_dataframes(df)
compute_on_pandas_dataframes()
elif FLAGS.dataframes == 'spark':
compute_on_spark_dataframes()

return 0

Expand Down
91 changes: 67 additions & 24 deletions pipeline_dp/dataframes.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import typing
import abc
from dataclasses import dataclass

import pandas as pd

import pipeline_dp
from typing import Any, Sequence, Callable, Optional, List, Dict
import pyspark


@dataclass
Expand All @@ -22,36 +23,76 @@ class _ContributionBounds:
max_value: Optional[float] = None


def dataframe_to_collection(df, columns: _Columns):
assert isinstance(df, pd.DataFrame), "Only Pandas dataframes are supported"
columns_to_keep = [columns.privacy_key, columns.partition_key]
if columns.value is not None:
columns_to_keep.append(columns.value)
df = df[columns_to_keep] # leave only needed columns.
if columns.value is None:
# For count value is not needed, but for simplicity always provide
# value.
df['value'] = 0
class DataFrameConvertor(abc.ABC):

# name=None makes that tuples instead of name tuple are returned.
return list(df.itertuples(index=False, name=None))
@abc.abstractmethod
def dataframe_to_collection(df, columns: _Columns):
pass

@abc.abstractmethod
def collection_to_dataframe(col, partition_key_column: str):
pass

def collection_to_dataframe(col, partition_key_column):
assert isinstance(col, list), "Only local run is supported for now"
partition_keys, data = list(zip(*col))
df = pd.DataFrame(data=data)
df[partition_key_column] = partition_keys
columns = list(df.columns)
columns = [columns[-1]] + columns[:-1]
df = df.reindex(columns=columns).set_index(partition_key_column)
return df

class PandasConverter(DataFrameConvertor):

def dataframe_to_collection(self, df: pd.DataFrame,
columns: _Columns) -> list:
assert isinstance(df,
pd.DataFrame), "Only Pandas dataframes are supported"
columns_to_keep = [columns.privacy_key, columns.partition_key]
if columns.value is not None:
columns_to_keep.append(columns.value)
df = df[columns_to_keep] # leave only needed columns.
if columns.value is None:
# For count value is not needed, but for simplicity always provide
# value.
df['value'] = 0

# name=None makes that tuples instead of name tuple are returned.
return list(df.itertuples(index=False, name=None))

def collection_to_dataframe(self, col: list,
partition_key_column: str) -> pd.DataFrame:
assert isinstance(col, list), "Only local run is supported for now"
partition_keys, data = list(zip(*col))
df = pd.DataFrame(data=data)
df[partition_key_column] = partition_keys
columns = list(df.columns)
columns = [columns[-1]] + columns[:-1]
df = df.reindex(columns=columns).set_index(partition_key_column)
return df


class SparkConverter(DataFrameConvertor):

def dataframe_to_collection(self, df, columns: _Columns) -> pyspark.RDD:
columns_to_keep = [columns.privacy_key, columns.partition_key]
if columns.value is not None:
columns_to_keep.append(columns.value)
df = df[columns_to_keep] # leave only needed columns.
return []

def collection_to_dataframe(self, col: pyspark.RDD,
partition_key_column: str):
pass


def create_backend_for_dataframe(
df) -> pipeline_dp.pipeline_backend.PipelineBackend:
if isinstance(df, pd.DataFrame):
return pipeline_dp.LocalBackend()
if isinstance(df, pyspark.DataFrame):
return pipeline_dp.SparkRDDBackend()
raise NotImplementedError(
f"Dataframes of type {type(df)} not yet supported")


def create_dataframe_converter(df) -> DataFrameConvertor:
if isinstance(df, pd.DataFrame):
return PandasConverter()
if isinstance(df, pyspark.DataFrame):
return SparkConverter()
raise NotImplementedError(
f"Dataframes of type {type(df)} not yet supported")

Expand Down Expand Up @@ -79,7 +120,8 @@ def __init__(self, df, columns: _Columns, metrics: Dict[pipeline_dp.Metric,
def run_query(self,
budget: Budget,
noise_kind: Optional[pipeline_dp.NoiseKind] = None):
col = dataframe_to_collection(self._df, self._columns)
converter = create_dataframe_converter(self._df)
col = converter.dataframe_to_collection(self._df, self._columns)
backend = create_backend_for_dataframe(self._df)
budget_accountant = pipeline_dp.NaiveBudgetAccountant(
total_epsilon=budget.epsilon, total_delta=budget.delta)
Expand Down Expand Up @@ -111,7 +153,8 @@ def run_query(self,
budget_accountant.compute_budgets()
dp_result = list(dp_result)
self._expain_computation_report = explain_computation_report.text()
return collection_to_dataframe(dp_result, self._columns.partition_key)
return converter.collection_to_dataframe(dp_result,
self._columns.partition_key)

def explain_computations(self):
if self._expain_computation_report is None:
Expand Down

0 comments on commit e7dab0c

Please sign in to comment.