diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml index 35b8a5591c876..08e50a4367b73 100644 --- a/docker-compose.dev.yml +++ b/docker-compose.dev.yml @@ -55,6 +55,7 @@ services: extends: file: docker-compose.base.yml service: clickhouse + hostname: clickhouse ports: - '8123:8123' - '8443:8443' diff --git a/posthog/clickhouse/cancel.py b/posthog/clickhouse/cancel.py index e05eea7ad3d64..475ae160ccb81 100644 --- a/posthog/clickhouse/cancel.py +++ b/posthog/clickhouse/cancel.py @@ -2,13 +2,42 @@ from posthog.api.services.query import logger from posthog.clickhouse.client import sync_execute +from posthog.clickhouse.client.connection import default_client from posthog.settings import CLICKHOUSE_CLUSTER def cancel_query_on_cluster(team_id: int, client_query_id: str) -> None: - result = sync_execute( - f"KILL QUERY ON CLUSTER '{CLICKHOUSE_CLUSTER}' WHERE query_id LIKE %(client_query_id)s", - {"client_query_id": f"{team_id}_{client_query_id}%"}, - ) - logger.info("Cancelled query %s for team %s, result: %s", client_query_id, team_id, result) + initiator_host = None + + try: + result = sync_execute( + f""" + SELECT hostname() + FROM clusterAllReplicas(posthog, system.processes) + WHERE query_id LIKE %(client_query_id)s + SETTINGS max_execution_time = 5 + """, + {"client_query_id": f"{team_id}_{client_query_id}%"}, + ) + initiator_host = result[0][0] if result else None + except Exception as e: + logger.info("Failed to find initiator host for query %s: %s", client_query_id, e) + + if initiator_host: + logger.debug("Found initiator host for query %s, cancelling query on host", initiator_host, client_query_id) + with default_client(host=initiator_host) as client: + result = sync_execute( + f"KILL QUERY WHERE query_id LIKE %(client_query_id)s", + {"client_query_id": f"{team_id}_{client_query_id}%"}, + sync_client=client, + ) + logger.info("Cancelled query %s for team %s, result: %s", client_query_id, team_id, result) + else: + logger.debug("No initiator host found for query %s, cancelling query on cluster", client_query_id) + result = sync_execute( + f"KILL QUERY ON CLUSTER '{CLICKHOUSE_CLUSTER}' WHERE query_id LIKE %(client_query_id)s", + {"client_query_id": f"{team_id}_{client_query_id}%"}, + ) + logger.info("Cancelled query %s for team %s, result: %s", client_query_id, team_id, result) + statsd.incr("clickhouse.query.cancellation_requested", tags={"team_id": team_id}) diff --git a/posthog/clickhouse/client/connection.py b/posthog/clickhouse/client/connection.py index 35c72a305faea..26348eeddffb6 100644 --- a/posthog/clickhouse/client/connection.py +++ b/posthog/clickhouse/client/connection.py @@ -43,13 +43,13 @@ def get_pool(workload: Workload, team_id=None, readonly=False): return make_ch_pool() -def default_client(): +def default_client(host=settings.CLICKHOUSE_HOST): """ Return a bare bones client for use in places where we are only interested in general ClickHouse state DO NOT USE THIS FOR QUERYING DATA """ return SyncClient( - host=settings.CLICKHOUSE_HOST, + host=host, # We set "system" here as we don't necessarily have a "default" database, # which is what the clickhouse_driver would use by default. We are # assuming that this exists and we have permissions to access it. This diff --git a/posthog/clickhouse/client/execute.py b/posthog/clickhouse/client/execute.py index 598a5c8b771aa..7eb9ad3024817 100644 --- a/posthog/clickhouse/client/execute.py +++ b/posthog/clickhouse/client/execute.py @@ -98,6 +98,7 @@ def sync_execute( workload: Workload = Workload.DEFAULT, team_id: Optional[int] = None, readonly=False, + sync_client: Optional[SyncClient] = None, ): if TEST and flush: try: @@ -120,7 +121,7 @@ def sync_execute( if get_query_tag_value("id") == "posthog.tasks.tasks.process_query_task": workload = Workload.ONLINE - with get_pool(workload, team_id, readonly).get_client() as client: + with sync_client or get_pool(workload, team_id, readonly).get_client() as client: start_time = perf_counter() prepared_sql, prepared_args, tags = _prepare_query(client=client, query=query, args=args, workload=workload)