From e7dab0cc918ecf2927dc8547190563726b0f033c Mon Sep 17 00:00:00 2001 From: Vadym Doroshenko Date: Wed, 11 Oct 2023 17:53:30 +0200 Subject: [PATCH] Spark impl part 1 --- .../restaurant_visits/run_on_dataframes.py | 33 ++++++- pipeline_dp/dataframes.py | 91 ++++++++++++++----- 2 files changed, 96 insertions(+), 28 deletions(-) diff --git a/examples/restaurant_visits/run_on_dataframes.py b/examples/restaurant_visits/run_on_dataframes.py index a57a9bba..00157e83 100644 --- a/examples/restaurant_visits/run_on_dataframes.py +++ b/examples/restaurant_visits/run_on_dataframes.py @@ -22,6 +22,8 @@ import pipeline_dp from pipeline_dp import dataframes import pandas as pd +from pyspark.sql import SparkSession +import pyspark FLAGS = flags.FLAGS flags.DEFINE_string('input_file', 'restaurants_week_data.csv', @@ -31,7 +33,7 @@ 'Which dataframes to use.') -def load_data_in_dataframe() -> pd.DataFrame: +def load_data_in_pandas_dataframe() -> pd.DataFrame: df = pd.read_csv(FLAGS.input_file) df.rename(inplace=True, columns={ @@ -44,6 +46,16 @@ def load_data_in_dataframe() -> pd.DataFrame: return df +def load_data_in_spark_dataframe( + spark: SparkSession) -> pyspark.sql.dataframe.DataFrame: + df = spark.read.csv(FLAGS.input_file, header=True) + return df.withColumnRenamed('VisitorId', 'visitor_id').withColumnRenamed( + 'Time entered', 'enter_time').withColumnRenamed( + 'Time spent (minutes)', 'spent_minutes').withColumnRenamed( + 'Money spent (euros)', + 'spent_money').withColumnRenamed('Day', 'day') + + def compute_private_result(df): dp_query_builder = dataframes.QueryBuilder(df, 'visitor_id') query = dp_query_builder.groupby('day', 3, 1).count().sum( @@ -54,15 +66,28 @@ def compute_private_result(df): return result_df -def compute_on_pandas_dataframes(df: pd.DataFrame) -> None: +def compute_on_pandas_dataframes() -> None: + df = load_data_in_pandas_dataframe() result_df = compute_private_result(df) result_df.to_csv(FLAGS.output_file) +def compute_on_spark_dataframes() -> None: + spark = SparkSession.builder \ + .master("local[1]") \ + .appName("SparkByExamples.com") \ + .getOrCreate() + df = load_data_in_spark_dataframe(spark) + df.printSchema() + result_df = compute_private_result(df) + result_df.write.format("csv").save(FLAGS.output_file) + + def main(unused_argv): - df = load_data_in_dataframe() if FLAGS.dataframes == 'pandas': - compute_on_pandas_dataframes(df) + compute_on_pandas_dataframes() + elif FLAGS.dataframes == 'spark': + compute_on_spark_dataframes() return 0 diff --git a/pipeline_dp/dataframes.py b/pipeline_dp/dataframes.py index 472a663d..2a54765f 100644 --- a/pipeline_dp/dataframes.py +++ b/pipeline_dp/dataframes.py @@ -1,10 +1,11 @@ -import typing +import abc from dataclasses import dataclass import pandas as pd import pipeline_dp from typing import Any, Sequence, Callable, Optional, List, Dict +import pyspark @dataclass @@ -22,36 +23,76 @@ class _ContributionBounds: max_value: Optional[float] = None -def dataframe_to_collection(df, columns: _Columns): - assert isinstance(df, pd.DataFrame), "Only Pandas dataframes are supported" - columns_to_keep = [columns.privacy_key, columns.partition_key] - if columns.value is not None: - columns_to_keep.append(columns.value) - df = df[columns_to_keep] # leave only needed columns. - if columns.value is None: - # For count value is not needed, but for simplicity always provide - # value. - df['value'] = 0 +class DataFrameConvertor(abc.ABC): - # name=None makes that tuples instead of name tuple are returned. - return list(df.itertuples(index=False, name=None)) + @abc.abstractmethod + def dataframe_to_collection(df, columns: _Columns): + pass + @abc.abstractmethod + def collection_to_dataframe(col, partition_key_column: str): + pass -def collection_to_dataframe(col, partition_key_column): - assert isinstance(col, list), "Only local run is supported for now" - partition_keys, data = list(zip(*col)) - df = pd.DataFrame(data=data) - df[partition_key_column] = partition_keys - columns = list(df.columns) - columns = [columns[-1]] + columns[:-1] - df = df.reindex(columns=columns).set_index(partition_key_column) - return df + +class PandasConverter(DataFrameConvertor): + + def dataframe_to_collection(self, df: pd.DataFrame, + columns: _Columns) -> list: + assert isinstance(df, + pd.DataFrame), "Only Pandas dataframes are supported" + columns_to_keep = [columns.privacy_key, columns.partition_key] + if columns.value is not None: + columns_to_keep.append(columns.value) + df = df[columns_to_keep] # leave only needed columns. + if columns.value is None: + # For count value is not needed, but for simplicity always provide + # value. + df['value'] = 0 + + # name=None makes that tuples instead of name tuple are returned. + return list(df.itertuples(index=False, name=None)) + + def collection_to_dataframe(self, col: list, + partition_key_column: str) -> pd.DataFrame: + assert isinstance(col, list), "Only local run is supported for now" + partition_keys, data = list(zip(*col)) + df = pd.DataFrame(data=data) + df[partition_key_column] = partition_keys + columns = list(df.columns) + columns = [columns[-1]] + columns[:-1] + df = df.reindex(columns=columns).set_index(partition_key_column) + return df + + +class SparkConverter(DataFrameConvertor): + + def dataframe_to_collection(self, df, columns: _Columns) -> pyspark.RDD: + columns_to_keep = [columns.privacy_key, columns.partition_key] + if columns.value is not None: + columns_to_keep.append(columns.value) + df = df[columns_to_keep] # leave only needed columns. + return [] + + def collection_to_dataframe(self, col: pyspark.RDD, + partition_key_column: str): + pass def create_backend_for_dataframe( df) -> pipeline_dp.pipeline_backend.PipelineBackend: if isinstance(df, pd.DataFrame): return pipeline_dp.LocalBackend() + if isinstance(df, pyspark.DataFrame): + return pipeline_dp.SparkRDDBackend() + raise NotImplementedError( + f"Dataframes of type {type(df)} not yet supported") + + +def create_dataframe_converter(df) -> DataFrameConvertor: + if isinstance(df, pd.DataFrame): + return PandasConverter() + if isinstance(df, pyspark.DataFrame): + return SparkConverter() raise NotImplementedError( f"Dataframes of type {type(df)} not yet supported") @@ -79,7 +120,8 @@ def __init__(self, df, columns: _Columns, metrics: Dict[pipeline_dp.Metric, def run_query(self, budget: Budget, noise_kind: Optional[pipeline_dp.NoiseKind] = None): - col = dataframe_to_collection(self._df, self._columns) + converter = create_dataframe_converter(self._df) + col = converter.dataframe_to_collection(self._df, self._columns) backend = create_backend_for_dataframe(self._df) budget_accountant = pipeline_dp.NaiveBudgetAccountant( total_epsilon=budget.epsilon, total_delta=budget.delta) @@ -111,7 +153,8 @@ def run_query(self, budget_accountant.compute_budgets() dp_result = list(dp_result) self._expain_computation_report = explain_computation_report.text() - return collection_to_dataframe(dp_result, self._columns.partition_key) + return converter.collection_to_dataframe(dp_result, + self._columns.partition_key) def explain_computations(self): if self._expain_computation_report is None: