Skip to content

Commit

Permalink
Merge branch 'master' into sleek-sparkline
Browse files Browse the repository at this point in the history
  • Loading branch information
Twixes committed Jan 17, 2024
2 parents 2df738c + 103280f commit 2a86c49
Show file tree
Hide file tree
Showing 22 changed files with 166 additions and 64 deletions.
2 changes: 1 addition & 1 deletion bin/celery-queues.env
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
# Default set of queues to be used by Celery.
# Important: Add new queues to make Celery consume tasks from them.
CELERY_WORKER_QUEUES=celery,email,insight_export,insight_refresh,gevent
CELERY_WORKER_QUEUES=celery,email,insight_export,insight_refresh,analytics_queries
7 changes: 7 additions & 0 deletions docker-compose.base.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@ services:
image: maildev/maildev:2.0.5
restart: on-failure

flower:
image: mher/flower:2.0.0
restart: on-failure
environment:
FLOWER_PORT: 5555
CELERY_BROKER_URL: redis://redis:6379

worker: &worker
command: ./bin/docker-worker-celery --with-scheduler
restart: on-failure
Expand Down
7 changes: 7 additions & 0 deletions docker-compose.dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ services:
ports:
- '6379:6379'

flower:
extends:
file: docker-compose.base.yml
service: flower
ports:
- '5555:5555'

clickhouse:
extends:
file: docker-compose.base.yml
Expand Down
5 changes: 4 additions & 1 deletion ee/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,7 @@
# Whether to enable the admin portal. Default false for self-hosted as if not setup properly can pose security issues.
ADMIN_PORTAL_ENABLED = get_from_env("ADMIN_PORTAL_ENABLED", DEMO or DEBUG, type_cast=str_to_bool)

ASSET_GENERATION_MAX_TIMEOUT_MINUTES = get_from_env("ASSET_GENERATION_MAX_TIMEOUT_MINUTES", 10, type_cast=int)
ASSET_GENERATION_MAX_TIMEOUT_SECONDS = get_from_env("ASSET_GENERATION_MAX_TIMEOUT_SECONDS", 60.0, type_cast=float)
PARALLEL_ASSET_GENERATION_MAX_TIMEOUT_MINUTES = get_from_env(
"PARALLEL_ASSET_GENERATION_MAX_TIMEOUT_MINUTES", 10.0, type_cast=float
)
10 changes: 8 additions & 2 deletions ee/tasks/subscriptions/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from datetime import datetime, timedelta
from typing import Optional

from django.conf import settings
import structlog
from prometheus_client import Counter
from sentry_sdk import capture_exception, capture_message
Expand Down Expand Up @@ -144,12 +145,17 @@ def schedule_all_subscriptions() -> None:
deliver_subscription_report.delay(subscription.id)


@app.task()
report_timeout_seconds = settings.PARALLEL_ASSET_GENERATION_MAX_TIMEOUT_MINUTES * 60 * 1.5


@app.task(soft_time_limit=report_timeout_seconds, timeout=report_timeout_seconds + 10)
def deliver_subscription_report(subscription_id: int) -> None:
if not settings.TEST:
return
return _deliver_subscription_report(subscription_id)


@app.task()
@app.task(soft_time_limit=30, time_limit=40)
def handle_subscription_value_change(
subscription_id: int, previous_value: str, invite_message: Optional[str] = None
) -> None:
Expand Down
2 changes: 1 addition & 1 deletion ee/tasks/subscriptions/subscription_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def generate_assets(

wait_for_parallel_celery_group(
parallel_job,
max_timeout=timedelta(minutes=settings.ASSET_GENERATION_MAX_TIMEOUT_MINUTES),
max_timeout=timedelta(minutes=settings.PARALLEL_ASSET_GENERATION_MAX_TIMEOUT_MINUTES),
)

return insights, assets
10 changes: 5 additions & 5 deletions ee/tasks/test/subscriptions/test_subscriptions_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def setUp(self) -> None:
self.subscription = create_subscription(team=self.team, insight=self.insight, created_by=self.user)

def test_generate_assets_for_insight(self, mock_export_task: MagicMock, _mock_group: MagicMock) -> None:
with self.settings(ASSET_GENERATION_MAX_TIMEOUT_MINUTES=1):
with self.settings(PARALLEL_ASSET_GENERATION_MAX_TIMEOUT_MINUTES=1):
insights, assets = generate_assets(self.subscription)

assert insights == [self.insight]
Expand All @@ -44,7 +44,7 @@ def test_generate_assets_for_insight(self, mock_export_task: MagicMock, _mock_gr
def test_generate_assets_for_dashboard(self, mock_export_task: MagicMock, _mock_group: MagicMock) -> None:
subscription = create_subscription(team=self.team, dashboard=self.dashboard, created_by=self.user)

with self.settings(ASSET_GENERATION_MAX_TIMEOUT_MINUTES=1):
with self.settings(PARALLEL_ASSET_GENERATION_MAX_TIMEOUT_MINUTES=1):
insights, assets = generate_assets(subscription)

assert len(insights) == len(self.tiles)
Expand All @@ -54,7 +54,7 @@ def test_generate_assets_for_dashboard(self, mock_export_task: MagicMock, _mock_
def test_raises_if_missing_resource(self, _mock_export_task: MagicMock, _mock_group: MagicMock) -> None:
subscription = create_subscription(team=self.team, created_by=self.user)

with self.settings(ASSET_GENERATION_MAX_TIMEOUT_MINUTES=1), pytest.raises(Exception) as e:
with self.settings(PARALLEL_ASSET_GENERATION_MAX_TIMEOUT_MINUTES=1), pytest.raises(Exception) as e:
generate_assets(subscription)

assert str(e.value) == "There are no insights to be sent for this Subscription"
Expand All @@ -68,7 +68,7 @@ def test_excludes_deleted_insights_for_dashboard(self, mock_export_task: MagicMo
current_tile.insight.save()
subscription = create_subscription(team=self.team, dashboard=self.dashboard, created_by=self.user)

with self.settings(ASSET_GENERATION_MAX_TIMEOUT_MINUTES=1):
with self.settings(PARALLEL_ASSET_GENERATION_MAX_TIMEOUT_MINUTES=1):
insights, assets = generate_assets(subscription)

assert len(insights) == 1
Expand All @@ -90,7 +90,7 @@ def test_cancels_children_if_timed_out(self, _mock_export_task: MagicMock, mock_
mock_running_exports.children = [running_export_task]
mock_running_exports.ready = mock_ready

with self.settings(ASSET_GENERATION_MAX_TIMEOUT_MINUTES=0.01), pytest.raises(Exception) as e:
with self.settings(PARALLEL_ASSET_GENERATION_MAX_TIMEOUT_MINUTES=0.01), pytest.raises(Exception) as e:
generate_assets(self.subscription)

assert str(e.value) == "Timed out waiting for celery task to finish"
Expand Down
31 changes: 2 additions & 29 deletions frontend/src/queries/nodes/DataNode/dataNodeLogic.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import clsx from 'clsx'
import equal from 'fast-deep-equal'
import {
actions,
afterMount,
Expand All @@ -16,7 +15,7 @@ import {
} from 'kea'
import { loaders } from 'kea-loaders'
import { subscriptions } from 'kea-subscriptions'
import api, { ApiMethodOptions, getJSONOrThrow } from 'lib/api'
import api, { ApiMethodOptions } from 'lib/api'
import { FEATURE_FLAGS } from 'lib/constants'
import { dayjs } from 'lib/dayjs'
import { featureFlagLogic } from 'lib/logic/featureFlagLogic'
Expand All @@ -41,18 +40,8 @@ import {
QueryResponse,
QueryTiming,
} from '~/queries/schema'
import {
isActorsQuery,
isEventsQuery,
isInsightActorsQuery,
isInsightQueryNode,
isLifecycleQuery,
isPersonsNode,
isStickinessQuery,
isTrendsQuery,
} from '~/queries/utils'
import { isActorsQuery, isEventsQuery, isInsightActorsQuery, isInsightQueryNode, isPersonsNode } from '~/queries/utils'

import { filtersToQueryNode } from '../InsightQuery/utils/filtersToQueryNode'
import type { dataNodeLogicType } from './dataNodeLogicType'

export interface DataNodeLogicProps {
Expand Down Expand Up @@ -123,22 +112,6 @@ export const dataNodeLogic = kea<dataNodeLogicType>([
if (props.doNotLoad) {
return props.cachedResults
}
if (
isInsightQueryNode(props.query) &&
!(values.hogQLInsightsLifecycleFlagEnabled && isLifecycleQuery(props.query)) &&
!(values.hogQLInsightsTrendsFlagEnabled && isTrendsQuery(props.query)) &&
!(values.hogQLInsightsStickinessFlagEnabled && isStickinessQuery(props.query)) &&
props.cachedResults &&
props.cachedResults['id'] &&
props.cachedResults['filters'] &&
equal(props.query, filtersToQueryNode(props.cachedResults['filters']))
) {
const url = `api/projects/${values.currentTeamId}/insights/${props.cachedResults['id']}?refresh=true`
const fetchResponse = await api.getResponse(url)
const data = await getJSONOrThrow(fetchResponse)
breakpoint()
return data
}

if (props.cachedResults && !refresh) {
if (
Expand Down
3 changes: 1 addition & 2 deletions mypy-baseline.txt
Original file line number Diff line number Diff line change
Expand Up @@ -421,8 +421,7 @@ posthog/api/person.py:0: error: Argument 1 to "loads" has incompatible type "str
posthog/api/person.py:0: error: Argument "user" to "log_activity" has incompatible type "User | AnonymousUser"; expected "User | None" [arg-type]
posthog/api/person.py:0: error: Argument "user" to "log_activity" has incompatible type "User | AnonymousUser"; expected "User | None" [arg-type]
posthog/hogql_queries/web_analytics/web_analytics_query_runner.py:0: error: Argument 1 to "append" of "list" has incompatible type "EventPropertyFilter"; expected "Expr" [arg-type]
posthog/hogql_queries/insights/trends/trends_query_runner.py:0: error: Need type annotation for "timings" (hint: "timings: List[<type>] = ...") [var-annotated]
posthog/hogql_queries/insights/trends/trends_query_runner.py:0: error: Argument 1 to "extend" of "list" has incompatible type "list[QueryTiming] | None"; expected "Iterable[Any]" [arg-type]
posthog/hogql_queries/insights/trends/trends_query_runner.py:0: error: Unused "type: ignore" comment [unused-ignore]
posthog/hogql_queries/insights/trends/trends_query_runner.py:0: error: Statement is unreachable [unreachable]
posthog/hogql_queries/insights/trends/trends_query_runner.py:0: error: Argument 1 to "FormulaAST" has incompatible type "map[Any]"; expected "list[list[float]]" [arg-type]
posthog/hogql_queries/insights/trends/trends_query_runner.py:0: error: Argument 1 to "FormulaAST" has incompatible type "map[Any]"; expected "list[list[float]]" [arg-type]
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -312,11 +312,11 @@
"stylelint --fix",
"prettier --write"
],
"frontend/src/**.{js,jsx,mjs,ts,tsx}": [
"frontend/src/**/*.{js,jsx,mjs,ts,tsx}": [
"eslint -c .eslintrc.js --fix",
"prettier --write"
],
"plugin-server/**.{js,jsx,mjs,ts,tsx}": [
"plugin-server/**/*.{js,jsx,mjs,ts,tsx}": [
"pnpm --dir plugin-server exec eslint --fix",
"pnpm --dir plugin-server exec prettier --write"
],
Expand Down
16 changes: 16 additions & 0 deletions posthog/api/test/__snapshots__/test_decide.ambr
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,22 @@
LIMIT 21 /*controller='team-detail',route='api/projects/%28%3FP%3Cid%3E%5B%5E/.%5D%2B%29/%3F%24'*/
'''
# ---
# name: TestDecide.test_decide_doesnt_error_out_when_database_is_down.10
'''
SELECT "posthog_pluginconfig"."id",
"posthog_pluginconfig"."web_token",
"posthog_pluginsourcefile"."updated_at",
"posthog_plugin"."updated_at",
"posthog_pluginconfig"."updated_at"
FROM "posthog_pluginconfig"
INNER JOIN "posthog_plugin" ON ("posthog_pluginconfig"."plugin_id" = "posthog_plugin"."id")
INNER JOIN "posthog_pluginsourcefile" ON ("posthog_plugin"."id" = "posthog_pluginsourcefile"."plugin_id")
WHERE ("posthog_pluginconfig"."enabled"
AND "posthog_pluginsourcefile"."filename" = 'site.ts'
AND "posthog_pluginsourcefile"."status" = 'TRANSPILED'
AND "posthog_pluginconfig"."team_id" = 2)
'''
# ---
# name: TestDecide.test_decide_doesnt_error_out_when_database_is_down.2
'''
SELECT "posthog_organizationmembership"."id",
Expand Down
6 changes: 3 additions & 3 deletions posthog/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,8 +401,8 @@ def redis_heartbeat():
get_client().set("POSTHOG_HEARTBEAT", int(time.time()))


@app.task(ignore_result=True, bind=True)
def process_query_task(self, team_id, query_id, query_json, limit_context=None, refresh_requested=False):
@app.task(ignore_result=True, queue="analytics_queries")
def process_query_task(team_id, query_id, query_json, limit_context=None, refresh_requested=False):
"""
Kick off query
Once complete save results to redis
Expand Down Expand Up @@ -822,7 +822,7 @@ def clear_clickhouse_deleted_person():
remove_deleted_person_data()


@app.task(ignore_result=True)
@app.task(ignore_result=True, queue="email")
def redis_celery_queue_depth():
try:
with pushed_metrics_registry("redis_celery_queue_depth_registry") as registry:
Expand Down
3 changes: 3 additions & 0 deletions posthog/hogql/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

if TYPE_CHECKING:
from posthog.hogql.database.database import Database
from posthog.models import Team


@dataclass
Expand All @@ -22,6 +23,8 @@ class HogQLContext:

# Team making the queries
team_id: Optional[int]
# Team making the queries - if team is passed in, then the team isn't queried when creating the database
team: Optional["Team"] = None
# Virtual database we're querying, will be populated from team_id if not present
database: Optional["Database"] = None
# If set, will save string constants to this dict. Inlines strings into the query if None.
Expand Down
12 changes: 9 additions & 3 deletions posthog/hogql/database/database.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, ClassVar, Dict, List, Literal, Optional, TypedDict
from typing import TYPE_CHECKING, Any, ClassVar, Dict, List, Literal, Optional, TypedDict
from zoneinfo import ZoneInfo, ZoneInfoNotFoundError
from pydantic import ConfigDict, BaseModel

Expand Down Expand Up @@ -51,6 +51,10 @@
from posthog.schema import HogQLQueryModifiers, PersonsOnEventsMode


if TYPE_CHECKING:
from posthog.models import Team


class Database(BaseModel):
model_config = ConfigDict(extra="allow")

Expand Down Expand Up @@ -121,7 +125,9 @@ def add_warehouse_tables(self, **field_definitions: Any):
setattr(self, f_name, f_def)


def create_hogql_database(team_id: int, modifiers: Optional[HogQLQueryModifiers] = None) -> Database:
def create_hogql_database(
team_id: int, modifiers: Optional[HogQLQueryModifiers] = None, team_arg: Optional["Team"] = None
) -> Database:
from posthog.models import Team
from posthog.hogql.query import create_default_modifiers_for_team
from posthog.warehouse.models import (
Expand All @@ -130,7 +136,7 @@ def create_hogql_database(team_id: int, modifiers: Optional[HogQLQueryModifiers]
DataWarehouseViewLink,
)

team = Team.objects.get(pk=team_id)
team = team_arg or Team.objects.get(pk=team_id)
modifiers = create_default_modifiers_for_team(team, modifiers)
database = Database(timezone=team.timezone, week_start_day=team.week_start_day)

Expand Down
2 changes: 1 addition & 1 deletion posthog/hogql/printer.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def prepare_ast_for_printing(
settings: Optional[HogQLGlobalSettings] = None,
) -> ast.Expr:
with context.timings.measure("create_hogql_database"):
context.database = context.database or create_hogql_database(context.team_id, context.modifiers)
context.database = context.database or create_hogql_database(context.team_id, context.modifiers, context.team)

if context.modifiers.inCohortVia == InCohortVia.leftjoin_conjoined:
with context.timings.measure("resolve_in_cohorts_conjoined"):
Expand Down
2 changes: 2 additions & 0 deletions posthog/hogql/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ def execute_hogql_query(
with timings.measure("prepare_ast"):
hogql_query_context = HogQLContext(
team_id=team.pk,
team=team,
enable_select_queries=True,
timings=timings,
modifiers=query_modifiers,
Expand Down Expand Up @@ -121,6 +122,7 @@ def execute_hogql_query(
with timings.measure("print_ast"):
clickhouse_context = HogQLContext(
team_id=team.pk,
team=team,
enable_select_queries=True,
timings=timings,
modifiers=query_modifiers,
Expand Down
1 change: 1 addition & 0 deletions posthog/hogql_queries/insights/trends/test/test_trends.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ def convert_filter_to_trends_query(filter: Filter) -> TrendsQuery:
return tq


@override_settings(IN_UNIT_TESTING=True)
class TestTrends(ClickhouseTestMixin, APIBaseTest):
maxDiff = None

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from dataclasses import dataclass
from typing import Dict, List, Optional
from unittest.mock import patch
from django.test import override_settings
from freezegun import freeze_time
from posthog.hogql_queries.insights.trends.trends_query_runner import TrendsQueryRunner
from posthog.models.cohort.cohort import Cohort
Expand Down Expand Up @@ -41,6 +43,7 @@ class SeriesTestData:
properties: Dict[str, str | int]


@override_settings(IN_UNIT_TESTING=True)
class TestQuery(ClickhouseTestMixin, APIBaseTest):
default_date_from = "2020-01-09"
default_date_to = "2020-01-19"
Expand Down Expand Up @@ -1158,3 +1161,22 @@ def test_smoothing(self):
)

assert response.results[0]["data"] == [1, 0, 0, 1, 1, 1, 1, 1, 1, 1, 0, 0]

@patch("posthog.hogql_queries.insights.trends.trends_query_runner.execute_hogql_query")
def test_should_throw_exception(self, patch_sync_execute):
patch_sync_execute.side_effect = Exception("Error thrown inside thread")

with self.assertRaises(Exception) as e:
self._run_trends_query(
"2020-01-09",
"2020-01-20",
IntervalType.day,
[EventsNode(event="$pageview")],
None,
None,
)

self.assertEqual(
str(e.exception),
"Error thrown inside thread",
)
Loading

0 comments on commit 2a86c49

Please sign in to comment.