Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: PostgreSQL batch export with spmc abstractions #26616

Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
267 changes: 175 additions & 92 deletions posthog/temporal/batch_exports/postgres_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,25 +28,56 @@
default_fields,
execute_batch_export_insert_activity,
get_data_interval,
iter_model_records,
start_batch_export_run,
)
from posthog.temporal.batch_exports.metrics import (
get_bytes_exported_metric,
get_rows_exported_metric,
from posthog.temporal.batch_exports.heartbeat import (
BatchExportRangeHeartbeatDetails,
DateRange,
should_resume_from_activity_heartbeat,
)
from posthog.temporal.batch_exports.spmc import (
Consumer,
Producer,
RecordBatchQueue,
run_consumer_loop,
wait_for_schema_or_producer,
)
from posthog.temporal.batch_exports.temporary_file import (
BatchExportTemporaryFile,
WriterFormat,
)
from posthog.temporal.batch_exports.temporary_file import CSVBatchExportWriter
from posthog.temporal.batch_exports.utils import (
JsonType,
apeek_first_and_rewind,
cast_record_batch_json_columns,
make_retryable_with_exponential_backoff,
set_status_to_running_task,
)
from posthog.temporal.common.clickhouse import get_client
from posthog.temporal.common.heartbeat import Heartbeater
from posthog.temporal.common.logger import bind_temporal_worker_logger

NON_RETRYABLE_ERROR_TYPES = [
# Raised on errors that are related to database operation.
# For example: unexpected disconnect, database or other object not found.
"OperationalError",
# The schema name provided is invalid (usually because it doesn't exist).
"InvalidSchemaName",
# Missing permissions to, e.g., insert into table.
"InsufficientPrivilege",
# Issue with exported data compared to schema, retrying won't help.
"NotNullViolation",
# A user added a unique constraint on their table, but batch exports (particularly events)
# can cause duplicates.
"UniqueViolation",
# Something changed in the target table's schema that we were not expecting.
"UndefinedColumn",
# A VARCHAR column is too small.
"StringDataRightTruncation",
# Raised by PostgreSQL client. Self explanatory.
"DiskFull",
# Raised by our PostgreSQL client when failing to connect after several attempts.
"PostgreSQLConnectionError",
]

PostgreSQLField = tuple[str, typing.LiteralString]
Fields = collections.abc.Iterable[PostgreSQLField]

Expand Down Expand Up @@ -197,6 +228,8 @@
primary_key_clause = sql.SQL(", PRIMARY KEY ({fields})").format(
fields=sql.SQL(",").join(sql.Identifier(field[0]) for field in primary_key)
)
else:
primary_key_clause = ""

Check failure on line 232 in posthog/temporal/batch_exports/postgres_batch_export.py

View workflow job for this annotation

GitHub Actions / Python code quality checks

Incompatible types in assignment (expression has type "str", variable has type "Composed")

async with self.connection.transaction():
async with self.connection.cursor() as cursor:
Expand Down Expand Up @@ -353,15 +386,15 @@
tsv_file,
schema: str,
table_name: str,
schema_columns: list[str],
fields: Fields,
) -> None:
"""Execute a COPY FROM query with given connection to copy contents of tsv_file.

Arguments:
tsv_file: A file-like object to interpret as TSV to copy its contents.
schema: The schema where the table we are COPYing into exists.
table_name: The name of the table we are COPYing into.
schema_columns: The column names of the table we are COPYing into.
fields: .
"""
tsv_file.seek(0)

Expand All @@ -376,7 +409,7 @@
# TODO: Switch to binary encoding as CSV has a million edge cases.
sql.SQL("COPY {table_name} ({fields}) FROM STDIN WITH (FORMAT CSV, DELIMITER '\t')").format(
table_name=sql.Identifier(table_name),
fields=sql.SQL(",").join(sql.Identifier(column) for column in schema_columns),
fields=sql.SQL(",").join(sql.Identifier(field[0]) for field in fields),
)
) as copy:
while data := tsv_file.read():
Expand Down Expand Up @@ -454,6 +487,63 @@
return pg_schema


@dataclasses.dataclass
class PostgreSQLHeartbeatDetails(BatchExportRangeHeartbeatDetails):
"""The BigQuery batch export details included in every heartbeat."""

pass


class PostgreSQLConsumer(Consumer):
"""Implementation of a SPMC pipeline Consumer for PostgreSQL batch exports."""

def __init__(
self,
heartbeater: Heartbeater,
heartbeat_details: PostgreSQLHeartbeatDetails,
data_interval_start: dt.datetime | str | None,
postgresql_client: PostgreSQLClient,
postgresql_table: str,
postgresql_schema: str,
postgresql_table_fields: Fields,
):
super().__init__(heartbeater, heartbeat_details, data_interval_start)
self.postgresql_client = postgresql_client
self.postgresql_table = postgresql_table
self.postgresql_schema = postgresql_schema
self.postgresql_table_fields = postgresql_table_fields

async def flush(
self,
batch_export_file: BatchExportTemporaryFile,
records_since_last_flush: int,
bytes_since_last_flush: int,
flush_counter: int,
last_date_range: DateRange,
is_last: bool,
error: Exception | None,
):
"""Implement flushing by loading batch export files to BigQuery"""
await self.logger.adebug(
"Copying %s records of size %s bytes to PostgreSQL table '%s'",
records_since_last_flush,
bytes_since_last_flush,
self.postgresql_table,
)

await self.postgresql_client.copy_tsv_to_postgres(
batch_export_file,
self.postgresql_schema,
self.postgresql_table,
self.postgresql_table_fields,
)

self.rows_exported_counter.add(records_since_last_flush)
self.bytes_exported_counter.add(bytes_since_last_flush)

self.heartbeat_details.track_done_range(last_date_range, self.data_interval_start)


@activity.defn
async def insert_into_postgres_activity(inputs: PostgresInsertInputs) -> RecordsCompleted:
"""Activity streams data from ClickHouse to Postgres."""
Expand All @@ -468,36 +558,75 @@
)

async with (
Heartbeater(),
Heartbeater() as heartbeater,
set_status_to_running_task(run_id=inputs.run_id, logger=logger),
get_client(team_id=inputs.team_id) as client,
):
if not await client.is_alive():
raise ConnectionError("Cannot establish connection to ClickHouse")

_, details = await should_resume_from_activity_heartbeat(activity, PostgreSQLHeartbeatDetails)
if details is None:
details = PostgreSQLHeartbeatDetails()

done_ranges: list[DateRange] = details.done_ranges

model: BatchExportModel | BatchExportSchema | None = None
if inputs.batch_export_schema is None and "batch_export_model" in {
field.name for field in dataclasses.fields(inputs)
}:
model = inputs.batch_export_model
if model is not None:
model_name = model.name
extra_query_parameters = model.schema["values"] if model.schema is not None else None
fields = model.schema["fields"] if model.schema is not None else None
else:
model_name = "events"
extra_query_parameters = None
fields = None
else:
model = inputs.batch_export_schema
model_name = "custom"
extra_query_parameters = model["values"] if model is not None else {}
fields = model["fields"] if model is not None else None

record_batch_iterator = iter_model_records(
client=client,
model=model,
data_interval_start = (
dt.datetime.fromisoformat(inputs.data_interval_start) if inputs.data_interval_start else None
)
data_interval_end = dt.datetime.fromisoformat(inputs.data_interval_end)
full_range = (data_interval_start, data_interval_end)

queue = RecordBatchQueue()
producer = Producer(clickhouse_client=client)
producer_task = producer.start(
queue=queue,
model_name=model_name,
is_backfill=inputs.is_backfill,
team_id=inputs.team_id,
interval_start=inputs.data_interval_start,
interval_end=inputs.data_interval_end,
full_range=full_range,
done_ranges=done_ranges,
fields=fields,
destination_default_fields=postgres_default_fields(),
exclude_events=inputs.exclude_events,
include_events=inputs.include_events,
destination_default_fields=postgres_default_fields(),
is_backfill=inputs.is_backfill,
extra_query_parameters=extra_query_parameters,
)
records_completed = 0

record_batch_schema = await wait_for_schema_or_producer(queue, producer_task)
if record_batch_schema is None:
return records_completed

record_batch_schema = pa.schema(
# NOTE: For some reason, some batches set non-nullable fields as non-nullable, whereas other
# record batches have them as nullable.
# Until we figure it out, we set all fields to nullable. There are some fields we know
# are not nullable, but I'm opting for the more flexible option until we out why schemas differ
# between batches.
[field.with_nullable(True) for field in record_batch_schema if field.name != "_inserted_at"]
)
first_record_batch, record_batch_iterator = await apeek_first_and_rewind(record_batch_iterator)
if first_record_batch is None:
return 0

json_columns = ["properties", "set", "set_once", "person_properties"]
if model is None or (isinstance(model, BatchExportModel) and model.name == "events"):
table_fields: Fields = [
("uuid", "VARCHAR(200)"),
Expand All @@ -514,16 +643,7 @@
]

else:
column_names = [column for column in first_record_batch.schema.names if column != "_inserted_at"]
record_schema = first_record_batch.select(column_names).schema
table_fields = get_postgres_fields_from_record_schema(
record_schema, known_json_columns=["properties", "set", "set_once", "person_properties"]
)

schema_columns = [field[0] for field in table_fields]

rows_exported = get_rows_exported_metric()
bytes_exported = get_bytes_exported_metric()
table_fields = get_postgres_fields_from_record_schema(record_batch_schema, known_json_columns=json_columns)

requires_merge = (
isinstance(inputs.batch_export_model, BatchExportModel) and inputs.batch_export_model.name == "persons"
Expand Down Expand Up @@ -552,47 +672,31 @@
primary_key=primary_key,
) as pg_stage_table,
):

async def flush_to_postgres(
local_results_file,
records_since_last_flush,
bytes_since_last_flush,
flush_counter: int,
last_inserted_at,
last: bool,
error: Exception | None,
):
await logger.adebug(
"Copying %s records of size %s bytes",
records_since_last_flush,
bytes_since_last_flush,
)

table = pg_stage_table if requires_merge else pg_table
await pg_client.copy_tsv_to_postgres(
local_results_file,
inputs.schema,
table,
schema_columns,
)
rows_exported.add(records_since_last_flush)
bytes_exported.add(bytes_since_last_flush)

writer = CSVBatchExportWriter(
records_completed = await run_consumer_loop(
queue=queue,
consumer_cls=PostgreSQLConsumer,
producer_task=producer_task,
heartbeater=heartbeater,
heartbeat_details=details,
data_interval_end=data_interval_end,
data_interval_start=data_interval_start,
schema=record_batch_schema,
writer_format=WriterFormat.CSV,
max_bytes=settings.BATCH_EXPORT_POSTGRES_UPLOAD_CHUNK_SIZE_BYTES,
flush_callable=flush_to_postgres,
field_names=schema_columns,
delimiter="\t",
quoting=csv.QUOTE_MINIMAL,
escape_char=None,
non_retryable_error_types=NON_RETRYABLE_ERROR_TYPES,
json_columns=(),
postgresql_client=pg_client,
postgresql_table=pg_stage_table if requires_merge else pg_table,
postgresql_schema=inputs.schema,
postgresql_table_fields=table_fields,
writer_file_kwargs={
"delimiter": "\t",
"field_names": [field[0] for field in table_fields],
"quoting": csv.QUOTE_MINIMAL,
"escape_char": None,
},
)

async with writer.open_temporary_file():
async for record_batch in record_batch_iterator:
record_batch = cast_record_batch_json_columns(record_batch, json_columns=())

await writer.write_record_batch(record_batch)

if requires_merge:
merge_key: Fields = (
("team_id", "INT"),
Expand All @@ -606,7 +710,7 @@
merge_key=merge_key,
)

return writer.records_total
return records_completed


@workflow.defn(name="postgres-export", failure_exception_types=[workflow.NondeterminismError])
Expand Down Expand Up @@ -683,27 +787,6 @@
insert_into_postgres_activity,
insert_inputs,
interval=inputs.interval,
non_retryable_error_types=[
# Raised on errors that are related to database operation.
# For example: unexpected disconnect, database or other object not found.
"OperationalError",
# The schema name provided is invalid (usually because it doesn't exist).
"InvalidSchemaName",
# Missing permissions to, e.g., insert into table.
"InsufficientPrivilege",
# Issue with exported data compared to schema, retrying won't help.
"NotNullViolation",
# A user added a unique constraint on their table, but batch exports (particularly events)
# can cause duplicates.
"UniqueViolation",
# Something changed in the target table's schema that we were not expecting.
"UndefinedColumn",
# A VARCHAR column is too small.
"StringDataRightTruncation",
# Raised by PostgreSQL client. Self explanatory.
"DiskFull",
# Raised by our PostgreSQL client when failing to connect after several attempts.
"PostgreSQLConnectionError",
],
non_retryable_error_types=NON_RETRYABLE_ERROR_TYPES,
finish_inputs=finish_inputs,
)
Loading