Skip to content

Commit

Permalink
feat(batch-exports): Add HTTP Batch Export destination (#20318)
Browse files Browse the repository at this point in the history
* feat(batch-exports): Add HTTP Batch Export destination

To possibly be reused in the future, but for now it only submits
payloads in the PostHog /batch format.

* add geoip_disable, don't toJSONString elements_chain, and mark some HTTP status codes as non-retryable

* Add heartbeating to HTTP batch export

* Update query snapshots

* Update query snapshots

* fix: Re-use client session

* refactor: Rename last_uploaded_part_timestamp to last_uploaded_timestamp

---------

Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: Tomás Farías Santana <[email protected]>
  • Loading branch information
3 people authored Feb 19, 2024
1 parent a271ea5 commit 36d1be0
Show file tree
Hide file tree
Showing 12 changed files with 1,036 additions and 2 deletions.
2 changes: 1 addition & 1 deletion latest_migrations.manifest
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ contenttypes: 0002_remove_content_type_name
ee: 0015_add_verified_properties
otp_static: 0002_throttling
otp_totp: 0002_auto_20190420_0723
posthog: 0388_add_schema_to_batch_exports
posthog: 0389_alter_batchexportdestination_type
sessions: 0001_initial
social_django: 0010_uid_db_index
two_factor: 0007_auto_20201201_1019
27 changes: 27 additions & 0 deletions posthog/api/test/dashboards/__snapshots__/test_dashboard.ambr
Original file line number Diff line number Diff line change
Expand Up @@ -1235,6 +1235,33 @@
5 /* ... */)) /*controller='project_dashboards-detail',route='api/projects/%28%3FP%3Cparent_lookup_team_id%3E%5B%5E/.%5D%2B%29/dashboards/%28%3FP%3Cpk%3E%5B%5E/.%5D%2B%29/%3F%24'*/
'''
# ---
# name: TestDashboard.test_adding_insights_is_not_nplus1_for_gets.32
'''
SELECT "posthog_dashboard"."id",
"posthog_dashboard"."name",
"posthog_dashboard"."description",
"posthog_dashboard"."team_id",
"posthog_dashboard"."pinned",
"posthog_dashboard"."created_at",
"posthog_dashboard"."created_by_id",
"posthog_dashboard"."deleted",
"posthog_dashboard"."last_accessed_at",
"posthog_dashboard"."filters",
"posthog_dashboard"."creation_mode",
"posthog_dashboard"."restriction_level",
"posthog_dashboard"."deprecated_tags",
"posthog_dashboard"."tags",
"posthog_dashboard"."share_token",
"posthog_dashboard"."is_shared"
FROM "posthog_dashboard"
WHERE (NOT ("posthog_dashboard"."deleted")
AND "posthog_dashboard"."id" IN (1,
2,
3,
4,
5 /* ... */)) /*controller='project_dashboards-detail',route='api/projects/%28%3FP%3Cparent_lookup_team_id%3E%5B%5E/.%5D%2B%29/dashboards/%28%3FP%3Cpk%3E%5B%5E/.%5D%2B%29/%3F%24'*/
'''
# ---
# name: TestDashboard.test_adding_insights_is_not_nplus1_for_gets.4
'''
SELECT "posthog_dashboard"."id",
Expand Down
2 changes: 2 additions & 0 deletions posthog/batch_exports/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class Destination(models.TextChoices):
POSTGRES = "Postgres"
REDSHIFT = "Redshift"
BIGQUERY = "BigQuery"
HTTP = "HTTP"
NOOP = "NoOp"

secret_fields = {
Expand All @@ -36,6 +37,7 @@ class Destination(models.TextChoices):
"Postgres": set("password"),
"Redshift": set("password"),
"BigQuery": {"private_key", "private_key_id", "client_email", "token_uri"},
"HTTP": set("token"),
"NoOp": set(),
}

Expand Down
16 changes: 16 additions & 0 deletions posthog/batch_exports/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,21 @@ class BigQueryBatchExportInputs:
batch_export_schema: BatchExportSchema | None = None


@dataclass
class HttpBatchExportInputs:
"""Inputs for Http export workflow."""

batch_export_id: str
team_id: int
url: str
token: str
interval: str = "hour"
data_interval_end: str | None = None
exclude_events: list[str] | None = None
include_events: list[str] | None = None
batch_export_schema: BatchExportSchema | None = None


@dataclass
class NoOpInputs:
"""NoOp Workflow is used for testing, it takes a single argument to echo back."""
Expand All @@ -174,6 +189,7 @@ class NoOpInputs:
"Postgres": ("postgres-export", PostgresBatchExportInputs),
"Redshift": ("redshift-export", RedshiftBatchExportInputs),
"BigQuery": ("bigquery-export", BigQueryBatchExportInputs),
"HTTP": ("http-export", HttpBatchExportInputs),
"NoOp": ("no-op", NoOpInputs),
}

Expand Down
29 changes: 29 additions & 0 deletions posthog/migrations/0389_alter_batchexportdestination_type.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Generated by Django 4.1.13 on 2024-02-13 18:47

from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("posthog", "0388_add_schema_to_batch_exports"),
]

operations = [
migrations.AlterField(
model_name="batchexportdestination",
name="type",
field=models.CharField(
choices=[
("S3", "S3"),
("Snowflake", "Snowflake"),
("Postgres", "Postgres"),
("Redshift", "Redshift"),
("BigQuery", "Bigquery"),
("HTTP", "Http"),
("NoOp", "Noop"),
],
help_text="A choice of supported BatchExportDestination types.",
max_length=64,
),
),
]
3 changes: 2 additions & 1 deletion posthog/settings/temporal.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
BATCH_EXPORT_SNOWFLAKE_UPLOAD_CHUNK_SIZE_BYTES = 1024 * 1024 * 100 # 100MB
BATCH_EXPORT_POSTGRES_UPLOAD_CHUNK_SIZE_BYTES = 1024 * 1024 * 50 # 50MB
BATCH_EXPORT_BIGQUERY_UPLOAD_CHUNK_SIZE_BYTES = 1024 * 1024 * 100 # 100MB

BATCH_EXPORT_HTTP_UPLOAD_CHUNK_SIZE_BYTES = 1024 * 1024 * 10 # 10MB
BATCH_EXPORT_HTTP_BATCH_SIZE = 1000

UNCONSTRAINED_TIMESTAMP_TEAM_IDS = get_list(os.getenv("UNCONSTRAINED_TIMESTAMP_TEAM_IDS", ""))
6 changes: 6 additions & 0 deletions posthog/temporal/batch_exports/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@
SnowflakeBatchExportWorkflow,
insert_into_snowflake_activity,
)
from posthog.temporal.batch_exports.http_batch_export import (
HttpBatchExportWorkflow,
insert_into_http_activity,
)
from posthog.temporal.batch_exports.squash_person_overrides import *

WORKFLOWS = [
Expand All @@ -42,6 +46,7 @@
RedshiftBatchExportWorkflow,
S3BatchExportWorkflow,
SnowflakeBatchExportWorkflow,
HttpBatchExportWorkflow,
SquashPersonOverridesWorkflow,
]

Expand All @@ -58,6 +63,7 @@
insert_into_redshift_activity,
insert_into_s3_activity,
insert_into_snowflake_activity,
insert_into_http_activity,
noop_activity,
prepare_dictionary,
prepare_person_overrides,
Expand Down
8 changes: 8 additions & 0 deletions posthog/temporal/batch_exports/batch_exports.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,14 @@ def write(self, content: bytes | str):

return result

def write_record_as_bytes(self, record: bytes):
result = self.write(record)

self.records_total += 1
self.records_since_last_reset += 1

return result

def write_records_to_jsonl(self, records):
"""Write records to a temporary file as JSONL."""
if len(records) == 1:
Expand Down
Loading

0 comments on commit 36d1be0

Please sign in to comment.