Skip to content

Commit

Permalink
Merge branch 'master' into persons-query-runners
Browse files Browse the repository at this point in the history
  • Loading branch information
mariusandra committed Oct 3, 2023
2 parents 189ba49 + b9e358c commit d593e0a
Show file tree
Hide file tree
Showing 193 changed files with 4,345 additions and 4,048 deletions.
7 changes: 0 additions & 7 deletions bin/docker-server
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,6 @@ trap 'rm -rf "$PROMETHEUS_MULTIPROC_DIR"' EXIT
export PROMETHEUS_METRICS_EXPORT_PORT=8001
export STATSD_PORT=${STATSD_PORT:-8125}

if [[ -n $INJECT_EC2_CLIENT_RACK ]]; then
# To avoid cross-AZ Kafka traffic, set KAFKA_CLIENT_RACK from the EC2 metadata endpoint.
# TODO: switch to the downwards API when https://github.com/kubernetes/kubernetes/issues/40610 is released
TOKEN=$(curl --max-time 0.1 -X PUT "http://169.254.169.254/latest/api/token" -H "X-aws-ec2-metadata-token-ttl-seconds: 21600")
export KAFKA_CLIENT_RACK=$(curl --max-time 0.1 -H "X-aws-ec2-metadata-token: $TOKEN" -v http://169.254.169.254/latest/meta-data/placement/availability-zone-id)
fi

exec gunicorn posthog.wsgi \
--config gunicorn.config.py \
--bind 0.0.0.0:8000 \
Expand Down
11 changes: 3 additions & 8 deletions ee/clickhouse/queries/experiments/secondary_experiment_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@
from zoneinfo import ZoneInfo

from rest_framework.exceptions import ValidationError
from ee.clickhouse.queries.experiments.trend_experiment_result import (
uses_count_per_property_value_aggregation,
uses_count_per_user_aggregation,
)
from ee.clickhouse.queries.experiments.trend_experiment_result import uses_math_aggregation_by_user_or_property_value

from posthog.constants import INSIGHT_FUNNELS, INSIGHT_TRENDS, TRENDS_CUMULATIVE
from posthog.models.feature_flag import FeatureFlag
Expand Down Expand Up @@ -59,7 +56,7 @@ def __init__(

self.team = team
if query_filter.insight == INSIGHT_TRENDS and not (
uses_count_per_user_aggregation(query_filter) or uses_count_per_property_value_aggregation(query_filter)
uses_math_aggregation_by_user_or_property_value(query_filter)
):
query_filter = query_filter.shallow_clone({"display": TRENDS_CUMULATIVE})

Expand Down Expand Up @@ -99,9 +96,7 @@ def get_trend_count_data_for_variants(self, insight_results) -> Dict[str, float]
count = result["count"]
breakdown_value = result["breakdown_value"]

if uses_count_per_user_aggregation(self.query_filter) or uses_count_per_property_value_aggregation(
self.query_filter
):
if uses_math_aggregation_by_user_or_property_value(self.query_filter):
count = result["count"] / len(result.get("data", [0]))

if breakdown_value in self.variants:
Expand Down
31 changes: 14 additions & 17 deletions ee/clickhouse/queries/experiments/trend_experiment_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from posthog.models.filters.filter import Filter
from posthog.models.team import Team
from posthog.queries.trends.trends import Trends
from posthog.queries.trends.util import COUNT_PER_ACTOR_MATH_FUNCTIONS, PROPERTY_MATH_FUNCTIONS
from posthog.queries.trends.util import ALL_SUPPORTED_MATH_FUNCTIONS

Probability = float

Expand All @@ -41,16 +41,18 @@ class Variant:
absolute_exposure: int


def uses_count_per_user_aggregation(filter: Filter):
def uses_math_aggregation_by_user_or_property_value(filter: Filter):
# sync with frontend: https://github.com/PostHog/posthog/blob/master/frontend/src/scenes/experiments/experimentLogic.tsx#L662
# the selector experimentCountPerUserMath

entities = filter.entities
count_per_actor_keys = COUNT_PER_ACTOR_MATH_FUNCTIONS.keys()
return any(entity.math in count_per_actor_keys for entity in entities)
math_keys = ALL_SUPPORTED_MATH_FUNCTIONS

# 'sum' doesn't need special handling, we can have custom exposure for sum filters
if "sum" in math_keys:
math_keys.remove("sum")

def uses_count_per_property_value_aggregation(filter: Filter):
entities = filter.entities
count_per_prop_value_keys = PROPERTY_MATH_FUNCTIONS.keys()
return any(entity.math in count_per_prop_value_keys for entity in entities)
return any(entity.math in math_keys for entity in entities)


class ClickhouseTrendExperimentResult:
Expand Down Expand Up @@ -89,14 +91,11 @@ def __init__(
experiment_end_date.astimezone(ZoneInfo(team.timezone)) if experiment_end_date else None
)

count_per_user_aggregation = uses_count_per_user_aggregation(filter)
count_per_property_value_aggregation = uses_count_per_property_value_aggregation(filter)
uses_math_aggregation = uses_math_aggregation_by_user_or_property_value(filter)

query_filter = filter.shallow_clone(
{
"display": TRENDS_CUMULATIVE
if not (count_per_user_aggregation or count_per_property_value_aggregation)
else TRENDS_LINEAR,
"display": TRENDS_CUMULATIVE if not uses_math_aggregation else TRENDS_LINEAR,
"date_from": start_date_in_project_timezone,
"date_to": end_date_in_project_timezone,
"explicit_date": True,
Expand All @@ -107,7 +106,7 @@ def __init__(
}
)

if count_per_user_aggregation or count_per_property_value_aggregation:
if uses_math_aggregation:
# A trend experiment can have only one metric, so take the first one to calculate exposure
# We copy the entity to avoid mutating the original filter
entity = query_filter.shallow_clone({}).entities[0]
Expand Down Expand Up @@ -213,9 +212,7 @@ def get_variants(self, insight_results, exposure_results):
exposure_counts = {}
exposure_ratios = {}

if uses_count_per_user_aggregation(self.query_filter) or uses_count_per_property_value_aggregation(
self.query_filter
):
if uses_math_aggregation_by_user_or_property_value(self.query_filter):
filtered_exposure_results = [
result for result in exposure_results if result["action"]["math"] == UNIQUE_USERS
]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# name: ClickhouseTestExperimentSecondaryResults.test_basic_secondary_metric_results
'
/* user_id:128 celery:posthog.celery.sync_insight_caching_state */
/* user_id:125 celery:posthog.celery.sync_insight_caching_state */
SELECT team_id,
date_diff('second', max(timestamp), now()) AS age
FROM events
Expand Down
127 changes: 126 additions & 1 deletion ee/clickhouse/views/test/test_clickhouse_experiments.py
Original file line number Diff line number Diff line change
Expand Up @@ -1505,7 +1505,7 @@ def test_experiment_flow_with_event_results_for_three_test_variants(self):
self.assertAlmostEqual(response_data["expected_loss"], 1, places=2)


@flaky(max_runs=10, min_passes=1)
# @flaky(max_runs=10, min_passes=1)
class ClickhouseTestTrendExperimentResults(ClickhouseTestMixin, APILicensedTest):
@snapshot_clickhouse_queries
def test_experiment_flow_with_event_results(self):
Expand Down Expand Up @@ -2440,3 +2440,128 @@ def test_experiment_flow_with_avg_count_per_property_value_results(self):
# The variant has high probability of being better. (effectively Gamma(10,1))
self.assertAlmostEqual(response_data["probability"]["test"], 0.805, places=2)
self.assertFalse(response_data["significant"])

def test_experiment_flow_with_sum_count_per_property_value_results(self):
journeys_for(
{
"person1": [
# 5 counts, single person
{
"event": "$pageview",
"timestamp": "2020-01-02",
"properties": {"$feature/a-b-test": "test", "mathable": 1},
},
{
"event": "$pageview",
"timestamp": "2020-01-02",
"properties": {"$feature/a-b-test": "test", "mathable": 1},
},
{
"event": "$pageview",
"timestamp": "2020-01-02",
"properties": {"$feature/a-b-test": "test", "mathable": 3},
},
{
"event": "$pageview",
"timestamp": "2020-01-02",
"properties": {"$feature/a-b-test": "test", "mathable": 3},
},
{
"event": "$pageview",
"timestamp": "2020-01-02",
"properties": {"$feature/a-b-test": "test", "mathable": 10},
},
],
"person2": [
{
"event": "$pageview",
"timestamp": "2020-01-03",
"properties": {"$feature/a-b-test": "control", "mathable": 1},
},
{
"event": "$pageview",
"timestamp": "2020-01-04",
"properties": {"$feature/a-b-test": "control", "mathable": 1},
},
{
"event": "$pageview",
"timestamp": "2020-01-05",
"properties": {"$feature/a-b-test": "control", "mathable": 1},
},
],
"person3": [
{
"event": "$pageview",
"timestamp": "2020-01-04",
"properties": {"$feature/a-b-test": "control", "mathable": 2},
},
],
"person4": [
{
"event": "$pageview",
"timestamp": "2020-01-05",
"properties": {"$feature/a-b-test": "test", "mathable": 1},
},
{
"event": "$pageview",
"timestamp": "2020-01-05",
"properties": {"$feature/a-b-test": "test", "mathable": 1.5},
},
],
# doesn't have feature set
"person_out_of_control": [
{"event": "$pageview", "timestamp": "2020-01-03"},
],
"person_out_of_end_date": [
{"event": "$pageview", "timestamp": "2020-08-03", "properties": {"$feature/a-b-test": "control"}},
],
},
self.team,
)

ff_key = "a-b-test"
# generates the FF which should result in the above events^
creation_response = self.client.post(
f"/api/projects/{self.team.id}/experiments/",
{
"name": "Test Experiment",
"description": "",
"start_date": "2020-01-01T00:00",
"end_date": "2020-01-06T00:00",
"feature_flag_key": ff_key,
"parameters": {
"custom_exposure_filter": {
"events": [
{
"id": "$pageview", # exposure is total pageviews
"order": 0,
}
],
}
},
"filters": {
"insight": "TRENDS",
"events": [{"order": 0, "id": "$pageview", "math": "sum", "math_property": "mathable"}],
"properties": [],
},
},
)

id = creation_response.json()["id"]

response = self.client.get(f"/api/projects/{self.team.id}/experiments/{id}/results")
self.assertEqual(200, response.status_code)

response_data = response.json()["result"]
result = sorted(response_data["insight"], key=lambda x: x["breakdown_value"])

self.assertEqual(result[0]["data"], [0.0, 0.0, 1.0, 4.0, 5.0, 5.0])
self.assertEqual("control", result[0]["breakdown_value"])

self.assertEqual(result[1]["data"], [0.0, 18.0, 18.0, 18.0, 20.5, 20.5])
self.assertEqual("test", result[1]["breakdown_value"])

# Variant with test: Gamma(7, 1) and control: Gamma(4, 1) distribution
# The variant has high probability of being better. (effectively Gamma(10,1))
self.assertAlmostEqual(response_data["probability"]["test"], 0.9513, places=2)
self.assertFalse(response_data["significant"])
2 changes: 2 additions & 0 deletions ee/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,5 @@

# Whether to enable the admin portal. Default false for self-hosted as if not setup properly can pose security issues.
ADMIN_PORTAL_ENABLED = get_from_env("ADMIN_PORTAL_ENABLED", DEMO or DEBUG, type_cast=str_to_bool)

ASSET_GENERATION_MAX_TIMEOUT_MINUTES = get_from_env("ASSET_GENERATION_MAX_TIMEOUT_MINUTES", 10, type_cast=int)
46 changes: 39 additions & 7 deletions ee/tasks/subscriptions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
from typing import Optional

import structlog
from prometheus_client import Counter
from sentry_sdk import capture_exception
from statshog.defaults.django import statsd

from ee.tasks.subscriptions.email_subscriptions import send_email_subscription_report
from ee.tasks.subscriptions.slack_subscriptions import send_slack_subscription_report
Expand All @@ -13,6 +13,14 @@

logger = structlog.get_logger(__name__)

SUBSCRIPTION_QUEUED = Counter(
"subscription_queued", "A subscription was queued for delivery", labelnames=["destination"]
)
SUBSCRIPTION_SUCCESS = Counter(
"subscription_send_success", "A subscription was sent successfully", labelnames=["destination"]
)
SUBSCRIPTION_FAILURE = Counter("subscription_send_failure", "A subscription failed to send", labelnames=["destination"])


def _deliver_subscription_report(
subscription_id: int, previous_value: Optional[str] = None, invite_message: Optional[str] = None
Expand All @@ -34,6 +42,8 @@ def _deliver_subscription_report(
return

if subscription.target_type == "email":
SUBSCRIPTION_QUEUED.labels(destination="email").inc()

insights, assets = generate_assets(subscription)

# Send emails
Expand All @@ -51,22 +61,38 @@ def _deliver_subscription_report(
invite_message=invite_message or "" if is_new_subscription_target else None,
total_asset_count=len(insights),
)
statsd.incr("subscription_email_send_success")
except Exception as e:
logger.error(e)
SUBSCRIPTION_FAILURE.labels(destination="email").inc()
logger.error(
"sending subscription failed",
subscription_id=subscription.id,
next_delivery_date=subscription.next_delivery_date,
destination=subscription.target_type,
exc_info=True,
)
capture_exception(e)
statsd.incr("subscription_email_send_failure")

SUBSCRIPTION_SUCCESS.labels(destination="email").inc()

elif subscription.target_type == "slack":
SUBSCRIPTION_QUEUED.labels(destination="slack").inc()

insights, assets = generate_assets(subscription)
try:
send_slack_subscription_report(
subscription, assets, total_asset_count=len(insights), is_new_subscription=is_new_subscription_target
)
statsd.incr("subscription_slack_send_success")
SUBSCRIPTION_SUCCESS.labels(destination="slack").inc()
except Exception as e:
statsd.incr("subscription_slack_send_failure")
logger.error(e)
SUBSCRIPTION_FAILURE.labels(destination="slack").inc()
logger.error(
"sending subscription failed",
subscription_id=subscription.id,
next_delivery_date=subscription.next_delivery_date,
destination=subscription.target_type,
exc_info=True,
)
capture_exception(e)
else:
raise NotImplementedError(f"{subscription.target_type} is not supported")

Expand All @@ -91,6 +117,12 @@ def schedule_all_subscriptions() -> None:
)

for subscription in subscriptions:
logger.info(
"Scheduling subscription",
subscription_id=subscription.id,
next_delivery_date=subscription.next_delivery_date,
destination=subscription.target_type,
)
deliver_subscription_report.delay(subscription.id)


Expand Down
Loading

0 comments on commit d593e0a

Please sign in to comment.