Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement structlog batch exports logger #18334

Closed
wants to merge 23 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
dae05d8
feat(batch-exports): Add backfill model and service support
tomasfarias Oct 11, 2023
b67e94f
test(batch-exports-backfills): Add Workflow test
tomasfarias Oct 13, 2023
e234068
chore(batch-exports-backfill): Bump migration
tomasfarias Oct 17, 2023
aaa40f1
feat(batch-exports): Add RedshiftBatchExportWorkflow
tomasfarias Oct 18, 2023
33500ae
feat(batch-exports): Add Redshift to BatchExport destinations
tomasfarias Oct 18, 2023
574fffe
feat(batch-exports): Support properties_data_type Redshift plugin par…
tomasfarias Oct 18, 2023
6e4ed85
refactor(batch-exports): Insert rows instead of using COPY
tomasfarias Oct 19, 2023
ee51aeb
fix: Remove unused migration
tomasfarias Nov 1, 2023
4627485
chore: Require aiokafka
tomasfarias Nov 2, 2023
5bd1201
feat: Implement new structlog batch exports logger
tomasfarias Nov 2, 2023
c1b1e00
refactor: Use new structlog logger everywhere
tomasfarias Nov 2, 2023
2042e24
test: Add tests, fix things
tomasfarias Nov 2, 2023
2006fc6
fix: Remove old tests
tomasfarias Nov 2, 2023
566d433
chore: Change typing of return logger
tomasfarias Nov 2, 2023
564e8da
chore: Bump structlog
tomasfarias Nov 2, 2023
d260481
chore: Extend docstrings
tomasfarias Nov 2, 2023
27a5f8a
fix: Don't use async logging as it's unsupported by temporal runtime
tomasfarias Nov 2, 2023
2182351
test: Add logger tests
tomasfarias Nov 3, 2023
d81f0b1
fix: Mix pytestmark lists
tomasfarias Nov 7, 2023
7d19cb1
fix: Remove unused imports
tomasfarias Nov 7, 2023
eb0b927
fix: Cleanup pytest warnings
tomasfarias Nov 7, 2023
d495f7c
fix: Create and drop dataset for bigquery tests
tomasfarias Nov 7, 2023
7f371cd
fix: Typing issue?
tomasfarias Nov 7, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 0 additions & 114 deletions posthog/temporal/tests/batch_exports/test_batch_exports.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,18 @@
import csv
import dataclasses
import datetime as dt
import io
import json
import logging
import operator
import random
import string
import uuid
from random import randint
from unittest.mock import patch

import pytest
from freezegun import freeze_time
from temporalio import activity, workflow

from posthog.clickhouse.log_entries import (
KAFKA_LOG_ENTRIES,
)
from posthog.temporal.tests.utils.datetimes import (
to_isoformat,
)
from posthog.temporal.tests.utils.events import generate_test_events_in_clickhouse
from posthog.temporal.workflows.batch_exports import (
BatchExportTemporaryFile,
KafkaLoggingHandler,
get_batch_exports_logger,
get_data_interval,
get_results_iterator,
get_rows_count,
Expand Down Expand Up @@ -540,104 +527,3 @@ def test_batch_export_temporary_file_write_records_to_tsv(records):
assert be_file.bytes_since_last_reset == 0
assert be_file.records_total == len(records)
assert be_file.records_since_last_reset == 0


def test_kafka_logging_handler_produces_to_kafka(caplog):
"""Test a mocked call to Kafka produce from the KafkaLoggingHandler."""
logger_name = "test-logger"
logger = logging.getLogger(logger_name)
handler = KafkaLoggingHandler(topic=KAFKA_LOG_ENTRIES)
handler.setLevel(logging.DEBUG)
logger.addHandler(handler)

team_id = random.randint(1, 10000)
batch_export_id = str(uuid.uuid4())
run_id = str(uuid.uuid4())
timestamp = "2023-09-21 00:01:01.000001"

expected_tuples = []
expected_kafka_produce_calls_kwargs = []

with patch("posthog.kafka_client.client._KafkaProducer.produce") as produce:
with caplog.at_level(logging.DEBUG):
with freeze_time(timestamp):
for level in (10, 20, 30, 40, 50):
random_message = "".join(random.choice(string.ascii_letters) for _ in range(30))

logger.log(
level,
random_message,
extra={
"team_id": team_id,
"batch_export_id": batch_export_id,
"workflow_run_id": run_id,
},
)

expected_tuples.append(
(
logger_name,
level,
random_message,
)
)
data = {
"message": random_message,
"team_id": team_id,
"log_source": "batch_exports",
"log_source_id": batch_export_id,
"instance_id": run_id,
"timestamp": timestamp,
"level": logging.getLevelName(level),
}
expected_kafka_produce_calls_kwargs.append({"topic": KAFKA_LOG_ENTRIES, "data": data, "key": None})

assert caplog.record_tuples == expected_tuples

kafka_produce_calls_kwargs = [call.kwargs for call in produce.call_args_list]
assert kafka_produce_calls_kwargs == expected_kafka_produce_calls_kwargs


@dataclasses.dataclass
class TestInputs:
team_id: int
data_interval_end: str | None = None
interval: str = "hour"
batch_export_id: str = ""


@dataclasses.dataclass
class TestInfo:
workflow_id: str
run_id: str
workflow_run_id: str
attempt: int


@pytest.mark.parametrize("context", [activity.__name__, workflow.__name__])
def test_batch_export_logger_adapter(context, caplog):
"""Test BatchExportLoggerAdapter sets the appropiate context variables."""
team_id = random.randint(1, 10000)
inputs = TestInputs(team_id=team_id)
logger = get_batch_exports_logger(inputs=inputs)

batch_export_id = str(uuid.uuid4())
run_id = str(uuid.uuid4())
attempt = random.randint(1, 10)
info = TestInfo(
workflow_id=f"{batch_export_id}-{dt.datetime.utcnow().isoformat()}",
run_id=run_id,
workflow_run_id=run_id,
attempt=attempt,
)

with patch("posthog.kafka_client.client._KafkaProducer.produce"):
with patch(context + ".info", return_value=info):
for level in (10, 20, 30, 40, 50):
logger.log(level, "test")

records = caplog.get_records("call")
assert all(record.team_id == team_id for record in records)
assert all(record.batch_export_id == batch_export_id for record in records)
assert all(record.workflow_run_id == run_id for record in records)
assert all(record.attempt == attempt for record in records)
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
insert_into_bigquery_activity,
)

pytestmark = [pytest.mark.asyncio, pytest.mark.django_db]
pytestmark = [pytest.mark.asyncio, pytest.mark.asyncio_event_loop, pytest.mark.django_db]

TEST_TIME = dt.datetime.utcnow()

Expand Down Expand Up @@ -108,9 +108,6 @@ def bigquery_config() -> dict[str, str]:
"private_key_id": credentials["private_key_id"],
"token_uri": credentials["token_uri"],
"client_email": credentials["client_email"],
# Not part of the credentials.
# Hardcoded to test dataset.
"dataset_id": "BatchExports",
}


Expand All @@ -119,10 +116,25 @@ def bigquery_client() -> typing.Generator[bigquery.Client, None, None]:
"""Manage a bigquery.Client for testing."""
client = bigquery.Client()

try:
yield client
finally:
client.close()
yield client

client.close()


@pytest.fixture
def bigquery_dataset(bigquery_config, bigquery_client) -> typing.Generator[bigquery.Dataset, None, None]:
"""Manage a bigquery dataset for testing.

We clean up the dataset after every test. Could be quite time expensive, but guarantees a clean slate.
"""
dataset_id = f"{bigquery_config['project_id']}.BatchExportsTest_{str(uuid4()).replace('-', '')}"

dataset = bigquery.Dataset(dataset_id)
dataset = bigquery_client.create_dataset(dataset)

yield dataset

bigquery_client.delete_dataset(dataset_id, delete_contents=True, not_found_ok=True)


@pytest.mark.skipif(
Expand All @@ -131,7 +143,7 @@ def bigquery_client() -> typing.Generator[bigquery.Client, None, None]:
)
@pytest.mark.parametrize("exclude_events", [None, ["test-exclude"]], indirect=True)
async def test_insert_into_bigquery_activity_inserts_data_into_bigquery_table(
clickhouse_client, activity_environment, bigquery_client, bigquery_config, exclude_events
clickhouse_client, activity_environment, bigquery_client, bigquery_config, exclude_events, bigquery_dataset
):
"""Test that the insert_into_bigquery_activity function inserts data into a BigQuery table.

Expand Down Expand Up @@ -194,6 +206,7 @@ async def test_insert_into_bigquery_activity_inserts_data_into_bigquery_table(
insert_inputs = BigQueryInsertInputs(
team_id=team_id,
table_id=f"test_insert_activity_table_{team_id}",
dataset_id=bigquery_dataset.dataset_id,
data_interval_start=data_interval_start.isoformat(),
data_interval_end=data_interval_end.isoformat(),
exclude_events=exclude_events,
Expand All @@ -208,7 +221,7 @@ async def test_insert_into_bigquery_activity_inserts_data_into_bigquery_table(
assert_events_in_bigquery(
client=bigquery_client,
table_id=f"test_insert_activity_table_{team_id}",
dataset_id=bigquery_config["dataset_id"],
dataset_id=bigquery_dataset.dataset_id,
events=events + events_with_no_properties,
bq_ingested_timestamp=ingested_timestamp,
exclude_events=exclude_events,
Expand All @@ -221,12 +234,15 @@ def table_id(ateam, interval):


@pytest_asyncio.fixture
async def bigquery_batch_export(ateam, table_id, bigquery_config, interval, exclude_events, temporal_client):
async def bigquery_batch_export(
ateam, table_id, bigquery_config, interval, exclude_events, temporal_client, bigquery_dataset
):
destination_data = {
"type": "BigQuery",
"config": {
**bigquery_config,
"table_id": table_id,
"dataset_id": bigquery_dataset.dataset_id,
"exclude_events": exclude_events,
},
}
Expand Down Expand Up @@ -257,7 +273,6 @@ async def bigquery_batch_export(ateam, table_id, bigquery_config, interval, excl
@pytest.mark.parametrize("exclude_events", [None, ["test-exclude"]], indirect=True)
async def test_bigquery_export_workflow(
clickhouse_client,
bigquery_config,
bigquery_client,
bigquery_batch_export,
interval,
Expand Down Expand Up @@ -303,7 +318,7 @@ async def test_bigquery_export_workflow(
inputs = BigQueryBatchExportInputs(
team_id=ateam.pk,
batch_export_id=str(bigquery_batch_export.id),
data_interval_end="2023-04-25 14:30:00.000000",
data_interval_end=data_interval_end.isoformat(),
interval=interval,
**bigquery_batch_export.destination.config,
)
Expand Down Expand Up @@ -340,7 +355,7 @@ async def test_bigquery_export_workflow(
assert_events_in_bigquery(
client=bigquery_client,
table_id=table_id,
dataset_id=bigquery_config["dataset_id"],
dataset_id=bigquery_batch_export.destination.config["dataset_id"],
events=events,
bq_ingested_timestamp=ingested_timestamp,
exclude_events=exclude_events,
Expand Down Expand Up @@ -439,6 +454,7 @@ async def never_finish_activity(_: BigQueryInsertInputs) -> str:
task_queue=settings.TEMPORAL_TASK_QUEUE,
retry_policy=RetryPolicy(maximum_attempts=1),
)

await asyncio.sleep(5)
await handle.cancel()

Expand Down
Loading
Loading