Skip to content

Commit

Permalink
feat(data-warehouse): log entries on data warehouse backend (#23416)
Browse files Browse the repository at this point in the history
Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
  • Loading branch information
EDsCODE and github-actions[bot] authored Jul 16, 2024
1 parent e92af80 commit 0adb5d5
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 2 deletions.
2 changes: 1 addition & 1 deletion posthog/temporal/common/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ def get_temporal_context() -> dict[str, str | int]:
log_source_id = workflow_id.split("-Backfill")[0]
log_source = "batch_exports_backfill"
elif workflow_type == "external-data-job":
# This works because the WorkflowID is made up like f"{external_data_source_id}-{data_interval_end}"
# This works because the WorkflowID is made up like f"{external_data_schema_id}-{data_interval_end}"
log_source_id = workflow_id.rsplit("-", maxsplit=3)[0]
log_source = "external_data_jobs"
else:
Expand Down
4 changes: 3 additions & 1 deletion posthog/warehouse/api/external_data_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from rest_framework.request import Request
from rest_framework.response import Response
from posthog.hogql.database.database import create_hogql_database
from posthog.api.log_entries import LogEntryMixin

from posthog.warehouse.data_load.service import (
external_data_workflow_exists,
Expand Down Expand Up @@ -166,13 +167,14 @@ class Meta:
fields = ["id", "name", "should_sync", "last_synced_at"]


class ExternalDataSchemaViewset(TeamAndOrgViewSetMixin, viewsets.ModelViewSet):
class ExternalDataSchemaViewset(TeamAndOrgViewSetMixin, LogEntryMixin, viewsets.ModelViewSet):
scope_object = "INTERNAL"
queryset = ExternalDataSchema.objects.all()
serializer_class = ExternalDataSchemaSerializer
filter_backends = [filters.SearchFilter]
search_fields = ["name"]
ordering = "-created_at"
log_source = "external_data_jobs"

def get_serializer_context(self) -> dict[str, Any]:
context = super().get_serializer_context()
Expand Down
157 changes: 157 additions & 0 deletions posthog/warehouse/api/test/test_log_entry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
import datetime as dt
import pytest

from posthog.api.test.test_organization import create_organization
from posthog.api.test.test_team import create_team
from posthog.api.test.test_user import create_user
from posthog.client import sync_execute
from django.test.client import Client as TestClient
from posthog.warehouse.models import (
ExternalDataSchema,
ExternalDataJob,
ExternalDataSource,
DataWarehouseTable,
DataWarehouseCredential,
)
from posthog.utils import encode_get_request_params


def create_external_data_job_log_entry(
*,
team_id: int,
external_data_schema_id: str,
run_id: str | None,
message: str,
level: str,
):
from posthog.clickhouse.log_entries import INSERT_LOG_ENTRY_SQL

sync_execute(
INSERT_LOG_ENTRY_SQL,
{
"team_id": team_id,
"log_source": "external_data_jobs",
"log_source_id": external_data_schema_id,
"instance_id": run_id,
"timestamp": dt.datetime.now(dt.UTC).strftime("%Y-%m-%d %H:%M:%S.%f"),
"level": level,
"message": message,
},
)


@pytest.fixture
def organization():
organization = create_organization("Test Org")

yield organization

organization.delete()


@pytest.fixture
def team(organization):
team = create_team(organization)

yield team

team.delete()


@pytest.fixture
def external_data_resources(client, organization, team):
user = create_user("[email protected]", "Test User", organization)
client.force_login(user)

source = ExternalDataSource.objects.create(
team=team,
source_id="source_id",
connection_id="connection_id",
status=ExternalDataSource.Status.COMPLETED,
source_type=ExternalDataSource.Type.STRIPE,
)
credentials = DataWarehouseCredential.objects.create(access_key="blah", access_secret="blah", team=team)
warehouse_table = DataWarehouseTable.objects.create(
name="table_1",
format="Parquet",
team=team,
external_data_source=source,
external_data_source_id=source.id,
credential=credentials,
url_pattern="https://bucket.s3/data/*",
columns={"id": {"hogql": "StringDatabaseField", "clickhouse": "Nullable(String)", "schema_valid": True}},
)
schema = ExternalDataSchema.objects.create(
team=team,
name="table_1",
source=source,
table=warehouse_table,
should_sync=True,
last_synced_at="2024-01-01",
# No status but should be completed because a data warehouse table already exists
)
job = ExternalDataJob.objects.create(
pipeline=source, schema=schema, workflow_id="fake_workflow_id", team=team, status="Running", rows_synced=100000
)

return {
"source": source,
"schema": schema,
"job": job,
}


def get_external_data_schema_run_log_entries(client: TestClient, team_id: int, external_data_schema_id: str, **extra):
return client.get(
f"/api/projects/{team_id}/external_data_schemas/{external_data_schema_id}/logs",
data=encode_get_request_params(extra),
)


@pytest.mark.django_db
def test_external_data_schema_log_api_with_level_filter(client, external_data_resources, team):
"""Test fetching batch export run log entries using the API."""
run_id = external_data_resources["job"].pk
schema_id = external_data_resources["schema"].pk

create_external_data_job_log_entry(
team_id=team.pk,
external_data_schema_id=schema_id,
run_id=run_id,
message="Test log. Much INFO.",
level="INFO",
)

create_external_data_job_log_entry(
team_id=team.pk,
external_data_schema_id=schema_id,
run_id="fake_workflow_id",
message="Test log. Much INFO.",
level="INFO",
)

create_external_data_job_log_entry(
team_id=team.pk,
external_data_schema_id=schema_id,
run_id=run_id,
message="Test log. Much DEBUG.",
level="DEBUG",
)

response = get_external_data_schema_run_log_entries(
client,
team_id=team.pk,
external_data_schema_id=schema_id,
level="INFO",
instance_id=run_id,
)

json_response = response.json()
results = json_response["results"]

assert response.status_code == 200
assert json_response["count"] == 1
assert len(results) == 1
assert results[0]["message"] == "Test log. Much INFO."
assert results[0]["level"] == "INFO"
assert results[0]["log_source_id"] == str(schema_id)

0 comments on commit 0adb5d5

Please sign in to comment.