Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(experiments): New trends calculation methods #26256

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@
calculate_credible_intervals,
calculate_probabilities,
)
from posthog.hogql_queries.experiments.experiment_trends_statistics import (
are_results_significant as are_results_significant_v2,
calculate_credible_intervals as calculate_credible_intervals_v2,
calculate_probabilities as calculate_probabilities_v2,
)
from posthog.hogql_queries.insights.trends.trends_query_runner import TrendsQueryRunner
from posthog.hogql_queries.query_runner import QueryRunner
from posthog.models.experiment import Experiment
Expand Down Expand Up @@ -265,6 +270,9 @@ def run(query_runner: TrendsQueryRunner, result_key: str, is_parallel: bool):
probabilities = calculate_probabilities(control_variant, test_variants)
significance_code, p_value = are_results_significant(control_variant, test_variants, probabilities)
credible_intervals = calculate_credible_intervals([control_variant, *test_variants])
# probabilities = calculate_probabilities_v2(control_variant, test_variants)
# significance_code, p_value = are_results_significant_v2(control_variant, test_variants, probabilities)
# credible_intervals = calculate_credible_intervals_v2([control_variant, *test_variants])

return ExperimentTrendsQueryResponse(
kind="ExperimentTrendsQuery",
Expand Down
134 changes: 134 additions & 0 deletions posthog/hogql_queries/experiments/experiment_trends_statistics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
from scipy import stats
from posthog.schema import ExperimentSignificanceCode, ExperimentVariantTrendsBaseStats

from ee.clickhouse.queries.experiments import (
FF_DISTRIBUTION_THRESHOLD,
)

Probability = float


def posterior_poisson(total_counts, total_exposures, alpha, beta):
"""Calculates the posterior distribution of a Poisson distribution given the data and prior parameters.

Parameters
----------

total_counts: int
The total number of observed counts
total_exposures: int
The total number of exposed users
alpha: float
The prior hyper-parameters
beta: float
The prior hyper-parameter
"""
prior = stats.gamma(a=alpha, scale=1 / beta)
alpha = alpha + total_counts
beta = beta + total_exposures
posterior = stats.gamma(a=alpha, scale=1 / beta)

return prior, posterior


def calculate_probabilities(
control_variant: ExperimentVariantTrendsBaseStats,
test_variants: list[ExperimentVariantTrendsBaseStats],
simulations: int = 100_000,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is an arbitrary large number for now, which is fine. we should do some quick testing to see if we really need such a large number

) -> list[Probability]:
"""
Calculates probability that each variant is the best using Bayesian inference.
Uses a Poisson likelihood with Gamma prior for modeling count data.
"""
if not control_variant:
raise ValueError("No control variant data found")

if len(test_variants) >= 10:
raise ValueError("Can't calculate experiment results for more than 10 variants")

if len(test_variants) < 1:
raise ValueError("Can't calculate experiment results for less than 2 variants")

variants = [control_variant, *test_variants]
probabilities = []

# For each variant, calculate probability it's the best
for target_variant in variants:
other_variants = [v for v in variants if v != target_variant]

# Get posterior distribution for target variant
_, target_posterior = posterior_poisson(
target_variant.count,
target_variant.exposure,
alpha=1, # prior alpha
beta=1, # prior beta
)

# Get posterior distributions for other variants
other_posteriors = [
posterior_poisson(
variant.count,
variant.exposure,
alpha=1, # prior alpha
beta=1, # prior beta
)[1]
for variant in other_variants
]

# Sample from posteriors
target_samples = target_posterior.rvs(simulations)
other_samples = [p.rvs(simulations) for p in other_posteriors]

# Count how often target variant is best
wins = sum(
target_sample > max(other_variant_samples)
for target_sample, other_variant_samples in zip(target_samples, zip(*other_samples))
)
Comment on lines +83 to +86
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can use vectorization here, much faster than a loop. haven't tested, but something like this perhaps (assuming we are working with numpy arrays here).

wins = (target_samples[:, None] > other_samples).sum(axis=1).sum()


probabilities.append(wins / simulations)

return probabilities


def are_results_significant(
control_variant: ExperimentVariantTrendsBaseStats,
test_variants: list[ExperimentVariantTrendsBaseStats],
probabilities: list[Probability],
threshold: float = 0.95,
) -> tuple[ExperimentSignificanceCode, float]:
"""
Determines if results are significant using Bayesian criteria:
1. Check if we have enough exposure
2. Check if any variant has high probability of being best
"""
# Check minimum exposure
if control_variant.absolute_exposure < FF_DISTRIBUTION_THRESHOLD:
return ExperimentSignificanceCode.NOT_ENOUGH_EXPOSURE, 0.0

for variant in test_variants:
if variant.absolute_exposure < FF_DISTRIBUTION_THRESHOLD:
return ExperimentSignificanceCode.NOT_ENOUGH_EXPOSURE, 0.0

# Check if any variant has high probability of being best
max_prob = max(probabilities)
if max_prob > threshold:
return ExperimentSignificanceCode.SIGNIFICANT, max_prob

return ExperimentSignificanceCode.LOW_WIN_PROBABILITY, max_prob


def calculate_credible_intervals(variants: list[ExperimentVariantTrendsBaseStats], interval: float = 0.95) -> dict:
"""
Calculate credible intervals for each variant's rate parameter
using the Gamma posterior distribution.
"""
alpha = (1 - interval) / 2
intervals = {}

for variant in variants:
posterior = stats.gamma(a=variant.count + 1, scale=1 / variant.absolute_exposure)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to add the beta to be statistically correct.
also, suggest to add PRIOR_ALPHA, and PRIOR_BETA as constants in this file, to avoid hard coding them.

Suggested change
posterior = stats.gamma(a=variant.count + 1, scale=1 / variant.absolute_exposure)
posterior = stats.gamma(a=variant.count + 1, scale=1 / (variant.absolute_exposure + 1))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, here we use variant.absolute_exposure where as in calculate_probabilities we use variant.exposure. What's the difference between them and why is the other used here?


lower, upper = posterior.ppf([alpha, 1 - alpha])
intervals[variant.key] = (lower, upper)

return intervals
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
from posthog.hogql_queries.experiments.experiment_trends_statistics import (
calculate_probabilities,
are_results_significant,
calculate_credible_intervals,
)
from posthog.schema import ExperimentVariantTrendsBaseStats, ExperimentSignificanceCode
from posthog.test.base import BaseTest


class TestExperimentTrendsStatistics(BaseTest):
def test_calculate_probabilities(self):
# Test error cases
control = ExperimentVariantTrendsBaseStats(key="control", count=100, exposure=1.0, absolute_exposure=1000)

# Test with no test variants
with self.assertRaises(ValueError) as e:
calculate_probabilities(control, [])
self.assertEqual(str(e.exception), "Can't calculate experiment results for less than 2 variants")

# Test with too many variants
too_many_variants = [
ExperimentVariantTrendsBaseStats(key=f"test_{i}", count=100, exposure=1.0, absolute_exposure=1000)
for i in range(10)
]
with self.assertRaises(ValueError) as e:
calculate_probabilities(control, too_many_variants)
self.assertEqual(str(e.exception), "Can't calculate experiment results for more than 10 variants")

# Test probability calculations
test = ExperimentVariantTrendsBaseStats(
key="test",
count=150, # 50% more events than control
exposure=1.0,
absolute_exposure=1000,
)

probabilities = calculate_probabilities(control, [test])

# Should return probabilities for both variants
self.assertEqual(len(probabilities), 2)

# Probabilities should sum to 1
self.assertAlmostEqual(sum(probabilities), 1.0, places=2)

# Test variant should have higher probability
self.assertGreater(probabilities[1], probabilities[0])

def test_analysis_clear_winner(self):
# Test case where there's a clear winning variant
control = ExperimentVariantTrendsBaseStats(key="control", count=100, exposure=1.0, absolute_exposure=1000)
test = ExperimentVariantTrendsBaseStats(key="test", count=150, exposure=1.0, absolute_exposure=1000)

# Calculate probabilities
probabilities = calculate_probabilities(control, [test])

# Test should have high probability of being better
self.assertGreater(probabilities[1], 0.95)

# Results should be significant
significance, prob = are_results_significant(control, [test], probabilities)
self.assertEqual(significance, ExperimentSignificanceCode.SIGNIFICANT)

# Check credible intervals
intervals = calculate_credible_intervals([control, test])
self.assertIn("control", intervals)
self.assertIn("test", intervals)

# Test interval should be higher than control
self.assertGreater(intervals["test"][0], intervals["control"][0])

def test_analysis_no_clear_winner(self):
# Test case where variants are very similar
control = ExperimentVariantTrendsBaseStats(key="control", count=100, exposure=1.0, absolute_exposure=1000)
test = ExperimentVariantTrendsBaseStats(key="test", count=102, exposure=1.0, absolute_exposure=1000)

probabilities = calculate_probabilities(control, [test])

# Neither variant should have high probability of being best
self.assertLess(max(probabilities), 0.95)

significance, _ = are_results_significant(control, [test], probabilities)
self.assertEqual(significance, ExperimentSignificanceCode.LOW_WIN_PROBABILITY)

def test_analysis_not_enough_exposure(self):
# Test case where there's not enough exposure
control = ExperimentVariantTrendsBaseStats(
key="control",
count=10,
exposure=1.0,
absolute_exposure=50, # Below FF_DISTRIBUTION_THRESHOLD
)
test = ExperimentVariantTrendsBaseStats(
key="test",
count=15,
exposure=1.0,
absolute_exposure=50, # Below FF_DISTRIBUTION_THRESHOLD
)

# Calculate probabilities
probabilities = calculate_probabilities(control, [test])

# Results should show not enough exposure
significance, prob = are_results_significant(control, [test], probabilities)
self.assertEqual(significance, ExperimentSignificanceCode.NOT_ENOUGH_EXPOSURE)
self.assertEqual(prob, 0.0)

# Test when only control has low exposure
test.absolute_exposure = 1000
significance, prob = are_results_significant(control, [test], probabilities)
self.assertEqual(significance, ExperimentSignificanceCode.NOT_ENOUGH_EXPOSURE)
self.assertEqual(prob, 0.0)

# Test when only test variant has low exposure
control.absolute_exposure = 1000
test.absolute_exposure = 50
significance, prob = are_results_significant(control, [test], probabilities)
self.assertEqual(significance, ExperimentSignificanceCode.NOT_ENOUGH_EXPOSURE)
self.assertEqual(prob, 0.0)

def test_credible_intervals(self):
# Test case with known values
control = ExperimentVariantTrendsBaseStats(
key="control",
count=100, # 100 events
exposure=1.0,
absolute_exposure=1000, # 1000 users
)
test = ExperimentVariantTrendsBaseStats(
key="test",
count=150, # 150 events
exposure=1.0,
absolute_exposure=1000, # 1000 users
)

intervals = calculate_credible_intervals([control, test])

# Check control interval
self.assertIn("control", intervals)
control_lower, control_upper = intervals["control"]
# With count=100 and exposure=1000, rate should be around 0.1
self.assertGreater(control_upper, 0.08) # Upper bound should be above 0.08
self.assertLess(control_lower, 0.12) # Lower bound should be below 0.12

# Check test interval
self.assertIn("test", intervals)
test_lower, test_upper = intervals["test"]
# With count=150 and exposure=1000, rate should be around 0.15
self.assertGreater(test_upper, 0.13) # Upper bound should be above 0.13
self.assertLess(test_lower, 0.17) # Lower bound should be below 0.17

# Test with custom interval width
narrow_intervals = calculate_credible_intervals([control, test], interval=0.5)
# 50% interval should be narrower than 95% interval
self.assertLess(
narrow_intervals["control"][1] - narrow_intervals["control"][0],
intervals["control"][1] - intervals["control"][0],
)
Loading