diff --git a/posthog/temporal/batch_exports/redshift_batch_export.py b/posthog/temporal/batch_exports/redshift_batch_export.py index 4229c294a12f53..52ce4e9db32ccc 100644 --- a/posthog/temporal/batch_exports/redshift_batch_export.py +++ b/posthog/temporal/batch_exports/redshift_batch_export.py @@ -166,7 +166,7 @@ def get_redshift_fields_from_record_schema( async def insert_records_to_redshift( - records: collections.abc.AsyncGenerator[dict[str, typing.Any]], + records: collections.abc.AsyncGenerator[dict[str, typing.Any], None], redshift_connection: psycopg.AsyncConnection, schema: str | None, table: str, @@ -382,7 +382,7 @@ def map_to_record(row: dict) -> dict: return record - async def record_generator() -> collections.abc.AsyncGenerator[dict[str, typing.Any]]: + async def record_generator() -> collections.abc.AsyncGenerator[dict[str, typing.Any], None]: async for record_batch in record_iterator: for record in record_batch.to_pylist(): yield map_to_record(record) diff --git a/posthog/temporal/common/asyncpa.py b/posthog/temporal/common/asyncpa.py index 9d40c847f9d906..c301538a50eb0f 100644 --- a/posthog/temporal/common/asyncpa.py +++ b/posthog/temporal/common/asyncpa.py @@ -100,7 +100,7 @@ class AsyncRecordBatchReader: def __init__(self, bytes_iter: typing.AsyncIterator[bytes]) -> None: self._reader = AsyncMessageReader(bytes_iter) - self._schema = None + self._schema: None | pa.Schema = None def __aiter__(self) -> "AsyncRecordBatchReader": return self diff --git a/posthog/temporal/tests/batch_exports/conftest.py b/posthog/temporal/tests/batch_exports/conftest.py index e679f729696717..6664c3bd7f5b17 100644 --- a/posthog/temporal/tests/batch_exports/conftest.py +++ b/posthog/temporal/tests/batch_exports/conftest.py @@ -3,8 +3,6 @@ import pytest_asyncio from psycopg import sql -from posthog.batch_exports.service import BatchExportModel - @pytest.fixture def interval(request) -> str: @@ -59,19 +57,6 @@ def batch_export_schema(request) -> dict | None: return None -@pytest.fixture -def batch_export_model(request) -> BatchExportModel | None: - """A parametrizable fixture to configure a batch export schema. - - By decorating a test function with @pytest.mark.parametrize("batch_export_model", ..., indirect=True) - it's possible to set the batch_export_schema that will be used to create a BatchExport. - """ - try: - return request.param - except AttributeError: - return BatchExportModel(name="events") - - @pytest_asyncio.fixture async def setup_postgres_test_db(postgres_config): """Fixture to manage a database for Redshift and Postgres export testing.