diff --git a/posthog/api/__init__.py b/posthog/api/__init__.py index dc84bf86b158f..b5ca842983f8a 100644 --- a/posthog/api/__init__.py +++ b/posthog/api/__init__.py @@ -4,6 +4,8 @@ from posthog.batch_exports import http as batch_exports from posthog.settings import EE_AVAILABLE from posthog.warehouse.api import saved_query, table, view_link + +from ..session_recordings.session_recording_api import SessionRecordingViewSet from . import ( activity_log, annotation, @@ -40,7 +42,6 @@ ) from .dashboards import dashboard, dashboard_templates from .data_management import DataManagementViewSet -from ..session_recordings.session_recording_api import SessionRecordingViewSet @decorators.api_view(["GET", "HEAD", "POST", "PUT", "PATCH", "DELETE"]) @@ -132,7 +133,22 @@ def api_not_found(request): batch_exports_router = projects_router.register( r"batch_exports", batch_exports.BatchExportViewSet, "batch_exports", ["team_id"] ) -batch_exports_router.register(r"runs", batch_exports.BatchExportRunViewSet, "runs", ["team_id", "batch_export_id"]) +batch_export_runs_router = batch_exports_router.register( + r"runs", batch_exports.BatchExportRunViewSet, "runs", ["team_id", "batch_export_id"] +) +batch_exports_router.register( + r"logs", + batch_exports.BatchExportLogViewSet, + "batch_export_run_logs", + ["team_id", "batch_export_id"], +) + +batch_export_runs_router.register( + r"logs", + batch_exports.BatchExportLogViewSet, + "batch_export_logs", + ["team_id", "batch_export_id", "run_id"], +) projects_router.register(r"warehouse_tables", table.TableViewSet, "project_warehouse_tables", ["team_id"]) projects_router.register( diff --git a/posthog/api/test/batch_exports/operations.py b/posthog/api/test/batch_exports/operations.py index 2000099a9385e..90d515130a9c4 100644 --- a/posthog/api/test/batch_exports/operations.py +++ b/posthog/api/test/batch_exports/operations.py @@ -104,3 +104,11 @@ def patch_batch_export(client, team_id, batch_export_id, new_batch_export_data): new_batch_export_data, content_type="application/json", ) + + +def get_batch_export_log_entries(client: TestClient, team_id: int, batch_export_id: str): + return client.get(f"/api/projects/{team_id}/batch_exports/{batch_export_id}/logs") + + +def get_batch_export_run_log_entries(client: TestClient, team_id: int, batch_export_id: str, run_id): + return client.get(f"/api/projects/{team_id}/batch_exports/{batch_export_id}/runs/{run_id}/logs") diff --git a/posthog/api/test/batch_exports/test_log_entry.py b/posthog/api/test/batch_exports/test_log_entry.py new file mode 100644 index 0000000000000..e7d63433e0054 --- /dev/null +++ b/posthog/api/test/batch_exports/test_log_entry.py @@ -0,0 +1,244 @@ +import datetime as dt +import uuid + +import pytest +from freezegun import freeze_time + +from posthog.api.test.batch_exports.conftest import start_test_worker +from posthog.api.test.batch_exports.operations import ( + create_batch_export_ok, + get_batch_export_log_entries, + get_batch_export_run_log_entries, +) +from posthog.api.test.test_organization import create_organization +from posthog.api.test.test_team import create_team +from posthog.api.test.test_user import create_user +from posthog.batch_exports.models import ( + BatchExportLogEntryLevel, + fetch_batch_export_log_entries, +) +from posthog.client import sync_execute +from posthog.temporal.client import sync_connect + + +def create_batch_export_log_entry( + *, + team_id: int, + batch_export_id: str, + run_id: str | None, + message: str, + level: BatchExportLogEntryLevel, +): + from posthog.clickhouse.log_entries import INSERT_LOG_ENTRY_SQL + + sync_execute( + INSERT_LOG_ENTRY_SQL, + { + "team_id": team_id, + "log_source": "batch_exports", + "log_source_id": batch_export_id, + "instance_id": run_id, + "timestamp": dt.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f"), + "level": level, + "message": message, + }, + ) + + +@pytest.fixture +def organization(): + organization = create_organization("Test Org") + + yield organization + + organization.delete() + + +@pytest.fixture +def team(organization): + team = create_team(organization) + + yield team + + team.delete() + + +@pytest.fixture +def batch_export(client, organization, team): + user = create_user("test@user.com", "Test User", organization) + client.force_login(user) + + temporal = sync_connect() + + destination_data = { + "type": "S3", + "config": { + "bucket_name": "my-production-s3-bucket", + "region": "us-east-1", + "prefix": "posthog-events/", + "aws_access_key_id": "abc123", + "aws_secret_access_key": "secret", + }, + } + + batch_export_data = { + "name": "my-production-s3-bucket-destination", + "destination": destination_data, + "interval": "hour", + "start_at": "2023-07-19 00:00:00", + "end_at": "2023-07-20 00:00:00", + } + with start_test_worker(temporal): + batch_export = create_batch_export_ok( + client, + team.pk, + batch_export_data, + ) + + yield batch_export + + +@pytest.mark.django_db +def test_simple_log_is_fetched(batch_export, team): + """Test the simple case of fetching a batch export log entry.""" + with freeze_time("2023-09-22 01:00:00"): + create_batch_export_log_entry( + team_id=team.pk, + batch_export_id=str(batch_export["id"]), + run_id=None, + message="Test log. Much INFO.", + level=BatchExportLogEntryLevel.INFO, + ) + + results = fetch_batch_export_log_entries( + team_id=team.pk, + batch_export_id=batch_export["id"], + after=dt.datetime(2023, 9, 22, 0, 59, 59), + before=dt.datetime(2023, 9, 22, 1, 0, 1), + ) + + assert len(results) == 1 + assert results[0].message == "Test log. Much INFO." + assert results[0].level == BatchExportLogEntryLevel.INFO + assert results[0].batch_export_id == str(batch_export["id"]) + + +@pytest.mark.django_db +def test_log_level_filter(batch_export, team): + """Test fetching a batch export log entries of a particular level.""" + with freeze_time("2023-09-22 01:00:00"): + for level in ( + BatchExportLogEntryLevel.INFO, + BatchExportLogEntryLevel.WARNING, + BatchExportLogEntryLevel.ERROR, + BatchExportLogEntryLevel.DEBUG, + ): + for message in ("Test log 1", "Test log 2"): + create_batch_export_log_entry( + team_id=team.pk, + batch_export_id=str(batch_export["id"]), + run_id=None, + message=message, + level=level, + ) + + for level in ( + BatchExportLogEntryLevel.INFO, + BatchExportLogEntryLevel.WARNING, + BatchExportLogEntryLevel.ERROR, + BatchExportLogEntryLevel.DEBUG, + ): + results = fetch_batch_export_log_entries( + team_id=team.pk, + batch_export_id=batch_export["id"], + level_filter=[level], + after=dt.datetime(2023, 9, 22, 0, 59, 59), + before=dt.datetime(2023, 9, 22, 1, 0, 1), + ) + + assert len(results) == 2 + assert results[0].message == "Test log 2" + assert results[0].level == level + assert results[0].batch_export_id == str(batch_export["id"]) + assert results[1].message == "Test log 1" + assert results[1].level == level + assert results[1].batch_export_id == str(batch_export["id"]) + + +@pytest.mark.django_db +def test_batch_export_log_api(client, batch_export, team): + """Test fetching batch export log entries using the API.""" + create_batch_export_log_entry( + team_id=team.pk, + batch_export_id=str(batch_export["id"]), + run_id=str(uuid.uuid4()), + message="Test log. Much INFO.", + level=BatchExportLogEntryLevel.INFO, + ) + create_batch_export_log_entry( + team_id=team.pk, + batch_export_id=str(batch_export["id"]), + run_id=str(uuid.uuid4()), + message="Test log. Much ERROR.", + level=BatchExportLogEntryLevel.ERROR, + ) + + response = get_batch_export_log_entries( + client, + team_id=team.pk, + batch_export_id=batch_export["id"], + ) + + json_response = response.json() + results = json_response["results"] + + assert response.status_code == 200 + assert json_response["count"] == 2 + assert len(results) == 2 + # Logs are ordered by timestamp DESC, so ERROR log comes first. + assert results[0]["message"] == "Test log. Much ERROR." + assert results[0]["level"] == BatchExportLogEntryLevel.ERROR + assert results[0]["batch_export_id"] == str(batch_export["id"]) + assert results[1]["message"] == "Test log. Much INFO." + assert results[1]["level"] == BatchExportLogEntryLevel.INFO + assert results[1]["batch_export_id"] == str(batch_export["id"]) + + +@pytest.mark.django_db +def test_batch_export_run_log_api(client, batch_export, team): + """Test fetching batch export run log entries using the API.""" + run_id = str(uuid.uuid4()) + + create_batch_export_log_entry( + team_id=team.pk, + batch_export_id=str(batch_export["id"]), + run_id=run_id, + message="Test log. Much INFO.", + level=BatchExportLogEntryLevel.INFO, + ) + + create_batch_export_log_entry( + team_id=team.pk, + batch_export_id=str(batch_export["id"]), + # Logs from a different run shouldn't be in results. + run_id=str(uuid.uuid4()), + message="Test log. Much INFO.", + level=BatchExportLogEntryLevel.INFO, + ) + + response = get_batch_export_run_log_entries( + client, + team_id=team.pk, + batch_export_id=batch_export["id"], + run_id=run_id, + ) + + json_response = response.json() + results = json_response["results"] + + assert response.status_code == 200 + assert json_response["count"] == 1 + assert len(results) == 1 + assert results[0]["message"] == "Test log. Much INFO." + assert results[0]["level"] == BatchExportLogEntryLevel.INFO + assert results[0]["batch_export_id"] == str(batch_export["id"]) diff --git a/posthog/batch_exports/http.py b/posthog/batch_exports/http.py index 56ad985cfcdec..149c65b0eb569 100644 --- a/posthog/batch_exports/http.py +++ b/posthog/batch_exports/http.py @@ -1,15 +1,22 @@ import datetime as dt from typing import Any +from django.db import transaction from django.utils.timezone import now -from rest_framework import request, response, serializers, viewsets +from rest_framework import mixins, request, response, serializers, viewsets from rest_framework.decorators import action from rest_framework.exceptions import NotAuthenticated, NotFound, ValidationError from rest_framework.pagination import CursorPagination from rest_framework.permissions import IsAuthenticated +from rest_framework_dataclasses.serializers import DataclassSerializer from posthog.api.routing import StructuredViewSetMixin -from posthog.batch_exports.models import BATCH_EXPORT_INTERVALS +from posthog.batch_exports.models import ( + BATCH_EXPORT_INTERVALS, + BatchExportLogEntry, + BatchExportLogEntryLevel, + fetch_batch_export_log_entries, +) from posthog.batch_exports.service import ( BatchExportIdError, BatchExportServiceError, @@ -20,10 +27,11 @@ sync_batch_export, unpause_batch_export, ) -from django.db import transaction - from posthog.models import BatchExport, BatchExportDestination, BatchExportRun, User -from posthog.permissions import ProjectMembershipNecessaryPermissions, TeamMemberAccessPermission +from posthog.permissions import ( + ProjectMembershipNecessaryPermissions, + TeamMemberAccessPermission, +) from posthog.temporal.client import sync_connect from posthog.utils import relative_date_parse @@ -276,3 +284,46 @@ def perform_destroy(self, instance: BatchExport): temporal = sync_connect() delete_schedule(temporal, str(instance.pk)) instance.save() + + +class BatchExportLogEntrySerializer(DataclassSerializer): + class Meta: + dataclass = BatchExportLogEntry + + +class BatchExportLogViewSet(StructuredViewSetMixin, mixins.ListModelMixin, viewsets.GenericViewSet): + permission_classes = [IsAuthenticated, ProjectMembershipNecessaryPermissions, TeamMemberAccessPermission] + serializer_class = BatchExportLogEntrySerializer + + def get_queryset(self): + limit_raw = self.request.GET.get("limit") + limit: int | None + if limit_raw: + try: + limit = int(limit_raw) + except ValueError: + raise ValidationError("Query param limit must be omitted or an integer!") + else: + limit = None + + after_raw: str | None = self.request.GET.get("after") + after: dt.datetime | None = None + if after_raw is not None: + after = dt.datetime.fromisoformat(after_raw.replace("Z", "+00:00")) + + before_raw: str | None = self.request.GET.get("before") + before: dt.datetime | None = None + if before_raw is not None: + before = dt.datetime.fromisoformat(before_raw.replace("Z", "+00:00")) + + level_filter = [BatchExportLogEntryLevel[t] for t in (self.request.GET.getlist("level_filter", []))] + return fetch_batch_export_log_entries( + team_id=self.parents_query_dict["team_id"], + batch_export_id=self.parents_query_dict["batch_export_id"], + run_id=self.parents_query_dict.get("run_id", None), + after=after, + before=before, + search=self.request.GET.get("search"), + limit=limit, + level_filter=level_filter, + ) diff --git a/posthog/batch_exports/models.py b/posthog/batch_exports/models.py index 49eb3a8260fec..3f268e93fce77 100644 --- a/posthog/batch_exports/models.py +++ b/posthog/batch_exports/models.py @@ -1,7 +1,12 @@ +import dataclasses +import datetime as dt +import enum +import typing from datetime import timedelta from django.db import models +from posthog.client import sync_execute from posthog.models.utils import UUIDModel @@ -147,3 +152,66 @@ def interval_time_delta(self) -> timedelta: elif self.interval == "week": return timedelta(weeks=1) raise ValueError("Invalid interval") + + +class BatchExportLogEntryLevel(str, enum.Enum): + DEBUG = "DEBUG" + LOG = "LOG" + INFO = "INFO" + WARNING = "WARNING" + ERROR = "ERROR" + + +@dataclasses.dataclass(frozen=True) +class BatchExportLogEntry: + team_id: int + batch_export_id: str + run_id: str + timestamp: dt.datetime + level: BatchExportLogEntryLevel + message: str + + +def fetch_batch_export_log_entries( + *, + batch_export_id: str, + team_id: int, + run_id: str | None = None, + after: dt.datetime | None = None, + before: dt.datetime | None = None, + search: str | None = None, + limit: int | None = None, + level_filter: list[BatchExportLogEntryLevel] = [], +) -> list[BatchExportLogEntry]: + clickhouse_where_parts: list[str] = [] + clickhouse_kwargs: dict[str, typing.Any] = {} + + clickhouse_where_parts.append("log_source_id = %(log_source_id)s") + clickhouse_kwargs["log_source_id"] = batch_export_id + clickhouse_where_parts.append("team_id = %(team_id)s") + clickhouse_kwargs["team_id"] = team_id + + if run_id is not None: + clickhouse_where_parts.append("instance_id = %(instance_id)s") + clickhouse_kwargs["instance_id"] = run_id + if after is not None: + clickhouse_where_parts.append("timestamp > toDateTime64(%(after)s, 6)") + clickhouse_kwargs["after"] = after.isoformat().replace("+00:00", "") + if before is not None: + clickhouse_where_parts.append("timestamp < toDateTime64(%(before)s, 6)") + clickhouse_kwargs["before"] = before.isoformat().replace("+00:00", "") + if search: + clickhouse_where_parts.append("message ILIKE %(search)s") + clickhouse_kwargs["search"] = f"%{search}%" + if len(level_filter) > 0: + clickhouse_where_parts.append("level in %(levels)s") + clickhouse_kwargs["levels"] = level_filter + + clickhouse_query = f""" + SELECT team_id, log_source_id AS batch_export_id, instance_id AS run_id, timestamp, level, message FROM log_entries + WHERE {' AND '.join(clickhouse_where_parts)} ORDER BY timestamp DESC {f'LIMIT {limit}' if limit else ''} + """ + + return [ + BatchExportLogEntry(*result) for result in typing.cast(list, sync_execute(clickhouse_query, clickhouse_kwargs)) + ]