Skip to content

Commit

Permalink
feat(data-warehouse): New pipeline WIP (#26341)
Browse files Browse the repository at this point in the history
Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
  • Loading branch information
Gilbert09 and github-actions[bot] authored Dec 19, 2024
1 parent 3485ba8 commit df3ea8d
Show file tree
Hide file tree
Showing 42 changed files with 1,199 additions and 284 deletions.
268 changes: 135 additions & 133 deletions mypy-baseline.txt

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion posthog/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ class FlagRequestType(StrEnum):

ENRICHED_DASHBOARD_INSIGHT_IDENTIFIER = "Feature Viewed"
DATA_WAREHOUSE_TASK_QUEUE = "data-warehouse-task-queue"
V2_DATA_WAREHOUSE_TASK_QUEUE = "v2-data-warehouse-task-queue"
DATA_WAREHOUSE_TASK_QUEUE_V2 = "v2-data-warehouse-task-queue"
BATCH_EXPORTS_TASK_QUEUE = "batch-exports-task-queue"
SYNC_BATCH_EXPORTS_TASK_QUEUE = "no-sandbox-python-django"
GENERAL_PURPOSE_TASK_QUEUE = "general-purpose-task-queue"
Expand Down
20 changes: 17 additions & 3 deletions posthog/hogql/database/s3_table.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import re
from typing import Optional
from typing import TYPE_CHECKING, Optional

from posthog.clickhouse.client.escape import substitute_params
from posthog.hogql.context import HogQLContext
from posthog.hogql.database.models import FunctionCallTable
from posthog.hogql.errors import ExposedHogQLError
from posthog.hogql.escape_sql import escape_hogql_identifier

if TYPE_CHECKING:
from posthog.warehouse.models import ExternalDataJob


def build_function_call(
url: str,
Expand All @@ -15,7 +18,10 @@ def build_function_call(
access_secret: Optional[str] = None,
structure: Optional[str] = None,
context: Optional[HogQLContext] = None,
pipeline_version: Optional["ExternalDataJob.PipelineVersion"] = None,
) -> str:
from posthog.warehouse.models import ExternalDataJob

raw_params: dict[str, str] = {}

def add_param(value: str, is_sensitive: bool = True) -> str:
Expand All @@ -36,10 +42,18 @@ def return_expr(expr: str) -> str:

# DeltaS3Wrapper format
if format == "DeltaS3Wrapper":
query_folder = "__query_v2" if pipeline_version == ExternalDataJob.PipelineVersion.V2 else "__query"

if url.endswith("/"):
escaped_url = add_param(f"{url[:len(url) - 1]}__query/*.parquet")
if pipeline_version == ExternalDataJob.PipelineVersion.V2:
escaped_url = add_param(f"{url[:-5]}{query_folder}/*.parquet")
else:
escaped_url = add_param(f"{url[:-1]}{query_folder}/*.parquet")
else:
escaped_url = add_param(f"{url}__query/*.parquet")
if pipeline_version == ExternalDataJob.PipelineVersion.V2:
escaped_url = add_param(f"{url[:-4]}{query_folder}/*.parquet")
else:
escaped_url = add_param(f"{url}{query_folder}/*.parquet")

if structure:
escaped_structure = add_param(structure, False)
Expand Down
6 changes: 3 additions & 3 deletions posthog/management/commands/start_temporal_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
from posthog.constants import (
BATCH_EXPORTS_TASK_QUEUE,
DATA_WAREHOUSE_TASK_QUEUE,
DATA_WAREHOUSE_TASK_QUEUE_V2,
GENERAL_PURPOSE_TASK_QUEUE,
SYNC_BATCH_EXPORTS_TASK_QUEUE,
V2_DATA_WAREHOUSE_TASK_QUEUE,
)
from posthog.temporal.batch_exports import (
ACTIVITIES as BATCH_EXPORTS_ACTIVITIES,
Expand All @@ -28,14 +28,14 @@
SYNC_BATCH_EXPORTS_TASK_QUEUE: BATCH_EXPORTS_WORKFLOWS,
BATCH_EXPORTS_TASK_QUEUE: BATCH_EXPORTS_WORKFLOWS,
DATA_WAREHOUSE_TASK_QUEUE: DATA_SYNC_WORKFLOWS + DATA_MODELING_WORKFLOWS,
V2_DATA_WAREHOUSE_TASK_QUEUE: DATA_SYNC_WORKFLOWS + DATA_MODELING_WORKFLOWS,
DATA_WAREHOUSE_TASK_QUEUE_V2: DATA_SYNC_WORKFLOWS + DATA_MODELING_WORKFLOWS,
GENERAL_PURPOSE_TASK_QUEUE: PROXY_SERVICE_WORKFLOWS,
}
ACTIVITIES_DICT = {
SYNC_BATCH_EXPORTS_TASK_QUEUE: BATCH_EXPORTS_ACTIVITIES,
BATCH_EXPORTS_TASK_QUEUE: BATCH_EXPORTS_ACTIVITIES,
DATA_WAREHOUSE_TASK_QUEUE: DATA_SYNC_ACTIVITIES + DATA_MODELING_ACTIVITIES,
V2_DATA_WAREHOUSE_TASK_QUEUE: DATA_SYNC_ACTIVITIES + DATA_MODELING_ACTIVITIES,
DATA_WAREHOUSE_TASK_QUEUE_V2: DATA_SYNC_ACTIVITIES + DATA_MODELING_ACTIVITIES,
GENERAL_PURPOSE_TASK_QUEUE: PROXY_SERVICE_ACTIVITIES,
}

Expand Down
30 changes: 30 additions & 0 deletions posthog/migrations/0533_externaldatajob_pipeline_version.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Generated by Django 4.2.15 on 2024-11-23 14:49

from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("posthog", "0532_taxonomy_unique_on_project"),
]

operations = [
migrations.AddField(
model_name="externaldatajob",
name="pipeline_version",
field=models.CharField(
blank=True,
choices=[("v1-dlt-sync", "v1-dlt-sync"), ("v2-non-dlt", "v2-non-dlt")],
max_length=400,
null=True,
),
),
migrations.RunSQL(
"""
UPDATE posthog_externaldatajob
SET pipeline_version = 'v1-dlt-sync'
WHERE pipeline_version is null
""",
reverse_sql=migrations.RunSQL.noop,
),
]
2 changes: 1 addition & 1 deletion posthog/migrations/max_migration.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0532_taxonomy_unique_on_project
0533_externaldatajob_pipeline_version
74 changes: 72 additions & 2 deletions posthog/tasks/test/test_usage_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -1314,11 +1314,23 @@ def test_external_data_rows_synced_response(

for i in range(5):
start_time = (now() - relativedelta(hours=i)).strftime("%Y-%m-%dT%H:%M:%SZ")
ExternalDataJob.objects.create(team_id=3, created_at=start_time, rows_synced=10, pipeline=source)
ExternalDataJob.objects.create(
team_id=3,
created_at=start_time,
rows_synced=10,
pipeline=source,
pipeline_version=ExternalDataJob.PipelineVersion.V1,
)

for i in range(5):
start_time = (now() - relativedelta(hours=i)).strftime("%Y-%m-%dT%H:%M:%SZ")
ExternalDataJob.objects.create(team_id=4, created_at=start_time, rows_synced=10, pipeline=source)
ExternalDataJob.objects.create(
team_id=4,
created_at=start_time,
rows_synced=10,
pipeline=source,
pipeline_version=ExternalDataJob.PipelineVersion.V1,
)

period = get_previous_day(at=now() + relativedelta(days=1))
period_start, period_end = period
Expand All @@ -1343,6 +1355,64 @@ def test_external_data_rows_synced_response(
assert org_2_report["organization_name"] == "Org 2"
assert org_2_report["rows_synced_in_period"] == 0

@patch("posthog.tasks.usage_report.Client")
@patch("posthog.tasks.usage_report.send_report_to_billing_service")
def test_external_data_rows_synced_response_with_v2_jobs(
self, billing_task_mock: MagicMock, posthog_capture_mock: MagicMock
) -> None:
self._setup_teams()

source = ExternalDataSource.objects.create(
team=self.analytics_team,
source_id="source_id",
connection_id="connection_id",
status=ExternalDataSource.Status.COMPLETED,
source_type=ExternalDataSource.Type.STRIPE,
)

for i in range(5):
start_time = (now() - relativedelta(hours=i)).strftime("%Y-%m-%dT%H:%M:%SZ")
ExternalDataJob.objects.create(
team_id=3,
created_at=start_time,
rows_synced=10,
pipeline=source,
pipeline_version=ExternalDataJob.PipelineVersion.V1,
)

for i in range(5):
start_time = (now() - relativedelta(hours=i)).strftime("%Y-%m-%dT%H:%M:%SZ")
ExternalDataJob.objects.create(
team_id=4,
created_at=start_time,
rows_synced=10,
pipeline=source,
pipeline_version=ExternalDataJob.PipelineVersion.V2,
)

period = get_previous_day(at=now() + relativedelta(days=1))
period_start, period_end = period
all_reports = _get_all_org_reports(period_start, period_end)

assert len(all_reports) == 3

org_1_report = _get_full_org_usage_report_as_dict(
_get_full_org_usage_report(all_reports[str(self.org_1.id)], get_instance_metadata(period))
)

org_2_report = _get_full_org_usage_report_as_dict(
_get_full_org_usage_report(all_reports[str(self.org_2.id)], get_instance_metadata(period))
)

assert org_1_report["organization_name"] == "Org 1"
assert org_1_report["rows_synced_in_period"] == 50

assert org_1_report["teams"]["3"]["rows_synced_in_period"] == 50
assert org_1_report["teams"]["4"]["rows_synced_in_period"] == 0 # V2 pipelines

assert org_2_report["organization_name"] == "Org 2"
assert org_2_report["rows_synced_in_period"] == 0


@freeze_time("2022-01-10T00:01:00Z")
class TestHogFunctionUsageReports(ClickhouseDestroyTablesMixin, TestCase, ClickhouseTestMixin):
Expand Down
21 changes: 18 additions & 3 deletions posthog/tasks/test/test_warehouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,12 @@ def test_capture_workspace_rows_synced_by_team_month_cutoff(self, mock_get_ph_cl

with freeze_time("2023-11-07T16:50:49Z"):
job = ExternalDataJob.objects.create(
pipeline=source, workflow_id="fake_workflow_id", team=self.team, status="Running", rows_synced=100000
pipeline=source,
workflow_id="fake_workflow_id",
team=self.team,
status="Running",
rows_synced=100000,
pipeline_version=ExternalDataJob.PipelineVersion.V1,
)

capture_workspace_rows_synced_by_team(self.team.pk)
Expand Down Expand Up @@ -86,12 +91,22 @@ def test_capture_workspace_rows_synced_by_team_month_cutoff_field_set(self, mock

with freeze_time("2023-10-30T18:32:41Z"):
ExternalDataJob.objects.create(
pipeline=source, workflow_id="fake_workflow_id", team=self.team, status="Completed", rows_synced=97747
pipeline=source,
workflow_id="fake_workflow_id",
team=self.team,
status="Completed",
rows_synced=97747,
pipeline_version=ExternalDataJob.PipelineVersion.V1,
)

with freeze_time("2023-11-07T16:50:49Z"):
job2 = ExternalDataJob.objects.create(
pipeline=source, workflow_id="fake_workflow_id", team=self.team, status="Completed", rows_synced=93353
pipeline=source,
workflow_id="fake_workflow_id",
team=self.team,
status="Completed",
rows_synced=93353,
pipeline_version=ExternalDataJob.PipelineVersion.V1,
)

capture_workspace_rows_synced_by_team(self.team.pk)
Expand Down
1 change: 1 addition & 0 deletions posthog/tasks/usage_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,7 @@ def get_teams_with_survey_responses_count_in_period(
def get_teams_with_rows_synced_in_period(begin: datetime, end: datetime) -> list:
return list(
ExternalDataJob.objects.filter(created_at__gte=begin, created_at__lte=end)
.exclude(pipeline_version=ExternalDataJob.PipelineVersion.V2)
.values("team_id")
.annotate(total=Sum("rows_synced"))
)
Expand Down
2 changes: 2 additions & 0 deletions posthog/temporal/data_imports/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
update_external_data_job_model,
check_billing_limits_activity,
sync_new_schemas_activity,
trigger_pipeline_v2,
)

WORKFLOWS = [ExternalDataJobWorkflow]
Expand All @@ -17,4 +18,5 @@
create_source_templates,
check_billing_limits_activity,
sync_new_schemas_activity,
trigger_pipeline_v2,
]
43 changes: 43 additions & 0 deletions posthog/temporal/data_imports/external_data_job.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,22 @@
import asyncio
import dataclasses
import datetime as dt
import json
import re

from django.conf import settings
from django.db import close_old_connections
import posthoganalytics
from temporalio import activity, exceptions, workflow
from temporalio.common import RetryPolicy

from posthog.constants import DATA_WAREHOUSE_TASK_QUEUE_V2

# TODO: remove dependency
from posthog.settings.base_variables import TEST
from posthog.temporal.batch_exports.base import PostHogWorkflow
from posthog.temporal.common.client import sync_connect
from posthog.temporal.data_imports.util import is_posthog_team
from posthog.temporal.data_imports.workflow_activities.check_billing_limits import (
CheckBillingLimitsActivityInputs,
check_billing_limits_activity,
Expand Down Expand Up @@ -131,6 +138,30 @@ def update_external_data_job_model(inputs: UpdateExternalDataJobStatusInputs) ->
)


@activity.defn
def trigger_pipeline_v2(inputs: ExternalDataWorkflowInputs):
logger = bind_temporal_worker_logger_sync(team_id=inputs.team_id)
logger.debug("Triggering V2 pipeline")

temporal = sync_connect()

asyncio.run(
temporal.start_workflow(
workflow="external-data-job",
arg=dataclasses.asdict(inputs),
id=f"{inputs.external_data_schema_id}-V2",
task_queue=str(DATA_WAREHOUSE_TASK_QUEUE_V2),
retry_policy=RetryPolicy(
maximum_interval=dt.timedelta(seconds=60),
maximum_attempts=1,
non_retryable_error_types=["NondeterminismError"],
),
)
)

logger.debug("V2 pipeline triggered")


@dataclasses.dataclass
class CreateSourceTemplateInputs:
team_id: int
Expand All @@ -154,6 +185,18 @@ def parse_inputs(inputs: list[str]) -> ExternalDataWorkflowInputs:
async def run(self, inputs: ExternalDataWorkflowInputs):
assert inputs.external_data_schema_id is not None

if (
settings.TEMPORAL_TASK_QUEUE != DATA_WAREHOUSE_TASK_QUEUE_V2
and not TEST
and is_posthog_team(inputs.team_id)
):
await workflow.execute_activity(
trigger_pipeline_v2,
inputs,
start_to_close_timeout=dt.timedelta(minutes=1),
retry_policy=RetryPolicy(maximum_attempts=1),
)

update_inputs = UpdateExternalDataJobStatusInputs(
job_id=None,
status=ExternalDataJob.Status.COMPLETED,
Expand Down
10 changes: 8 additions & 2 deletions posthog/temporal/data_imports/pipelines/chargebee/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,13 @@ def update_request(self, request: Request) -> None:

@dlt.source(max_table_nesting=0)
def chargebee_source(
api_key: str, site_name: str, endpoint: str, team_id: int, job_id: str, is_incremental: bool = False
api_key: str,
site_name: str,
endpoint: str,
team_id: int,
job_id: str,
db_incremental_field_last_value: Optional[Any],
is_incremental: bool = False,
):
config: RESTAPIConfig = {
"client": {
Expand All @@ -242,7 +248,7 @@ def chargebee_source(
"resources": [get_resource(endpoint, is_incremental)],
}

yield from rest_api_resources(config, team_id, job_id)
yield from rest_api_resources(config, team_id, job_id, db_incremental_field_last_value)


def validate_credentials(api_key: str, site_name: str) -> bool:
Expand Down
Loading

0 comments on commit df3ea8d

Please sign in to comment.