Skip to content

Commit

Permalink
feat(batch-exports): Support high frequency exports (#17844)
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasfarias authored Oct 9, 2023
1 parent e3bcb76 commit 09ee6a1
Show file tree
Hide file tree
Showing 9 changed files with 160 additions and 70 deletions.
1 change: 1 addition & 0 deletions frontend/src/lib/constants.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ export const FEATURE_FLAGS = {
GENERIC_SIGNUP_BENEFITS: 'generic-signup-benefits', // experiment, owner: @raquelmsmith
SURVEYS_POSITIONS: 'surveys-positions', // owner: @liyiy
WEB_ANALYTICS: 'web-analytics', // owner @robbie-c #team-web-analytics
HIGH_FREQUENCY_BATCH_EXPORTS: 'high-frequency-batch-exports', // owner: @tomasfarias
// owner: team monitoring, only to be enabled for PostHog team testing
EXCEPTION_AUTOCAPTURE: 'exception-autocapture',
DATA_WAREHOUSE: 'data-warehouse', // owner: @EDsCODE
Expand Down
27 changes: 21 additions & 6 deletions frontend/src/scenes/batch_exports/BatchExportEditForm.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,18 @@ import { IconInfo } from 'lib/lemon-ui/icons'
import { BatchExportsEditLogicProps, batchExportsEditLogic } from './batchExportEditLogic'
import { Field } from 'lib/forms/Field'
import { Tooltip } from 'lib/lemon-ui/Tooltip'
import { featureFlagLogic } from 'lib/logic/featureFlagLogic'
import { FEATURE_FLAGS } from 'lib/constants'

export function BatchExportsEditForm(props: BatchExportsEditLogicProps): JSX.Element {
const logic = batchExportsEditLogic(props)
const { isNew, batchExportConfigForm, isBatchExportConfigFormSubmitting, batchExportConfigLoading } =
useValues(logic)
const { submitBatchExportConfigForm, cancelEditing } = useActions(logic)

const { featureFlags } = useValues(featureFlagLogic)
const highFrequencyBatchExports = featureFlags[FEATURE_FLAGS.HIGH_FREQUENCY_BATCH_EXPORTS]

return (
<>
{batchExportConfigLoading ? (
Expand Down Expand Up @@ -51,12 +56,22 @@ export function BatchExportsEditForm(props: BatchExportsEditLogicProps): JSX.Ele
</>
}
>
<LemonSelect
options={[
{ value: 'hour', label: 'Hourly' },
{ value: 'day', label: 'Daily' },
]}
/>
{highFrequencyBatchExports ? (
<LemonSelect
options={[
{ value: 'hour', label: 'Hourly' },
{ value: 'day', label: 'Daily' },
{ value: 'every 5 minutes', label: 'Every 5 minutes' },
]}
/>
) : (
<LemonSelect
options={[
{ value: 'hour', label: 'Hourly' },
{ value: 'day', label: 'Daily' },
]}
/>
)}
</Field>
{/* <Field
name="start_at"
Expand Down
2 changes: 1 addition & 1 deletion latest_migrations.manifest
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ contenttypes: 0002_remove_content_type_name
ee: 0015_add_verified_properties
otp_static: 0002_throttling
otp_totp: 0002_auto_20190420_0723
posthog: 0352_auto_20230926_1833
posthog: 0353_add_5_minute_interval_to_batch_exports
sessions: 0001_initial
social_django: 0010_uid_db_index
two_factor: 0007_auto_20201201_1019
66 changes: 54 additions & 12 deletions posthog/api/test/batch_exports/test_create.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import datetime as dt
import json
from unittest import mock

import pytest
from asgiref.sync import async_to_sync
Expand All @@ -12,6 +12,7 @@
from posthog.api.test.test_organization import create_organization
from posthog.api.test.test_team import create_team
from posthog.api.test.test_user import create_user
from posthog.batch_exports.models import BatchExport
from posthog.temporal.client import sync_connect
from posthog.temporal.codec import EncryptionCodec

Expand All @@ -20,7 +21,7 @@
]


@pytest.mark.parametrize("interval", ["hour", "day"])
@pytest.mark.parametrize("interval", ["hour", "day", "every 5 minutes"])
def test_create_batch_export_with_interval_schedule(client: HttpClient, interval):
"""Test creating a BatchExport.
Expand Down Expand Up @@ -54,11 +55,17 @@ def test_create_batch_export_with_interval_schedule(client: HttpClient, interval
client.force_login(user)

with start_test_worker(temporal):
response = create_batch_export(
client,
team.pk,
batch_export_data,
)
with mock.patch(
"posthog.batch_exports.http.posthoganalytics.feature_enabled", return_value=True
) as feature_enabled:
response = create_batch_export(
client,
team.pk,
batch_export_data,
)

if interval == "every 5 minutes":
feature_enabled.assert_called_once_with("high-frequency-batch-exports", str(team.uuid))

assert response.status_code == status.HTTP_201_CREATED, response.json()

Expand All @@ -80,11 +87,8 @@ def test_create_batch_export_with_interval_schedule(client: HttpClient, interval
codec = EncryptionCodec(settings=settings)
schedule = describe_schedule(temporal, data["id"])

if interval == "hour":
expected_interval = dt.timedelta(hours=1)
else:
expected_interval = dt.timedelta(days=1)
assert schedule.schedule.spec.intervals[0].every == expected_interval
batch_export = BatchExport.objects.get(id=data["id"])
assert schedule.schedule.spec.intervals[0].every == batch_export.interval_time_delta

decoded_payload = async_to_sync(codec.decode)(schedule.schedule.action.args)
args = json.loads(decoded_payload[0].data)
Expand Down Expand Up @@ -138,3 +142,41 @@ def test_cannot_create_a_batch_export_for_another_organization(client: HttpClien
)

assert response.status_code == status.HTTP_403_FORBIDDEN, response.json()


def test_cannot_create_a_batch_export_with_higher_frequencies_if_not_enabled(client: HttpClient):
temporal = sync_connect()

destination_data = {
"type": "S3",
"config": {
"bucket_name": "my-production-s3-bucket",
"region": "us-east-1",
"prefix": "posthog-events/",
"aws_access_key_id": "abc123",
"aws_secret_access_key": "secret",
},
}

batch_export_data = {
"name": "my-production-s3-bucket-destination",
"destination": destination_data,
"interval": "every 5 minutes",
}

organization = create_organization("Test Org")
team = create_team(organization)
user = create_user("[email protected]", "Test User", organization)

with start_test_worker(temporal):
client.force_login(user)
with mock.patch(
"posthog.batch_exports.http.posthoganalytics.feature_enabled", return_value=False
) as feature_enabled:
response = create_batch_export(
client,
team.pk,
batch_export_data,
)
assert response.status_code == status.HTTP_403_FORBIDDEN, response.json()
feature_enabled.assert_called_once_with("high-frequency-batch-exports", str(team.uuid))
23 changes: 20 additions & 3 deletions posthog/batch_exports/http.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
import datetime as dt
from typing import Any

import posthoganalytics
from django.db import transaction
from django.utils.timezone import now
from rest_framework import mixins, request, response, serializers, viewsets
from rest_framework.decorators import action
from rest_framework.exceptions import NotAuthenticated, NotFound, ValidationError
from rest_framework.exceptions import (
NotAuthenticated,
NotFound,
PermissionDenied,
ValidationError,
)
from rest_framework.pagination import CursorPagination
from rest_framework.permissions import IsAuthenticated
from rest_framework_dataclasses.serializers import DataclassSerializer
Expand All @@ -27,7 +33,13 @@
sync_batch_export,
unpause_batch_export,
)
from posthog.models import BatchExport, BatchExportDestination, BatchExportRun, User
from posthog.models import (
BatchExport,
BatchExportDestination,
BatchExportRun,
Team,
User,
)
from posthog.permissions import (
ProjectMembershipNecessaryPermissions,
TeamMemberAccessPermission,
Expand Down Expand Up @@ -158,9 +170,14 @@ def create(self, validated_data: dict) -> BatchExport:
destination_data = validated_data.pop("destination")
team_id = self.context["team_id"]

if validated_data["interval"] not in ("hour", "day", "week") and not posthoganalytics.feature_enabled(
"high-frequency-batch-exports",
str(Team.objects.get(id=team_id).uuid),
):
raise PermissionDenied("Higher frequency exports are not enabled for this team.")

destination = BatchExportDestination(**destination_data)
batch_export = BatchExport(team_id=team_id, destination=destination, **validated_data)

sync_batch_export(batch_export, created=True)

with transaction.atomic():
Expand Down
9 changes: 7 additions & 2 deletions posthog/batch_exports/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class Status(models.TextChoices):
)


BATCH_EXPORT_INTERVALS = [("hour", "hour"), ("day", "day"), ("week", "week")]
BATCH_EXPORT_INTERVALS = [("hour", "hour"), ("day", "day"), ("week", "week"), ("every 5 minutes", "every 5 minutes")]


class BatchExport(UUIDModel):
Expand Down Expand Up @@ -145,13 +145,18 @@ def latest_runs(self):

@property
def interval_time_delta(self) -> timedelta:
"""Return a datetime.timedelta that corresponds to this BatchExport's interval."""
if self.interval == "hour":
return timedelta(hours=1)
elif self.interval == "day":
return timedelta(days=1)
elif self.interval == "week":
return timedelta(weeks=1)
raise ValueError("Invalid interval")
elif self.interval.startswith("every"):
_, value, unit = self.interval.split(" ")
kwargs = {unit: int(value)}
return timedelta(**kwargs)
raise ValueError(f"Invalid interval: '{self.interval}'")


class BatchExportLogEntryLevel(str, enum.Enum):
Expand Down
23 changes: 23 additions & 0 deletions posthog/migrations/0353_add_5_minute_interval_to_batch_exports.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Generated by Django 3.2.19 on 2023-10-06 11:07

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
("posthog", "0352_auto_20230926_1833"),
]

operations = [
migrations.AlterField(
model_name="batchexport",
name="interval",
field=models.CharField(
choices=[("hour", "hour"), ("day", "day"), ("week", "week"), ("every 5 minutes", "every 5 minutes")],
default="hour",
help_text="The interval at which to export data.",
max_length=64,
),
),
]
Loading

0 comments on commit 09ee6a1

Please sign in to comment.