Skip to content

Commit

Permalink
Computing privacy_id_count and count in PerPartitionUtilityResults (#474
Browse files Browse the repository at this point in the history
)
  • Loading branch information
dvadym authored Jul 25, 2023
1 parent 744178e commit d618eea
Show file tree
Hide file tree
Showing 9 changed files with 88 additions and 36 deletions.
2 changes: 1 addition & 1 deletion analysis/cross_partition_combiners.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ def _per_partition_to_utility_report(
# Fill partition selection metrics.
if public_partitions:
prob_to_keep = 1
is_empty_partition = False # TODO(dvadym): compute this
is_empty_partition = per_partition_utility.raw_statistics.count == 0
partition_metrics = _partition_metrics_public_partitions(
is_empty_partition)
else:
Expand Down
7 changes: 7 additions & 0 deletions analysis/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,16 @@ class SumMetrics:
noise_kind: pipeline_dp.NoiseKind


@dataclass
class RawStatistics:
privacy_id_count: int
count: int


@dataclass
class PerPartitionMetrics:
partition_selection_probability_to_keep: float
raw_statistics: RawStatistics
metric_errors: Optional[List[SumMetrics]] = None


Expand Down
23 changes: 18 additions & 5 deletions analysis/combiners.py → analysis/per_partition_combiners.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,19 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utility Analysis Combiners."""
"""Utility Analysis per-partition Combiners."""

import abc
import copy
from dataclasses import dataclass
from typing import Any, List, Optional, Tuple
import numpy as np
import math
import scipy

import pipeline_dp
from pipeline_dp import dp_computations
from pipeline_dp import combiners
from analysis import metrics
from analysis import poisson_binomial
from analysis import probability_computations
from pipeline_dp import partition_selection

MAX_PROBABILITIES_IN_ACCUMULATOR = 100
Expand Down Expand Up @@ -320,6 +317,22 @@ def create_accumulator(
return super().create_accumulator(data)


class RawStatisticsCombiner(UtilityAnalysisCombiner):
"""A combiner for computing per-partition raw statistics (count etc)."""
# (privacy_id_count, count)
AccumulatorType = Tuple[int, int]

def create_accumulator(
self, sparse_acc: Tuple[np.ndarray, np.ndarray,
np.ndarray]) -> AccumulatorType:
count, _sum, n_partitions = sparse_acc
return len(count), np.sum(count).item()

def compute_metrics(self, acc: AccumulatorType):
privacy_id_count, count = acc
return metrics.RawStatistics(privacy_id_count, count)


class CompoundCombiner(pipeline_dp.combiners.CompoundCombiner):
"""Compound combiner for Utility analysis per partition metrics."""

Expand All @@ -337,7 +350,7 @@ class CompoundCombiner(pipeline_dp.combiners.CompoundCombiner):
# In Sparse mode, data (which contains counts, sums, n_partitions) are kept
# in lists and merge is merging of those lists. For further performance
# improvements, on converting from sparse to dense mode, the data are
# are converted to NumPy arrays. And internal combiners perform NumPy vector
# converted to NumPy arrays. And internal combiners perform NumPy vector
# aggregations.
SparseAccumulatorType = Tuple[List[int], List[float], List[int]]
DenseAccumulatorType = List[Any]
Expand Down
20 changes: 14 additions & 6 deletions analysis/tests/cross_partition_combiners_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,10 @@ def test_per_partition_to_cross_partition_utility(
mock_create_for_private_partitions,
mock_create_for_public_partitions):
per_partition_utility = metrics.PerPartitionMetrics(
0.2, metric_errors=[_get_sum_metrics(),
_get_sum_metrics()])
partition_selection_probability_to_keep=0.2,
raw_statistics=metrics.RawStatistics(privacy_id_count=10, count=15),
metric_errors=[_get_sum_metrics(),
_get_sum_metrics()])
dp_metrics = [
pipeline_dp.Metrics.PRIVACY_ID_COUNT, pipeline_dp.Metrics.COUNT
]
Expand All @@ -129,8 +131,10 @@ def test_per_partition_to_cross_partition_utility(
def test_per_partition_to_cross_partition_utility_only_partition_selection(
self, mock_to_metric_utility, mock_create_for_private_partitions,
mock_create_for_public_partitions):
per_partition_utility = metrics.PerPartitionMetrics(0.5,
metric_errors=None)
per_partition_utility = metrics.PerPartitionMetrics(
partition_selection_probability_to_keep=0.5,
raw_statistics=metrics.RawStatistics(privacy_id_count=3, count=100),
metric_errors=None)
output = cross_partition_combiners._per_partition_to_utility_report(
per_partition_utility, [], public_partitions=False)

Expand Down Expand Up @@ -295,7 +299,9 @@ def _create_combiner(self, public_partitions=False):
def test_create_report_wo_mocks(self):
combiner = self._create_combiner()
per_partition_metrics = metrics.PerPartitionMetrics(
0.2, metric_errors=[_get_sum_metrics()])
partition_selection_probability_to_keep=0.2,
raw_statistics=metrics.RawStatistics(privacy_id_count=3, count=9),
metric_errors=[_get_sum_metrics()])
sum_actual, utility_report = combiner.create_accumulator(
per_partition_metrics)
self.assertEqual(sum_actual, (10.0,))
Expand All @@ -309,7 +315,9 @@ def test_create_report_with_mocks(self,
mock_per_partition_to_utility_report):
combiner = self._create_combiner()
per_partition_metrics = metrics.PerPartitionMetrics(
0.2, metric_errors=[_get_sum_metrics()])
partition_selection_probability_to_keep=0.2,
raw_statistics=metrics.RawStatistics(privacy_id_count=3, count=9),
metric_errors=[_get_sum_metrics()])
combiner.create_accumulator(per_partition_metrics)
expected_metrics = [pipeline_dp.Metrics.COUNT]
expected_public_partitions = False
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from typing import Tuple

import pipeline_dp
from analysis import combiners
from analysis import per_partition_combiners as combiners
from analysis import metrics
from analysis.tests import common

Expand Down Expand Up @@ -227,7 +227,7 @@ def test_partition_selection_accumulator_compute_probability(
delta=1e-10)

@patch(
'analysis.combiners.PartitionSelectionCalculator.compute_probability_to_keep'
'analysis.per_partition_combiners.PartitionSelectionCalculator.compute_probability_to_keep'
)
def test_partition_selection_combiner(self,
mock_compute_probability_to_keep):
Expand Down Expand Up @@ -628,5 +628,20 @@ def test_merge_list_right_bigger(self):
self.assertIs(result, b)


class RawStatisticsTest(parameterized.TestCase):

def test_create_accumulator(self):
count, sum_, n_partitions = np.array([1,
2]), np.array([1]), np.array([2])
combiner = combiners.RawStatisticsCombiner()
self.assertEqual(
combiner.create_accumulator((count, sum_, n_partitions)), (2, 3))

def test_compute_metrics(self):
combiner = combiners.RawStatisticsCombiner()
self.assertEqual(combiner.compute_metrics((3, 10)),
metrics.RawStatistics(privacy_id_count=3, count=10))


if __name__ == '__main__':
absltest.main()
10 changes: 6 additions & 4 deletions analysis/tests/utility_analysis_engine_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,19 +203,19 @@ def test_per_partition_error_metrics(self, pre_aggregated: bool):
# Assert
self.assertLen(output, 10)
# Assert count metrics are correct.
[self.assertEqual(v[1][1].clipping_to_max_error, -10) for v in output]
[self.assertEqual(v[1][2].clipping_to_max_error, -10) for v in output]
[
self.assertAlmostEqual(v[1][1].expected_l0_bounding_error,
self.assertAlmostEqual(v[1][2].expected_l0_bounding_error,
-18.0,
delta=1e-5) for v in output
]
[
self.assertAlmostEqual(v[1][1].std_l0_bounding_error,
self.assertAlmostEqual(v[1][2].std_l0_bounding_error,
1.89736,
delta=1e-5) for v in output
]
[
self.assertAlmostEqual(v[1][1].std_noise, 11.95312, delta=1e-5)
self.assertAlmostEqual(v[1][2].std_noise, 11.95312, delta=1e-5)
for v in output
]

Expand Down Expand Up @@ -266,6 +266,7 @@ def test_multi_parameters(self):
[self.assertLen(partition_metrics, 2) for partition_metrics in output]

expected_pk0 = [
metrics.RawStatistics(privacy_id_count=2, count=1),
metrics.SumMetrics(aggregation=pipeline_dp.Metrics.COUNT,
sum=1.0,
clipping_to_min_error=0.0,
Expand All @@ -284,6 +285,7 @@ def test_multi_parameters(self):
noise_kind=pipeline_dp.NoiseKind.GAUSSIAN)
]
expected_pk1 = [
metrics.RawStatistics(privacy_id_count=2, count=2),
metrics.SumMetrics(aggregation=pipeline_dp.Metrics.COUNT,
sum=2.0,
clipping_to_min_error=0.0,
Expand Down
9 changes: 6 additions & 3 deletions analysis/tests/utility_analysis_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,10 @@ def _get_per_partition_metrics(self, n_configurations=3):
for i in range(n_configurations):
result.append(
metrics.PerPartitionMetrics(
0.1, metric_errors=[self._get_sum_metrics(150)]))
partition_selection_probability_to_keep=0.1,
raw_statistics=metrics.RawStatistics(privacy_id_count=5,
count=10),
metric_errors=[self._get_sum_metrics(150)]))
return result

@parameterized.parameters(False, True)
Expand Down Expand Up @@ -314,8 +317,8 @@ def test_unnest_metrics(self):
output = list(utility_analysis._unnest_metrics(input_data))
self.assertLen(output, 4)
self.assertEqual(output[0], ((0, None), input_data[0]))
self.assertEqual(output[1], ((1, None), input_data[1]))
self.assertEqual(output[2], ((0, 100), input_data[0]))
self.assertEqual(output[1], ((0, 100), input_data[0]))
self.assertEqual(output[2], ((1, None), input_data[1]))
self.assertEqual(output[3], ((1, 100), input_data[1]))


Expand Down
20 changes: 12 additions & 8 deletions analysis/utility_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,14 @@ def _pack_per_partition_metrics(
"""
n_metrics = len(utility_result) // n_configurations

raw_statistics = utility_result[0]
# Create 'result' with empty elements.
empty_partition_metric = lambda: metrics.PerPartitionMetrics(1, [])
empty_partition_metric = lambda: metrics.PerPartitionMetrics(
1, raw_statistics, [])
result = tuple(empty_partition_metric() for _ in range(n_configurations))

# Fill 'result' from 'utility_metrics'.
for i, metric in enumerate(utility_result):
for i, metric in enumerate(utility_result[1:]):
i_configuration = i // n_metrics
ith_result = result[i_configuration]
if isinstance(metric, float): # partition selection
Expand Down Expand Up @@ -179,15 +181,17 @@ def _get_upper_bound(n: int) -> int:
def _unnest_metrics(
metrics: List[metrics.PerPartitionMetrics]
) -> Iterable[Tuple[Any, metrics.PerPartitionMetrics]]:
"""Unnest metrics from different configurations."""
"""Unnests metrics from different configurations."""
for i, metric in enumerate(metrics):
yield ((i, None), metric)
if metrics[0].metric_errors:
if metrics[0].metric_errors:
partition_size = metrics[0].metric_errors[0].sum
else:
# Select partitions case.
partition_size = metrics[0].raw_statistics.privacy_id_count
# Emits metrics for computing histogram by partition size.
actual_bucket_value = metrics[0].metric_errors[0].sum
bucket = _get_lower_bound(actual_bucket_value)
for i, metric in enumerate(metrics):
yield ((i, bucket), metric)
bucket = _get_lower_bound(partition_size)
yield ((i, bucket), metric)


def _group_utility_reports(
Expand Down
14 changes: 7 additions & 7 deletions analysis/utility_analysis_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from pipeline_dp import pipeline_backend
import analysis
import analysis.contribution_bounders as utility_contribution_bounders
import analysis.combiners as utility_analysis_combiners
from analysis import per_partition_combiners
from analysis import data_structures


Expand Down Expand Up @@ -111,34 +111,34 @@ def _create_compound_combiner(
mechanism_type, weight=aggregate_params.budget_weight)

# Create Utility analysis combiners.
internal_combiners = []
internal_combiners = [per_partition_combiners.RawStatisticsCombiner()]
for params in data_structures.get_aggregate_params(self._options):
# WARNING: Do not change the order here,
# _create_aggregate_error_compound_combiner() in utility_analysis.py
# depends on it.
if not self._is_public_partitions:
internal_combiners.append(
utility_analysis_combiners.PartitionSelectionCombiner(
per_partition_combiners.PartitionSelectionCombiner(
combiners.CombinerParams(
private_partition_selection_budget, params)))
if pipeline_dp.Metrics.SUM in aggregate_params.metrics:
internal_combiners.append(
utility_analysis_combiners.SumCombiner(
per_partition_combiners.SumCombiner(
combiners.CombinerParams(
budgets[pipeline_dp.Metrics.SUM], params)))
if pipeline_dp.Metrics.COUNT in aggregate_params.metrics:
internal_combiners.append(
utility_analysis_combiners.CountCombiner(
per_partition_combiners.CountCombiner(
combiners.CombinerParams(
budgets[pipeline_dp.Metrics.COUNT], params)))
if pipeline_dp.Metrics.PRIVACY_ID_COUNT in aggregate_params.metrics:
internal_combiners.append(
utility_analysis_combiners.PrivacyIdCountCombiner(
per_partition_combiners.PrivacyIdCountCombiner(
combiners.CombinerParams(
budgets[pipeline_dp.Metrics.PRIVACY_ID_COUNT],
params)))

return utility_analysis_combiners.CompoundCombiner(
return per_partition_combiners.CompoundCombiner(
internal_combiners, return_named_tuple=False)

def _select_private_partitions_internal(
Expand Down

0 comments on commit d618eea

Please sign in to comment.