Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for custom weight specification during cross partition combining #472

Merged
merged 10 commits into from
Jul 31, 2023
88 changes: 57 additions & 31 deletions analysis/cross_partition_combiners.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import pipeline_dp
from analysis import metrics
import dataclasses
from typing import Iterable, List, Optional, Tuple
from typing import List, Tuple, Callable
import math


Expand Down Expand Up @@ -63,7 +63,8 @@ def _create_contribution_bounding_errors(


def _sum_metrics_to_value_error(sum_metrics: metrics.SumMetrics,
keep_prob: float) -> metrics.ValueErrors:
keep_prob: float,
weight: float) -> metrics.ValueErrors:
"""Creates ValueErrors from per-partition metrics."""
value = sum_metrics.sum
bounding_errors = _create_contribution_bounding_errors(sum_metrics)
Expand All @@ -83,17 +84,18 @@ def _sum_metrics_to_value_error(sum_metrics: metrics.SumMetrics,
l1=l1,
rmse_with_dropped_partitions=rmse_with_dropped_partitions,
l1_with_dropped_partitions=l1_with_dropped_partitions)
if keep_prob != 1:
# Weight per-partition result with keep_prob for computing average.
if weight != 1:
# Weight per-partition result for computing weighted sum.
_multiply_float_dataclasses_field(result,
keep_prob,
weight,
fields_to_ignore=["noise_std"])
return result


def _sum_metrics_to_metric_utility(
sum_metrics: metrics.SumMetrics, dp_metric: pipeline_dp.Metric,
partition_keep_probability: float) -> metrics.MetricUtility:
partition_keep_probability: float,
partition_weight: float) -> metrics.MetricUtility:
"""Creates cross-partition MetricUtility from 1 partition utility.

Attributes:
Expand All @@ -104,8 +106,9 @@ def _sum_metrics_to_metric_utility(
data_dropped = _sum_metrics_to_data_dropped(sum_metrics,
partition_keep_probability,
dp_metric)
absolute_error = _sum_metrics_to_value_error(
sum_metrics, keep_prob=partition_keep_probability)
absolute_error = _sum_metrics_to_value_error(sum_metrics,
partition_keep_probability,
partition_weight)
relative_error = absolute_error.to_relative(sum_metrics.sum)

return metrics.MetricUtility(metric=dp_metric,
Expand Down Expand Up @@ -192,8 +195,8 @@ def _multiply_float_dataclasses_field(dataclass,

def _per_partition_to_utility_report(
per_partition_utility: metrics.PerPartitionMetrics,
dp_metrics: List[pipeline_dp.Metric],
public_partitions: bool) -> metrics.UtilityReport:
dp_metrics: List[pipeline_dp.Metric], public_partitions: bool,
partition_weight: float) -> metrics.UtilityReport:
"""Converts per-partition metrics to cross-partition utility report."""
# Fill partition selection metrics.
if public_partitions:
Expand All @@ -213,7 +216,7 @@ def _per_partition_to_utility_report(
dp_metrics):
metric_errors.append(
_sum_metrics_to_metric_utility(metric_error, dp_metric,
prob_to_keep))
prob_to_keep, partition_weight))

return metrics.UtilityReport(configuration_index=-1,
partitions_info=partition_metrics,
Expand Down Expand Up @@ -254,61 +257,84 @@ def _merge_utility_reports(report1: metrics.UtilityReport,
_merge_metric_utility(utility1, utility2)


def _average_utility_report(report: metrics.UtilityReport,
public_partitions: bool,
sums_actual: Tuple) -> None:
def _average_utility_report(report: metrics.UtilityReport, sums_actual: Tuple,
total_weight: float) -> None:
"""Averages fields of the 'report' across partitions."""
if not report.metric_errors:
return
partitions = report.partitions_info
if public_partitions:
num_output_partitions = partitions.num_dataset_partitions + partitions.num_empty_partitions
else:
num_output_partitions = partitions.kept_partitions.mean

for sum_actual, metric_error in zip(sums_actual, report.metric_errors):
_multiply_float_dataclasses_field(
metric_error,
1.0 / num_output_partitions,
1.0 / total_weight,
fields_to_ignore=["noise_std", "ratio_data_dropped"])
scaling_factor = 1 if sum_actual == 0 else 1.0 / sum_actual
_multiply_float_dataclasses_field(metric_error.ratio_data_dropped,
scaling_factor)


def partition_size_weight_fn(
per_partition_metrics: metrics.PerPartitionMetrics) -> float:
RamSaw marked this conversation as resolved.
Show resolved Hide resolved
"""Weights partitions according to their size."""
# Only one metric is calculated as of now.
return per_partition_metrics.metric_errors[0].sum


def equal_weight_fn(
per_partition_metrics: metrics.PerPartitionMetrics) -> float:
"""Weights partitions according to their probability to be kept."""
# For the public partitions weights will be 1, and we will do normal
# averaging because total weight will equal to the total number of
# partitions. The function assumes that
# partition_selection_probability_to_keep for public partitions is 1 and all
# public partitions including empty are processed in CrossPartitionCombiner.
# For private partitions we will do weighted average and
# total weight will equal to mean number of kept partitions
# (`partitions.kept_partitions.mean`).
return per_partition_metrics.partition_selection_probability_to_keep


class CrossPartitionCombiner(pipeline_dp.combiners.Combiner):
"""A combiner for aggregating error metrics across partitions"""
# Accumulator is a tuple of
# 1. The sum of non dp metrics, which is used for averaging of error
# metrics.
# 2. metrics.UtilityReport contains error metrics.
AccumulatorType = Tuple[Tuple, metrics.UtilityReport]

def __init__(self, dp_metrics: List[pipeline_dp.Metrics],
public_partitions: bool):
# 3. Accumulated weight. Used to calculate total weight after accumulation.
# During creation of accumulator in `create_accumulator` the initial weight
# is applied to metric errors of a partition.
AccumulatorType = Tuple[Tuple, metrics.UtilityReport, float]

def __init__(self,
dp_metrics: List[pipeline_dp.Metric],
public_partitions: bool,
weight_fn: Callable[[metrics.PerPartitionMetrics],
float] = equal_weight_fn):
self._dp_metrics = dp_metrics
self._public_partitions = public_partitions
self._weight_fn = weight_fn

def create_accumulator(
self, metrics: metrics.PerPartitionMetrics) -> AccumulatorType:
actual_metrics = tuple(me.sum for me in metrics.metric_errors)
weight = self._weight_fn(metrics)
return actual_metrics, _per_partition_to_utility_report(
metrics, self._dp_metrics, self._public_partitions)
metrics, self._dp_metrics, self._public_partitions, weight), weight

def merge_accumulators(self, acc1: AccumulatorType,
acc2: AccumulatorType) -> AccumulatorType:
sum_actual1, report1 = acc1
sum_actual2, report2 = acc2
sum_actual1, report1, weight1 = acc1
sum_actual2, report2, weight2 = acc2
sum_actual = tuple(x + y for x, y in zip(sum_actual1, sum_actual2))
_merge_utility_reports(report1, report2)
return sum_actual, report1
weight = weight1 + weight2
return sum_actual, report1, weight

def compute_metrics(self, acc: AccumulatorType) -> metrics.UtilityReport:
"""Returns UtilityReport with final metrics."""
sum_actual, report = acc
sum_actual, report, total_weight = acc
report_copy = copy.deepcopy(report)
_average_utility_report(report_copy, self._public_partitions,
sum_actual)
_average_utility_report(report_copy, sum_actual, total_weight)
return report_copy

def metrics_names(self):
Expand Down
2 changes: 1 addition & 1 deletion analysis/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ class UtilityReport:
Attributes:
configuration_index: the index of the input parameter configuration for
which this report was computed.
partition_metrics: utility analysis of selected partition.
partitions_info: utility analysis of selected partition.
metric_errors: utility analysis of metrics (e.g. COUNT, SUM,
PRIVACY_ID_COUNT).
utility_report_histogram:
Expand Down
83 changes: 56 additions & 27 deletions analysis/tests/cross_partition_combiners_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
import pipeline_dp


def _get_sum_metrics():
def _get_sum_metrics(sum=10.0):
return metrics.SumMetrics(aggregation=pipeline_dp.Metrics.SUM,
sum=10.0,
sum=sum,
clipping_to_min_error=3.0,
clipping_to_max_error=-5.0,
expected_l0_bounding_error=-2.0,
Expand All @@ -43,7 +43,8 @@ def test_metric_utility_count(self, keep_prob: float):
output: metrics.MetricUtility = cross_partition_combiners._sum_metrics_to_metric_utility(
input,
pipeline_dp.Metrics.COUNT,
partition_keep_probability=keep_prob)
partition_keep_probability=keep_prob,
partition_weight=keep_prob)

self.assertEqual(output.metric, pipeline_dp.Metrics.COUNT)
self.assertEqual(output.noise_kind, input.noise_kind)
Expand Down Expand Up @@ -111,7 +112,10 @@ def test_per_partition_to_cross_partition_utility(
pipeline_dp.Metrics.PRIVACY_ID_COUNT, pipeline_dp.Metrics.COUNT
]
cross_partition_combiners._per_partition_to_utility_report(
per_partition_utility, dp_metrics, public_partitions)
per_partition_utility,
dp_metrics,
public_partitions,
partition_weight=0.2)
if public_partitions:
mock_create_for_public_partitions.assert_called_once_with(False)
mock_create_for_private_partitions.assert_not_called()
Expand All @@ -136,7 +140,9 @@ def test_per_partition_to_cross_partition_utility_only_partition_selection(
raw_statistics=metrics.RawStatistics(privacy_id_count=3, count=100),
metric_errors=None)
output = cross_partition_combiners._per_partition_to_utility_report(
per_partition_utility, [], public_partitions=False)
per_partition_utility, [],
public_partitions=False,
partition_weight=0.5)

self.assertIsNone(output.metric_errors)
self.assertIsInstance(output.partitions_info, mock.MagicMock)
Expand Down Expand Up @@ -291,61 +297,84 @@ def test_merge_utility_reports(self):

class CrossPartitionCombiner(parameterized.TestCase):

def _create_combiner(self, public_partitions=False):
def _create_combiner(self,
dp_metrics=(pipeline_dp.Metrics.COUNT,),
public_partitions=False,
weight_fn=cross_partition_combiners.equal_weight_fn):
return cross_partition_combiners.CrossPartitionCombiner(
dp_metrics=[pipeline_dp.Metrics.COUNT],
public_partitions=public_partitions)
dp_metrics, public_partitions, weight_fn)

def test_create_report_wo_mocks(self):
combiner = self._create_combiner()
public_partitions = False
prob_keep = 0.2
combiner = self._create_combiner(
public_partitions=public_partitions,
weight_fn=cross_partition_combiners.equal_weight_fn)
per_partition_metrics = metrics.PerPartitionMetrics(
partition_selection_probability_to_keep=0.2,
partition_selection_probability_to_keep=prob_keep,
raw_statistics=metrics.RawStatistics(privacy_id_count=3, count=9),
metric_errors=[_get_sum_metrics()])
sum_actual, utility_report = combiner.create_accumulator(
metric_errors=[_get_sum_metrics(sum=10.0)])
sum_actual, utility_report, weight = combiner.create_accumulator(
per_partition_metrics)
self.assertEqual(sum_actual, (10.0,))
self.assertEqual(utility_report.partitions_info.num_dataset_partitions,
1)
self.assertLen(utility_report.metric_errors, 1)
self.assertEqual(weight, prob_keep)

def test_create_report_partition_size_is_used_as_weight_wo_mocks(self):
combiner = self._create_combiner(
weight_fn=cross_partition_combiners.partition_size_weight_fn)
per_partition_metrics = metrics.PerPartitionMetrics(
partition_selection_probability_to_keep=0.2,
raw_statistics=metrics.RawStatistics(privacy_id_count=3, count=9),
metric_errors=[_get_sum_metrics(sum=5.0)])
_, _, weight = combiner.create_accumulator(per_partition_metrics)
self.assertEqual(weight, 5.0)

@patch(
"analysis.cross_partition_combiners._per_partition_to_utility_report")
def test_create_report_with_mocks(self,
mock_per_partition_to_utility_report):
combiner = self._create_combiner()
dp_metrics = [pipeline_dp.Metrics.COUNT]
public_partitions = False
prob_keep = 0.2
combiner = self._create_combiner(
dp_metrics,
public_partitions,
weight_fn=cross_partition_combiners.equal_weight_fn)
per_partition_metrics = metrics.PerPartitionMetrics(
partition_selection_probability_to_keep=0.2,
partition_selection_probability_to_keep=prob_keep,
raw_statistics=metrics.RawStatistics(privacy_id_count=3, count=9),
metric_errors=[_get_sum_metrics()])
combiner.create_accumulator(per_partition_metrics)
expected_metrics = [pipeline_dp.Metrics.COUNT]
expected_public_partitions = False
mock_per_partition_to_utility_report.assert_called_once_with(
per_partition_metrics, expected_metrics, expected_public_partitions)
per_partition_metrics, dp_metrics, public_partitions, prob_keep)

def test_create_accumulator(self):
combiner = self._create_combiner()
report1 = _get_utility_report(coef=2)
acc1 = ((1,), report1)
acc1 = ((1,), report1, 0.5)
report2 = _get_utility_report(coef=5)
acc2 = ((3,), report2)
acc2 = ((3,), report2, 0.5)
expected_report = _get_utility_report(coef=7)
sum_actual, output_report = combiner.merge_accumulators(acc1, acc2)
self.assertEqual(output_report, expected_report)
sum_actual, output_report, total_weight = combiner.merge_accumulators(
acc1, acc2)
self.assertEqual(sum_actual, (4,))
self.assertEqual(output_report, expected_report)
self.assertEqual(total_weight, 1)

@parameterized.parameters(False, True)
@patch("analysis.cross_partition_combiners._average_utility_report")
def test_compute_metrics(self, public_partitions,
mock_average_utility_report):
combiner = self._create_combiner(public_partitions)
def test_compute_metrics(self, mock_average_utility_report):
combiner = self._create_combiner()
report = _get_utility_report(coef=1)
sum_actual_metrics = (1000,)
acc = (sum_actual_metrics, report)
# Actual value does not matter in the test.
total_weight = 11
acc = (sum_actual_metrics, report, total_weight)
output = combiner.compute_metrics(acc)
mock_average_utility_report.assert_called_once_with(
output, public_partitions, sum_actual_metrics)
output, sum_actual_metrics, total_weight)
# Check that the input report was not modified.
self.assertEqual(report, _get_utility_report(coef=1))

Expand Down
Loading