diff --git a/examples/movie_view_ratings/run_without_frameworks.py b/examples/movie_view_ratings/run_without_frameworks.py index 145b0ad6..684d985e 100644 --- a/examples/movie_view_ratings/run_without_frameworks.py +++ b/examples/movie_view_ratings/run_without_frameworks.py @@ -22,6 +22,9 @@ FLAGS = flags.FLAGS flags.DEFINE_string('input_file', None, 'The file with the movie view data') flags.DEFINE_string('output_file', None, 'Output file') +flags.DEFINE_bool( + 'pld_accounting', False, 'If false Naive budget accounting ' + 'is used, if true PLD accounting') def main(unused_argv): @@ -33,8 +36,12 @@ def main(unused_argv): backend = pipeline_dp.LocalBackend() # Define the privacy budget available for our computation. - budget_accountant = pipeline_dp.NaiveBudgetAccountant(total_epsilon=1, - total_delta=1e-6) + if FLAGS.pld_accounting: + budget_accountant = pipeline_dp.PLDBudgetAccountant(total_epsilon=1, + total_delta=1e-6) + else: + budget_accountant = pipeline_dp.NaiveBudgetAccountant(total_epsilon=1, + total_delta=1e-6) # Load and parse input data movie_views = parse_file(FLAGS.input_file) @@ -42,16 +49,24 @@ def main(unused_argv): # Create a DPEngine instance. dp_engine = pipeline_dp.DPEngine(budget_accountant, backend) - params = pipeline_dp.AggregateParams( - metrics=[ - # we can compute multiple metrics at once. - pipeline_dp.Metrics.COUNT, - pipeline_dp.Metrics.SUM, - pipeline_dp.Metrics.PRIVACY_ID_COUNT, + # Define metrics to compute. We can compute multiple metrics at once. + metrics = [ + # We can compute multiple metrics at once. + pipeline_dp.Metrics.COUNT, + pipeline_dp.Metrics.SUM, + pipeline_dp.Metrics.PRIVACY_ID_COUNT + ] + if not FLAGS.pld_accounting: + # PLD accounting does not yet support PERCENTILE computations. + metrics.extend([ pipeline_dp.Metrics.PERCENTILE(50), pipeline_dp.Metrics.PERCENTILE(90), pipeline_dp.Metrics.PERCENTILE(99) - ], + ]) + params = pipeline_dp.AggregateParams( + metrics=metrics, + # Add Gaussian noise to anonymize values. + noise_kind=pipeline_dp.NoiseKind.GAUSSIAN, # Limits to how much one user can contribute: # .. at most two movies rated per user max_partitions_contributed=2, @@ -82,6 +97,7 @@ def main(unused_argv): movie_views, params, data_extractors, + public_partitions=list(range(1, 100)), out_explain_computation_report=explain_computation_report) budget_accountant.compute_budgets() diff --git a/pipeline_dp/__init__.py b/pipeline_dp/__init__.py index 44a928f3..4d9eda5e 100644 --- a/pipeline_dp/__init__.py +++ b/pipeline_dp/__init__.py @@ -27,6 +27,7 @@ from pipeline_dp.aggregate_params import SumParams from pipeline_dp.budget_accounting import BudgetAccountant from pipeline_dp.budget_accounting import NaiveBudgetAccountant +from pipeline_dp.budget_accounting import PLDBudgetAccountant from pipeline_dp.combiners import Combiner from pipeline_dp.combiners import CustomCombiner from pipeline_dp.data_extractors import DataExtractors diff --git a/pipeline_dp/dp_engine.py b/pipeline_dp/dp_engine.py index 157dae7b..b2827b8c 100644 --- a/pipeline_dp/dp_engine.py +++ b/pipeline_dp/dp_engine.py @@ -13,7 +13,7 @@ # limitations under the License. """DP aggregations.""" import functools -from typing import Any, Callable, Optional, Tuple +from typing import Any, Callable, Optional, Sequence, Tuple import pipeline_dp from pipeline_dp import budget_accounting @@ -24,7 +24,6 @@ from pipeline_dp import report_generator from pipeline_dp import sampling_utils from pipeline_dp.dataset_histograms import computing_histograms -from pipeline_dp import pipeline_functions from pipeline_dp.private_contribution_bounds import PrivateL0Calculator @@ -82,6 +81,9 @@ def aggregate(self, 'count' for COUNT metrics etc. """ self._check_aggregate_params(col, params, data_extractors) + self._check_budget_accountant_compatibility( + public_partitions is not None, params.metrics, + params.custom_combiners is not None) with self._budget_accountant.scope(weight=params.budget_weight): self._report_generators.append( @@ -208,6 +210,7 @@ def select_partitions(self, col, params: pipeline_dp.SelectPartitionsParams, `value_extractor` is not required. """ self._check_select_private_partitions(col, params, data_extractors) + self._check_budget_accountant_compatibility(False, [], False) with self._budget_accountant.scope(weight=params.budget_weight): self._report_generators.append( @@ -495,6 +498,29 @@ def _check_calculate_private_contribution_bounds_params( if check_data_extractors: _check_data_extractors(data_extractors) + def _check_budget_accountant_compatibility(self, is_public_partition: bool, + metrics: Sequence[ + pipeline_dp.Metric], + custom_combiner: bool) -> None: + if isinstance(self._budget_accountant, + pipeline_dp.NaiveBudgetAccountant): + # All aggregations support NaiveBudgetAccountant. + return + if not is_public_partition: + raise NotImplementedError("PLD budget accounting does not support " + "private partition selection") + supported_metrics = [ + pipeline_dp.Metrics.COUNT, pipeline_dp.Metrics.PRIVACY_ID_COUNT, + pipeline_dp.Metrics.SUM, pipeline_dp.Metrics.MEAN + ] + non_supported_metrics = set(metrics) - set(supported_metrics) + if non_supported_metrics: + raise NotImplementedError(f"Metrics {non_supported_metrics} do not " + f"support PLD budget accounting") + if custom_combiner: + raise ValueError(f"PLD budget accounting does not support custom " + f"combiners") + def _annotate(self, col, params: pipeline_dp.SelectPartitionsParams, budget: budget_accounting.Budget): return self._backend.annotate(col, diff --git a/tests/dp_engine_test.py b/tests/dp_engine_test.py index 7b35076f..70261c1c 100644 --- a/tests/dp_engine_test.py +++ b/tests/dp_engine_test.py @@ -1080,18 +1080,38 @@ def run_e2e_private_partition_selection_large_budget(self, col, backend): return col - def test_run_e2e_local(self): + @parameterized.parameters(False, True) + def test_run_e2e_count_public_partition_local(self, pld_accounting): + Accountant = pipeline_dp.PLDBudgetAccountant if pld_accounting else pipeline_dp.NaiveBudgetAccountant + accountant = Accountant(total_epsilon=1, + total_delta=1e-3, + num_aggregations=1) + dp_engine = self._create_dp_engine_default(accountant=accountant) + aggregate_params, _ = self._create_params_default() + + input = [1, 2, 3] + public_partitions = [2] + explain_computation_report = pipeline_dp.ExplainComputationReport() + output = dp_engine.aggregate(input, aggregate_params, + self._get_default_extractors(), + public_partitions, + explain_computation_report) + + accountant.compute_budgets() + self.assertLen(list(output), 1) + + def test_run_e2e_partition_selection_local(self): input = list(range(10)) output = self.run_e2e_private_partition_selection_large_budget( input, pipeline_dp.LocalBackend()) - self.assertEqual(5, len(list(output))) + self.assertLen(list(output), 5) @unittest.skip("There are some problems with serialization in this test. " "Tests in private_spark_test.py work normaly so probably it" " is because of some missing setup.") - def test_run_e2e_spark(self): + def test_run_e2e_partition_selection_spark(self): import pyspark conf = pyspark.SparkConf() sc = pyspark.SparkContext.getOrCreate(conf=conf) @@ -1100,9 +1120,9 @@ def test_run_e2e_spark(self): output = self.run_e2e_private_partition_selection_large_budget( input, pipeline_dp.SparkRDDBackend(sc)) - self.assertEqual(5, len(collect_to_container())) + self.assertLen(collect_to_container(), 5) - def test_run_e2e_beam(self): + def test_run_e2e_partition_selection_beam(self): with test_pipeline.TestPipeline() as p: input = p | "Create input" >> beam.Create(list(range(10))) @@ -1164,6 +1184,31 @@ def test_annotate_call(self, mock_annotate_fn): self.assertEqual(total_epsilon / 3, budget.epsilon) self.assertEqual(total_delta / 3, budget.delta) + def test_pld_not_supported_metrics(self): + with self.assertRaisesRegex( + NotImplementedError, + "Metrics {VARIANCE} do not support PLD budget accounting"): + budget_accountant = pipeline_dp.PLDBudgetAccountant( + total_epsilon=1, total_delta=1e-10) + engine = pipeline_dp.DPEngine(budget_accountant=budget_accountant, + backend=pipeline_dp.LocalBackend()) + aggregate_params, public_partitions = self._create_params_default() + aggregate_params.metrics = [pipeline_dp.Metrics.VARIANCE] + engine.aggregate([1], aggregate_params, + self._get_default_extractors(), public_partitions) + + def test_pld_not_support_private_partition_selection(self): + with self.assertRaisesRegex( + NotImplementedError, + "PLD budget accounting does not support private partition"): + budget_accountant = pipeline_dp.PLDBudgetAccountant( + total_epsilon=1, total_delta=1e-10) + engine = pipeline_dp.DPEngine(budget_accountant=budget_accountant, + backend=pipeline_dp.LocalBackend()) + aggregate_params, _ = self._create_params_default() + engine.aggregate([1], aggregate_params, + self._get_default_extractors()) + if __name__ == '__main__': absltest.main()