From 4b178bcca644983492c0219712d0d44c9f46b937 Mon Sep 17 00:00:00 2001 From: Masa Ono Date: Tue, 10 Dec 2024 10:54:03 -0800 Subject: [PATCH] Support Hindsight PR in TorchRec Summary: Pull request TDB after approvals. ### Overview This diff implements HindsightTargetPR metric into TorchRec. This will also include a bucketized version. Thrift changes submitted ahead in D66216486. ### Implementation 1) Create X-wide granular array to store metric states where each index represents the threshold. For bucketization, each bucket will be stacked in the next dimension within the state tensor. 2) Calculate minimum threshold that meets target_precision. 3) Calculate precision and recall points with target threshold. ### Metrics This metric will return the following curves: * hindsight_target_pr: this is the calculated threshold for the window state to maximize recall while achieving the target precision. * hindsight_target_precision: this is the achieved precision with hindsight_target_pr. * hindsight_target_recall: this is the achieved recall with hindsight_target_pr. ### Usage Hindsight PR metrics are primarily useful to mimic the calibration system within identity team. Please adjust the bucketization and window size accordingly to best approximate this. Note: since the states are stored as a dimensional tensor, multiple tasks will not be supported for this metric. Differential Revision: D65867461 --- torchrec/metrics/hindsight_target_pr.py | 235 ++++++++++++++++++ torchrec/metrics/metric_module.py | 2 + torchrec/metrics/metrics_config.py | 1 + torchrec/metrics/metrics_namespace.py | 6 + .../metrics/tests/test_hindsight_target_pr.py | 169 +++++++++++++ 5 files changed, 413 insertions(+) create mode 100644 torchrec/metrics/hindsight_target_pr.py create mode 100644 torchrec/metrics/tests/test_hindsight_target_pr.py diff --git a/torchrec/metrics/hindsight_target_pr.py b/torchrec/metrics/hindsight_target_pr.py new file mode 100644 index 000000000..800052ecf --- /dev/null +++ b/torchrec/metrics/hindsight_target_pr.py @@ -0,0 +1,235 @@ +#!/usr/bin/env python3 +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. + +# pyre-strict + +from typing import Any, cast, Dict, List, Optional, Type + +import torch +from torchrec.metrics.metrics_namespace import MetricName, MetricNamespace, MetricPrefix +from torchrec.metrics.rec_metric import ( + MetricComputationReport, + RecMetric, + RecMetricComputation, + RecMetricException, +) + + +TARGET_PRECISION = "target_precision" +THRESHOLD_GRANULARITY = 1000 + + +def compute_precision( + num_true_positives: torch.Tensor, num_false_positives: torch.Tensor +) -> torch.Tensor: + return torch.where( + num_true_positives + num_false_positives == 0.0, + 0.0, + num_true_positives / (num_true_positives + num_false_positives).double(), + ) + + +def compute_recall( + num_true_positives: torch.Tensor, num_false_negitives: torch.Tensor +) -> torch.Tensor: + return torch.where( + num_true_positives + num_false_negitives == 0.0, + 0.0, + num_true_positives / (num_true_positives + num_false_negitives), + ) + + +def compute_threshold_idx( + num_true_positives: torch.Tensor, + num_false_positives: torch.Tensor, + target_precision: float, +) -> int: + for i in range(THRESHOLD_GRANULARITY): + if ( + compute_precision(num_true_positives[i], num_false_positives[i]) + >= target_precision + ): + return i + + return THRESHOLD_GRANULARITY - 1 + + +def compute_true_pos_sum( + labels: torch.Tensor, + predictions: torch.Tensor, + weights: torch.Tensor, +) -> torch.Tensor: + predictions = predictions.double() + tp_sum = torch.zeros(THRESHOLD_GRANULARITY, dtype=torch.double) + thresholds = torch.linspace(0, 1, steps=THRESHOLD_GRANULARITY) + for i, threshold in enumerate(thresholds): + tp_sum[i] = torch.sum(weights * ((predictions >= threshold) * labels), -1) + return tp_sum + + +def compute_false_pos_sum( + labels: torch.Tensor, + predictions: torch.Tensor, + weights: torch.Tensor, +) -> torch.Tensor: + predictions = predictions.double() + fp_sum = torch.zeros(THRESHOLD_GRANULARITY, dtype=torch.double) + thresholds = torch.linspace(0, 1, steps=THRESHOLD_GRANULARITY) + for i, threshold in enumerate(thresholds): + fp_sum[i] = torch.sum(weights * ((predictions >= threshold) * (1 - labels)), -1) + return fp_sum + + +def compute_false_neg_sum( + labels: torch.Tensor, + predictions: torch.Tensor, + weights: torch.Tensor, +) -> torch.Tensor: + predictions = predictions.double() + fn_sum = torch.zeros(THRESHOLD_GRANULARITY, dtype=torch.double) + thresholds = torch.linspace(0, 1, steps=THRESHOLD_GRANULARITY) + for i, threshold in enumerate(thresholds): + fn_sum[i] = torch.sum(weights * ((predictions <= threshold) * labels), -1) + return fn_sum + + +def get_pr_states( + labels: torch.Tensor, + predictions: torch.Tensor, + weights: Optional[torch.Tensor], +) -> Dict[str, torch.Tensor]: + if weights is None: + weights = torch.ones_like(predictions) + return { + "true_pos_sum": compute_true_pos_sum(labels, predictions, weights), + "false_pos_sum": compute_false_pos_sum(labels, predictions, weights), + "false_neg_sum": compute_false_neg_sum(labels, predictions, weights), + } + + +class HindsightTargetPRMetricComputation(RecMetricComputation): + r""" + This class implements the RecMetricComputation for Hingsight Target PR. + + The constructor arguments are defined in RecMetricComputation. + See the docstring of RecMetricComputation for more detail. + + Args: + target_precision (float): If provided, computes the minimum threshold to achieve the target precision. + """ + + def __init__( + self, *args: Any, target_precision: float = 0.5, **kwargs: Any + ) -> None: + super().__init__(*args, **kwargs) + self._add_state( + "true_pos_sum", + torch.zeros(THRESHOLD_GRANULARITY, dtype=torch.double), + add_window_state=True, + dist_reduce_fx="sum", + persistent=True, + ) + self._add_state( + "false_pos_sum", + torch.zeros(THRESHOLD_GRANULARITY, dtype=torch.double), + add_window_state=True, + dist_reduce_fx="sum", + persistent=True, + ) + self._add_state( + "false_neg_sum", + torch.zeros(THRESHOLD_GRANULARITY, dtype=torch.double), + add_window_state=True, + dist_reduce_fx="sum", + persistent=True, + ) + self._target_precision: float = target_precision + + def update( + self, + *, + predictions: Optional[torch.Tensor], + labels: torch.Tensor, + weights: Optional[torch.Tensor], + **kwargs: Dict[str, Any], + ) -> None: + if predictions is None: + raise RecMetricException( + "Inputs 'predictions' should not be None for HindsightTargetPRMetricComputation update" + ) + states = get_pr_states(labels, predictions, weights) + num_samples = predictions.shape[-1] + + for state_name, state_value in states.items(): + state = getattr(self, state_name) + state += state_value + self._aggregate_window_state(state_name, state_value, num_samples) + + def _compute(self) -> List[MetricComputationReport]: + true_pos_sum = cast(torch.Tensor, self.true_pos_sum) + false_pos_sum = cast(torch.Tensor, self.false_pos_sum) + false_neg_sum = cast(torch.Tensor, self.false_neg_sum) + threshold_idx = compute_threshold_idx( + true_pos_sum, + false_pos_sum, + self._target_precision, + ) + window_threshold_idx = compute_threshold_idx( + self.get_window_state("true_pos_sum"), + self.get_window_state("false_pos_sum"), + self._target_precision, + ) + reports = [ + MetricComputationReport( + name=MetricName.HINDSIGHT_TARGET_PR, + metric_prefix=MetricPrefix.LIFETIME, + value=torch.Tensor(threshold_idx), + ), + MetricComputationReport( + name=MetricName.HINDSIGHT_TARGET_PR, + metric_prefix=MetricPrefix.WINDOW, + value=torch.Tensor(window_threshold_idx), + ), + MetricComputationReport( + name=MetricName.HINDSIGHT_TARGET_PRECISION, + metric_prefix=MetricPrefix.LIFETIME, + value=compute_precision( + true_pos_sum[threshold_idx], + false_pos_sum[threshold_idx], + ), + ), + MetricComputationReport( + name=MetricName.HINDSIGHT_TARGET_PRECISION, + metric_prefix=MetricPrefix.WINDOW, + value=compute_precision( + self.get_window_state("true_pos_sum")[window_threshold_idx], + self.get_window_state("false_pos_sum")[window_threshold_idx], + ), + ), + MetricComputationReport( + name=MetricName.HINDSIGHT_TARGET_RECALL, + metric_prefix=MetricPrefix.LIFETIME, + value=compute_recall( + true_pos_sum[threshold_idx], + false_neg_sum[threshold_idx], + ), + ), + MetricComputationReport( + name=MetricName.HINDSIGHT_TARGET_RECALL, + metric_prefix=MetricPrefix.WINDOW, + value=compute_recall( + self.get_window_state("true_pos_sum")[window_threshold_idx], + self.get_window_state("false_neg_sum")[window_threshold_idx], + ), + ), + ] + return reports + + +class HindsightTargetPRMetric(RecMetric): + _namespace: MetricNamespace = MetricNamespace.HINDSIGHT_TARGET_PR + _computation_class: Type[RecMetricComputation] = HindsightTargetPRMetricComputation diff --git a/torchrec/metrics/metric_module.py b/torchrec/metrics/metric_module.py index b7228acbe..0be8329f1 100644 --- a/torchrec/metrics/metric_module.py +++ b/torchrec/metrics/metric_module.py @@ -24,6 +24,7 @@ from torchrec.metrics.cali_free_ne import CaliFreeNEMetric from torchrec.metrics.calibration import CalibrationMetric from torchrec.metrics.ctr import CTRMetric +from torchrec.metrics.hindsight_target_pr import HindsightTargetPRMetric from torchrec.metrics.mae import MAEMetric from torchrec.metrics.metrics_config import ( BatchSizeStage, @@ -94,6 +95,7 @@ RecMetricEnum.TENSOR_WEIGHTED_AVG: TensorWeightedAvgMetric, RecMetricEnum.CALI_FREE_NE: CaliFreeNEMetric, RecMetricEnum.UNWEIGHTED_NE: UnweightedNEMetric, + RecMetricEnum.HINDSIGHT_TARGET_PR: HindsightTargetPRMetric, } diff --git a/torchrec/metrics/metrics_config.py b/torchrec/metrics/metrics_config.py index ac9edf440..e85867862 100644 --- a/torchrec/metrics/metrics_config.py +++ b/torchrec/metrics/metrics_config.py @@ -47,6 +47,7 @@ class RecMetricEnum(RecMetricEnumBase): TENSOR_WEIGHTED_AVG = "tensor_weighted_avg" CALI_FREE_NE = "cali_free_ne" UNWEIGHTED_NE = "unweighted_ne" + HINDSIGHT_TARGET_PR = "hindsight_target_pr" @dataclass(unsafe_hash=True, eq=True) diff --git a/torchrec/metrics/metrics_namespace.py b/torchrec/metrics/metrics_namespace.py index 55dbd72e2..1afd83e60 100644 --- a/torchrec/metrics/metrics_namespace.py +++ b/torchrec/metrics/metrics_namespace.py @@ -82,6 +82,10 @@ class MetricName(MetricNameBase): CALI_FREE_NE = "cali_free_ne" UNWEIGHTED_NE = "unweighted_ne" + HINDSIGHT_TARGET_PR = "hindsight_target_pr" + HINDSIGHT_TARGET_PRECISION = "hindsight_target_precision" + HINDSIGHT_TARGET_RECALL = "hindsight_target_recall" + class MetricNamespaceBase(StrValueMixin, Enum): pass @@ -131,6 +135,8 @@ class MetricNamespace(MetricNamespaceBase): CALI_FREE_NE = "cali_free_ne" UNWEIGHTED_NE = "unweighted_ne" + HINDSIGHT_TARGET_PR = "hindsight_target_pr" + class MetricPrefix(StrValueMixin, Enum): DEFAULT = "" diff --git a/torchrec/metrics/tests/test_hindsight_target_pr.py b/torchrec/metrics/tests/test_hindsight_target_pr.py new file mode 100644 index 000000000..054e209d8 --- /dev/null +++ b/torchrec/metrics/tests/test_hindsight_target_pr.py @@ -0,0 +1,169 @@ +#!/usr/bin/env python3 +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. + +# pyre-strict + +import unittest +from typing import Dict, Type + +import torch +from torchrec.metrics.hindsight_target_pr import ( + compute_precision, + compute_recall, + compute_threshold_idx, + HindsightTargetPRMetric, +) +from torchrec.metrics.rec_metric import RecComputeMode, RecMetric +from torchrec.metrics.test_utils import ( + metric_test_helper, + rec_metric_value_test_launcher, + TestMetric, +) + + +WORLD_SIZE = 4 +THRESHOLD_GRANULARITY = 1000 + + +class TestHindsightTargetPRMetric(TestMetric): + @staticmethod + def _get_states( + labels: torch.Tensor, predictions: torch.Tensor, weights: torch.Tensor + ) -> Dict[str, torch.Tensor]: + predictions = predictions.double() + tp_sum = torch.zeros(THRESHOLD_GRANULARITY, dtype=torch.double) + fp_sum = torch.zeros(THRESHOLD_GRANULARITY, dtype=torch.double) + thresholds = torch.linspace(0, 1, steps=THRESHOLD_GRANULARITY) + for i, threshold in enumerate(thresholds): + tp_sum[i] = torch.sum(weights * ((predictions >= threshold) * labels), -1) + fp_sum[i] = torch.sum( + weights * ((predictions >= threshold) * (1 - labels)), -1 + ) + return { + "true_pos_sum": tp_sum, + "false_pos_sum": fp_sum, + } + + @staticmethod + def _compute(states: Dict[str, torch.Tensor]) -> torch.Tensor: + threshold_idx = compute_threshold_idx( + states["true_pos_sum"], states["false_pos_sum"], 0.5 + ) + return torch.Tensor(threshold_idx) + + +class TestHindsightTargetPrecisionMetric(TestMetric): + @staticmethod + def _get_states( + labels: torch.Tensor, predictions: torch.Tensor, weights: torch.Tensor + ) -> Dict[str, torch.Tensor]: + predictions = predictions.double() + tp_sum = torch.zeros(THRESHOLD_GRANULARITY, dtype=torch.double) + fp_sum = torch.zeros(THRESHOLD_GRANULARITY, dtype=torch.double) + thresholds = torch.linspace(0, 1, steps=THRESHOLD_GRANULARITY) + for i, threshold in enumerate(thresholds): + tp_sum[i] = torch.sum(weights * ((predictions >= threshold) * labels), -1) + fp_sum[i] = torch.sum( + weights * ((predictions >= threshold) * (1 - labels)), -1 + ) + return { + "true_pos_sum": tp_sum, + "false_pos_sum": fp_sum, + } + + @staticmethod + def _compute(states: Dict[str, torch.Tensor]) -> torch.Tensor: + threshold_idx = compute_threshold_idx( + states["true_pos_sum"], states["false_pos_sum"], 0.5 + ) + return compute_precision( + states["true_pos_sum"][threshold_idx], + states["false_pos_sum"][threshold_idx], + ) + + +class TestHindsightTargetRecallMetric(TestMetric): + @staticmethod + def _get_states( + labels: torch.Tensor, predictions: torch.Tensor, weights: torch.Tensor + ) -> Dict[str, torch.Tensor]: + predictions = predictions.double() + tp_sum = torch.zeros(THRESHOLD_GRANULARITY, dtype=torch.double) + fp_sum = torch.zeros(THRESHOLD_GRANULARITY, dtype=torch.double) + fn_sum = torch.zeros(THRESHOLD_GRANULARITY, dtype=torch.double) + thresholds = torch.linspace(0, 1, steps=THRESHOLD_GRANULARITY) + for i, threshold in enumerate(thresholds): + tp_sum[i] = torch.sum(weights * ((predictions >= threshold) * labels), -1) + fp_sum[i] = torch.sum( + weights * ((predictions >= threshold) * (1 - labels)), -1 + ) + fn_sum[i] = torch.sum(weights * ((predictions <= threshold) * labels), -1) + return { + "true_pos_sum": tp_sum, + "false_pos_sum": fp_sum, + "false_neg_sum": fn_sum, + } + + @staticmethod + def _compute(states: Dict[str, torch.Tensor]) -> torch.Tensor: + threshold_idx = compute_threshold_idx( + states["true_pos_sum"], states["false_pos_sum"], 0.5 + ) + return compute_recall( + states["true_pos_sum"][threshold_idx], + states["false_neg_sum"][threshold_idx], + ) + + +# Fused tests are not supported for this metric. +class TestHindsightTargetPRMetricTest(unittest.TestCase): + target_clazz: Type[RecMetric] = HindsightTargetPRMetric + pr_task_name: str = "hindsight_target_pr" + precision_task_name: str = "hindsight_target_precision" + recall_task_name: str = "hindsight_target_recall" + + # def test_unfused_hindsight_target_pr(self) -> None: + # rec_metric_value_test_launcher( + # target_clazz=HindsightTargetPRMetric, + # target_compute_mode=RecComputeMode.UNFUSED_TASKS_COMPUTATION, + # test_clazz=TestHindsightTargetPRMetric, + # metric_name=TestHindsightTargetPRMetricTest.pr_task_name, + # task_names=["t1", "t2", "t3"], + # fused_update_limit=0, + # compute_on_all_ranks=False, + # should_validate_update=False, + # world_size=WORLD_SIZE, + # entry_point=metric_test_helper, + # ) + + def test_unfused_hindsight_target_precision(self) -> None: + rec_metric_value_test_launcher( + target_clazz=HindsightTargetPRMetric, + target_compute_mode=RecComputeMode.UNFUSED_TASKS_COMPUTATION, + test_clazz=TestHindsightTargetPrecisionMetric, + metric_name=TestHindsightTargetPRMetricTest.precision_task_name, + task_names=["t1", "t2", "t3"], + fused_update_limit=0, + compute_on_all_ranks=False, + should_validate_update=False, + world_size=WORLD_SIZE, + entry_point=metric_test_helper, + ) + + def test_unfused_hindsight_target_recall(self) -> None: + rec_metric_value_test_launcher( + target_clazz=HindsightTargetPRMetric, + target_compute_mode=RecComputeMode.UNFUSED_TASKS_COMPUTATION, + test_clazz=TestHindsightTargetRecallMetric, + metric_name=TestHindsightTargetPRMetricTest.recall_task_name, + task_names=["t1", "t2", "t3"], + fused_update_limit=0, + compute_on_all_ranks=False, + should_validate_update=False, + world_size=WORLD_SIZE, + entry_point=metric_test_helper, + )