Skip to content

Commit

Permalink
refactor: Structlog batch exports logging (#18458)
Browse files Browse the repository at this point in the history
* fix: Checkout master before checking for hogql changes

* fix: But go back to branch after done

* fix: Instead just diff with origin/master

* fix: Install libantlr, what's the worst that can happen?

* fix: Install antlr the hogql-way

* fix: Let's just checkout everything

* feat(batch-exports): Add backfill model and service support

* test(batch-exports-backfills): Add Workflow test

* chore(batch-exports-backfill): Bump migration

* feat(batch-exports): Add RedshiftBatchExportWorkflow

* feat(batch-exports): Add Redshift to BatchExport destinations

* feat(batch-exports): Support properties_data_type Redshift plugin parameter

* refactor(batch-exports): Insert rows instead of using COPY

* fix: Remove unused migration

* chore: Require aiokafka

* feat: Implement new structlog batch exports logger

* refactor: Use new structlog logger everywhere

* test: Add tests, fix things

* fix: Remove old tests

* chore: Change typing of return logger

* chore: Bump structlog

* chore: Extend docstrings

* fix: Don't use async logging as it's unsupported by temporal runtime

* test: Add logger tests

* fix: Mix pytestmark lists

* fix: Remove unused imports

* fix: Cleanup  pytest warnings

* fix: Create and drop dataset for bigquery tests

* fix: Typing issue?

* fix: Let's just checkout everything

* fix: Use global event loop in tests

* fix: Blow-up cache

* fix: Truncate only if table exists

* revert-me: Skip all postgres tests

* fix: Connect to kafka in localhost

* fix: Lazily connect to Kafka

* fix: Resolve conflicts

* fix: Capture temporal context once and bind it to the logger

* fix: Make configure logger sync

* fix: Keep strong reference to background tasks

* fix: Continue consuming from log queue even if we fail to produce

* fix: Also catch the producer not starting

* fix: Remove unused await

* fix: Log kafka producer error after logger is configured
  • Loading branch information
tomasfarias authored Nov 13, 2023
1 parent fded6fd commit a8f6d92
Show file tree
Hide file tree
Showing 18 changed files with 849 additions and 364 deletions.
2 changes: 1 addition & 1 deletion .github/actions/run-backend-tests/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ runs:
- uses: syphar/restore-virtualenv@v1
id: cache-backend-tests
with:
custom_cache_key_element: v1
custom_cache_key_element: v2

- uses: syphar/restore-pip-download-cache@v1
if: steps.cache-backend-tests.outputs.cache-hit != 'true'
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/ci-backend.yml
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ jobs:
- uses: syphar/restore-virtualenv@v1
id: cache-backend-tests
with:
custom_cache_key_element: v1-
custom_cache_key_element: v2-

- uses: syphar/restore-pip-download-cache@v1
if: steps.cache-backend-tests.outputs.cache-hit != 'true'
Expand Down Expand Up @@ -331,7 +331,7 @@ jobs:
- uses: syphar/restore-virtualenv@v1
id: cache-backend-tests
with:
custom_cache_key_element: v1-
custom_cache_key_element: v2-

- uses: syphar/restore-pip-download-cache@v1
if: steps.cache-backend-tests.outputs.cache-hit != 'true'
Expand Down
2 changes: 1 addition & 1 deletion posthog/temporal/tests/batch_exports/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,4 @@ async def truncate_events(clickhouse_client):
This is useful if during the test setup we insert a lot of events we wish to clean-up.
"""
yield
await clickhouse_client.execute_query("TRUNCATE TABLE `sharded_events`")
await clickhouse_client.execute_query("TRUNCATE TABLE IF EXISTS `sharded_events`")
116 changes: 1 addition & 115 deletions posthog/temporal/tests/batch_exports/test_batch_exports.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,25 @@
import csv
import dataclasses
import datetime as dt
import io
import json
import logging
import operator
import random
import string
import uuid
from random import randint
from unittest.mock import patch

import pytest
from freezegun import freeze_time
from temporalio import activity, workflow

from posthog.clickhouse.log_entries import (
KAFKA_LOG_ENTRIES,
)
from posthog.temporal.tests.utils.datetimes import (
to_isoformat,
)
from posthog.temporal.tests.utils.events import generate_test_events_in_clickhouse
from posthog.temporal.workflows.batch_exports import (
BatchExportTemporaryFile,
KafkaLoggingHandler,
get_batch_exports_logger,
get_data_interval,
get_results_iterator,
get_rows_count,
json_dumps_bytes,
)

pytestmark = [pytest.mark.django_db, pytest.mark.asyncio]
pytestmark = [pytest.mark.asyncio, pytest.mark.django_db]


async def test_get_rows_count(clickhouse_client):
Expand Down Expand Up @@ -540,104 +527,3 @@ def test_batch_export_temporary_file_write_records_to_tsv(records):
assert be_file.bytes_since_last_reset == 0
assert be_file.records_total == len(records)
assert be_file.records_since_last_reset == 0


def test_kafka_logging_handler_produces_to_kafka(caplog):
"""Test a mocked call to Kafka produce from the KafkaLoggingHandler."""
logger_name = "test-logger"
logger = logging.getLogger(logger_name)
handler = KafkaLoggingHandler(topic=KAFKA_LOG_ENTRIES)
handler.setLevel(logging.DEBUG)
logger.addHandler(handler)

team_id = random.randint(1, 10000)
batch_export_id = str(uuid.uuid4())
run_id = str(uuid.uuid4())
timestamp = "2023-09-21 00:01:01.000001"

expected_tuples = []
expected_kafka_produce_calls_kwargs = []

with patch("posthog.kafka_client.client._KafkaProducer.produce") as produce:
with caplog.at_level(logging.DEBUG):
with freeze_time(timestamp):
for level in (10, 20, 30, 40, 50):
random_message = "".join(random.choice(string.ascii_letters) for _ in range(30))

logger.log(
level,
random_message,
extra={
"team_id": team_id,
"batch_export_id": batch_export_id,
"workflow_run_id": run_id,
},
)

expected_tuples.append(
(
logger_name,
level,
random_message,
)
)
data = {
"message": random_message,
"team_id": team_id,
"log_source": "batch_exports",
"log_source_id": batch_export_id,
"instance_id": run_id,
"timestamp": timestamp,
"level": logging.getLevelName(level),
}
expected_kafka_produce_calls_kwargs.append({"topic": KAFKA_LOG_ENTRIES, "data": data, "key": None})

assert caplog.record_tuples == expected_tuples

kafka_produce_calls_kwargs = [call.kwargs for call in produce.call_args_list]
assert kafka_produce_calls_kwargs == expected_kafka_produce_calls_kwargs


@dataclasses.dataclass
class TestInputs:
team_id: int
data_interval_end: str | None = None
interval: str = "hour"
batch_export_id: str = ""


@dataclasses.dataclass
class TestInfo:
workflow_id: str
run_id: str
workflow_run_id: str
attempt: int


@pytest.mark.parametrize("context", [activity.__name__, workflow.__name__])
def test_batch_export_logger_adapter(context, caplog):
"""Test BatchExportLoggerAdapter sets the appropiate context variables."""
team_id = random.randint(1, 10000)
inputs = TestInputs(team_id=team_id)
logger = get_batch_exports_logger(inputs=inputs)

batch_export_id = str(uuid.uuid4())
run_id = str(uuid.uuid4())
attempt = random.randint(1, 10)
info = TestInfo(
workflow_id=f"{batch_export_id}-{dt.datetime.utcnow().isoformat()}",
run_id=run_id,
workflow_run_id=run_id,
attempt=attempt,
)

with patch("posthog.kafka_client.client._KafkaProducer.produce"):
with patch(context + ".info", return_value=info):
for level in (10, 20, 30, 40, 50):
logger.log(level, "test")

records = caplog.get_records("call")
assert all(record.team_id == team_id for record in records)
assert all(record.batch_export_id == batch_export_id for record in records)
assert all(record.workflow_run_id == run_id for record in records)
assert all(record.attempt == attempt for record in records)
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,13 @@
insert_into_bigquery_activity,
)

pytestmark = [pytest.mark.asyncio, pytest.mark.django_db]
SKIP_IF_MISSING_GOOGLE_APPLICATION_CREDENTIALS = pytest.mark.skipif(
"GOOGLE_APPLICATION_CREDENTIALS" not in os.environ,
reason="Google credentials not set in environment",
)

pytestmark = [SKIP_IF_MISSING_GOOGLE_APPLICATION_CREDENTIALS, pytest.mark.asyncio, pytest.mark.django_db]


TEST_TIME = dt.datetime.utcnow()

Expand Down Expand Up @@ -108,9 +114,6 @@ def bigquery_config() -> dict[str, str]:
"private_key_id": credentials["private_key_id"],
"token_uri": credentials["token_uri"],
"client_email": credentials["client_email"],
# Not part of the credentials.
# Hardcoded to test dataset.
"dataset_id": "BatchExports",
}


Expand All @@ -119,19 +122,30 @@ def bigquery_client() -> typing.Generator[bigquery.Client, None, None]:
"""Manage a bigquery.Client for testing."""
client = bigquery.Client()

try:
yield client
finally:
client.close()
yield client

client.close()


@pytest.fixture
def bigquery_dataset(bigquery_config, bigquery_client) -> typing.Generator[bigquery.Dataset, None, None]:
"""Manage a bigquery dataset for testing.
We clean up the dataset after every test. Could be quite time expensive, but guarantees a clean slate.
"""
dataset_id = f"{bigquery_config['project_id']}.BatchExportsTest_{str(uuid4()).replace('-', '')}"

dataset = bigquery.Dataset(dataset_id)
dataset = bigquery_client.create_dataset(dataset)

yield dataset

bigquery_client.delete_dataset(dataset_id, delete_contents=True, not_found_ok=True)


@pytest.mark.skipif(
"GOOGLE_APPLICATION_CREDENTIALS" not in os.environ,
reason="Google credentials not set in environment",
)
@pytest.mark.parametrize("exclude_events", [None, ["test-exclude"]], indirect=True)
async def test_insert_into_bigquery_activity_inserts_data_into_bigquery_table(
clickhouse_client, activity_environment, bigquery_client, bigquery_config, exclude_events
clickhouse_client, activity_environment, bigquery_client, bigquery_config, exclude_events, bigquery_dataset
):
"""Test that the insert_into_bigquery_activity function inserts data into a BigQuery table.
Expand Down Expand Up @@ -194,6 +208,7 @@ async def test_insert_into_bigquery_activity_inserts_data_into_bigquery_table(
insert_inputs = BigQueryInsertInputs(
team_id=team_id,
table_id=f"test_insert_activity_table_{team_id}",
dataset_id=bigquery_dataset.dataset_id,
data_interval_start=data_interval_start.isoformat(),
data_interval_end=data_interval_end.isoformat(),
exclude_events=exclude_events,
Expand All @@ -208,7 +223,7 @@ async def test_insert_into_bigquery_activity_inserts_data_into_bigquery_table(
assert_events_in_bigquery(
client=bigquery_client,
table_id=f"test_insert_activity_table_{team_id}",
dataset_id=bigquery_config["dataset_id"],
dataset_id=bigquery_dataset.dataset_id,
events=events + events_with_no_properties,
bq_ingested_timestamp=ingested_timestamp,
exclude_events=exclude_events,
Expand All @@ -221,12 +236,15 @@ def table_id(ateam, interval):


@pytest_asyncio.fixture
async def bigquery_batch_export(ateam, table_id, bigquery_config, interval, exclude_events, temporal_client):
async def bigquery_batch_export(
ateam, table_id, bigquery_config, interval, exclude_events, temporal_client, bigquery_dataset
):
destination_data = {
"type": "BigQuery",
"config": {
**bigquery_config,
"table_id": table_id,
"dataset_id": bigquery_dataset.dataset_id,
"exclude_events": exclude_events,
},
}
Expand All @@ -249,15 +267,10 @@ async def bigquery_batch_export(ateam, table_id, bigquery_config, interval, excl
await adelete_batch_export(batch_export, temporal_client)


@pytest.mark.skipif(
"GOOGLE_APPLICATION_CREDENTIALS" not in os.environ,
reason="Google credentials not set in environment",
)
@pytest.mark.parametrize("interval", ["hour", "day"])
@pytest.mark.parametrize("exclude_events", [None, ["test-exclude"]], indirect=True)
async def test_bigquery_export_workflow(
clickhouse_client,
bigquery_config,
bigquery_client,
bigquery_batch_export,
interval,
Expand Down Expand Up @@ -303,7 +316,7 @@ async def test_bigquery_export_workflow(
inputs = BigQueryBatchExportInputs(
team_id=ateam.pk,
batch_export_id=str(bigquery_batch_export.id),
data_interval_end="2023-04-25 14:30:00.000000",
data_interval_end=data_interval_end.isoformat(),
interval=interval,
**bigquery_batch_export.destination.config,
)
Expand Down Expand Up @@ -340,17 +353,13 @@ async def test_bigquery_export_workflow(
assert_events_in_bigquery(
client=bigquery_client,
table_id=table_id,
dataset_id=bigquery_config["dataset_id"],
dataset_id=bigquery_batch_export.destination.config["dataset_id"],
events=events,
bq_ingested_timestamp=ingested_timestamp,
exclude_events=exclude_events,
)


@pytest.mark.skipif(
"GOOGLE_APPLICATION_CREDENTIALS" not in os.environ,
reason="Google credentials not set in environment",
)
async def test_bigquery_export_workflow_handles_insert_activity_errors(ateam, bigquery_batch_export, interval):
"""Test that BigQuery Export Workflow can gracefully handle errors when inserting BigQuery data."""
data_interval_end = dt.datetime.fromisoformat("2023-04-25T14:30:00.000000+00:00")
Expand Down Expand Up @@ -397,10 +406,6 @@ async def insert_into_bigquery_activity_mocked(_: BigQueryInsertInputs) -> str:
assert run.latest_error == "ValueError: A useful error message"


@pytest.mark.skipif(
"GOOGLE_APPLICATION_CREDENTIALS" not in os.environ,
reason="Google credentials not set in environment",
)
async def test_bigquery_export_workflow_handles_cancellation(ateam, bigquery_batch_export, interval):
"""Test that BigQuery Export Workflow can gracefully handle cancellations when inserting BigQuery data."""
data_interval_end = dt.datetime.fromisoformat("2023-04-25T14:30:00.000000+00:00")
Expand Down Expand Up @@ -439,6 +444,7 @@ async def never_finish_activity(_: BigQueryInsertInputs) -> str:
task_queue=settings.TEMPORAL_TASK_QUEUE,
retry_policy=RetryPolicy(maximum_attempts=1),
)

await asyncio.sleep(5)
await handle.cancel()

Expand Down
Loading

0 comments on commit a8f6d92

Please sign in to comment.