diff --git a/bin/celery-queues.env b/bin/celery-queues.env index 97920a2fb9262..c4f63d5aeaab7 100644 --- a/bin/celery-queues.env +++ b/bin/celery-queues.env @@ -1,3 +1,3 @@ # Default set of queues to be used by Celery. # Important: Add new queues to make Celery consume tasks from them. -CELERY_WORKER_QUEUES=celery,email,insight_export,insight_refresh,gevent \ No newline at end of file +CELERY_WORKER_QUEUES=celery,email,insight_export,insight_refresh,analytics_queries \ No newline at end of file diff --git a/docker-compose.base.yml b/docker-compose.base.yml index 72ed8dedde605..22cd812fe2790 100644 --- a/docker-compose.base.yml +++ b/docker-compose.base.yml @@ -60,6 +60,13 @@ services: image: maildev/maildev:2.0.5 restart: on-failure + flower: + image: mher/flower:2.0.0 + restart: on-failure + environment: + FLOWER_PORT: 5555 + CELERY_BROKER_URL: redis://redis:6379 + worker: &worker command: ./bin/docker-worker-celery --with-scheduler restart: on-failure diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml index 365ec2c5b452d..b4601915c95c3 100644 --- a/docker-compose.dev.yml +++ b/docker-compose.dev.yml @@ -27,6 +27,13 @@ services: ports: - '6379:6379' + flower: + extends: + file: docker-compose.base.yml + service: flower + ports: + - '5555:5555' + clickhouse: extends: file: docker-compose.base.yml diff --git a/ee/settings.py b/ee/settings.py index 5fbb4c81fe8ce..766f1533822c1 100644 --- a/ee/settings.py +++ b/ee/settings.py @@ -68,4 +68,7 @@ # Whether to enable the admin portal. Default false for self-hosted as if not setup properly can pose security issues. ADMIN_PORTAL_ENABLED = get_from_env("ADMIN_PORTAL_ENABLED", DEMO or DEBUG, type_cast=str_to_bool) -ASSET_GENERATION_MAX_TIMEOUT_MINUTES = get_from_env("ASSET_GENERATION_MAX_TIMEOUT_MINUTES", 10, type_cast=int) +ASSET_GENERATION_MAX_TIMEOUT_SECONDS = get_from_env("ASSET_GENERATION_MAX_TIMEOUT_SECONDS", 60.0, type_cast=float) +PARALLEL_ASSET_GENERATION_MAX_TIMEOUT_MINUTES = get_from_env( + "PARALLEL_ASSET_GENERATION_MAX_TIMEOUT_MINUTES", 10.0, type_cast=float +) diff --git a/ee/tasks/subscriptions/__init__.py b/ee/tasks/subscriptions/__init__.py index a5adc7a640763..235ea70c16d7e 100644 --- a/ee/tasks/subscriptions/__init__.py +++ b/ee/tasks/subscriptions/__init__.py @@ -1,6 +1,7 @@ from datetime import datetime, timedelta from typing import Optional +from django.conf import settings import structlog from prometheus_client import Counter from sentry_sdk import capture_exception, capture_message @@ -144,12 +145,17 @@ def schedule_all_subscriptions() -> None: deliver_subscription_report.delay(subscription.id) -@app.task() +report_timeout_seconds = settings.PARALLEL_ASSET_GENERATION_MAX_TIMEOUT_MINUTES * 60 * 1.5 + + +@app.task(soft_time_limit=report_timeout_seconds, timeout=report_timeout_seconds + 10) def deliver_subscription_report(subscription_id: int) -> None: + if not settings.TEST: + return return _deliver_subscription_report(subscription_id) -@app.task() +@app.task(soft_time_limit=30, time_limit=40) def handle_subscription_value_change( subscription_id: int, previous_value: str, invite_message: Optional[str] = None ) -> None: diff --git a/ee/tasks/subscriptions/subscription_utils.py b/ee/tasks/subscriptions/subscription_utils.py index 5df00e4a8ee85..3350a5cc561b7 100644 --- a/ee/tasks/subscriptions/subscription_utils.py +++ b/ee/tasks/subscriptions/subscription_utils.py @@ -60,7 +60,7 @@ def generate_assets( wait_for_parallel_celery_group( parallel_job, - max_timeout=timedelta(minutes=settings.ASSET_GENERATION_MAX_TIMEOUT_MINUTES), + max_timeout=timedelta(minutes=settings.PARALLEL_ASSET_GENERATION_MAX_TIMEOUT_MINUTES), ) return insights, assets diff --git a/ee/tasks/test/subscriptions/test_subscriptions_utils.py b/ee/tasks/test/subscriptions/test_subscriptions_utils.py index 35b2ca350ed8a..c8ff89adcea65 100644 --- a/ee/tasks/test/subscriptions/test_subscriptions_utils.py +++ b/ee/tasks/test/subscriptions/test_subscriptions_utils.py @@ -34,7 +34,7 @@ def setUp(self) -> None: self.subscription = create_subscription(team=self.team, insight=self.insight, created_by=self.user) def test_generate_assets_for_insight(self, mock_export_task: MagicMock, _mock_group: MagicMock) -> None: - with self.settings(ASSET_GENERATION_MAX_TIMEOUT_MINUTES=1): + with self.settings(PARALLEL_ASSET_GENERATION_MAX_TIMEOUT_MINUTES=1): insights, assets = generate_assets(self.subscription) assert insights == [self.insight] @@ -44,7 +44,7 @@ def test_generate_assets_for_insight(self, mock_export_task: MagicMock, _mock_gr def test_generate_assets_for_dashboard(self, mock_export_task: MagicMock, _mock_group: MagicMock) -> None: subscription = create_subscription(team=self.team, dashboard=self.dashboard, created_by=self.user) - with self.settings(ASSET_GENERATION_MAX_TIMEOUT_MINUTES=1): + with self.settings(PARALLEL_ASSET_GENERATION_MAX_TIMEOUT_MINUTES=1): insights, assets = generate_assets(subscription) assert len(insights) == len(self.tiles) @@ -54,7 +54,7 @@ def test_generate_assets_for_dashboard(self, mock_export_task: MagicMock, _mock_ def test_raises_if_missing_resource(self, _mock_export_task: MagicMock, _mock_group: MagicMock) -> None: subscription = create_subscription(team=self.team, created_by=self.user) - with self.settings(ASSET_GENERATION_MAX_TIMEOUT_MINUTES=1), pytest.raises(Exception) as e: + with self.settings(PARALLEL_ASSET_GENERATION_MAX_TIMEOUT_MINUTES=1), pytest.raises(Exception) as e: generate_assets(subscription) assert str(e.value) == "There are no insights to be sent for this Subscription" @@ -68,7 +68,7 @@ def test_excludes_deleted_insights_for_dashboard(self, mock_export_task: MagicMo current_tile.insight.save() subscription = create_subscription(team=self.team, dashboard=self.dashboard, created_by=self.user) - with self.settings(ASSET_GENERATION_MAX_TIMEOUT_MINUTES=1): + with self.settings(PARALLEL_ASSET_GENERATION_MAX_TIMEOUT_MINUTES=1): insights, assets = generate_assets(subscription) assert len(insights) == 1 @@ -90,7 +90,7 @@ def test_cancels_children_if_timed_out(self, _mock_export_task: MagicMock, mock_ mock_running_exports.children = [running_export_task] mock_running_exports.ready = mock_ready - with self.settings(ASSET_GENERATION_MAX_TIMEOUT_MINUTES=0.01), pytest.raises(Exception) as e: + with self.settings(PARALLEL_ASSET_GENERATION_MAX_TIMEOUT_MINUTES=0.01), pytest.raises(Exception) as e: generate_assets(self.subscription) assert str(e.value) == "Timed out waiting for celery task to finish" diff --git a/frontend/src/queries/nodes/DataNode/dataNodeLogic.ts b/frontend/src/queries/nodes/DataNode/dataNodeLogic.ts index a0b04feac18bb..87ab20e46205a 100644 --- a/frontend/src/queries/nodes/DataNode/dataNodeLogic.ts +++ b/frontend/src/queries/nodes/DataNode/dataNodeLogic.ts @@ -1,5 +1,4 @@ import clsx from 'clsx' -import equal from 'fast-deep-equal' import { actions, afterMount, @@ -16,7 +15,7 @@ import { } from 'kea' import { loaders } from 'kea-loaders' import { subscriptions } from 'kea-subscriptions' -import api, { ApiMethodOptions, getJSONOrThrow } from 'lib/api' +import api, { ApiMethodOptions } from 'lib/api' import { FEATURE_FLAGS } from 'lib/constants' import { dayjs } from 'lib/dayjs' import { featureFlagLogic } from 'lib/logic/featureFlagLogic' @@ -41,18 +40,8 @@ import { QueryResponse, QueryTiming, } from '~/queries/schema' -import { - isActorsQuery, - isEventsQuery, - isInsightActorsQuery, - isInsightQueryNode, - isLifecycleQuery, - isPersonsNode, - isStickinessQuery, - isTrendsQuery, -} from '~/queries/utils' +import { isActorsQuery, isEventsQuery, isInsightActorsQuery, isInsightQueryNode, isPersonsNode } from '~/queries/utils' -import { filtersToQueryNode } from '../InsightQuery/utils/filtersToQueryNode' import type { dataNodeLogicType } from './dataNodeLogicType' export interface DataNodeLogicProps { @@ -123,22 +112,6 @@ export const dataNodeLogic = kea([ if (props.doNotLoad) { return props.cachedResults } - if ( - isInsightQueryNode(props.query) && - !(values.hogQLInsightsLifecycleFlagEnabled && isLifecycleQuery(props.query)) && - !(values.hogQLInsightsTrendsFlagEnabled && isTrendsQuery(props.query)) && - !(values.hogQLInsightsStickinessFlagEnabled && isStickinessQuery(props.query)) && - props.cachedResults && - props.cachedResults['id'] && - props.cachedResults['filters'] && - equal(props.query, filtersToQueryNode(props.cachedResults['filters'])) - ) { - const url = `api/projects/${values.currentTeamId}/insights/${props.cachedResults['id']}?refresh=true` - const fetchResponse = await api.getResponse(url) - const data = await getJSONOrThrow(fetchResponse) - breakpoint() - return data - } if (props.cachedResults && !refresh) { if ( diff --git a/mypy-baseline.txt b/mypy-baseline.txt index 7747af86b3112..061cc288ffcd5 100644 --- a/mypy-baseline.txt +++ b/mypy-baseline.txt @@ -421,8 +421,7 @@ posthog/api/person.py:0: error: Argument 1 to "loads" has incompatible type "str posthog/api/person.py:0: error: Argument "user" to "log_activity" has incompatible type "User | AnonymousUser"; expected "User | None" [arg-type] posthog/api/person.py:0: error: Argument "user" to "log_activity" has incompatible type "User | AnonymousUser"; expected "User | None" [arg-type] posthog/hogql_queries/web_analytics/web_analytics_query_runner.py:0: error: Argument 1 to "append" of "list" has incompatible type "EventPropertyFilter"; expected "Expr" [arg-type] -posthog/hogql_queries/insights/trends/trends_query_runner.py:0: error: Need type annotation for "timings" (hint: "timings: List[] = ...") [var-annotated] -posthog/hogql_queries/insights/trends/trends_query_runner.py:0: error: Argument 1 to "extend" of "list" has incompatible type "list[QueryTiming] | None"; expected "Iterable[Any]" [arg-type] +posthog/hogql_queries/insights/trends/trends_query_runner.py:0: error: Unused "type: ignore" comment [unused-ignore] posthog/hogql_queries/insights/trends/trends_query_runner.py:0: error: Statement is unreachable [unreachable] posthog/hogql_queries/insights/trends/trends_query_runner.py:0: error: Argument 1 to "FormulaAST" has incompatible type "map[Any]"; expected "list[list[float]]" [arg-type] posthog/hogql_queries/insights/trends/trends_query_runner.py:0: error: Argument 1 to "FormulaAST" has incompatible type "map[Any]"; expected "list[list[float]]" [arg-type] diff --git a/package.json b/package.json index 7f1bdf639cac8..f860db3fd69e4 100644 --- a/package.json +++ b/package.json @@ -312,11 +312,11 @@ "stylelint --fix", "prettier --write" ], - "frontend/src/**.{js,jsx,mjs,ts,tsx}": [ + "frontend/src/**/*.{js,jsx,mjs,ts,tsx}": [ "eslint -c .eslintrc.js --fix", "prettier --write" ], - "plugin-server/**.{js,jsx,mjs,ts,tsx}": [ + "plugin-server/**/*.{js,jsx,mjs,ts,tsx}": [ "pnpm --dir plugin-server exec eslint --fix", "pnpm --dir plugin-server exec prettier --write" ], diff --git a/posthog/api/test/__snapshots__/test_decide.ambr b/posthog/api/test/__snapshots__/test_decide.ambr index d8daad640dd92..0082db5b23262 100644 --- a/posthog/api/test/__snapshots__/test_decide.ambr +++ b/posthog/api/test/__snapshots__/test_decide.ambr @@ -88,6 +88,22 @@ LIMIT 21 /*controller='team-detail',route='api/projects/%28%3FP%3Cid%3E%5B%5E/.%5D%2B%29/%3F%24'*/ ''' # --- +# name: TestDecide.test_decide_doesnt_error_out_when_database_is_down.10 + ''' + SELECT "posthog_pluginconfig"."id", + "posthog_pluginconfig"."web_token", + "posthog_pluginsourcefile"."updated_at", + "posthog_plugin"."updated_at", + "posthog_pluginconfig"."updated_at" + FROM "posthog_pluginconfig" + INNER JOIN "posthog_plugin" ON ("posthog_pluginconfig"."plugin_id" = "posthog_plugin"."id") + INNER JOIN "posthog_pluginsourcefile" ON ("posthog_plugin"."id" = "posthog_pluginsourcefile"."plugin_id") + WHERE ("posthog_pluginconfig"."enabled" + AND "posthog_pluginsourcefile"."filename" = 'site.ts' + AND "posthog_pluginsourcefile"."status" = 'TRANSPILED' + AND "posthog_pluginconfig"."team_id" = 2) + ''' +# --- # name: TestDecide.test_decide_doesnt_error_out_when_database_is_down.2 ''' SELECT "posthog_organizationmembership"."id", diff --git a/posthog/celery.py b/posthog/celery.py index 3526c61567a45..90b57e69c7d3a 100644 --- a/posthog/celery.py +++ b/posthog/celery.py @@ -401,8 +401,8 @@ def redis_heartbeat(): get_client().set("POSTHOG_HEARTBEAT", int(time.time())) -@app.task(ignore_result=True, bind=True) -def process_query_task(self, team_id, query_id, query_json, limit_context=None, refresh_requested=False): +@app.task(ignore_result=True, queue="analytics_queries") +def process_query_task(team_id, query_id, query_json, limit_context=None, refresh_requested=False): """ Kick off query Once complete save results to redis @@ -822,7 +822,7 @@ def clear_clickhouse_deleted_person(): remove_deleted_person_data() -@app.task(ignore_result=True) +@app.task(ignore_result=True, queue="email") def redis_celery_queue_depth(): try: with pushed_metrics_registry("redis_celery_queue_depth_registry") as registry: diff --git a/posthog/hogql/context.py b/posthog/hogql/context.py index 7f45d66fa4f83..1b43362790f16 100644 --- a/posthog/hogql/context.py +++ b/posthog/hogql/context.py @@ -6,6 +6,7 @@ if TYPE_CHECKING: from posthog.hogql.database.database import Database + from posthog.models import Team @dataclass @@ -22,6 +23,8 @@ class HogQLContext: # Team making the queries team_id: Optional[int] + # Team making the queries - if team is passed in, then the team isn't queried when creating the database + team: Optional["Team"] = None # Virtual database we're querying, will be populated from team_id if not present database: Optional["Database"] = None # If set, will save string constants to this dict. Inlines strings into the query if None. diff --git a/posthog/hogql/database/database.py b/posthog/hogql/database/database.py index caac14649f573..b25c7c95c4ca3 100644 --- a/posthog/hogql/database/database.py +++ b/posthog/hogql/database/database.py @@ -1,4 +1,4 @@ -from typing import Any, ClassVar, Dict, List, Literal, Optional, TypedDict +from typing import TYPE_CHECKING, Any, ClassVar, Dict, List, Literal, Optional, TypedDict from zoneinfo import ZoneInfo, ZoneInfoNotFoundError from pydantic import ConfigDict, BaseModel @@ -51,6 +51,10 @@ from posthog.schema import HogQLQueryModifiers, PersonsOnEventsMode +if TYPE_CHECKING: + from posthog.models import Team + + class Database(BaseModel): model_config = ConfigDict(extra="allow") @@ -121,7 +125,9 @@ def add_warehouse_tables(self, **field_definitions: Any): setattr(self, f_name, f_def) -def create_hogql_database(team_id: int, modifiers: Optional[HogQLQueryModifiers] = None) -> Database: +def create_hogql_database( + team_id: int, modifiers: Optional[HogQLQueryModifiers] = None, team_arg: Optional["Team"] = None +) -> Database: from posthog.models import Team from posthog.hogql.query import create_default_modifiers_for_team from posthog.warehouse.models import ( @@ -130,7 +136,7 @@ def create_hogql_database(team_id: int, modifiers: Optional[HogQLQueryModifiers] DataWarehouseViewLink, ) - team = Team.objects.get(pk=team_id) + team = team_arg or Team.objects.get(pk=team_id) modifiers = create_default_modifiers_for_team(team, modifiers) database = Database(timezone=team.timezone, week_start_day=team.week_start_day) diff --git a/posthog/hogql/printer.py b/posthog/hogql/printer.py index 428ab73a70811..98de2dd1bfcf1 100644 --- a/posthog/hogql/printer.py +++ b/posthog/hogql/printer.py @@ -97,7 +97,7 @@ def prepare_ast_for_printing( settings: Optional[HogQLGlobalSettings] = None, ) -> ast.Expr: with context.timings.measure("create_hogql_database"): - context.database = context.database or create_hogql_database(context.team_id, context.modifiers) + context.database = context.database or create_hogql_database(context.team_id, context.modifiers, context.team) if context.modifiers.inCohortVia == InCohortVia.leftjoin_conjoined: with context.timings.measure("resolve_in_cohorts_conjoined"): diff --git a/posthog/hogql/query.py b/posthog/hogql/query.py index 63b99ea516f5e..3ea5c3d49eb7a 100644 --- a/posthog/hogql/query.py +++ b/posthog/hogql/query.py @@ -81,6 +81,7 @@ def execute_hogql_query( with timings.measure("prepare_ast"): hogql_query_context = HogQLContext( team_id=team.pk, + team=team, enable_select_queries=True, timings=timings, modifiers=query_modifiers, @@ -121,6 +122,7 @@ def execute_hogql_query( with timings.measure("print_ast"): clickhouse_context = HogQLContext( team_id=team.pk, + team=team, enable_select_queries=True, timings=timings, modifiers=query_modifiers, diff --git a/posthog/hogql_queries/insights/trends/test/test_trends.py b/posthog/hogql_queries/insights/trends/test/test_trends.py index d2f927070c24f..874d02e246dc4 100644 --- a/posthog/hogql_queries/insights/trends/test/test_trends.py +++ b/posthog/hogql_queries/insights/trends/test/test_trends.py @@ -200,6 +200,7 @@ def convert_filter_to_trends_query(filter: Filter) -> TrendsQuery: return tq +@override_settings(IN_UNIT_TESTING=True) class TestTrends(ClickhouseTestMixin, APIBaseTest): maxDiff = None diff --git a/posthog/hogql_queries/insights/trends/test/test_trends_query_runner.py b/posthog/hogql_queries/insights/trends/test/test_trends_query_runner.py index 2fe0570ab104a..95e14d0fbd676 100644 --- a/posthog/hogql_queries/insights/trends/test/test_trends_query_runner.py +++ b/posthog/hogql_queries/insights/trends/test/test_trends_query_runner.py @@ -1,5 +1,7 @@ from dataclasses import dataclass from typing import Dict, List, Optional +from unittest.mock import patch +from django.test import override_settings from freezegun import freeze_time from posthog.hogql_queries.insights.trends.trends_query_runner import TrendsQueryRunner from posthog.models.cohort.cohort import Cohort @@ -41,6 +43,7 @@ class SeriesTestData: properties: Dict[str, str | int] +@override_settings(IN_UNIT_TESTING=True) class TestQuery(ClickhouseTestMixin, APIBaseTest): default_date_from = "2020-01-09" default_date_to = "2020-01-19" @@ -1158,3 +1161,22 @@ def test_smoothing(self): ) assert response.results[0]["data"] == [1, 0, 0, 1, 1, 1, 1, 1, 1, 1, 0, 0] + + @patch("posthog.hogql_queries.insights.trends.trends_query_runner.execute_hogql_query") + def test_should_throw_exception(self, patch_sync_execute): + patch_sync_execute.side_effect = Exception("Error thrown inside thread") + + with self.assertRaises(Exception) as e: + self._run_trends_query( + "2020-01-09", + "2020-01-20", + IntervalType.day, + [EventsNode(event="$pageview")], + None, + None, + ) + + self.assertEqual( + str(e.exception), + "Error thrown inside thread", + ) diff --git a/posthog/hogql_queries/insights/trends/trends_query_runner.py b/posthog/hogql_queries/insights/trends/trends_query_runner.py index cb4b5f5ca654d..b45ca18612bb0 100644 --- a/posthog/hogql_queries/insights/trends/trends_query_runner.py +++ b/posthog/hogql_queries/insights/trends/trends_query_runner.py @@ -3,7 +3,9 @@ from itertools import groupby from math import ceil from operator import itemgetter +import threading from typing import List, Optional, Any, Dict +from django.conf import settings from django.utils.timezone import datetime from posthog.caching.insights_api import ( @@ -40,6 +42,7 @@ EventsNode, HogQLQueryResponse, InCohortVia, + QueryTiming, TrendsQuery, TrendsQueryResponse, HogQLQueryModifiers, @@ -129,23 +132,69 @@ def to_actors_query(self) -> ast.SelectQuery | ast.SelectUnionQuery: def calculate(self): queries = self.to_query() - res = [] - timings = [] + res_matrix: List[List[Any] | Any | None] = [None] * len(queries) + timings_matrix: List[List[QueryTiming] | None] = [None] * len(queries) + errors: List[Exception] = [] - for index, query in enumerate(queries): - series_with_extra = self.series[index] + def run(index: int, query: ast.SelectQuery | ast.SelectUnionQuery, is_parallel: bool): + try: + series_with_extra = self.series[index] - response = execute_hogql_query( - query_type="TrendsQuery", - query=query, - team=self.team, - timings=self.timings, - modifiers=self.modifiers, - ) + response = execute_hogql_query( + query_type="TrendsQuery", + query=query, + team=self.team, + timings=self.timings, + modifiers=self.modifiers, + ) - timings.extend(response.timings) + timings_matrix[index] = response.timings + res_matrix[index] = self.build_series_response(response, series_with_extra, len(queries)) + except Exception as e: + errors.append(e) + finally: + if is_parallel: + from django.db import connection + + # This will only close the DB connection for the newly spawned thread and not the whole app + connection.close() + + # This exists so that we're not spawning threads during unit tests. We can't do + # this right now due to the lack of multithreaded support of Django + if settings.IN_UNIT_TESTING: # type: ignore + for index, query in enumerate(queries): + run(index, query, False) + elif len(queries) == 1: + run(0, queries[0], False) + else: + jobs = [threading.Thread(target=run, args=(index, query, True)) for index, query in enumerate(queries)] - res.extend(self.build_series_response(response, series_with_extra, len(queries))) + # Start the threads + for j in jobs: + j.start() + + # Ensure all of the threads have finished + for j in jobs: + j.join() + + # Raise any errors raised in a seperate thread + if len(errors) > 0: + raise errors[0] + + # Flatten res and timings + res = [] + for result in res_matrix: + if isinstance(result, List): + res.extend(result) + else: + res.append(result) + + timings = [] + for result in timings_matrix: + if isinstance(result, List): + timings.extend(result) + else: + timings.append(result) if ( self.query.trendsFilter is not None diff --git a/posthog/models/filters/base_filter.py b/posthog/models/filters/base_filter.py index 10ca241ad8f16..ca2ef9e4c575f 100644 --- a/posthog/models/filters/base_filter.py +++ b/posthog/models/filters/base_filter.py @@ -56,7 +56,11 @@ def __init__( # Set the HogQL context for the request self.hogql_context = self.kwargs.get( "hogql_context", - HogQLContext(within_non_hogql_query=True, team_id=self.team.pk if self.team else None), + HogQLContext( + within_non_hogql_query=True, + team_id=self.team.pk if self.team else None, + team=self.team if self.team else None, + ), ) if self.team: self.hogql_context.person_on_events_mode = self.team.person_on_events_mode diff --git a/posthog/settings/__init__.py b/posthog/settings/__init__.py index 2e3e1d1e112de..3593d44f40c57 100644 --- a/posthog/settings/__init__.py +++ b/posthog/settings/__init__.py @@ -110,6 +110,8 @@ PROM_PUSHGATEWAY_ADDRESS = os.getenv("PROM_PUSHGATEWAY_ADDRESS", None) +IN_UNIT_TESTING = get_from_env("IN_UNIT_TESTING", False, type_cast=str_to_bool) + # Extend and override these settings with EE's ones if "ee.apps.EnterpriseConfig" in INSTALLED_APPS: from ee.settings import * # noqa: F401, F403 diff --git a/posthog/tasks/exporter.py b/posthog/tasks/exporter.py index 69f968f207cea..ffec5d5b1142f 100644 --- a/posthog/tasks/exporter.py +++ b/posthog/tasks/exporter.py @@ -2,6 +2,7 @@ from prometheus_client import Counter, Histogram +from posthog import settings from posthog.celery import app from posthog.models import ExportedAsset @@ -40,6 +41,7 @@ retry_backoff=True, acks_late=True, ignore_result=False, + time_limit=settings.ASSET_GENERATION_MAX_TIMEOUT_SECONDS, ) def export_asset(exported_asset_id: int, limit: Optional[int] = None) -> None: from posthog.tasks.exports import csv_exporter, image_exporter