Skip to content

Commit

Permalink
Use distributed_events_recent table
Browse files Browse the repository at this point in the history
  • Loading branch information
rossgray committed Jan 16, 2025
1 parent 1e3bc91 commit 1f84853
Show file tree
Hide file tree
Showing 9 changed files with 240 additions and 50 deletions.
32 changes: 31 additions & 1 deletion posthog/batch_exports/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,37 @@
)
"""

CREATE_EVENTS_BATCH_EXPORT_VIEW_RECENT_DISTRIBUTED = f"""
CREATE OR REPLACE VIEW events_batch_export_recent_distributed ON CLUSTER {settings.CLICKHOUSE_CLUSTER} AS (
SELECT DISTINCT ON (team_id, event, cityHash64(distributed_events_recent.distinct_id), cityHash64(distributed_events_recent.uuid))
team_id AS team_id,
timestamp AS timestamp,
event AS event,
distinct_id AS distinct_id,
toString(uuid) AS uuid,
inserted_at AS _inserted_at,
created_at AS created_at,
elements_chain AS elements_chain,
toString(person_id) AS person_id,
nullIf(properties, '') AS properties,
nullIf(person_properties, '') AS person_properties,
nullIf(JSONExtractString(properties, '$set'), '') AS set,
nullIf(JSONExtractString(properties, '$set_once'), '') AS set_once
FROM
distributed_events_recent
PREWHERE
distributed_events_recent.inserted_at >= {{interval_start:DateTime64}}
AND distributed_events_recent.inserted_at < {{interval_end:DateTime64}}
WHERE
team_id = {{team_id:Int64}}
AND (length({{include_events:Array(String)}}) = 0 OR event IN {{include_events:Array(String)}})
AND (length({{exclude_events:Array(String)}}) = 0 OR event NOT IN {{exclude_events:Array(String)}})
ORDER BY
_inserted_at, event
SETTINGS optimize_aggregation_in_order=1
)
"""

CREATE_EVENTS_BATCH_EXPORT_VIEW_BACKFILL = f"""
CREATE OR REPLACE VIEW events_batch_export_backfill ON CLUSTER {settings.CLICKHOUSE_CLUSTER} AS (
SELECT DISTINCT ON (team_id, event, cityHash64(events.distinct_id), cityHash64(events.uuid))
Expand Down Expand Up @@ -319,7 +350,6 @@
)
"""

# TODO: is this the best query to use?
EVENT_COUNT_BY_INTERVAL = """
SELECT
toStartOfInterval(_inserted_at, INTERVAL {interval}) AS interval_start,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from posthog.batch_exports.sql import CREATE_EVENTS_BATCH_EXPORT_VIEW_RECENT_DISTRIBUTED
from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions

operations = map(
run_sql_with_exceptions,
[
CREATE_EVENTS_BATCH_EXPORT_VIEW_RECENT_DISTRIBUTED,
],
)
7 changes: 5 additions & 2 deletions posthog/settings/temporal.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@
)
CLICKHOUSE_OFFLINE_5MIN_CLUSTER_HOST: str | None = os.getenv("CLICKHOUSE_OFFLINE_5MIN_CLUSTER_HOST", None)

# What percentage of teams should use events_recent for batch exports (should be a value from 0 to 1 and we only support increments of 0.1)
# What percentage of teams should use distributed_events_recent for batch exports (should be a value from 0 to 1 and we
# only support increments of 0.1)
# TODO - remove this once migration is complete
BATCH_EXPORT_EVENTS_RECENT_ROLLOUT: float = get_from_env("BATCH_EXPORT_EVENTS_RECENT_ROLLOUT", 0.0, type_cast=float)
BATCH_EXPORT_DISTRIBUTED_EVENTS_RECENT_ROLLOUT: float = get_from_env(
"BATCH_EXPORT_DISTRIBUTED_EVENTS_RECENT_ROLLOUT", 0.0, type_cast=float
)
57 changes: 56 additions & 1 deletion posthog/temporal/batch_exports/batch_exports.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,27 @@
"""
)

SELECT_FROM_EVENTS_VIEW_RECENT_DISTRIBUTED = Template(
"""
SELECT
$fields
FROM
events_batch_export_recent_distributed(
team_id={team_id},
interval_start={interval_start},
interval_end={interval_end},
include_events={include_events}::Array(String),
exclude_events={exclude_events}::Array(String)
) AS events
FORMAT ArrowStream
SETTINGS
-- This is half of configured MAX_MEMORY_USAGE for batch exports.
max_bytes_before_external_sort=50000000000,
max_replica_delay_for_distributed_queries=60,
fallback_to_stale_replicas_for_distributed_queries=0
"""
)

SELECT_FROM_EVENTS_VIEW_BACKFILL = Template(
"""
SELECT
Expand Down Expand Up @@ -348,8 +369,14 @@ async def iter_records_from_model_view(
else:
is_5_min_batch_export = False

# for 5 min batch exports we query the events_recent table, which is known to have zero replication lag, but
# may not be able to handle the load from all batch exports
if is_5_min_batch_export and not is_backfill:
query_template = SELECT_FROM_EVENTS_VIEW_RECENT
# for other batch exports that should use `events_recent` we use the `distributed_events_recent` table
# which is a distributed table that sits in front of the `events_recent` table
elif use_distributed_events_recent_table(is_backfill=is_backfill, team_id=team_id):
query_template = SELECT_FROM_EVENTS_VIEW_RECENT_DISTRIBUTED
elif str(team_id) in settings.UNCONSTRAINED_TIMESTAMP_TEAM_IDS:
query_template = SELECT_FROM_EVENTS_VIEW_UNBOUNDED
elif is_backfill:
Expand Down Expand Up @@ -449,6 +476,7 @@ def __init__(self, task: str):
super().__init__(f"Expected task '{task}' to be done by now")


# TODO - not sure this is being used anymore?
def start_produce_batch_export_record_batches(
client: ClickHouseClient,
model_name: str,
Expand Down Expand Up @@ -510,9 +538,15 @@ def start_produce_batch_export_record_batches(
else:
is_5_min_batch_export = False

# for 5 min batch exports we query the events_recent table, which is known to have zero replication lag, but
# may not be able to handle the load from all batch exports
if is_5_min_batch_export and not is_backfill:
query_template = SELECT_FROM_EVENTS_VIEW_RECENT
if str(team_id) in settings.UNCONSTRAINED_TIMESTAMP_TEAM_IDS:
# for other batch exports that should use `events_recent` we use the `distributed_events_recent` table
# which is a distributed table that sits in front of the `events_recent` table
elif use_distributed_events_recent_table(is_backfill=is_backfill, team_id=team_id):
query_template = SELECT_FROM_EVENTS_VIEW_RECENT_DISTRIBUTED
elif str(team_id) in settings.UNCONSTRAINED_TIMESTAMP_TEAM_IDS:
query_template = SELECT_FROM_EVENTS_VIEW_UNBOUNDED
elif is_backfill:
query_template = SELECT_FROM_EVENTS_VIEW_BACKFILL
Expand Down Expand Up @@ -653,6 +687,21 @@ async def raise_on_produce_task_failure(produce_task: asyncio.Task) -> None:
raise RecordBatchProducerError() from exc


def use_distributed_events_recent_table(is_backfill: bool, team_id: int) -> bool:
if is_backfill:
return False

events_recent_rollout: float = settings.BATCH_EXPORT_DISTRIBUTED_EVENTS_RECENT_ROLLOUT
# sanity check
if events_recent_rollout < 0:
events_recent_rollout = 0
elif events_recent_rollout > 1:
events_recent_rollout = 1

bucket = team_id % 10
return bucket < events_recent_rollout * 10


def iter_records(
client: ClickHouseClient,
team_id: int,
Expand Down Expand Up @@ -723,8 +772,14 @@ def iter_records(
else:
is_5_min_batch_export = False

# for 5 min batch exports we query the events_recent table, which is known to have zero replication lag, but
# may not be able to handle the load from all batch exports
if is_5_min_batch_export and not is_backfill:
query = SELECT_FROM_EVENTS_VIEW_RECENT
# for other batch exports that should use `events_recent` we use the `distributed_events_recent` table
# which is a distributed table that sits in front of the `events_recent` table
elif use_distributed_events_recent_table(is_backfill=is_backfill, team_id=team_id):
query = SELECT_FROM_EVENTS_VIEW_RECENT_DISTRIBUTED
elif str(team_id) in settings.UNCONSTRAINED_TIMESTAMP_TEAM_IDS:
query = SELECT_FROM_EVENTS_VIEW_UNBOUNDED
elif is_backfill:
Expand Down
30 changes: 20 additions & 10 deletions posthog/temporal/batch_exports/spmc.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
SELECT_FROM_EVENTS_VIEW,
SELECT_FROM_EVENTS_VIEW_BACKFILL,
SELECT_FROM_EVENTS_VIEW_RECENT,
SELECT_FROM_EVENTS_VIEW_RECENT_DISTRIBUTED,
SELECT_FROM_EVENTS_VIEW_UNBOUNDED,
SELECT_FROM_PERSONS_VIEW,
SELECT_FROM_PERSONS_VIEW_BACKFILL,
Expand Down Expand Up @@ -507,13 +508,11 @@ def is_5_min_batch_export(full_range: tuple[dt.datetime | None, dt.datetime]) ->
return False


def use_events_recent(full_range: tuple[dt.datetime | None, dt.datetime], is_backfill: bool, team_id: int) -> bool:
def use_distributed_events_recent_table(is_backfill: bool, team_id: int) -> bool:
if is_backfill:
return False
if is_5_min_batch_export(full_range):
return True

events_recent_rollout: float = settings.BATCH_EXPORT_EVENTS_RECENT_ROLLOUT
events_recent_rollout: float = settings.BATCH_EXPORT_DISTRIBUTED_EVENTS_RECENT_ROLLOUT
# sanity check
if events_recent_rollout < 0:
events_recent_rollout = 0
Expand Down Expand Up @@ -585,8 +584,14 @@ def start(
else:
parameters["include_events"] = []

if use_events_recent(full_range=full_range, is_backfill=is_backfill, team_id=team_id):
# for 5 min batch exports we query the events_recent table, which is known to have zero replication lag, but
# may not be able to handle the load from all batch exports
if is_5_min_batch_export(full_range=full_range) and not is_backfill:
query_template = SELECT_FROM_EVENTS_VIEW_RECENT
# for other batch exports that should use `events_recent` we use the `distributed_events_recent` table
# which is a distributed table that sits in front of the `events_recent` table
elif use_distributed_events_recent_table(is_backfill=is_backfill, team_id=team_id):
query_template = SELECT_FROM_EVENTS_VIEW_RECENT_DISTRIBUTED
elif str(team_id) in settings.UNCONSTRAINED_TIMESTAMP_TEAM_IDS:
query_template = SELECT_FROM_EVENTS_VIEW_UNBOUNDED
elif is_backfill:
Expand Down Expand Up @@ -660,14 +665,19 @@ async def produce_batch_export_record_batches_from_range(
this number of records.
"""
clickhouse_url = None
if use_events_recent(full_range=full_range, is_backfill=is_backfill, team_id=team_id):
# 5 min batch exports should query a single node, which is known to have zero replication lag
if is_5_min_batch_export(full_range=full_range):
clickhouse_url = settings.CLICKHOUSE_OFFLINE_5MIN_CLUSTER_HOST

# data can sometimes take a while to settle, so for 5 min batch exports
# we wait several seconds just to be safe
# Data can sometimes take a while to settle, so for 5 min batch exports we wait several seconds just to be safe.
# For all other batch exports we wait for 1 minute since we're querying the events_recent table using a
# distributed table and setting `max_replica_delay_for_distributed_queries` to 1 minute
if is_5_min_batch_export(full_range):
end_at = full_range[1]
await wait_for_delta_past_data_interval_end(end_at)
delta = dt.timedelta(seconds=30)
else:
delta = dt.timedelta(minutes=1)
end_at = full_range[1]
await wait_for_delta_past_data_interval_end(end_at, delta)

async with get_client(team_id=team_id, clickhouse_url=clickhouse_url) as client:
if not await client.is_alive():
Expand Down
22 changes: 22 additions & 0 deletions posthog/temporal/batch_exports/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,28 @@
"""
)

SELECT_FROM_EVENTS_VIEW_RECENT_DISTRIBUTED = Template(
"""
SELECT
$fields
FROM
events_batch_export_recent_distributed(
team_id={team_id},
interval_start={interval_start},
interval_end={interval_end},
include_events={include_events}::Array(String),
exclude_events={exclude_events}::Array(String)
) AS events
FORMAT ArrowStream
SETTINGS
-- This is half of configured MAX_MEMORY_USAGE for batch exports.
max_bytes_before_external_sort=50000000000,
max_replica_delay_for_distributed_queries=60,
fallback_to_stale_replicas_for_distributed_queries=0
"""
)


SELECT_FROM_EVENTS_VIEW_UNBOUNDED = Template(
"""
SELECT
Expand Down
17 changes: 14 additions & 3 deletions posthog/temporal/tests/batch_exports/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ async def create_clickhouse_tables_and_views(clickhouse_client, django_db_setup)
CREATE_EVENTS_BATCH_EXPORT_VIEW,
CREATE_EVENTS_BATCH_EXPORT_VIEW_BACKFILL,
CREATE_EVENTS_BATCH_EXPORT_VIEW_RECENT,
CREATE_EVENTS_BATCH_EXPORT_VIEW_RECENT_DISTRIBUTED,
CREATE_EVENTS_BATCH_EXPORT_VIEW_UNBOUNDED,
CREATE_PERSONS_BATCH_EXPORT_VIEW,
CREATE_PERSONS_BATCH_EXPORT_VIEW_BACKFILL,
Expand All @@ -184,6 +185,7 @@ async def create_clickhouse_tables_and_views(clickhouse_client, django_db_setup)
CREATE_EVENTS_BATCH_EXPORT_VIEW_BACKFILL,
CREATE_EVENTS_BATCH_EXPORT_VIEW_UNBOUNDED,
CREATE_EVENTS_BATCH_EXPORT_VIEW_RECENT,
CREATE_EVENTS_BATCH_EXPORT_VIEW_RECENT_DISTRIBUTED,
CREATE_PERSONS_BATCH_EXPORT_VIEW,
CREATE_PERSONS_BATCH_EXPORT_VIEW_BACKFILL,
)
Expand Down Expand Up @@ -260,6 +262,14 @@ def test_person_properties(request):
return {"utm_medium": "referral", "$initial_os": "Linux"}


@pytest.fixture
def use_distributed_events_recent_table(request):
try:
return request.param
except AttributeError:
return False


@pytest_asyncio.fixture
async def generate_test_data(
ateam,
Expand All @@ -270,12 +280,13 @@ async def generate_test_data(
interval,
test_properties,
test_person_properties,
use_distributed_events_recent_table,
):
"""Generate test data in ClickHouse."""
if interval != "every 5 minutes":
table = "sharded_events"
else:
if interval == "every 5 minutes" or use_distributed_events_recent_table:
table = "events_recent"
else:
table = "sharded_events"

events_to_export_created, _, _ = await generate_test_events_in_clickhouse(
client=clickhouse_client,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2108,3 +2108,83 @@ def __init__(self, *args, **kwargs):
data_interval_end=data_interval_end,
batch_export_model=model,
)


# TODO - this can be removed once we've fully migrated to using distributed events_recent
# for all teams
@pytest.mark.parametrize("use_distributed_events_recent_table", [True, False])
async def test_insert_into_s3_activity_when_using_distributed_events_recent_table(
clickhouse_client,
bucket_name,
minio_client,
activity_environment,
compression,
exclude_events,
file_format,
data_interval_start,
data_interval_end,
generate_test_data,
ateam,
use_distributed_events_recent_table,
):
"""We're migrating to using distributed events_recent for all realtime batch exports (except for 5 minute exports).
This test ensures that the insert_into_s3_activity function works as expected when using the
distributed events_recent table.
It can be removed once we've fully migrated to using distributed events_recent for all teams and the tests always
use this new table.
"""

model = BatchExportModel(name="events", schema=None)

prefix = str(uuid.uuid4())

batch_export_schema: BatchExportSchema | None = None
batch_export_model: BatchExportModel | None = None
if isinstance(model, BatchExportModel):
batch_export_model = model
elif model is not None:

Check failure on line 2147 in posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py

View workflow job for this annotation

GitHub Actions / Python code quality checks

Statement is unreachable
batch_export_schema = model

insert_inputs = S3InsertInputs(
bucket_name=bucket_name,
region="us-east-1",
prefix=prefix,
team_id=ateam.pk,
data_interval_start=data_interval_start.isoformat(),
data_interval_end=data_interval_end.isoformat(),
aws_access_key_id="object_storage_root_user",
aws_secret_access_key="object_storage_root_password",
endpoint_url=settings.OBJECT_STORAGE_ENDPOINT,
compression=compression,
exclude_events=exclude_events,
file_format=file_format,
batch_export_schema=batch_export_schema,
batch_export_model=batch_export_model,
)

with override_settings(
BATCH_EXPORT_S3_UPLOAD_CHUNK_SIZE_BYTES=5 * 1024**2,
BATCH_EXPORT_DISTRIBUTED_EVENTS_RECENT_ROLLOUT=1 if use_distributed_events_recent_table else 0,
): # 5MB, the minimum for Multipart uploads
records_exported = await activity_environment.run(insert_into_s3_activity, insert_inputs)

events_to_export_created, persons_to_export_created = generate_test_data
assert records_exported == len(events_to_export_created) or records_exported == len(persons_to_export_created)

await assert_clickhouse_records_in_s3(
s3_compatible_client=minio_client,
clickhouse_client=clickhouse_client,
bucket_name=bucket_name,
key_prefix=prefix,
team_id=ateam.pk,
data_interval_start=data_interval_start,
data_interval_end=data_interval_end,
batch_export_model=model,
exclude_events=exclude_events,
include_events=None,
compression=compression,
file_format=file_format,
is_backfill=False,
)
Loading

0 comments on commit 1f84853

Please sign in to comment.