Skip to content

Commit

Permalink
Adding input params validation for PLD accounting (#476)
Browse files Browse the repository at this point in the history
  • Loading branch information
dvadym authored Aug 23, 2023
1 parent bdb91a3 commit d944bf7
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 16 deletions.
34 changes: 25 additions & 9 deletions examples/movie_view_ratings/run_without_frameworks.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
FLAGS = flags.FLAGS
flags.DEFINE_string('input_file', None, 'The file with the movie view data')
flags.DEFINE_string('output_file', None, 'Output file')
flags.DEFINE_bool(
'pld_accounting', False, 'If false Naive budget accounting '
'is used, if true PLD accounting')


def main(unused_argv):
Expand All @@ -33,25 +36,37 @@ def main(unused_argv):
backend = pipeline_dp.LocalBackend()

# Define the privacy budget available for our computation.
budget_accountant = pipeline_dp.NaiveBudgetAccountant(total_epsilon=1,
total_delta=1e-6)
if FLAGS.pld_accounting:
budget_accountant = pipeline_dp.PLDBudgetAccountant(total_epsilon=1,
total_delta=1e-6)
else:
budget_accountant = pipeline_dp.NaiveBudgetAccountant(total_epsilon=1,
total_delta=1e-6)

# Load and parse input data
movie_views = parse_file(FLAGS.input_file)

# Create a DPEngine instance.
dp_engine = pipeline_dp.DPEngine(budget_accountant, backend)

params = pipeline_dp.AggregateParams(
metrics=[
# we can compute multiple metrics at once.
pipeline_dp.Metrics.COUNT,
pipeline_dp.Metrics.SUM,
pipeline_dp.Metrics.PRIVACY_ID_COUNT,
# Define metrics to compute. We can compute multiple metrics at once.
metrics = [
# We can compute multiple metrics at once.
pipeline_dp.Metrics.COUNT,
pipeline_dp.Metrics.SUM,
pipeline_dp.Metrics.PRIVACY_ID_COUNT
]
if not FLAGS.pld_accounting:
# PLD accounting does not yet support PERCENTILE computations.
metrics.extend([
pipeline_dp.Metrics.PERCENTILE(50),
pipeline_dp.Metrics.PERCENTILE(90),
pipeline_dp.Metrics.PERCENTILE(99)
],
])
params = pipeline_dp.AggregateParams(
metrics=metrics,
# Add Gaussian noise to anonymize values.
noise_kind=pipeline_dp.NoiseKind.GAUSSIAN,
# Limits to how much one user can contribute:
# .. at most two movies rated per user
max_partitions_contributed=2,
Expand Down Expand Up @@ -82,6 +97,7 @@ def main(unused_argv):
movie_views,
params,
data_extractors,
public_partitions=list(range(1, 100)),
out_explain_computation_report=explain_computation_report)

budget_accountant.compute_budgets()
Expand Down
1 change: 1 addition & 0 deletions pipeline_dp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from pipeline_dp.aggregate_params import SumParams
from pipeline_dp.budget_accounting import BudgetAccountant
from pipeline_dp.budget_accounting import NaiveBudgetAccountant
from pipeline_dp.budget_accounting import PLDBudgetAccountant
from pipeline_dp.combiners import Combiner
from pipeline_dp.combiners import CustomCombiner
from pipeline_dp.data_extractors import DataExtractors
Expand Down
30 changes: 28 additions & 2 deletions pipeline_dp/dp_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.
"""DP aggregations."""
import functools
from typing import Any, Callable, Optional, Tuple
from typing import Any, Callable, Optional, Sequence, Tuple

import pipeline_dp
from pipeline_dp import budget_accounting
Expand All @@ -24,7 +24,6 @@
from pipeline_dp import report_generator
from pipeline_dp import sampling_utils
from pipeline_dp.dataset_histograms import computing_histograms
from pipeline_dp import pipeline_functions
from pipeline_dp.private_contribution_bounds import PrivateL0Calculator


Expand Down Expand Up @@ -82,6 +81,9 @@ def aggregate(self,
'count' for COUNT metrics etc.
"""
self._check_aggregate_params(col, params, data_extractors)
self._check_budget_accountant_compatibility(
public_partitions is not None, params.metrics,
params.custom_combiners is not None)

with self._budget_accountant.scope(weight=params.budget_weight):
self._report_generators.append(
Expand Down Expand Up @@ -208,6 +210,7 @@ def select_partitions(self, col, params: pipeline_dp.SelectPartitionsParams,
`value_extractor` is not required.
"""
self._check_select_private_partitions(col, params, data_extractors)
self._check_budget_accountant_compatibility(False, [], False)

with self._budget_accountant.scope(weight=params.budget_weight):
self._report_generators.append(
Expand Down Expand Up @@ -495,6 +498,29 @@ def _check_calculate_private_contribution_bounds_params(
if check_data_extractors:
_check_data_extractors(data_extractors)

def _check_budget_accountant_compatibility(self, is_public_partition: bool,
metrics: Sequence[
pipeline_dp.Metric],
custom_combiner: bool) -> None:
if isinstance(self._budget_accountant,
pipeline_dp.NaiveBudgetAccountant):
# All aggregations support NaiveBudgetAccountant.
return
if not is_public_partition:
raise NotImplementedError("PLD budget accounting does not support "
"private partition selection")
supported_metrics = [
pipeline_dp.Metrics.COUNT, pipeline_dp.Metrics.PRIVACY_ID_COUNT,
pipeline_dp.Metrics.SUM, pipeline_dp.Metrics.MEAN
]
non_supported_metrics = set(metrics) - set(supported_metrics)
if non_supported_metrics:
raise NotImplementedError(f"Metrics {non_supported_metrics} do not "
f"support PLD budget accounting")
if custom_combiner:
raise ValueError(f"PLD budget accounting does not support custom "
f"combiners")

def _annotate(self, col, params: pipeline_dp.SelectPartitionsParams,
budget: budget_accounting.Budget):
return self._backend.annotate(col,
Expand Down
55 changes: 50 additions & 5 deletions tests/dp_engine_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1080,18 +1080,38 @@ def run_e2e_private_partition_selection_large_budget(self, col, backend):

return col

def test_run_e2e_local(self):
@parameterized.parameters(False, True)
def test_run_e2e_count_public_partition_local(self, pld_accounting):
Accountant = pipeline_dp.PLDBudgetAccountant if pld_accounting else pipeline_dp.NaiveBudgetAccountant
accountant = Accountant(total_epsilon=1,
total_delta=1e-3,
num_aggregations=1)
dp_engine = self._create_dp_engine_default(accountant=accountant)
aggregate_params, _ = self._create_params_default()

input = [1, 2, 3]
public_partitions = [2]
explain_computation_report = pipeline_dp.ExplainComputationReport()
output = dp_engine.aggregate(input, aggregate_params,
self._get_default_extractors(),
public_partitions,
explain_computation_report)

accountant.compute_budgets()
self.assertLen(list(output), 1)

def test_run_e2e_partition_selection_local(self):
input = list(range(10))

output = self.run_e2e_private_partition_selection_large_budget(
input, pipeline_dp.LocalBackend())

self.assertEqual(5, len(list(output)))
self.assertLen(list(output), 5)

@unittest.skip("There are some problems with serialization in this test. "
"Tests in private_spark_test.py work normaly so probably it"
" is because of some missing setup.")
def test_run_e2e_spark(self):
def test_run_e2e_partition_selection_spark(self):
import pyspark
conf = pyspark.SparkConf()
sc = pyspark.SparkContext.getOrCreate(conf=conf)
Expand All @@ -1100,9 +1120,9 @@ def test_run_e2e_spark(self):
output = self.run_e2e_private_partition_selection_large_budget(
input, pipeline_dp.SparkRDDBackend(sc))

self.assertEqual(5, len(collect_to_container()))
self.assertLen(collect_to_container(), 5)

def test_run_e2e_beam(self):
def test_run_e2e_partition_selection_beam(self):
with test_pipeline.TestPipeline() as p:
input = p | "Create input" >> beam.Create(list(range(10)))

Expand Down Expand Up @@ -1164,6 +1184,31 @@ def test_annotate_call(self, mock_annotate_fn):
self.assertEqual(total_epsilon / 3, budget.epsilon)
self.assertEqual(total_delta / 3, budget.delta)

def test_pld_not_supported_metrics(self):
with self.assertRaisesRegex(
NotImplementedError,
"Metrics {VARIANCE} do not support PLD budget accounting"):
budget_accountant = pipeline_dp.PLDBudgetAccountant(
total_epsilon=1, total_delta=1e-10)
engine = pipeline_dp.DPEngine(budget_accountant=budget_accountant,
backend=pipeline_dp.LocalBackend())
aggregate_params, public_partitions = self._create_params_default()
aggregate_params.metrics = [pipeline_dp.Metrics.VARIANCE]
engine.aggregate([1], aggregate_params,
self._get_default_extractors(), public_partitions)

def test_pld_not_support_private_partition_selection(self):
with self.assertRaisesRegex(
NotImplementedError,
"PLD budget accounting does not support private partition"):
budget_accountant = pipeline_dp.PLDBudgetAccountant(
total_epsilon=1, total_delta=1e-10)
engine = pipeline_dp.DPEngine(budget_accountant=budget_accountant,
backend=pipeline_dp.LocalBackend())
aggregate_params, _ = self._create_params_default()
engine.aggregate([1], aggregate_params,
self._get_default_extractors())


if __name__ == '__main__':
absltest.main()

0 comments on commit d944bf7

Please sign in to comment.