Skip to content

Commit

Permalink
feat: Add BatchExport logs API
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasfarias committed Sep 22, 2023
1 parent 17e3b59 commit 03bb0bd
Show file tree
Hide file tree
Showing 5 changed files with 394 additions and 7 deletions.
20 changes: 18 additions & 2 deletions posthog/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from posthog.batch_exports import http as batch_exports
from posthog.settings import EE_AVAILABLE
from posthog.warehouse.api import saved_query, table, view_link

from ..session_recordings.session_recording_api import SessionRecordingViewSet
from . import (
activity_log,
annotation,
Expand Down Expand Up @@ -40,7 +42,6 @@
)
from .dashboards import dashboard, dashboard_templates
from .data_management import DataManagementViewSet
from ..session_recordings.session_recording_api import SessionRecordingViewSet


@decorators.api_view(["GET", "HEAD", "POST", "PUT", "PATCH", "DELETE"])
Expand Down Expand Up @@ -132,7 +133,22 @@ def api_not_found(request):
batch_exports_router = projects_router.register(
r"batch_exports", batch_exports.BatchExportViewSet, "batch_exports", ["team_id"]
)
batch_exports_router.register(r"runs", batch_exports.BatchExportRunViewSet, "runs", ["team_id", "batch_export_id"])
batch_export_runs_router = batch_exports_router.register(
r"runs", batch_exports.BatchExportRunViewSet, "runs", ["team_id", "batch_export_id"]
)
batch_exports_router.register(
r"logs",
batch_exports.BatchExportLogViewSet,
"batch_export_run_logs",
["team_id", "batch_export_id"],
)

batch_export_runs_router.register(
r"logs",
batch_exports.BatchExportLogViewSet,
"batch_export_logs",
["team_id", "batch_export_id", "run_id"],
)

projects_router.register(r"warehouse_tables", table.TableViewSet, "project_warehouse_tables", ["team_id"])
projects_router.register(
Expand Down
8 changes: 8 additions & 0 deletions posthog/api/test/batch_exports/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,11 @@ def patch_batch_export(client, team_id, batch_export_id, new_batch_export_data):
new_batch_export_data,
content_type="application/json",
)


def get_batch_export_log_entries(client: TestClient, team_id: int, batch_export_id: str):
return client.get(f"/api/projects/{team_id}/batch_exports/{batch_export_id}/logs")


def get_batch_export_run_log_entries(client: TestClient, team_id: int, batch_export_id: str, run_id):
return client.get(f"/api/projects/{team_id}/batch_exports/{batch_export_id}/runs/{run_id}/logs")
244 changes: 244 additions & 0 deletions posthog/api/test/batch_exports/test_log_entry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
import datetime as dt
import uuid

import pytest
from freezegun import freeze_time

from posthog.api.test.batch_exports.conftest import start_test_worker
from posthog.api.test.batch_exports.operations import (
create_batch_export_ok,
get_batch_export_log_entries,
get_batch_export_run_log_entries,
)
from posthog.api.test.test_organization import create_organization
from posthog.api.test.test_team import create_team
from posthog.api.test.test_user import create_user
from posthog.batch_exports.models import (
BatchExportLogEntryLevel,
fetch_batch_export_log_entries,
)
from posthog.client import sync_execute
from posthog.temporal.client import sync_connect


def create_batch_export_log_entry(
*,
team_id: int,
batch_export_id: str,
run_id: str | None,
message: str,
level: BatchExportLogEntryLevel,
):
from posthog.clickhouse.log_entries import INSERT_LOG_ENTRY_SQL

sync_execute(
INSERT_LOG_ENTRY_SQL,
{
"team_id": team_id,
"log_source": "batch_exports",
"log_source_id": batch_export_id,
"instance_id": run_id,
"timestamp": dt.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f"),
"level": level,
"message": message,
},
)


@pytest.fixture
def organization():
organization = create_organization("Test Org")

yield organization

organization.delete()


@pytest.fixture
def team(organization):
team = create_team(organization)

yield team

team.delete()


@pytest.fixture
def batch_export(client, organization, team):
user = create_user("[email protected]", "Test User", organization)
client.force_login(user)

temporal = sync_connect()

destination_data = {
"type": "S3",
"config": {
"bucket_name": "my-production-s3-bucket",
"region": "us-east-1",
"prefix": "posthog-events/",
"aws_access_key_id": "abc123",
"aws_secret_access_key": "secret",
},
}

batch_export_data = {
"name": "my-production-s3-bucket-destination",
"destination": destination_data,
"interval": "hour",
"start_at": "2023-07-19 00:00:00",
"end_at": "2023-07-20 00:00:00",
}
with start_test_worker(temporal):
batch_export = create_batch_export_ok(
client,
team.pk,
batch_export_data,
)

yield batch_export


@pytest.mark.django_db
def test_simple_log_is_fetched(batch_export, team):
"""Test the simple case of fetching a batch export log entry."""
with freeze_time("2023-09-22 01:00:00"):
create_batch_export_log_entry(
team_id=team.pk,
batch_export_id=str(batch_export["id"]),
run_id=None,
message="Test log. Much INFO.",
level=BatchExportLogEntryLevel.INFO,
)

results = fetch_batch_export_log_entries(
team_id=team.pk,
batch_export_id=batch_export["id"],
after=dt.datetime(2023, 9, 22, 0, 59, 59),
before=dt.datetime(2023, 9, 22, 1, 0, 1),
)

assert len(results) == 1
assert results[0].message == "Test log. Much INFO."
assert results[0].level == BatchExportLogEntryLevel.INFO
assert results[0].batch_export_id == str(batch_export["id"])


@pytest.mark.django_db
def test_log_level_filter(batch_export, team):
"""Test fetching a batch export log entries of a particular level."""
with freeze_time("2023-09-22 01:00:00"):
for level in (
BatchExportLogEntryLevel.INFO,
BatchExportLogEntryLevel.WARNING,
BatchExportLogEntryLevel.ERROR,
BatchExportLogEntryLevel.DEBUG,
):
for message in ("Test log 1", "Test log 2"):
create_batch_export_log_entry(
team_id=team.pk,
batch_export_id=str(batch_export["id"]),
run_id=None,
message=message,
level=level,
)

for level in (
BatchExportLogEntryLevel.INFO,
BatchExportLogEntryLevel.WARNING,
BatchExportLogEntryLevel.ERROR,
BatchExportLogEntryLevel.DEBUG,
):
results = fetch_batch_export_log_entries(
team_id=team.pk,
batch_export_id=batch_export["id"],
level_filter=[level],
after=dt.datetime(2023, 9, 22, 0, 59, 59),
before=dt.datetime(2023, 9, 22, 1, 0, 1),
)

assert len(results) == 2
assert results[0].message == "Test log 2"
assert results[0].level == level
assert results[0].batch_export_id == str(batch_export["id"])
assert results[1].message == "Test log 1"
assert results[1].level == level
assert results[1].batch_export_id == str(batch_export["id"])


@pytest.mark.django_db
def test_batch_export_log_api(client, batch_export, team):
"""Test fetching batch export log entries using the API."""
create_batch_export_log_entry(
team_id=team.pk,
batch_export_id=str(batch_export["id"]),
run_id=str(uuid.uuid4()),
message="Test log. Much INFO.",
level=BatchExportLogEntryLevel.INFO,
)
create_batch_export_log_entry(
team_id=team.pk,
batch_export_id=str(batch_export["id"]),
run_id=str(uuid.uuid4()),
message="Test log. Much ERROR.",
level=BatchExportLogEntryLevel.ERROR,
)

response = get_batch_export_log_entries(
client,
team_id=team.pk,
batch_export_id=batch_export["id"],
)

json_response = response.json()
results = json_response["results"]

assert response.status_code == 200
assert json_response["count"] == 2
assert len(results) == 2
# Logs are ordered by timestamp DESC, so ERROR log comes first.
assert results[0]["message"] == "Test log. Much ERROR."
assert results[0]["level"] == BatchExportLogEntryLevel.ERROR
assert results[0]["batch_export_id"] == str(batch_export["id"])
assert results[1]["message"] == "Test log. Much INFO."
assert results[1]["level"] == BatchExportLogEntryLevel.INFO
assert results[1]["batch_export_id"] == str(batch_export["id"])


@pytest.mark.django_db
def test_batch_export_run_log_api(client, batch_export, team):
"""Test fetching batch export run log entries using the API."""
run_id = str(uuid.uuid4())

create_batch_export_log_entry(
team_id=team.pk,
batch_export_id=str(batch_export["id"]),
run_id=run_id,
message="Test log. Much INFO.",
level=BatchExportLogEntryLevel.INFO,
)

create_batch_export_log_entry(
team_id=team.pk,
batch_export_id=str(batch_export["id"]),
# Logs from a different run shouldn't be in results.
run_id=str(uuid.uuid4()),
message="Test log. Much INFO.",
level=BatchExportLogEntryLevel.INFO,
)

response = get_batch_export_run_log_entries(
client,
team_id=team.pk,
batch_export_id=batch_export["id"],
run_id=run_id,
)

json_response = response.json()
results = json_response["results"]

assert response.status_code == 200
assert json_response["count"] == 1
assert len(results) == 1
assert results[0]["message"] == "Test log. Much INFO."
assert results[0]["level"] == BatchExportLogEntryLevel.INFO
assert results[0]["batch_export_id"] == str(batch_export["id"])
61 changes: 56 additions & 5 deletions posthog/batch_exports/http.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,22 @@
import datetime as dt
from typing import Any

from django.db import transaction
from django.utils.timezone import now
from rest_framework import request, response, serializers, viewsets
from rest_framework import mixins, request, response, serializers, viewsets
from rest_framework.decorators import action
from rest_framework.exceptions import NotAuthenticated, NotFound, ValidationError
from rest_framework.pagination import CursorPagination
from rest_framework.permissions import IsAuthenticated
from rest_framework_dataclasses.serializers import DataclassSerializer

from posthog.api.routing import StructuredViewSetMixin
from posthog.batch_exports.models import BATCH_EXPORT_INTERVALS
from posthog.batch_exports.models import (
BATCH_EXPORT_INTERVALS,
BatchExportLogEntry,
BatchExportLogEntryLevel,
fetch_batch_export_log_entries,
)
from posthog.batch_exports.service import (
BatchExportIdError,
BatchExportServiceError,
Expand All @@ -20,10 +27,11 @@
sync_batch_export,
unpause_batch_export,
)
from django.db import transaction

from posthog.models import BatchExport, BatchExportDestination, BatchExportRun, User
from posthog.permissions import ProjectMembershipNecessaryPermissions, TeamMemberAccessPermission
from posthog.permissions import (
ProjectMembershipNecessaryPermissions,
TeamMemberAccessPermission,
)
from posthog.temporal.client import sync_connect
from posthog.utils import relative_date_parse

Expand Down Expand Up @@ -276,3 +284,46 @@ def perform_destroy(self, instance: BatchExport):
temporal = sync_connect()
delete_schedule(temporal, str(instance.pk))
instance.save()


class BatchExportLogEntrySerializer(DataclassSerializer):
class Meta:
dataclass = BatchExportLogEntry


class BatchExportLogViewSet(StructuredViewSetMixin, mixins.ListModelMixin, viewsets.GenericViewSet):
permission_classes = [IsAuthenticated, ProjectMembershipNecessaryPermissions, TeamMemberAccessPermission]
serializer_class = BatchExportLogEntrySerializer

def get_queryset(self):
limit_raw = self.request.GET.get("limit")
limit: int | None
if limit_raw:
try:
limit = int(limit_raw)
except ValueError:
raise ValidationError("Query param limit must be omitted or an integer!")
else:
limit = None

after_raw: str | None = self.request.GET.get("after")
after: dt.datetime | None = None
if after_raw is not None:
after = dt.datetime.fromisoformat(after_raw.replace("Z", "+00:00"))

before_raw: str | None = self.request.GET.get("before")
before: dt.datetime | None = None
if before_raw is not None:
before = dt.datetime.fromisoformat(before_raw.replace("Z", "+00:00"))

level_filter = [BatchExportLogEntryLevel[t] for t in (self.request.GET.getlist("level_filter", []))]
return fetch_batch_export_log_entries(
team_id=self.parents_query_dict["team_id"],
batch_export_id=self.parents_query_dict["batch_export_id"],
run_id=self.parents_query_dict.get("run_id", None),
after=after,
before=before,
search=self.request.GET.get("search"),
limit=limit,
level_filter=level_filter,
)
Loading

0 comments on commit 03bb0bd

Please sign in to comment.