Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(batch-exports): Add HTTP Batch Export destination #20318

Merged
merged 7 commits into from
Feb 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion latest_migrations.manifest
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ contenttypes: 0002_remove_content_type_name
ee: 0015_add_verified_properties
otp_static: 0002_throttling
otp_totp: 0002_auto_20190420_0723
posthog: 0388_add_schema_to_batch_exports
posthog: 0389_alter_batchexportdestination_type
sessions: 0001_initial
social_django: 0010_uid_db_index
two_factor: 0007_auto_20201201_1019
27 changes: 27 additions & 0 deletions posthog/api/test/dashboards/__snapshots__/test_dashboard.ambr
Original file line number Diff line number Diff line change
Expand Up @@ -1235,6 +1235,33 @@
5 /* ... */)) /*controller='project_dashboards-detail',route='api/projects/%28%3FP%3Cparent_lookup_team_id%3E%5B%5E/.%5D%2B%29/dashboards/%28%3FP%3Cpk%3E%5B%5E/.%5D%2B%29/%3F%24'*/
'''
# ---
# name: TestDashboard.test_adding_insights_is_not_nplus1_for_gets.32
'''
SELECT "posthog_dashboard"."id",
"posthog_dashboard"."name",
"posthog_dashboard"."description",
"posthog_dashboard"."team_id",
"posthog_dashboard"."pinned",
"posthog_dashboard"."created_at",
"posthog_dashboard"."created_by_id",
"posthog_dashboard"."deleted",
"posthog_dashboard"."last_accessed_at",
"posthog_dashboard"."filters",
"posthog_dashboard"."creation_mode",
"posthog_dashboard"."restriction_level",
"posthog_dashboard"."deprecated_tags",
"posthog_dashboard"."tags",
"posthog_dashboard"."share_token",
"posthog_dashboard"."is_shared"
FROM "posthog_dashboard"
WHERE (NOT ("posthog_dashboard"."deleted")
AND "posthog_dashboard"."id" IN (1,
2,
3,
4,
5 /* ... */)) /*controller='project_dashboards-detail',route='api/projects/%28%3FP%3Cparent_lookup_team_id%3E%5B%5E/.%5D%2B%29/dashboards/%28%3FP%3Cpk%3E%5B%5E/.%5D%2B%29/%3F%24'*/
'''
# ---
# name: TestDashboard.test_adding_insights_is_not_nplus1_for_gets.4
'''
SELECT "posthog_dashboard"."id",
Expand Down
2 changes: 2 additions & 0 deletions posthog/batch_exports/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class Destination(models.TextChoices):
POSTGRES = "Postgres"
REDSHIFT = "Redshift"
BIGQUERY = "BigQuery"
HTTP = "HTTP"
NOOP = "NoOp"

secret_fields = {
Expand All @@ -36,6 +37,7 @@ class Destination(models.TextChoices):
"Postgres": set("password"),
"Redshift": set("password"),
"BigQuery": {"private_key", "private_key_id", "client_email", "token_uri"},
"HTTP": set("token"),
"NoOp": set(),
}

Expand Down
16 changes: 16 additions & 0 deletions posthog/batch_exports/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,21 @@ class BigQueryBatchExportInputs:
batch_export_schema: BatchExportSchema | None = None


@dataclass
class HttpBatchExportInputs:
"""Inputs for Http export workflow."""

batch_export_id: str
team_id: int
url: str
token: str
interval: str = "hour"
data_interval_end: str | None = None
exclude_events: list[str] | None = None
include_events: list[str] | None = None
batch_export_schema: BatchExportSchema | None = None


@dataclass
class NoOpInputs:
"""NoOp Workflow is used for testing, it takes a single argument to echo back."""
Expand All @@ -174,6 +189,7 @@ class NoOpInputs:
"Postgres": ("postgres-export", PostgresBatchExportInputs),
"Redshift": ("redshift-export", RedshiftBatchExportInputs),
"BigQuery": ("bigquery-export", BigQueryBatchExportInputs),
"HTTP": ("http-export", HttpBatchExportInputs),
"NoOp": ("no-op", NoOpInputs),
}

Expand Down
29 changes: 29 additions & 0 deletions posthog/migrations/0389_alter_batchexportdestination_type.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Generated by Django 4.1.13 on 2024-02-13 18:47

from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("posthog", "0388_add_schema_to_batch_exports"),
]

operations = [
migrations.AlterField(
model_name="batchexportdestination",
name="type",
field=models.CharField(
choices=[
("S3", "S3"),
("Snowflake", "Snowflake"),
("Postgres", "Postgres"),
("Redshift", "Redshift"),
("BigQuery", "Bigquery"),
("HTTP", "Http"),
("NoOp", "Noop"),
],
help_text="A choice of supported BatchExportDestination types.",
max_length=64,
),
),
]
3 changes: 2 additions & 1 deletion posthog/settings/temporal.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
BATCH_EXPORT_SNOWFLAKE_UPLOAD_CHUNK_SIZE_BYTES = 1024 * 1024 * 100 # 100MB
BATCH_EXPORT_POSTGRES_UPLOAD_CHUNK_SIZE_BYTES = 1024 * 1024 * 50 # 50MB
BATCH_EXPORT_BIGQUERY_UPLOAD_CHUNK_SIZE_BYTES = 1024 * 1024 * 100 # 100MB

BATCH_EXPORT_HTTP_UPLOAD_CHUNK_SIZE_BYTES = 1024 * 1024 * 10 # 10MB
BATCH_EXPORT_HTTP_BATCH_SIZE = 1000
bretthoerner marked this conversation as resolved.
Show resolved Hide resolved

UNCONSTRAINED_TIMESTAMP_TEAM_IDS = get_list(os.getenv("UNCONSTRAINED_TIMESTAMP_TEAM_IDS", ""))
6 changes: 6 additions & 0 deletions posthog/temporal/batch_exports/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@
SnowflakeBatchExportWorkflow,
insert_into_snowflake_activity,
)
from posthog.temporal.batch_exports.http_batch_export import (
HttpBatchExportWorkflow,
insert_into_http_activity,
)
from posthog.temporal.batch_exports.squash_person_overrides import *

WORKFLOWS = [
Expand All @@ -42,6 +46,7 @@
RedshiftBatchExportWorkflow,
S3BatchExportWorkflow,
SnowflakeBatchExportWorkflow,
HttpBatchExportWorkflow,
SquashPersonOverridesWorkflow,
]

Expand All @@ -58,6 +63,7 @@
insert_into_redshift_activity,
insert_into_s3_activity,
insert_into_snowflake_activity,
insert_into_http_activity,
noop_activity,
prepare_dictionary,
prepare_person_overrides,
Expand Down
8 changes: 8 additions & 0 deletions posthog/temporal/batch_exports/batch_exports.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,14 @@ def write(self, content: bytes | str):

return result

def write_record_as_bytes(self, record: bytes):
result = self.write(record)

self.records_total += 1
self.records_since_last_reset += 1

return result

def write_records_to_jsonl(self, records):
"""Write records to a temporary file as JSONL."""
if len(records) == 1:
Expand Down
Loading
Loading