Skip to content

Commit

Permalink
feat(data-warehouse): DLT + temporal (#18700)
Browse files Browse the repository at this point in the history
* testing

* draft

* tests

* working rough draft

* split workflow and activity workers

* temp

* split task queues

* add schedule

* working with schedule

* add bucket stitching activity

* sync data steps for pipeline

* add comment

* update default endpoints

* workflow with all steps

* update heartbeat

* update env var, update folders so there are common and specific workflow modules

* update workflow

* update imports

* reformat

* already sync

* format

* adjust async methods

* make it readable

* remove extra

* remove unnecessary config

* add invoices to default endpoint

* change heartbeat name

* remove

* update dev packages

* update migration

* package and tests

* update test import path

* restore task queue env var and remove unnecessary ones, update github action to only trigger deploy for respective worker

* fix tests

* typing

* revert task queue change

* update github action and env vars

* poll reload

* update retry

* change activity stiching function back to sync for now

* typing

* try to import differently

* Update query snapshots

* Update query snapshots

* remap

* move

* Update UI snapshots for `chromium` (2)

* Update UI snapshots for `chromium` (1)

* Update UI snapshots for `chromium` (2)

* Update UI snapshots for `chromium` (1)

* sort

* update import

* Update UI snapshots for `chromium` (1)

* Update UI snapshots for `chromium` (1)

* typing

* add notes on logging

* make sure datawarehousetable filtering works right

* update timeout and how datawarehouse tables are linked

* sort

* add delete

* Update UI snapshots for `chromium` (1)

* Update UI snapshots for `chromium` (1)

---------

Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
  • Loading branch information
EDsCODE and github-actions[bot] authored Nov 29, 2023
1 parent 022996d commit 2a6a13c
Show file tree
Hide file tree
Showing 52 changed files with 1,297 additions and 165 deletions.
19 changes: 19 additions & 0 deletions .github/workflows/container-images-cd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -142,3 +142,22 @@ jobs:
"image_tag": "${{ steps.build.outputs.digest }}",
"worker_name": "temporal-worker"
}
- name: Check for changes that affect data warehouse temporal worker
id: check_changes_data_warehouse_temporal_worker
run: |
echo "::set-output name=changed::$(git diff --name-only HEAD^ HEAD | grep -E '^posthog/temporal/common|^posthog/temporal/data_imports|^posthog/warehouse/|^posthog/management/commands/start_temporal_worker.py$' || true)"
- name: Trigger Data Warehouse Temporal Worker Cloud deployment
if: steps.check_changes_data_warehouse_temporal_worker.outputs.changed != ''
uses: mvasigh/dispatch-action@main
with:
token: ${{ steps.deployer.outputs.token }}
repo: charts
owner: PostHog
event_type: temporal_worker_deploy
message: |
{
"image_tag": "${{ steps.build.outputs.digest }}",
"worker_name": "temporal-worker-data-warehouse"
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import { Breadcrumb, ExternalDataStripeSource } from '~/types'

import type { dataWarehouseSettingsLogicType } from './dataWarehouseSettingsLogicType'

const REFRESH_INTERVAL = 5000

export interface DataWarehouseSource {}

export const dataWarehouseSettingsLogic = kea<dataWarehouseSettingsLogicType>([
Expand Down Expand Up @@ -63,7 +65,14 @@ export const dataWarehouseSettingsLogic = kea<dataWarehouseSettingsLogicType>([
],
],
}),
listeners(({ actions }) => ({
listeners(({ actions, cache }) => ({
loadSourcesSuccess: () => {
clearTimeout(cache.refreshTimeout)

cache.refreshTimeout = setTimeout(() => {
actions.loadSources()
}, REFRESH_INTERVAL)
},
deleteSource: async ({ source }) => {
await api.externalDataSources.delete(source.id)
actions.loadSources()
Expand Down
6 changes: 3 additions & 3 deletions posthog/batch_exports/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
ScheduleState,
)

from posthog import settings
from posthog.batch_exports.models import (
BatchExport,
BatchExportBackfill,
Expand All @@ -30,6 +29,7 @@
pause_schedule,
delete_schedule,
)
from posthog.constants import BATCH_EXPORTS_TASK_QUEUE


class BatchExportsInputsProtocol(typing.Protocol):
Expand Down Expand Up @@ -327,7 +327,7 @@ async def start_backfill_batch_export_workflow(temporal: Client, inputs: Backfil
"backfill-batch-export",
inputs,
id=workflow_id,
task_queue=settings.TEMPORAL_TASK_QUEUE,
task_queue=BATCH_EXPORTS_TASK_QUEUE,
)

return workflow_id
Expand Down Expand Up @@ -399,7 +399,7 @@ def sync_batch_export(batch_export: BatchExport, created: bool):
)
),
id=str(batch_export.id),
task_queue=settings.TEMPORAL_TASK_QUEUE,
task_queue=BATCH_EXPORTS_TASK_QUEUE,
),
spec=ScheduleSpec(
start_at=batch_export.start_at,
Expand Down
2 changes: 2 additions & 0 deletions posthog/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,3 +300,5 @@ class FlagRequestType(str, Enum):


ENRICHED_DASHBOARD_INSIGHT_IDENTIFIER = "Feature Viewed"
DATA_WAREHOUSE_TASK_QUEUE = "data-warehouse-task-queue"
BATCH_EXPORTS_TASK_QUEUE = "no-sandbox-python-django"
9 changes: 7 additions & 2 deletions posthog/management/commands/start_temporal_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,18 @@
from django.core.management.base import BaseCommand

from posthog.temporal.common.worker import start_worker

from posthog.temporal.batch_exports import WORKFLOWS as BATCH_EXPORTS_WORKFLOWS, ACTIVITIES as BATCH_EXPORTS_ACTIVITIES
from posthog.temporal.data_imports import WORKFLOWS as DATA_SYNC_WORKFLOWS, ACTIVITIES as DATA_SYNC_ACTIVITIES
from posthog.constants import DATA_WAREHOUSE_TASK_QUEUE, BATCH_EXPORTS_TASK_QUEUE

WORKFLOWS_DICT = {
"no-sandbox-python-django": BATCH_EXPORTS_WORKFLOWS,
BATCH_EXPORTS_TASK_QUEUE: BATCH_EXPORTS_WORKFLOWS,
DATA_WAREHOUSE_TASK_QUEUE: DATA_SYNC_WORKFLOWS,
}
ACTIVITIES_DICT = {
"no-sandbox-python-django": BATCH_EXPORTS_ACTIVITIES,
BATCH_EXPORTS_TASK_QUEUE: BATCH_EXPORTS_ACTIVITIES,
DATA_WAREHOUSE_TASK_QUEUE: DATA_SYNC_ACTIVITIES,
}


Expand Down
2 changes: 2 additions & 0 deletions posthog/settings/airbyte.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,6 @@
AIRBYTE_BUCKET_KEY = os.getenv("AIRBYTE_BUCKET_KEY", None)
AIRBYTE_BUCKET_SECRET = os.getenv("AIRBYTE_BUCKET_SECRET", None)
AIRBYTE_BUCKET_DOMAIN = os.getenv("AIRBYTE_BUCKET_DOMAIN", None)
# for DLT
BUCKET_URL = os.getenv("BUCKET_URL", None)
AIRBYTE_BUCKET_NAME = os.getenv("AIRBYTE_BUCKET_NAME", None)
1 change: 0 additions & 1 deletion posthog/settings/temporal.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
TEMPORAL_CLIENT_KEY = os.getenv("TEMPORAL_CLIENT_KEY", None)
TEMPORAL_WORKFLOW_MAX_ATTEMPTS = os.getenv("TEMPORAL_WORKFLOW_MAX_ATTEMPTS", "3")


BATCH_EXPORT_S3_UPLOAD_CHUNK_SIZE_BYTES = 1024 * 1024 * 50 # 50MB
BATCH_EXPORT_SNOWFLAKE_UPLOAD_CHUNK_SIZE_BYTES = 1024 * 1024 * 100 # 100MB
BATCH_EXPORT_POSTGRES_UPLOAD_CHUNK_SIZE_BYTES = 1024 * 1024 * 50 # 50MB
Expand Down
2 changes: 1 addition & 1 deletion posthog/temporal/batch_exports/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
SquashPersonOverridesWorkflow,
]

ACTIVITIES: Sequence[Callable] = [
ACTIVITIES = [
backfill_schedule,
create_batch_export_backfill_model,
create_export_run,
Expand Down
2 changes: 1 addition & 1 deletion posthog/temporal/batch_exports/backfill_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@
from django.conf import settings

from posthog.batch_exports.service import BackfillBatchExportInputs
from posthog.temporal.common.client import connect
from posthog.temporal.batch_exports.base import PostHogWorkflow
from posthog.temporal.batch_exports.batch_exports import (
CreateBatchExportBackfillInputs,
UpdateBatchExportBackfillStatusInputs,
create_batch_export_backfill_model,
update_batch_export_backfill_model_status,
)
from posthog.temporal.common.client import connect


class TemporalScheduleNotFoundError(Exception):
Expand Down
5 changes: 4 additions & 1 deletion posthog/temporal/batch_exports/batch_exports.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@
update_batch_export_run_status,
)
from posthog.temporal.batch_exports.logger import bind_batch_exports_logger
from posthog.temporal.batch_exports.metrics import get_export_finished_metric, get_export_started_metric
from posthog.temporal.batch_exports.metrics import (
get_export_finished_metric,
get_export_started_metric,
)

SELECT_QUERY_TEMPLATE = Template(
"""
Expand Down
13 changes: 8 additions & 5 deletions posthog/temporal/batch_exports/bigquery_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,6 @@
from temporalio.common import RetryPolicy

from posthog.batch_exports.service import BigQueryBatchExportInputs
from posthog.temporal.common.utils import (
HeartbeatDetails,
should_resume_from_activity_heartbeat,
)
from posthog.temporal.batch_exports.base import PostHogWorkflow
from posthog.temporal.batch_exports.batch_exports import (
BatchExportTemporaryFile,
Expand All @@ -28,7 +24,14 @@
)
from posthog.temporal.batch_exports.clickhouse import get_client
from posthog.temporal.batch_exports.logger import bind_batch_exports_logger
from posthog.temporal.batch_exports.metrics import get_bytes_exported_metric, get_rows_exported_metric
from posthog.temporal.batch_exports.metrics import (
get_bytes_exported_metric,
get_rows_exported_metric,
)
from posthog.temporal.common.utils import (
HeartbeatDetails,
should_resume_from_activity_heartbeat,
)


async def load_jsonl_file_to_bigquery_table(jsonl_file, table, table_schema, bigquery_client):
Expand Down
5 changes: 4 additions & 1 deletion posthog/temporal/batch_exports/postgres_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@
)
from posthog.temporal.batch_exports.clickhouse import get_client
from posthog.temporal.batch_exports.logger import bind_batch_exports_logger
from posthog.temporal.batch_exports.metrics import get_bytes_exported_metric, get_rows_exported_metric
from posthog.temporal.batch_exports.metrics import (
get_bytes_exported_metric,
get_rows_exported_metric,
)


@contextlib.asynccontextmanager
Expand Down
5 changes: 4 additions & 1 deletion posthog/temporal/batch_exports/s3_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@
)
from posthog.temporal.batch_exports.clickhouse import get_client
from posthog.temporal.batch_exports.logger import bind_batch_exports_logger
from posthog.temporal.batch_exports.metrics import get_bytes_exported_metric, get_rows_exported_metric
from posthog.temporal.batch_exports.metrics import (
get_bytes_exported_metric,
get_rows_exported_metric,
)


def get_allowed_template_variables(inputs) -> dict[str, str]:
Expand Down
17 changes: 10 additions & 7 deletions posthog/temporal/batch_exports/snowflake_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,6 @@
from temporalio.common import RetryPolicy

from posthog.batch_exports.service import SnowflakeBatchExportInputs
from posthog.temporal.common.utils import (
HeartbeatDetails,
HeartbeatParseError,
NotEnoughHeartbeatValuesError,
should_resume_from_activity_heartbeat,
)
from posthog.temporal.batch_exports.base import PostHogWorkflow
from posthog.temporal.batch_exports.batch_exports import (
BatchExportTemporaryFile,
Expand All @@ -33,7 +27,16 @@
)
from posthog.temporal.batch_exports.clickhouse import get_client
from posthog.temporal.batch_exports.logger import bind_batch_exports_logger
from posthog.temporal.batch_exports.metrics import get_bytes_exported_metric, get_rows_exported_metric
from posthog.temporal.batch_exports.metrics import (
get_bytes_exported_metric,
get_rows_exported_metric,
)
from posthog.temporal.common.utils import (
HeartbeatDetails,
HeartbeatParseError,
NotEnoughHeartbeatValuesError,
should_resume_from_activity_heartbeat,
)


class SnowflakeFileNotUploadedError(Exception):
Expand Down
37 changes: 37 additions & 0 deletions posthog/temporal/common/heartbeat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import asyncio
import collections.abc
import datetime as dt
import typing

import temporalio.activity


class AsyncHeartbeatDetails(typing.NamedTuple):
"""Details sent over in a Temporal Activity heartbeat."""

def make_activity_heartbeat_while_running(
self, function_to_run: collections.abc.Callable, heartbeat_every: dt.timedelta
) -> collections.abc.Callable[..., collections.abc.Coroutine]:
"""Return a callable that returns a coroutine that heartbeats with these HeartbeatDetails.
The returned callable wraps 'function_to_run' while heartbeating every 'heartbeat_every'
seconds.
"""

async def heartbeat() -> None:
"""Heartbeat every 'heartbeat_every' seconds."""
while True:
await asyncio.sleep(heartbeat_every.total_seconds())
temporalio.activity.heartbeat(self)

async def heartbeat_while_running(*args, **kwargs):
"""Wrap 'function_to_run' to asynchronously heartbeat while awaiting."""
heartbeat_task = asyncio.create_task(heartbeat())

try:
return await function_to_run(*args, **kwargs)
finally:
heartbeat_task.cancel()
await asyncio.wait([heartbeat_task])

return heartbeat_while_running
7 changes: 1 addition & 6 deletions posthog/temporal/common/schedule.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
from asgiref.sync import async_to_sync
from temporalio.client import (
Client,
Schedule,
ScheduleUpdate,
ScheduleUpdateInput,
)
from temporalio.client import Client, Schedule, ScheduleUpdate, ScheduleUpdateInput


@async_to_sync
Expand Down
2 changes: 1 addition & 1 deletion posthog/temporal/common/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import sys
from datetime import timedelta

from temporalio.runtime import Runtime, TelemetryConfig, PrometheusConfig
from temporalio.runtime import PrometheusConfig, Runtime, TelemetryConfig
from temporalio.worker import UnsandboxedWorkflowRunner, Worker

from posthog.temporal.common.client import connect
Expand Down
11 changes: 11 additions & 0 deletions posthog/temporal/data_imports/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from posthog.temporal.data_imports.external_data_job import *

WORKFLOWS = [ExternalDataJobWorkflow]

ACTIVITIES = [
create_external_data_job_model,
update_external_data_job_model,
run_external_data_job,
move_draft_to_production_activity,
validate_schema_activity,
]
Loading

0 comments on commit 2a6a13c

Please sign in to comment.