Skip to content

Commit

Permalink
refactor: Remove early row count and update batch export metrics (#22810
Browse files Browse the repository at this point in the history
)

* refactor: Update metrics to fetch counts at request time

* refactor: Remove count from batch exports

* fix: Move import to method

* fix: Add function

* fix: Typing fixes

* fix: Move early return in main workflow activity

* test: Docstring updates and more tests

* fix: Actually use include and exclude events

* refactor: Switch to counting runs

* feat: Frontend display number of runs instead of events for batchexports

* Update UI snapshots for `chromium` (2)

* Update UI snapshots for `chromium` (2)

* Update UI snapshots for `chromium` (2)

* Update UI snapshots for `chromium` (2)

---------

Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
  • Loading branch information
tomasfarias and github-actions[bot] authored Jun 13, 2024
1 parent f3c4647 commit de17595
Show file tree
Hide file tree
Showing 19 changed files with 313 additions and 241 deletions.
4 changes: 2 additions & 2 deletions frontend/src/scenes/pipeline/AppMetricSparkLine.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ export function AppMetricSparkLine({ pipelineNode }: { pipelineNode: PipelineNod
const displayData: SparklineTimeSeries[] = [
{
color: 'success',
name: 'Events sent',
name: pipelineNode.backend == 'batch_export' ? 'Runs succeeded' : 'Events sent',
values: successes,
},
]
if (appMetricsResponse?.metrics.failures.some((failure) => failure > 0)) {
displayData.push({
color: 'danger',
name: 'Events dropped',
name: pipelineNode.backend == 'batch_export' ? 'Runs failed' : 'Events dropped',
values: failures,
})
}
Expand Down
73 changes: 32 additions & 41 deletions posthog/api/app_metrics.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import datetime as dt
import uuid
from typing import Any

from django.db.models import Q, Sum
from django.db.models.functions import Coalesce, TruncDay
from django.db.models import Q, Count
from django.db.models.functions import TruncDay
from rest_framework import mixins, request, response, viewsets
from rest_framework.decorators import action

Expand Down Expand Up @@ -32,11 +31,8 @@ class AppMetricsViewSet(TeamAndOrgViewSetMixin, mixins.RetrieveModelMixin, views

def retrieve(self, request: request.Request, *args: Any, **kwargs: Any) -> response.Response:
try:
rows = self.get_batch_export_runs_app_metrics_queryset(batch_export_id=kwargs["pk"])
dates, successes, failures = self.get_batch_export_runs_app_metrics_queryset(batch_export_id=kwargs["pk"])

dates = [row["dates"].strftime("%Y-%m-%d") for row in rows]
successes = [row["successes"] for row in rows]
failures = [row["failures"] for row in rows]
return response.Response(
{
"metrics": {
Expand Down Expand Up @@ -83,30 +79,7 @@ def error_details(self, request: request.Request, *args: Any, **kwargs: Any) ->
return response.Response({"result": error_details})

def get_batch_export_runs_app_metrics_queryset(self, batch_export_id: str):
"""Use the Django ORM to fetch app metrics for batch export runs.
Attempts to (roughly) match the following (much more readable) query:
```
select
date_trunc('day', last_updated_at) as dates,
sum(case when status = 'Completed' then coalesce(records_total_count, 0) else 0) as successes,
sum(case when status != 'Completed' then coalesce(records_total_count, 0) else 0) as failures
from
posthog_batchexportrun
where
batch_export_id = :batch_export_id
and last_updated_at between :date_from and :date_to
and status != 'Running'
group by
date_trunc('day', last_updated_at)
order by
dates
```
A truncated 'last_updated_at' is used as the grouping date as it reflects when a particular run
was last updated. It feels easier to explain to users that if they see metrics for today, those
correspond to runs that happened today, even if the runs themselves exported data from a year ago
(because it was a backfill).
"""Use the Django ORM and ClickHouse to fetch app metrics for batch export runs.
Raises:
ValueError: If provided 'batch_export_id' is not a valid UUID.
Expand All @@ -120,22 +93,40 @@ def get_batch_export_runs_app_metrics_queryset(self, batch_export_id: str):
relative_date_parse(before, self.team.timezone_info) if before else dt.datetime.now(dt.timezone.utc)
)
date_range = (after_datetime, before_datetime)
return (
BatchExportRun.objects.filter(batch_export_id=batch_export_uuid, last_updated_at__range=date_range)
.annotate(dates=TruncDay("last_updated_at"))
.values("dates")
.annotate(
successes=Sum(
Coalesce("records_total_count", 0), filter=Q(status=BatchExportRun.Status.COMPLETED), default=0
runs = (
BatchExportRun.objects.select_related("batch_export__destination")
.filter(
batch_export_id=batch_export_uuid,
last_updated_at__range=date_range,
status__in=(
BatchExportRun.Status.COMPLETED,
BatchExportRun.Status.FAILED,
BatchExportRun.Status.FAILED_RETRYABLE,
),
failures=Sum(
Coalesce("records_total_count", 0), filter=~Q(status=BatchExportRun.Status.COMPLETED), default=0
)
.annotate(day=TruncDay("last_updated_at"))
.values("day")
.annotate(
successes=Count("data_interval_end", filter=Q(status=BatchExportRun.Status.COMPLETED)),
failures=Count(
"data_interval_end",
filter=(Q(status=BatchExportRun.Status.FAILED) | Q(status=BatchExportRun.Status.FAILED_RETRYABLE)),
),
)
.order_by("dates")
.order_by("day")
.all()
)

dates = []
successes = []
failures = []
for run in runs:
dates.append(run["day"].strftime("%Y-%m-%d"))
successes.append(run["successes"])
failures.append(run["failures"])

return dates, successes, failures


class HistoricalExportsAppMetricsViewSet(
TeamAndOrgViewSetMixin,
Expand Down
46 changes: 38 additions & 8 deletions posthog/api/test/test_app_metrics.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import datetime as dt
import json
import uuid
from unittest import mock

from freezegun.api import freeze_time
Expand All @@ -8,6 +9,7 @@
from posthog.api.test.batch_exports.conftest import start_test_worker
from posthog.api.test.batch_exports.operations import create_batch_export_ok
from posthog.batch_exports.models import BatchExportRun
from posthog.client import sync_execute
from posthog.models.activity_logging.activity_log import Detail, Trigger, log_activity
from posthog.models.plugin import Plugin, PluginConfig
from posthog.models.utils import UUIDT
Expand All @@ -18,6 +20,20 @@
SAMPLE_PAYLOAD = {"dateRange": ["2021-06-10", "2022-06-12"], "parallelism": 1}


def insert_event(team_id: int, timestamp: dt.datetime, event: str = "test-event"):
sync_execute(
"INSERT INTO `sharded_events` (uuid, team_id, event, timestamp) VALUES",
[
{
"uuid": uuid.uuid4(),
"team_id": team_id,
"event": event,
"timestamp": timestamp,
}
],
)


@freeze_time("2021-12-05T13:23:00Z")
class TestAppMetricsAPI(ClickhouseTestMixin, APIBaseTest):
maxDiff = None
Expand Down Expand Up @@ -88,6 +104,7 @@ def test_retrieve_batch_export_runs_app_metrics(self):
"prefix": "posthog-events/",
"aws_access_key_id": "abc123",
"aws_secret_access_key": "secret",
"include_events": ["test-event"],
},
}

Expand Down Expand Up @@ -119,17 +136,31 @@ def test_retrieve_batch_export_runs_app_metrics(self):
data_interval_end=last_updated_at,
data_interval_start=last_updated_at - dt.timedelta(hours=1),
status=BatchExportRun.Status.COMPLETED,
records_completed=3,
records_total_count=3,
)
BatchExportRun.objects.create(
batch_export_id=batch_export_id,
data_interval_end=last_updated_at,
data_interval_start=last_updated_at - dt.timedelta(hours=1),
status=BatchExportRun.Status.COMPLETED,
)
BatchExportRun.objects.create(
batch_export_id=batch_export_id,
data_interval_end=last_updated_at,
data_interval_start=last_updated_at - dt.timedelta(hours=1),
status=BatchExportRun.Status.COMPLETED,
)

BatchExportRun.objects.create(
batch_export_id=batch_export_id,
data_interval_end=last_updated_at - dt.timedelta(hours=2),
data_interval_start=last_updated_at - dt.timedelta(hours=3),
status=BatchExportRun.Status.FAILED,
records_completed=0,
records_total_count=5,
)
BatchExportRun.objects.create(
batch_export_id=batch_export_id,
data_interval_end=last_updated_at - dt.timedelta(hours=2),
data_interval_start=last_updated_at - dt.timedelta(hours=3),
status=BatchExportRun.Status.FAILED_RETRYABLE,
)

response = self.client.get(f"/api/projects/@current/app_metrics/{batch_export_id}?date_from=-7d")
Expand All @@ -149,8 +180,8 @@ def test_retrieve_batch_export_runs_app_metrics(self):
],
"successes": [3, 3, 3, 3, 3, 3, 3],
"successes_on_retry": [0, 0, 0, 0, 0, 0, 0],
"failures": [5, 5, 5, 5, 5, 5, 5],
"totals": {"successes": 21, "successes_on_retry": 0, "failures": 35},
"failures": [2, 2, 2, 2, 2, 2, 2],
"totals": {"successes": 21, "successes_on_retry": 0, "failures": 14},
},
"errors": None,
},
Expand All @@ -166,6 +197,7 @@ def test_retrieve_batch_export_runs_app_metrics_defaults_to_zero(self):
"prefix": "posthog-events/",
"aws_access_key_id": "abc123",
"aws_secret_access_key": "secret",
"exclude_events": ["exclude-me"],
},
}

Expand Down Expand Up @@ -197,8 +229,6 @@ def test_retrieve_batch_export_runs_app_metrics_defaults_to_zero(self):
data_interval_end=last_updated_at,
data_interval_start=last_updated_at - dt.timedelta(hours=1),
status=BatchExportRun.Status.COMPLETED,
records_completed=1,
records_total_count=1,
)

response = self.client.get(f"/api/projects/@current/app_metrics/{batch_export_id}?date_from=-7d")
Expand Down
50 changes: 3 additions & 47 deletions posthog/temporal/batch_exports/batch_exports.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import asyncio
import collections.abc
import dataclasses
import datetime as dt
Expand Down Expand Up @@ -27,7 +26,7 @@
get_export_finished_metric,
get_export_started_metric,
)
from posthog.temporal.common.clickhouse import ClickHouseClient, get_client
from posthog.temporal.common.clickhouse import ClickHouseClient
from posthog.temporal.common.client import connect
from posthog.temporal.common.logger import bind_temporal_worker_logger

Expand Down Expand Up @@ -329,12 +328,11 @@ class StartBatchExportRunInputs:
is_backfill: bool = False


RecordsTotalCount = int | None
BatchExportRunId = str


@activity.defn
async def start_batch_export_run(inputs: StartBatchExportRunInputs) -> tuple[BatchExportRunId, RecordsTotalCount]:
async def start_batch_export_run(inputs: StartBatchExportRunInputs) -> BatchExportRunId:
"""Activity that creates an BatchExportRun and returns the count of records to export.
Intended to be used in all export workflows, usually at the start, to create a model
Expand All @@ -350,56 +348,14 @@ async def start_batch_export_run(inputs: StartBatchExportRunInputs) -> tuple[Bat
inputs.data_interval_end,
)

delta = dt.datetime.fromisoformat(inputs.data_interval_end) - dt.datetime.fromisoformat(inputs.data_interval_start)
async with get_client(team_id=inputs.team_id) as client:
if not await client.is_alive():
raise ConnectionError("Cannot establish connection to ClickHouse")

try:
count = await asyncio.wait_for(
get_rows_count(
client=client,
team_id=inputs.team_id,
interval_start=inputs.data_interval_start,
interval_end=inputs.data_interval_end,
exclude_events=inputs.exclude_events,
include_events=inputs.include_events,
is_backfill=inputs.is_backfill,
),
timeout=(delta / 12).total_seconds(),
)
except asyncio.TimeoutError:
count = None

if count is None:
logger.info(
"Batch export for range %s - %s will continue without a count of rows to export",
inputs.data_interval_start,
inputs.data_interval_end,
)
elif count > 0:
logger.info(
"Batch export for range %s - %s will export %s rows",
inputs.data_interval_start,
inputs.data_interval_end,
count,
)
else:
logger.info(
"Batch export for range %s - %s has no rows to export",
inputs.data_interval_start,
inputs.data_interval_end,
)

run = await acreate_batch_export_run(
batch_export_id=uuid.UUID(inputs.batch_export_id),
data_interval_start=inputs.data_interval_start,
data_interval_end=inputs.data_interval_end,
status=BatchExportRun.Status.STARTING,
records_total_count=count,
)

return str(run.id), count
return str(run.id)


@dataclasses.dataclass
Expand Down
29 changes: 9 additions & 20 deletions posthog/temporal/batch_exports/bigquery_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
StartBatchExportRunInputs,
default_fields,
execute_batch_export_insert_activity,
finish_batch_export_run,
get_data_interval,
iter_records,
start_batch_export_run,
Expand Down Expand Up @@ -251,6 +250,10 @@ async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs) -> Records
is_backfill=inputs.is_backfill,
)

first_record_batch, records_iterator = peek_first_and_rewind(records_iterator)
if first_record_batch is None:
return 0

bigquery_table = None
inserted_at = None

Expand All @@ -270,8 +273,6 @@ async def flush_to_bigquery(bigquery_table, table_schema):
rows_exported.add(jsonl_file.records_since_last_reset)
bytes_exported.add(jsonl_file.bytes_since_last_reset)

first_record, records_iterator = peek_first_and_rewind(records_iterator)

if inputs.use_json_type is True:
json_type = "JSON"
json_columns = ["properties", "set", "set_once", "person_properties"]
Expand All @@ -296,8 +297,10 @@ async def flush_to_bigquery(bigquery_table, table_schema):
]

else:
column_names = [column for column in first_record.schema.names if column != "_inserted_at"]
record_schema = first_record.select(column_names).schema
column_names = [
column for column in first_record_batch.schema.names if column != "_inserted_at"
]
record_schema = first_record_batch.select(column_names).schema
schema = get_bigquery_fields_from_record_schema(record_schema, known_json_columns=json_columns)

bigquery_table = await create_table_in_bigquery(
Expand Down Expand Up @@ -371,7 +374,7 @@ async def run(self, inputs: BigQueryBatchExportInputs):
include_events=inputs.include_events,
is_backfill=inputs.is_backfill,
)
run_id, records_total_count = await workflow.execute_activity(
run_id = await workflow.execute_activity(
start_batch_export_run,
start_batch_export_run_inputs,
start_to_close_timeout=dt.timedelta(minutes=5),
Expand All @@ -390,20 +393,6 @@ async def run(self, inputs: BigQueryBatchExportInputs):
team_id=inputs.team_id,
)

if records_total_count == 0:
await workflow.execute_activity(
finish_batch_export_run,
finish_inputs,
start_to_close_timeout=dt.timedelta(minutes=5),
retry_policy=RetryPolicy(
initial_interval=dt.timedelta(seconds=10),
maximum_interval=dt.timedelta(seconds=60),
maximum_attempts=0,
non_retryable_error_types=["NotNullViolation", "IntegrityError"],
),
)
return

insert_inputs = BigQueryInsertInputs(
team_id=inputs.team_id,
table_id=inputs.table_id,
Expand Down
Loading

0 comments on commit de17595

Please sign in to comment.