Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Postgres (+ Redshift) batch exports now async #18501

Closed
wants to merge 42 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
b63aeda
fix: Checkout master before checking for hogql changes
tomasfarias Nov 7, 2023
0fc1685
fix: But go back to branch after done
tomasfarias Nov 7, 2023
26f71a5
fix: Instead just diff with origin/master
tomasfarias Nov 7, 2023
5be9d81
fix: Install libantlr, what's the worst that can happen?
tomasfarias Nov 7, 2023
15a5026
fix: Install antlr the hogql-way
tomasfarias Nov 7, 2023
421891c
fix: Let's just checkout everything
tomasfarias Nov 7, 2023
2a04396
feat(batch-exports): Add backfill model and service support
tomasfarias Oct 11, 2023
d68a93a
test(batch-exports-backfills): Add Workflow test
tomasfarias Oct 13, 2023
ba05a2f
chore(batch-exports-backfill): Bump migration
tomasfarias Oct 17, 2023
1160cc5
feat(batch-exports): Add RedshiftBatchExportWorkflow
tomasfarias Oct 18, 2023
8aa9e51
feat(batch-exports): Add Redshift to BatchExport destinations
tomasfarias Oct 18, 2023
9770a8f
feat(batch-exports): Support properties_data_type Redshift plugin par…
tomasfarias Oct 18, 2023
916620e
refactor(batch-exports): Insert rows instead of using COPY
tomasfarias Oct 19, 2023
c31b28a
fix: Remove unused migration
tomasfarias Nov 1, 2023
1c4fdc4
chore: Require aiokafka
tomasfarias Nov 2, 2023
291ecb9
feat: Implement new structlog batch exports logger
tomasfarias Nov 2, 2023
91af5c2
refactor: Use new structlog logger everywhere
tomasfarias Nov 2, 2023
edd3de8
test: Add tests, fix things
tomasfarias Nov 2, 2023
784fd18
fix: Remove old tests
tomasfarias Nov 2, 2023
b4a472f
chore: Change typing of return logger
tomasfarias Nov 2, 2023
13f4cab
chore: Bump structlog
tomasfarias Nov 2, 2023
470a277
chore: Extend docstrings
tomasfarias Nov 2, 2023
33e6d85
fix: Don't use async logging as it's unsupported by temporal runtime
tomasfarias Nov 2, 2023
f6f7380
test: Add logger tests
tomasfarias Nov 3, 2023
31ca0ec
fix: Mix pytestmark lists
tomasfarias Nov 7, 2023
17be12b
fix: Remove unused imports
tomasfarias Nov 7, 2023
1574357
fix: Cleanup pytest warnings
tomasfarias Nov 7, 2023
9ea3b82
fix: Create and drop dataset for bigquery tests
tomasfarias Nov 7, 2023
6695832
fix: Typing issue?
tomasfarias Nov 7, 2023
44b3db7
fix: Let's just checkout everything
tomasfarias Nov 7, 2023
5c03610
fix: Use global event loop in tests
tomasfarias Nov 7, 2023
5f8d5ba
fix: Blow-up cache
tomasfarias Nov 7, 2023
2ed879c
fix: Truncate only if table exists
tomasfarias Nov 7, 2023
6c24472
revert-me: Skip all postgres tests
tomasfarias Nov 7, 2023
89db2f3
fix: Connect to kafka in localhost
tomasfarias Nov 8, 2023
288a97c
fix: Lazily connect to Kafka
tomasfarias Nov 8, 2023
de5e10b
refactor: Postgres batch exports now async
tomasfarias Nov 9, 2023
aaa29a2
refactor: And redshift too
tomasfarias Nov 9, 2023
dc1afc9
fix: Let's use from psycopg import sql instead
tomasfarias Nov 9, 2023
ccc0808
fix: Typing issues
tomasfarias Nov 9, 2023
194bce8
test: Update Redshift tests
tomasfarias Nov 9, 2023
1d569ba
fix: Typing issues
tomasfarias Nov 9, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/actions/run-backend-tests/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ runs:
- uses: syphar/restore-virtualenv@v1
id: cache-backend-tests
with:
custom_cache_key_element: v1
custom_cache_key_element: v2

- uses: syphar/restore-pip-download-cache@v1
if: steps.cache-backend-tests.outputs.cache-hit != 'true'
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/ci-backend.yml
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ jobs:
- uses: syphar/restore-virtualenv@v1
id: cache-backend-tests
with:
custom_cache_key_element: v1-
custom_cache_key_element: v2-

- uses: syphar/restore-pip-download-cache@v1
if: steps.cache-backend-tests.outputs.cache-hit != 'true'
Expand Down Expand Up @@ -331,7 +331,7 @@ jobs:
- uses: syphar/restore-virtualenv@v1
id: cache-backend-tests
with:
custom_cache_key_element: v1-
custom_cache_key_element: v2-

- uses: syphar/restore-pip-download-cache@v1
if: steps.cache-backend-tests.outputs.cache-hit != 'true'
Expand Down
72 changes: 71 additions & 1 deletion posthog/temporal/tests/batch_exports/conftest.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import psycopg
import pytest
import pytest_asyncio
from psycopg import sql


@pytest.fixture
Expand Down Expand Up @@ -39,4 +41,72 @@ async def truncate_events(clickhouse_client):
This is useful if during the test setup we insert a lot of events we wish to clean-up.
"""
yield
await clickhouse_client.execute_query("TRUNCATE TABLE `sharded_events`")
await clickhouse_client.execute_query("TRUNCATE TABLE IF EXISTS `sharded_events`")


@pytest_asyncio.fixture
async def setup_postgres_test_db(postgres_config):
"""Fixture to manage a database for Redshift export testing.

Managing a test database involves the following steps:
1. Creating a test database.
2. Initializing a connection to that database.
3. Creating a test schema.
4. Yielding the connection to be used in tests.
5. After tests, drop the test schema and any tables in it.
6. Drop the test database.
"""
connection = await psycopg.AsyncConnection.connect(
user=postgres_config["user"],
password=postgres_config["password"],
host=postgres_config["host"],
port=postgres_config["port"],
)
await connection.set_autocommit(True)

async with connection.cursor() as cursor:
await cursor.execute(
sql.SQL("SELECT 1 FROM pg_database WHERE datname = %s"),
(postgres_config["database"],),
)

if await cursor.fetchone() is None:
await cursor.execute(sql.SQL("CREATE DATABASE {}").format(sql.Identifier(postgres_config["database"])))

await connection.close()

# We need a new connection to connect to the database we just created.
connection = await psycopg.AsyncConnection.connect(
user=postgres_config["user"],
password=postgres_config["password"],
host=postgres_config["host"],
port=postgres_config["port"],
dbname=postgres_config["database"],
)
await connection.set_autocommit(True)

async with connection.cursor() as cursor:
await cursor.execute(
sql.SQL("CREATE SCHEMA IF NOT EXISTS {}").format(sql.Identifier(postgres_config["schema"]))
)

yield

async with connection.cursor() as cursor:
await cursor.execute(sql.SQL("DROP SCHEMA {} CASCADE").format(sql.Identifier(postgres_config["schema"])))

await connection.close()

# We need a new connection to drop the database, as we cannot drop the current database.
connection = await psycopg.AsyncConnection.connect(
user=postgres_config["user"],
password=postgres_config["password"],
host=postgres_config["host"],
port=postgres_config["port"],
)
await connection.set_autocommit(True)

async with connection.cursor() as cursor:
await cursor.execute(sql.SQL("DROP DATABASE {}").format(sql.Identifier(postgres_config["database"])))

await connection.close()
116 changes: 1 addition & 115 deletions posthog/temporal/tests/batch_exports/test_batch_exports.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,25 @@
import csv
import dataclasses
import datetime as dt
import io
import json
import logging
import operator
import random
import string
import uuid
from random import randint
from unittest.mock import patch

import pytest
from freezegun import freeze_time
from temporalio import activity, workflow

from posthog.clickhouse.log_entries import (
KAFKA_LOG_ENTRIES,
)
from posthog.temporal.tests.utils.datetimes import (
to_isoformat,
)
from posthog.temporal.tests.utils.events import generate_test_events_in_clickhouse
from posthog.temporal.workflows.batch_exports import (
BatchExportTemporaryFile,
KafkaLoggingHandler,
get_batch_exports_logger,
get_data_interval,
get_results_iterator,
get_rows_count,
json_dumps_bytes,
)

pytestmark = [pytest.mark.django_db, pytest.mark.asyncio]
pytestmark = [pytest.mark.asyncio, pytest.mark.django_db]


async def test_get_rows_count(clickhouse_client):
Expand Down Expand Up @@ -540,104 +527,3 @@ def test_batch_export_temporary_file_write_records_to_tsv(records):
assert be_file.bytes_since_last_reset == 0
assert be_file.records_total == len(records)
assert be_file.records_since_last_reset == 0


def test_kafka_logging_handler_produces_to_kafka(caplog):
"""Test a mocked call to Kafka produce from the KafkaLoggingHandler."""
logger_name = "test-logger"
logger = logging.getLogger(logger_name)
handler = KafkaLoggingHandler(topic=KAFKA_LOG_ENTRIES)
handler.setLevel(logging.DEBUG)
logger.addHandler(handler)

team_id = random.randint(1, 10000)
batch_export_id = str(uuid.uuid4())
run_id = str(uuid.uuid4())
timestamp = "2023-09-21 00:01:01.000001"

expected_tuples = []
expected_kafka_produce_calls_kwargs = []

with patch("posthog.kafka_client.client._KafkaProducer.produce") as produce:
with caplog.at_level(logging.DEBUG):
with freeze_time(timestamp):
for level in (10, 20, 30, 40, 50):
random_message = "".join(random.choice(string.ascii_letters) for _ in range(30))

logger.log(
level,
random_message,
extra={
"team_id": team_id,
"batch_export_id": batch_export_id,
"workflow_run_id": run_id,
},
)

expected_tuples.append(
(
logger_name,
level,
random_message,
)
)
data = {
"message": random_message,
"team_id": team_id,
"log_source": "batch_exports",
"log_source_id": batch_export_id,
"instance_id": run_id,
"timestamp": timestamp,
"level": logging.getLevelName(level),
}
expected_kafka_produce_calls_kwargs.append({"topic": KAFKA_LOG_ENTRIES, "data": data, "key": None})

assert caplog.record_tuples == expected_tuples

kafka_produce_calls_kwargs = [call.kwargs for call in produce.call_args_list]
assert kafka_produce_calls_kwargs == expected_kafka_produce_calls_kwargs


@dataclasses.dataclass
class TestInputs:
team_id: int
data_interval_end: str | None = None
interval: str = "hour"
batch_export_id: str = ""


@dataclasses.dataclass
class TestInfo:
workflow_id: str
run_id: str
workflow_run_id: str
attempt: int


@pytest.mark.parametrize("context", [activity.__name__, workflow.__name__])
def test_batch_export_logger_adapter(context, caplog):
"""Test BatchExportLoggerAdapter sets the appropiate context variables."""
team_id = random.randint(1, 10000)
inputs = TestInputs(team_id=team_id)
logger = get_batch_exports_logger(inputs=inputs)

batch_export_id = str(uuid.uuid4())
run_id = str(uuid.uuid4())
attempt = random.randint(1, 10)
info = TestInfo(
workflow_id=f"{batch_export_id}-{dt.datetime.utcnow().isoformat()}",
run_id=run_id,
workflow_run_id=run_id,
attempt=attempt,
)

with patch("posthog.kafka_client.client._KafkaProducer.produce"):
with patch(context + ".info", return_value=info):
for level in (10, 20, 30, 40, 50):
logger.log(level, "test")

records = caplog.get_records("call")
assert all(record.team_id == team_id for record in records)
assert all(record.batch_export_id == batch_export_id for record in records)
assert all(record.workflow_run_id == run_id for record in records)
assert all(record.attempt == attempt for record in records)
Loading
Loading