diff --git a/ee/clickhouse/views/experiments.py b/ee/clickhouse/views/experiments.py index d3a9f2afea364..391fc1a6aa5ab 100644 --- a/ee/clickhouse/views/experiments.py +++ b/ee/clickhouse/views/experiments.py @@ -339,6 +339,7 @@ def update(self, instance: Experiment, validated_data: dict, *args: Any, **kwarg # if ( # not instance.filters.get("events") # and not instance.filters.get("actions") + # and not instance.filters.get("data_warehouse") # and validated_data.get("start_date") # and not validated_data.get("filters") # ): diff --git a/ee/hogai/schema_generator/nodes.py b/ee/hogai/schema_generator/nodes.py index 560a9a7d5cc9e..c5e7ffbba85c4 100644 --- a/ee/hogai/schema_generator/nodes.py +++ b/ee/hogai/schema_generator/nodes.py @@ -144,7 +144,10 @@ def _construct_messages( human_messages, visualization_messages = filter_visualization_conversation(messages) first_ai_message = True - for human_message, ai_message in itertools.zip_longest(human_messages, visualization_messages): + for idx, (human_message, ai_message) in enumerate( + itertools.zip_longest(human_messages, visualization_messages) + ): + # Plans go first if ai_message: conversation.append( HumanMessagePromptTemplate.from_template( @@ -161,6 +164,7 @@ def _construct_messages( ).format(plan=generated_plan) ) + # Then questions if human_message: conversation.append( HumanMessagePromptTemplate.from_template(QUESTION_PROMPT, template_format="mustache").format( @@ -168,7 +172,8 @@ def _construct_messages( ) ) - if ai_message: + # Then schemas, but include only last generated schema because it doesn't need more context. + if ai_message and idx + 1 == len(visualization_messages): conversation.append( LangchainAssistantMessage(content=ai_message.answer.model_dump_json() if ai_message.answer else "") ) diff --git a/ee/hogai/schema_generator/test/test_nodes.py b/ee/hogai/schema_generator/test/test_nodes.py index af66234978794..795045af50b56 100644 --- a/ee/hogai/schema_generator/test/test_nodes.py +++ b/ee/hogai/schema_generator/test/test_nodes.py @@ -9,9 +9,11 @@ from ee.hogai.schema_generator.nodes import SchemaGeneratorNode, SchemaGeneratorToolsNode from ee.hogai.schema_generator.utils import SchemaGeneratorOutput from posthog.schema import ( + AssistantMessage, AssistantTrendsQuery, FailureMessage, HumanMessage, + RouterMessage, VisualizationMessage, ) from posthog.test.base import APIBaseTest, ClickhouseTestMixin @@ -169,6 +171,71 @@ def test_agent_reconstructs_conversation_and_merges_messages(self): self.assertNotIn("{{question}}", history[5].content) self.assertIn("Follow\nUp", history[5].content) + def test_agent_reconstructs_typical_conversation(self): + node = DummyGeneratorNode(self.team) + history = node._construct_messages( + { + "messages": [ + HumanMessage(content="Question 1"), + RouterMessage(content="trends"), + VisualizationMessage(answer=AssistantTrendsQuery(series=[]), plan="Plan 1"), + AssistantMessage(content="Summary 1"), + HumanMessage(content="Question 2"), + RouterMessage(content="funnel"), + VisualizationMessage(answer=AssistantTrendsQuery(series=[]), plan="Plan 2"), + AssistantMessage(content="Summary 2"), + HumanMessage(content="Question 3"), + RouterMessage(content="funnel"), + ], + "plan": "Plan 3", + } + ) + self.assertEqual(len(history), 8) + self.assertEqual(history[0].type, "human") + self.assertIn("mapping", history[0].content) + self.assertEqual(history[1].type, "human") + self.assertIn("Plan 1", history[1].content) + self.assertEqual(history[2].type, "human") + self.assertIn("Question 1", history[2].content) + self.assertEqual(history[3].type, "human") + self.assertIn("Plan 2", history[3].content) + self.assertEqual(history[4].type, "human") + self.assertIn("Question 2", history[4].content) + self.assertEqual(history[5].type, "ai") + self.assertEqual(history[6].type, "human") + self.assertIn("Plan 3", history[6].content) + self.assertEqual(history[7].type, "human") + self.assertIn("Question 3", history[7].content) + + def test_prompt(self): + node = DummyGeneratorNode(self.team) + state = { + "messages": [ + HumanMessage(content="Question 1"), + RouterMessage(content="trends"), + VisualizationMessage(answer=AssistantTrendsQuery(series=[]), plan="Plan 1"), + AssistantMessage(content="Summary 1"), + HumanMessage(content="Question 2"), + RouterMessage(content="funnel"), + VisualizationMessage(answer=AssistantTrendsQuery(series=[]), plan="Plan 2"), + AssistantMessage(content="Summary 2"), + HumanMessage(content="Question 3"), + RouterMessage(content="funnel"), + ], + "plan": "Plan 3", + } + with patch.object(DummyGeneratorNode, "_model") as generator_model_mock: + + def assert_prompt(prompt): + self.assertEqual(len(prompt), 4) + self.assertEqual(prompt[0].type, "system") + self.assertEqual(prompt[1].type, "human") + self.assertEqual(prompt[2].type, "ai") + self.assertEqual(prompt[3].type, "human") + + generator_model_mock.return_value = RunnableLambda(assert_prompt) + node.run(state, {}) + def test_failover_with_incorrect_schema(self): node = DummyGeneratorNode(self.team) with patch.object(DummyGeneratorNode, "_model") as generator_model_mock: diff --git a/ee/hogai/taxonomy_agent/test/test_nodes.py b/ee/hogai/taxonomy_agent/test/test_nodes.py index fe3d52266ec18..40127c19370b6 100644 --- a/ee/hogai/taxonomy_agent/test/test_nodes.py +++ b/ee/hogai/taxonomy_agent/test/test_nodes.py @@ -18,6 +18,7 @@ AssistantTrendsQuery, FailureMessage, HumanMessage, + RouterMessage, VisualizationMessage, ) from posthog.test.base import APIBaseTest, ClickhouseTestMixin, _create_event, _create_person @@ -116,6 +117,36 @@ def test_agent_reconstructs_conversation_with_failures(self): self.assertIn("Text", history[0].content) self.assertNotIn("{{question}}", history[0].content) + def test_agent_reconstructs_typical_conversation(self): + node = self._get_node() + history = node._construct_messages( + { + "messages": [ + HumanMessage(content="Question 1"), + RouterMessage(content="trends"), + VisualizationMessage(answer=AssistantTrendsQuery(series=[]), plan="Plan 1"), + AssistantMessage(content="Summary 1"), + HumanMessage(content="Question 2"), + RouterMessage(content="funnel"), + VisualizationMessage(answer=AssistantTrendsQuery(series=[]), plan="Plan 2"), + AssistantMessage(content="Summary 2"), + HumanMessage(content="Question 3"), + RouterMessage(content="funnel"), + ] + } + ) + self.assertEqual(len(history), 5) + self.assertEqual(history[0].type, "human") + self.assertIn("Question 1", history[0].content) + self.assertEqual(history[1].type, "ai") + self.assertEqual(history[1].content, "Plan 1") + self.assertEqual(history[2].type, "human") + self.assertIn("Question 2", history[2].content) + self.assertEqual(history[3].type, "ai") + self.assertEqual(history[3].content, "Plan 2") + self.assertEqual(history[4].type, "human") + self.assertIn("Question 3", history[4].content) + def test_agent_filters_out_low_count_events(self): _create_person(distinct_ids=["test"], team=self.team) for i in range(26): diff --git a/ee/hogai/test/test_utils.py b/ee/hogai/test/test_utils.py index 89f23d2fdd7b6..42e54d058c556 100644 --- a/ee/hogai/test/test_utils.py +++ b/ee/hogai/test/test_utils.py @@ -1,7 +1,14 @@ from langchain_core.messages import HumanMessage as LangchainHumanMessage from ee.hogai.utils import filter_visualization_conversation, merge_human_messages -from posthog.schema import AssistantTrendsQuery, FailureMessage, HumanMessage, VisualizationMessage +from posthog.schema import ( + AssistantMessage, + AssistantTrendsQuery, + FailureMessage, + HumanMessage, + RouterMessage, + VisualizationMessage, +) from posthog.test.base import BaseTest @@ -37,3 +44,29 @@ def test_filter_trends_conversation(self): self.assertEqual( visualization_messages, [VisualizationMessage(answer=AssistantTrendsQuery(series=[]), plan="plan")] ) + + def test_filters_typical_conversation(self): + human_messages, visualization_messages = filter_visualization_conversation( + [ + HumanMessage(content="Question 1"), + RouterMessage(content="trends"), + VisualizationMessage(answer=AssistantTrendsQuery(series=[]), plan="Plan 1"), + AssistantMessage(content="Summary 1"), + HumanMessage(content="Question 2"), + RouterMessage(content="funnel"), + VisualizationMessage(answer=AssistantTrendsQuery(series=[]), plan="Plan 2"), + AssistantMessage(content="Summary 2"), + ] + ) + self.assertEqual(len(human_messages), 2) + self.assertEqual(len(visualization_messages), 2) + self.assertEqual( + human_messages, [LangchainHumanMessage(content="Question 1"), LangchainHumanMessage(content="Question 2")] + ) + self.assertEqual( + visualization_messages, + [ + VisualizationMessage(answer=AssistantTrendsQuery(series=[]), plan="Plan 1"), + VisualizationMessage(answer=AssistantTrendsQuery(series=[]), plan="Plan 2"), + ], + ) diff --git a/frontend/src/globals.d.ts b/frontend/src/globals.d.ts index 3c9a4a730cb38..e2f93e99a9665 100644 --- a/frontend/src/globals.d.ts +++ b/frontend/src/globals.d.ts @@ -22,30 +22,5 @@ declare global { } IMPERSONATED_SESSION?: boolean POSTHOG_JS_UUID_VERSION?: string - - EventSourcePolyfill: typeof EventSource } } - -/** Copied from https://github.com/DefinitelyTyped/DefinitelyTyped/blob/aaf94a0a/types/event-source-polyfill/index.d.ts#L43 */ -export declare class EventSource { - static readonly CLOSED: number - static readonly CONNECTING: number - static readonly OPEN: number - - constructor(url: string, eventSourceInitDict?: EventSource.EventSourceInitDict) - - readonly CLOSED: number - readonly CONNECTING: number - readonly OPEN: number - readonly url: string - readonly readyState: number - readonly withCredentials: boolean - onopen: (evt: MessageEvent) => any - onmessage: (evt: MessageEvent) => any - onerror: (evt: MessageEvent) => any - addEventListener(type: string, listener: (evt: MessageEvent) => void): void - dispatchEvent(evt: Event): boolean - removeEventListener(type: string, listener: (evt: MessageEvent) => void): void - close(): void -} diff --git a/frontend/src/lib/components/Alerts/alertsLogic.ts b/frontend/src/lib/components/Alerts/alertsLogic.ts index 126677eb92f32..e5be50bf46eb1 100644 --- a/frontend/src/lib/components/Alerts/alertsLogic.ts +++ b/frontend/src/lib/components/Alerts/alertsLogic.ts @@ -33,5 +33,19 @@ export const alertsLogic = kea([ afterMount(({ actions }) => actions.loadAlerts()), ]) -const alertComparatorKey = (alert: AlertType): number => - !alert.enabled ? 3 : alert.state === AlertState.NOT_FIRING ? 2 : 1 +const alertComparatorKey = (alert: AlertType): number => { + if (!alert.enabled) { + return 100 + } + + switch (alert.state) { + case AlertState.FIRING: + return 1 + case AlertState.ERRORED: + return 2 + case AlertState.SNOOZED: + return 3 + case AlertState.NOT_FIRING: + return 4 + } +} diff --git a/frontend/src/lib/components/Alerts/views/Alerts.tsx b/frontend/src/lib/components/Alerts/views/Alerts.tsx index 4bb1129fd1146..d8ff606e31631 100644 --- a/frontend/src/lib/components/Alerts/views/Alerts.tsx +++ b/frontend/src/lib/components/Alerts/views/Alerts.tsx @@ -1,6 +1,5 @@ import { TZLabel } from '@posthog/apps-common' -import { IconCheck } from '@posthog/icons' -import { Tooltip } from '@posthog/lemon-ui' +import { LemonTag, Tooltip } from '@posthog/lemon-ui' import { useActions, useValues } from 'kea' import { router } from 'kea-router' import { FeedbackNotice } from 'lib/components/FeedbackNotice' @@ -49,7 +48,6 @@ export function Alerts({ alertId }: AlertsProps): JSX.Element { className={alert.enabled ? '' : 'text-muted'} title={
- {alert.enabled ? : null}
{name}
} @@ -58,6 +56,13 @@ export function Alerts({ alertId }: AlertsProps): JSX.Element { ) }, }, + { + title: 'Status', + dataIndex: 'state', + render: function renderStateIndicator(_, alert: AlertType) { + return alert.enabled ? : null + }, + }, { title: 'Last checked', sorter: true, @@ -98,7 +103,8 @@ export function Alerts({ alertId }: AlertsProps): JSX.Element { title: 'Enabled', dataIndex: 'enabled', key: 'enabled', - render: (enabled: any) => (enabled ? : null), + render: (enabled: any) => + enabled ? ENABLED : DISABLED, }, ] diff --git a/frontend/src/lib/components/Alerts/views/EditAlertModal.tsx b/frontend/src/lib/components/Alerts/views/EditAlertModal.tsx index 404bc5e213a9f..ae12fccdab37f 100644 --- a/frontend/src/lib/components/Alerts/views/EditAlertModal.tsx +++ b/frontend/src/lib/components/Alerts/views/EditAlertModal.tsx @@ -35,11 +35,13 @@ export function AlertStateTable({ alert }: { alert: AlertType }): JSX.Element | return (
-

- Current status - {alert.state} - {alert.snoozed_until && ` until ${formatDate(dayjs(alert?.snoozed_until), 'MMM D, HH:mm')}`}{' '} +
+

Current status:

-

+

+ {alert.snoozed_until && ` until ${formatDate(dayjs(alert?.snoozed_until), 'MMM D, HH:mm')}`} +

+
@@ -92,7 +94,7 @@ export function EditAlertModal({ const { setAlertFormValue } = useActions(formLogic) const trendsLogic = trendsDataLogic({ dashboardItemId: insightShortId }) - const { alertSeries, isNonTimeSeriesDisplay, isBreakdownValid } = useValues(trendsLogic) + const { alertSeries, isNonTimeSeriesDisplay, isBreakdownValid, formula } = useValues(trendsLogic) const creatingNewAlert = alertForm.id === undefined @@ -161,13 +163,17 @@ export function EditAlertModal({ options={alertSeries?.map(({ event }, index) => ({ label: isBreakdownValid ? 'any breakdown value' + : formula + ? `Formula (${formula})` : `${alphabet[index]} - ${event}`, - value: index, + value: isBreakdownValid || formula ? 0 : index, }))} disabledReason={ - isBreakdownValid && - `For trends with breakdown, the alert will fire if any of the breakdown - values breaches the threshold.` + (isBreakdownValid && + `For trends with breakdown, the alert will fire if any of the breakdown + values breaches the threshold.`) || + (formula && + `When using formula mode, can only alert on formula value`) } /> @@ -273,7 +279,7 @@ export function EditAlertModal({ { value: InsightThresholdType.PERCENTAGE, label: '%', - tooltip: 'Percentage', + tooltip: 'Percent', }, { value: InsightThresholdType.ABSOLUTE, diff --git a/frontend/src/lib/components/Alerts/views/ManageAlertsModal.tsx b/frontend/src/lib/components/Alerts/views/ManageAlertsModal.tsx index 6c8b2ad4ae2ce..e46b7bb83a0fe 100644 --- a/frontend/src/lib/components/Alerts/views/ManageAlertsModal.tsx +++ b/frontend/src/lib/components/Alerts/views/ManageAlertsModal.tsx @@ -1,4 +1,3 @@ -import { IconCheck, IconX } from '@posthog/icons' import { Link } from '@posthog/lemon-ui' import { useActions, useValues } from 'kea' import { router } from 'kea-router' @@ -16,15 +15,16 @@ import { insightAlertsLogic, InsightAlertsLogicProps } from '../insightAlertsLog import { AlertType } from '../types' export function AlertStateIndicator({ alert }: { alert: AlertType }): JSX.Element { - return alert.state === AlertState.FIRING ? ( - - - - ) : ( - - - - ) + switch (alert.state) { + case AlertState.FIRING: + return FIRING + case AlertState.ERRORED: + return ERRORED + case AlertState.SNOOZED: + return SNOOZED + case AlertState.NOT_FIRING: + return NOT FIRING + } } interface AlertListItemProps { @@ -40,8 +40,8 @@ export function AlertListItem({ alert, onClick }: AlertListItemProps): JSX.Eleme
- {alert.name} + {alert.enabled ? (
diff --git a/frontend/src/lib/lemon-ui/LemonSegmentedButton/LemonSegmentedButton.tsx b/frontend/src/lib/lemon-ui/LemonSegmentedButton/LemonSegmentedButton.tsx index 1e0bfbfd2fa9d..bf6b46fbea608 100644 --- a/frontend/src/lib/lemon-ui/LemonSegmentedButton/LemonSegmentedButton.tsx +++ b/frontend/src/lib/lemon-ui/LemonSegmentedButton/LemonSegmentedButton.tsx @@ -96,6 +96,7 @@ export function LemonSegmentedButton({ }} icon={option.icon} data-attr={option['data-attr']} + tooltip={option.tooltip} center > {option.label} diff --git a/frontend/src/scenes/activity/live/liveEventsTableLogic.tsx b/frontend/src/scenes/activity/live/liveEventsTableLogic.tsx index 4e52a3d96ebf0..afcd2bbd25783 100644 --- a/frontend/src/scenes/activity/live/liveEventsTableLogic.tsx +++ b/frontend/src/scenes/activity/live/liveEventsTableLogic.tsx @@ -1,3 +1,4 @@ +import { fetchEventSource } from '@microsoft/fetch-event-source' import { lemonToast, Spinner } from '@posthog/lemon-ui' import { actions, connect, events, kea, listeners, path, props, reducers, selectors } from 'kea' import { liveEventsHostOrigin } from 'lib/utils/apiHost' @@ -119,8 +120,8 @@ export const liveEventsTableLogic = kea([ actions.updateEventsConnection() }, updateEventsConnection: async () => { - if (cache.eventsSource) { - cache.eventsSource.close() + if (cache.eventSourceController) { + cache.eventSourceController.abort() } if (values.streamPaused) { @@ -137,41 +138,40 @@ export const liveEventsTableLogic = kea([ url.searchParams.append('eventType', eventType) } - const source = new window.EventSourcePolyfill(url.toString(), { + cache.batch = [] + cache.eventSourceController = new AbortController() + + await fetchEventSource(url.toString(), { headers: { Authorization: `Bearer ${values.currentTeam.live_events_token}`, }, + signal: cache.eventSourceController.signal, + onmessage: (event) => { + lemonToast.dismiss(ERROR_TOAST_ID) + const eventData = JSON.parse(event.data) + cache.batch.push(eventData) + // If the batch is 10 or more events, or if it's been more than 300ms since the last batch + if (cache.batch.length >= 10 || performance.now() - (values.lastBatchTimestamp || 0) > 300) { + actions.addEvents(cache.batch) + cache.batch.length = 0 + } + }, + onerror: (error) => { + if (!cache.hasShownLiveStreamErrorToast && props.showLiveStreamErrorToast) { + console.error('Failed to poll events. You likely have no events coming in.', error) + lemonToast.error(`No live events found. Continuing to retry in the background…`, { + icon: , + toastId: ERROR_TOAST_ID, + autoClose: false, + }) + cache.hasShownLiveStreamErrorToast = true // Only show once + } + }, }) - - cache.batch = [] - source.onmessage = function (event: any) { - lemonToast.dismiss(ERROR_TOAST_ID) - const eventData = JSON.parse(event.data) - cache.batch.push(eventData) - // If the batch is 10 or more events, or if it's been more than 300ms since the last batch - if (cache.batch.length >= 10 || performance.now() - (values.lastBatchTimestamp || 0) > 300) { - actions.addEvents(cache.batch) - cache.batch.length = 0 - } - } - - source.onerror = function (e) { - if (!cache.hasShownLiveStreamErrorToast && props.showLiveStreamErrorToast) { - console.error('Failed to poll events. You likely have no events coming in.', e) - lemonToast.error(`No live events found. Continuing to retry in the background…`, { - icon: , - toastId: ERROR_TOAST_ID, - autoClose: false, - }) - cache.hasShownLiveStreamErrorToast = true // Only show once - } - } - - cache.eventsSource = source }, pauseStream: () => { - if (cache.eventsSource) { - cache.eventsSource.close() + if (cache.eventSourceController) { + cache.eventSourceController.abort() } }, resumeStream: () => { @@ -214,8 +214,8 @@ export const liveEventsTableLogic = kea([ }, 1500) }, beforeUnmount: () => { - if (cache.eventsSource) { - cache.eventsSource.close() + if (cache.eventSourceController) { + cache.eventSourceController.abort() } if (cache.statsInterval) { clearInterval(cache.statsInterval) diff --git a/frontend/src/scenes/data-warehouse/settings/source/dataWarehouseSourceSettingsLogic.ts b/frontend/src/scenes/data-warehouse/settings/source/dataWarehouseSourceSettingsLogic.ts index 3343eec78e0e0..eaedd3c9c06b0 100644 --- a/frontend/src/scenes/data-warehouse/settings/source/dataWarehouseSourceSettingsLogic.ts +++ b/frontend/src/scenes/data-warehouse/settings/source/dataWarehouseSourceSettingsLogic.ts @@ -4,7 +4,7 @@ import { forms } from 'kea-forms' import { loaders } from 'kea-loaders' import api from 'lib/api' import posthog from 'posthog-js' -import { SOURCE_DETAILS } from 'scenes/data-warehouse/new/sourceWizardLogic' +import { getErrorsForFields, SOURCE_DETAILS } from 'scenes/data-warehouse/new/sourceWizardLogic' import { ExternalDataJob, ExternalDataSource, ExternalDataSourceSchema } from '~/types' @@ -116,6 +116,9 @@ export const dataWarehouseSourceSettingsLogic = kea ({ sourceConfig: { defaults: {} as Record, + errors: (sourceValues) => { + return getErrorsForFields(values.sourceFieldConfig?.fields ?? [], sourceValues as any) + }, submit: async ({ payload = {} }) => { const newJobInputs = { ...values.source?.job_inputs, diff --git a/frontend/src/scenes/experiments/ExperimentView/Goal.tsx b/frontend/src/scenes/experiments/ExperimentView/Goal.tsx index 1bf11115771d1..867d82bc83a37 100644 --- a/frontend/src/scenes/experiments/ExperimentView/Goal.tsx +++ b/frontend/src/scenes/experiments/ExperimentView/Goal.tsx @@ -73,7 +73,13 @@ export function MetricDisplayOld({ filters }: { filters?: FilterType }): JSX.Ele return ( <> - {([...(filters?.events || []), ...(filters?.actions || [])] as ActionFilter[]) + {( + [ + ...(filters?.events || []), + ...(filters?.actions || []), + ...(filters?.data_warehouse || []), + ] as ActionFilter[] + ) .sort((a, b) => (a.order || 0) - (b.order || 0)) .map((event: ActionFilter, idx: number) => (
diff --git a/frontend/src/scenes/experiments/experimentLogic.tsx b/frontend/src/scenes/experiments/experimentLogic.tsx index 230c97ec654f0..647c4bdabf279 100644 --- a/frontend/src/scenes/experiments/experimentLogic.tsx +++ b/frontend/src/scenes/experiments/experimentLogic.tsx @@ -653,7 +653,8 @@ export const experimentLogic = kea([ setExperiment: async ({ experiment }) => { const experimentEntitiesChanged = (experiment.filters?.events && experiment.filters.events.length > 0) || - (experiment.filters?.actions && experiment.filters.actions.length > 0) + (experiment.filters?.actions && experiment.filters.actions.length > 0) || + (experiment.filters?.data_warehouse && experiment.filters.data_warehouse.length > 0) if (!experiment.filters || Object.keys(experiment.filters).length === 0) { return @@ -668,7 +669,9 @@ export const experimentLogic = kea([ if (name === 'filters') { const experimentEntitiesChanged = - (value?.events && value.events.length > 0) || (value?.actions && value.actions.length > 0) + (value?.events && value.events.length > 0) || + (value?.actions && value.actions.length > 0) || + (value?.data_warehouse && value.data_warehouse.length > 0) if (!value || Object.keys(value).length === 0) { return @@ -686,7 +689,8 @@ export const experimentLogic = kea([ const experimentEntitiesChanged = (experiment.filters?.events && experiment.filters.events.length > 0) || - (experiment.filters?.actions && experiment.filters.actions.length > 0) + (experiment.filters?.actions && experiment.filters.actions.length > 0) || + (experiment.filters?.data_warehouse && experiment.filters.data_warehouse.length > 0) if (!experiment.filters || Object.keys(experiment.filters).length === 0) { return @@ -700,7 +704,8 @@ export const experimentLogic = kea([ const experiment = values.experiment const experimentEntitiesChanged = (experiment.filters?.events && experiment.filters.events.length > 0) || - (experiment.filters?.actions && experiment.filters.actions.length > 0) + (experiment.filters?.actions && experiment.filters.actions.length > 0) || + (experiment.filters?.data_warehouse && experiment.filters.data_warehouse.length > 0) if (!experiment.filters || Object.keys(experiment.filters).length === 0) { return @@ -1046,7 +1051,11 @@ export const experimentLogic = kea([ if (!filters) { return undefined } - entities = [...(filters?.events || []), ...(filters?.actions || [])] as ActionFilterType[] + entities = [ + ...(filters?.events || []), + ...(filters?.actions || []), + ...(filters?.data_warehouse || []), + ] as ActionFilterType[] } // Find out if we're using count per actor math aggregates averages per user diff --git a/frontend/src/scenes/web-analytics/tiles/WebAnalyticsTile.tsx b/frontend/src/scenes/web-analytics/tiles/WebAnalyticsTile.tsx index 1820360e0ec39..44f85a38d8eea 100644 --- a/frontend/src/scenes/web-analytics/tiles/WebAnalyticsTile.tsx +++ b/frontend/src/scenes/web-analytics/tiles/WebAnalyticsTile.tsx @@ -19,6 +19,23 @@ import { DataTableNode, InsightVizNode, NodeKind, QuerySchema, WebStatsBreakdown import { QueryContext, QueryContextColumnComponent, QueryContextColumnTitleComponent } from '~/queries/types' import { ChartDisplayType, GraphPointPayload, InsightLogicProps, ProductKey, PropertyFilterType } from '~/types' +const toUtcOffsetFormat = (value: number): string => { + if (value === 0) { + return 'UTC' + } + + const integerPart = Math.floor(value) + const sign = integerPart > 0 ? '+' : '-' + + // India has half-hour offsets, and Australia has 45-minute offsets, why? + const decimalPart = value - integerPart + const decimalPartAsMinutes = decimalPart * 60 + const formattedMinutes = decimalPartAsMinutes > 0 ? `:${decimalPartAsMinutes}` : '' + + // E.g. UTC-3, UTC, UTC+5:30, UTC+11:45 + return `UTC${sign}${integerPart}${formattedMinutes}` +} + const PercentageCell: QueryContextColumnComponent = ({ value }) => { if (typeof value === 'number') { return {`${(value * 100).toFixed(1)}%`} @@ -121,6 +138,11 @@ const BreakdownValueCell: QueryContextColumnComponent = (props) => { ) } break + case WebStatsBreakdown.Timezone: + if (typeof value === 'number') { + return <>{toUtcOffsetFormat(value)} + } + break } if (typeof value === 'string') { diff --git a/package.json b/package.json index ce49021a3c85d..5338c97b374d5 100644 --- a/package.json +++ b/package.json @@ -75,6 +75,7 @@ "@lottiefiles/react-lottie-player": "^3.4.7", "@medv/finder": "^3.1.0", "@microlink/react-json-view": "^1.21.3", + "@microsoft/fetch-event-source": "^2.0.1", "@monaco-editor/react": "4.6.0", "@posthog/hogvm": "^1.0.61", "@posthog/icons": "0.9.2", @@ -159,7 +160,7 @@ "pmtiles": "^2.11.0", "postcss": "^8.4.31", "postcss-preset-env": "^9.3.0", - "posthog-js": "1.188.0", + "posthog-js": "1.189.0", "posthog-js-lite": "3.0.0", "prettier": "^2.8.8", "prop-types": "^15.7.2", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index ccdb2a5ff1f1c..8cd136eb68bb8 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -46,6 +46,9 @@ dependencies: '@microlink/react-json-view': specifier: ^1.21.3 version: 1.22.2(@types/react@17.0.52)(react-dom@18.2.0)(react@18.2.0) + '@microsoft/fetch-event-source': + specifier: ^2.0.1 + version: 2.0.1 '@monaco-editor/react': specifier: 4.6.0 version: 4.6.0(monaco-editor@0.49.0)(react-dom@18.2.0)(react@18.2.0) @@ -299,8 +302,8 @@ dependencies: specifier: ^9.3.0 version: 9.3.0(postcss@8.4.31) posthog-js: - specifier: 1.188.0 - version: 1.188.0 + specifier: 1.189.0 + version: 1.189.0 posthog-js-lite: specifier: 3.0.0 version: 3.0.0 @@ -407,7 +410,7 @@ dependencies: optionalDependencies: fsevents: specifier: ^2.3.2 - version: 2.3.2 + version: 2.3.3 devDependencies: '@babel/core': @@ -5299,6 +5302,10 @@ packages: - encoding dev: false + /@microsoft/fetch-event-source@2.0.1: + resolution: {integrity: sha512-W6CLUJ2eBMw3Rec70qrsEW0jOm/3twwJv21mrmj2yORiaVmVYGS4sSS5yUwvQc1ZlDLYGPnClVWmUUMagKNsfA==} + dev: false + /@monaco-editor/loader@1.4.0(monaco-editor@0.49.0): resolution: {integrity: sha512-00ioBig0x642hytVspPl7DbQyaSWRaolYie/UFNjoTdvoKPzo6xrXLhTk9ixgIKcLH5b5vDOjVNiGyY+uDCUlg==} peerDependencies: @@ -6594,7 +6601,7 @@ packages: '@storybook/client-logger': 7.6.20 '@storybook/core-events': 7.6.20 '@storybook/global': 5.0.0 - qs: 6.13.0 + qs: 6.13.1 telejson: 7.2.0 tiny-invariant: 1.3.3 dev: true @@ -7108,7 +7115,7 @@ packages: dequal: 2.0.3 lodash: 4.17.21 memoizerific: 1.11.3 - qs: 6.13.0 + qs: 6.13.1 synchronous-promise: 2.0.17 ts-dedent: 2.2.0 util-deprecate: 1.0.2 @@ -7268,7 +7275,7 @@ packages: dependencies: '@storybook/client-logger': 7.6.20 memoizerific: 1.11.3 - qs: 6.13.0 + qs: 6.13.1 dev: true /@storybook/router@7.6.4: @@ -13168,6 +13175,7 @@ packages: engines: {node: ^8.16.0 || ^10.6.0 || >=11.0.0} os: [darwin] requiresBuild: true + dev: true optional: true /fsevents@2.3.3: @@ -17814,8 +17822,8 @@ packages: resolution: {integrity: sha512-dyajjnfzZD1tht4N7p7iwf7nBnR1MjVaVu+MKr+7gBgA39bn28wizCIJZztZPtHy4PY0YwtSGgwfBCuG/hnHgA==} dev: false - /posthog-js@1.188.0: - resolution: {integrity: sha512-FdNCZcgM5sjADxES7VWbRntD39V2fvHunZry6Rrsp8VDG20TcAWc+koAuCMfEoU5jKxm/Ua37QnI9Xqfwg2fow==} + /posthog-js@1.189.0: + resolution: {integrity: sha512-nhx36zxxIY9PsvMqSnMDohweoUVU9c3TG9nXZb2tjFyKDbyb5N7TrLmgTkVMCxSxhdKkbnIZC4peNjn0/s8Z1g==} dependencies: core-js: 3.39.0 fflate: 0.4.8 @@ -18193,8 +18201,8 @@ packages: side-channel: 1.0.6 dev: true - /qs@6.13.0: - resolution: {integrity: sha512-+38qI9SOr8tfZ4QmJNplMUxqjbe7LKvvZgWdExBOmd+egZTtjLB67Gu0HRX3u/XOq7UU2Nx6nsjvS16Z9uwfpg==} + /qs@6.13.1: + resolution: {integrity: sha512-EJPeIn0CYrGu+hli1xilKAPXODtJ12T0sP63Ijx2/khC2JtuaN3JyNIpvmnkmaEtha9ocbG4A4cMcr+TvqvwQg==} engines: {node: '>=0.6'} dependencies: side-channel: 1.0.6 diff --git a/posthog/hogql/functions/mapping.py b/posthog/hogql/functions/mapping.py index 3a8615d6cf100..9eb0980b9d933 100644 --- a/posthog/hogql/functions/mapping.py +++ b/posthog/hogql/functions/mapping.py @@ -421,13 +421,15 @@ def compare_types(arg_types: list[ConstantType], sig_arg_types: tuple[ConstantTy "toString": HogQLFunctionMeta( "toString", 1, - 1, + 2, signatures=[ ((IntegerType(),), StringType()), ((StringType(),), StringType()), ((FloatType(),), StringType()), ((DateType(),), StringType()), + ((DateType(), StringType()), StringType()), ((DateTimeType(),), StringType()), + ((DateTimeType(), StringType()), StringType()), ], ), "toJSONString": HogQLFunctionMeta("toJSONString", 1, 1), diff --git a/posthog/hogql/printer.py b/posthog/hogql/printer.py index da81bdd32d37f..dee9988d97c0c 100644 --- a/posthog/hogql/printer.py +++ b/posthog/hogql/printer.py @@ -23,7 +23,7 @@ ) from posthog.hogql.context import HogQLContext from posthog.hogql.database.models import Table, FunctionCallTable, SavedQuery -from posthog.hogql.database.database import create_hogql_database +from posthog.hogql.database.database import Database, create_hogql_database from posthog.hogql.database.s3_table import S3Table from posthog.hogql.errors import ImpossibleASTError, InternalHogQLError, QueryError, ResolutionError from posthog.hogql.escape_sql import ( @@ -66,13 +66,18 @@ def team_id_guard_for_table(table_type: Union[ast.TableType, ast.TableAliasType] ) -def to_printed_hogql(query: ast.Expr, team: Team, modifiers: Optional[HogQLQueryModifiers] = None) -> str: +def to_printed_hogql( + query: ast.Expr, team: Team, modifiers: Optional[HogQLQueryModifiers] = None, database: Optional["Database"] = None +) -> str: """Prints the HogQL query without mutating the node""" return print_ast( clone_expr(query), dialect="hogql", context=HogQLContext( - team_id=team.pk, enable_select_queries=True, modifiers=create_default_modifiers_for_team(team, modifiers) + team_id=team.pk, + enable_select_queries=True, + modifiers=create_default_modifiers_for_team(team, modifiers), + database=database, ), pretty=True, ) diff --git a/posthog/hogql_queries/experiments/experiment_trends_query_runner.py b/posthog/hogql_queries/experiments/experiment_trends_query_runner.py index 895142a936da0..5f5a93a84cbdb 100644 --- a/posthog/hogql_queries/experiments/experiment_trends_query_runner.py +++ b/posthog/hogql_queries/experiments/experiment_trends_query_runner.py @@ -3,6 +3,9 @@ from django.conf import settings from posthog.constants import ExperimentNoResultsErrorKeys from posthog.hogql import ast +from posthog.hogql.context import HogQLContext +from posthog.hogql.database.database import create_hogql_database +from posthog.hogql.database.models import LazyJoin from posthog.hogql_queries.experiments import CONTROL_VARIANT_KEY from posthog.hogql_queries.experiments.trends_statistics import ( are_results_significant, @@ -19,6 +22,8 @@ BreakdownFilter, CachedExperimentTrendsQueryResponse, ChartDisplayType, + DataWarehouseNode, + DataWarehousePropertyFilter, EventPropertyFilter, EventsNode, ExperimentSignificanceCode, @@ -27,11 +32,12 @@ ExperimentVariantTrendsBaseStats, InsightDateRange, PropertyMathType, + PropertyOperator, TrendsFilter, TrendsQuery, TrendsQueryResponse, ) -from typing import Any, Optional +from typing import Any, Optional, cast import threading @@ -89,12 +95,18 @@ def _get_insight_date_range(self) -> InsightDateRange: explicitDate=True, ) - def _get_breakdown_filter(self) -> BreakdownFilter: + def _get_event_breakdown_filter(self) -> BreakdownFilter: return BreakdownFilter( breakdown=self.breakdown_key, breakdown_type="event", ) + def _get_data_warehouse_breakdown_filter(self) -> BreakdownFilter: + return BreakdownFilter( + breakdown=f"events.properties.{self.breakdown_key}", + breakdown_type="data_warehouse", + ) + def _prepare_count_query(self) -> TrendsQuery: """ This method takes the raw trend query and adapts it @@ -118,15 +130,32 @@ def _prepare_count_query(self) -> TrendsQuery: prepared_count_query.trendsFilter = TrendsFilter(display=ChartDisplayType.ACTIONS_LINE_GRAPH_CUMULATIVE) prepared_count_query.dateRange = self._get_insight_date_range() - prepared_count_query.breakdownFilter = self._get_breakdown_filter() - prepared_count_query.properties = [ - EventPropertyFilter( - key=self.breakdown_key, - value=self.variants, - operator="exact", - type="event", - ) - ] + if self._is_data_warehouse_query(prepared_count_query): + prepared_count_query.breakdownFilter = self._get_data_warehouse_breakdown_filter() + prepared_count_query.properties = [ + DataWarehousePropertyFilter( + key="events.event", + value="$feature_flag_called", + operator=PropertyOperator.EXACT, + type="data_warehouse", + ), + DataWarehousePropertyFilter( + key=f"events.properties.{self.breakdown_key}", + value=self.variants, + operator=PropertyOperator.EXACT, + type="data_warehouse", + ), + ] + else: + prepared_count_query.breakdownFilter = self._get_event_breakdown_filter() + prepared_count_query.properties = [ + EventPropertyFilter( + key=self.breakdown_key, + value=self.variants, + operator=PropertyOperator.EXACT, + type="event", + ) + ] return prepared_count_query @@ -152,7 +181,7 @@ def _prepare_exposure_query(self) -> TrendsQuery: if hasattr(count_event, "event"): prepared_exposure_query.dateRange = self._get_insight_date_range() - prepared_exposure_query.breakdownFilter = self._get_breakdown_filter() + prepared_exposure_query.breakdownFilter = self._get_event_breakdown_filter() prepared_exposure_query.trendsFilter = TrendsFilter( display=ChartDisplayType.ACTIONS_LINE_GRAPH_CUMULATIVE ) @@ -166,7 +195,7 @@ def _prepare_exposure_query(self) -> TrendsQuery: EventPropertyFilter( key=self.breakdown_key, value=self.variants, - operator="exact", + operator=PropertyOperator.EXACT, type="event", ) ] @@ -177,13 +206,13 @@ def _prepare_exposure_query(self) -> TrendsQuery: elif self.query.exposure_query: prepared_exposure_query = TrendsQuery(**self.query.exposure_query.model_dump()) prepared_exposure_query.dateRange = self._get_insight_date_range() - prepared_exposure_query.breakdownFilter = self._get_breakdown_filter() prepared_exposure_query.trendsFilter = TrendsFilter(display=ChartDisplayType.ACTIONS_LINE_GRAPH_CUMULATIVE) + prepared_exposure_query.breakdownFilter = self._get_event_breakdown_filter() prepared_exposure_query.properties = [ EventPropertyFilter( key=self.breakdown_key, value=self.variants, - operator="exact", + operator=PropertyOperator.EXACT, type="event", ) ] @@ -206,13 +235,13 @@ def _prepare_exposure_query(self) -> TrendsQuery: EventPropertyFilter( key="$feature_flag_response", value=self.variants, - operator="exact", + operator=PropertyOperator.EXACT, type="event", ), EventPropertyFilter( key="$feature_flag", value=[self.feature_flag.key], - operator="exact", + operator=PropertyOperator.EXACT, type="event", ), ], @@ -226,7 +255,86 @@ def calculate(self) -> ExperimentTrendsQueryResponse: def run(query_runner: TrendsQueryRunner, result_key: str, is_parallel: bool): try: - result = query_runner.calculate() + # Create a new database instance where we can attach our + # custom join to the events table. It will be passed through + # and used by the query runner. + database = create_hogql_database(team_id=self.team.pk) + if self._is_data_warehouse_query(query_runner.query): + series_node = cast(DataWarehouseNode, query_runner.query.series[0]) + table = database.get_table(series_node.table_name) + table.fields["events"] = LazyJoin( + from_field=[series_node.distinct_id_field], + join_table=database.get_table("events"), + join_function=lambda join_to_add, context, node: ( + ast.JoinExpr( + table=ast.SelectQuery( + select=[ + ast.Alias(alias=name, expr=ast.Field(chain=["events", *chain])) + for name, chain in { + **join_to_add.fields_accessed, + "timestamp": ["timestamp"], + "distinct_id": ["distinct_id"], + "properties": ["properties"], + }.items() + ], + select_from=ast.JoinExpr(table=ast.Field(chain=["events"])), + ), + # ASOF JOIN finds the most recent matching event that occurred at or before each data warehouse timestamp. + # + # Why this matters: + # When a user performs an action (recorded in data warehouse), we want to know which + # experiment variant they were assigned at that moment. The most recent $feature_flag_called + # event before their action represents their active variant assignment. + # + # Example: + # Data Warehouse: timestamp=2024-01-03 12:00, distinct_id=user1 + # Events: + # 2024-01-02: (user1, variant='control') <- This event will be joined + # 2024-01-03: (user1, variant='test') <- Ignored (occurs after data warehouse timestamp) + # + # This ensures we capture the correct causal relationship: which experiment variant + # was the user assigned to when they performed the action? + join_type="ASOF LEFT JOIN", + alias=join_to_add.to_table, + constraint=ast.JoinConstraint( + expr=ast.And( + exprs=[ + ast.CompareOperation( + left=ast.Field(chain=[join_to_add.to_table, "event"]), + op=ast.CompareOperationOp.Eq, + right=ast.Constant(value="$feature_flag_called"), + ), + ast.CompareOperation( + left=ast.Field( + chain=[ + join_to_add.from_table, + series_node.distinct_id_field, + ] + ), + op=ast.CompareOperationOp.Eq, + right=ast.Field(chain=[join_to_add.to_table, "distinct_id"]), + ), + ast.CompareOperation( + left=ast.Field( + chain=[ + join_to_add.from_table, + series_node.timestamp_field, + ] + ), + op=ast.CompareOperationOp.GtEq, + right=ast.Field(chain=[join_to_add.to_table, "timestamp"]), + ), + ] + ), + constraint_type="ON", + ), + ) + ), + ) + + context = HogQLContext(team_id=self.team.pk, database=database) + + result = query_runner.calculate(context=context) shared_results[result_key] = result except Exception as e: errors.append(e) @@ -362,5 +470,8 @@ def _validate_event_variants(self, count_result: TrendsQueryResponse): if has_errors: raise ValidationError(detail=json.dumps(errors)) + def _is_data_warehouse_query(self, query: TrendsQuery) -> bool: + return isinstance(query.series[0], DataWarehouseNode) + def to_query(self) -> ast.SelectQuery: raise ValueError(f"Cannot convert source query of type {self.query.count_query.kind} to query") diff --git a/posthog/hogql_queries/experiments/test/test_experiment_trends_query_runner.py b/posthog/hogql_queries/experiments/test/test_experiment_trends_query_runner.py index 16e0dde1b1f36..5645566a954aa 100644 --- a/posthog/hogql_queries/experiments/test/test_experiment_trends_query_runner.py +++ b/posthog/hogql_queries/experiments/test/test_experiment_trends_query_runner.py @@ -1,27 +1,58 @@ from django.test import override_settings +from posthog.hogql.errors import QueryError from posthog.hogql_queries.experiments.experiment_trends_query_runner import ExperimentTrendsQueryRunner from posthog.models.experiment import Experiment, ExperimentHoldout from posthog.models.feature_flag.feature_flag import FeatureFlag from posthog.schema import ( + DataWarehouseNode, EventsNode, ExperimentSignificanceCode, ExperimentTrendsQuery, ExperimentTrendsQueryResponse, TrendsQuery, ) +from posthog.settings import ( + OBJECT_STORAGE_ACCESS_KEY_ID, + OBJECT_STORAGE_BUCKET, + OBJECT_STORAGE_ENDPOINT, + OBJECT_STORAGE_SECRET_ACCESS_KEY, + XDIST_SUFFIX, +) from posthog.test.base import APIBaseTest, ClickhouseTestMixin, _create_event, flush_persons_and_events from freezegun import freeze_time from typing import cast from django.utils import timezone -from datetime import timedelta +from datetime import datetime, timedelta from posthog.test.test_journeys import journeys_for from rest_framework.exceptions import ValidationError from posthog.constants import ExperimentNoResultsErrorKeys +import s3fs +from pyarrow import parquet as pq +import pyarrow as pa import json +from boto3 import resource +from botocore.config import Config +from posthog.warehouse.models.credential import DataWarehouseCredential +from posthog.warehouse.models.table import DataWarehouseTable + +TEST_BUCKET = "test_storage_bucket-posthog.hogql.datawarehouse.trendquery" + XDIST_SUFFIX + @override_settings(IN_UNIT_TESTING=True) class TestExperimentTrendsQueryRunner(ClickhouseTestMixin, APIBaseTest): + def teardown_method(self, method) -> None: + s3 = resource( + "s3", + endpoint_url=OBJECT_STORAGE_ENDPOINT, + aws_access_key_id=OBJECT_STORAGE_ACCESS_KEY_ID, + aws_secret_access_key=OBJECT_STORAGE_SECRET_ACCESS_KEY, + config=Config(signature_version="s3v4"), + region_name="us-east-1", + ) + bucket = s3.Bucket(OBJECT_STORAGE_BUCKET) + bucket.objects.filter(Prefix=TEST_BUCKET).delete() + def create_feature_flag(self, key="test-experiment"): return FeatureFlag.objects.create( name=f"Test experiment flag: {key}", @@ -47,15 +78,25 @@ def create_feature_flag(self, key="test-experiment"): created_by=self.user, ) - def create_experiment(self, name="test-experiment", feature_flag=None): + def create_experiment( + self, + name="test-experiment", + feature_flag=None, + start_date=None, + end_date=None, + ): if feature_flag is None: feature_flag = self.create_feature_flag(name) + if start_date is None: + start_date = timezone.now() + if end_date is None: + end_date = timezone.now() + timedelta(days=14) return Experiment.objects.create( name=name, team=self.team, feature_flag=feature_flag, - start_date=timezone.now(), - end_date=timezone.now() + timedelta(days=14), + start_date=start_date, + end_date=end_date, ) def create_holdout_for_experiment(self, experiment: Experiment): @@ -69,6 +110,67 @@ def create_holdout_for_experiment(self, experiment: Experiment): experiment.save() return holdout + def create_data_warehouse_table_with_payments(self): + if not OBJECT_STORAGE_ACCESS_KEY_ID or not OBJECT_STORAGE_SECRET_ACCESS_KEY: + raise Exception("Missing vars") + + fs = s3fs.S3FileSystem( + client_kwargs={ + "region_name": "us-east-1", + "endpoint_url": OBJECT_STORAGE_ENDPOINT, + "aws_access_key_id": OBJECT_STORAGE_ACCESS_KEY_ID, + "aws_secret_access_key": OBJECT_STORAGE_SECRET_ACCESS_KEY, + }, + ) + + path_to_s3_object = "s3://" + OBJECT_STORAGE_BUCKET + f"/{TEST_BUCKET}" + + id = pa.array(["1", "2", "3", "4", "5"]) + timestamp = pa.array( + [ + datetime(2023, 1, 1), + datetime(2023, 1, 2), + datetime(2023, 1, 3), + datetime(2023, 1, 6), + datetime(2023, 1, 7), + ] + ) + distinct_id = pa.array(["user_control_0", "user_test_1", "user_test_2", "user_test_3", "user_extra"]) + amount = pa.array([100, 50, 75, 80, 90]) + names = ["id", "timestamp", "distinct_id", "amount"] + + pq.write_to_dataset( + pa.Table.from_arrays([id, timestamp, distinct_id, amount], names=names), + path_to_s3_object, + filesystem=fs, + use_dictionary=True, + compression="snappy", + version="2.0", + ) + + table_name = "payments" + + credential = DataWarehouseCredential.objects.create( + access_key=OBJECT_STORAGE_ACCESS_KEY_ID, + access_secret=OBJECT_STORAGE_SECRET_ACCESS_KEY, + team=self.team, + ) + + DataWarehouseTable.objects.create( + name=table_name, + url_pattern=f"http://host.docker.internal:19000/{OBJECT_STORAGE_BUCKET}/{TEST_BUCKET}/*.parquet", + format=DataWarehouseTable.TableFormat.Parquet, + team=self.team, + columns={ + "id": "String", + "timestamp": "DateTime64(3, 'UTC')", + "distinct_id": "String", + "amount": "Int64", + }, + credential=credential, + ) + return table_name + @freeze_time("2020-01-01T12:00:00Z") def test_query_runner(self): feature_flag = self.create_feature_flag() @@ -376,6 +478,143 @@ def test_query_runner_with_holdout(self): self.assertEqual(test_result.absolute_exposure, 9) self.assertEqual(holdout_result.absolute_exposure, 4) + def test_query_runner_with_data_warehouse_series(self): + table_name = self.create_data_warehouse_table_with_payments() + + feature_flag = self.create_feature_flag() + experiment = self.create_experiment( + feature_flag=feature_flag, + start_date=datetime(2023, 1, 1), + end_date=datetime(2023, 1, 10), + ) + + feature_flag_property = f"$feature/{feature_flag.key}" + + count_query = TrendsQuery( + series=[ + DataWarehouseNode( + id=table_name, + distinct_id_field="distinct_id", + id_field="distinct_id", + table_name=table_name, + timestamp_field="timestamp", + ) + ] + ) + exposure_query = TrendsQuery(series=[EventsNode(event="$feature_flag_called")]) + + experiment_query = ExperimentTrendsQuery( + experiment_id=experiment.id, + kind="ExperimentTrendsQuery", + count_query=count_query, + exposure_query=exposure_query, + ) + + experiment.metrics = [{"type": "primary", "query": experiment_query.model_dump()}] + experiment.save() + + # Populate exposure events + for variant, count in [("control", 7), ("test", 9)]: + for i in range(count): + _create_event( + team=self.team, + event="$feature_flag_called", + distinct_id=f"user_{variant}_{i}", + properties={feature_flag_property: variant}, + timestamp=datetime(2023, 1, i + 1), + ) + + # "user_test_3" first exposure (feature_flag_property="control") is on 2023-01-03 + # "user_test_3" relevant exposure (feature_flag_property="test") is on 2023-01-04 + # "user_test_3" other event (feature_flag_property="control" is on 2023-01-05 + # "user_test_3" purchase is on 2023-01-06 + # "user_test_3" second exposure (feature_flag_property="control") is on 2023-01-09 + # "user_test_3" should fall into the "test" variant, not the "control" variant + _create_event( + team=self.team, + event="$feature_flag_called", + distinct_id="user_test_3", + properties={feature_flag_property: "control"}, + timestamp=datetime(2023, 1, 3), + ) + _create_event( + team=self.team, + event="Some other event", + distinct_id="user_test_3", + properties={feature_flag_property: "control"}, + timestamp=datetime(2023, 1, 5), + ) + _create_event( + team=self.team, + event="$feature_flag_called", + distinct_id="user_test_3", + properties={feature_flag_property: "control"}, + timestamp=datetime(2023, 1, 9), + ) + + flush_persons_and_events() + + query_runner = ExperimentTrendsQueryRunner( + query=ExperimentTrendsQuery(**experiment.metrics[0]["query"]), team=self.team + ) + with freeze_time("2023-01-07"): + result = query_runner.calculate() + + trend_result = cast(ExperimentTrendsQueryResponse, result) + + self.assertEqual(len(result.variants), 2) + + control_result = next(variant for variant in trend_result.variants if variant.key == "control") + test_result = next(variant for variant in trend_result.variants if variant.key == "test") + + self.assertEqual(control_result.count, 1) + self.assertEqual(test_result.count, 3) + self.assertEqual(control_result.absolute_exposure, 9) + self.assertEqual(test_result.absolute_exposure, 9) + + def test_query_runner_with_invalid_data_warehouse_table_name(self): + # parquet file isn't created, so we'll get an error + table_name = "invalid_table_name" + + feature_flag = self.create_feature_flag() + experiment = self.create_experiment( + feature_flag=feature_flag, + start_date=datetime(2023, 1, 1), + end_date=datetime(2023, 1, 10), + ) + + count_query = TrendsQuery( + series=[ + DataWarehouseNode( + id=table_name, + distinct_id_field="distinct_id", + id_field="distinct_id", + table_name=table_name, + timestamp_field="timestamp", + ) + ] + ) + exposure_query = TrendsQuery(series=[EventsNode(event="$feature_flag_called")]) + + experiment_query = ExperimentTrendsQuery( + experiment_id=experiment.id, + kind="ExperimentTrendsQuery", + count_query=count_query, + exposure_query=exposure_query, + ) + + experiment.metrics = [{"type": "primary", "query": experiment_query.model_dump()}] + experiment.save() + + query_runner = ExperimentTrendsQueryRunner( + query=ExperimentTrendsQuery(**experiment.metrics[0]["query"]), team=self.team + ) + with freeze_time("2023-01-07"): + with self.assertRaises(QueryError) as context: + query_runner.calculate() + + self.assertEqual(str(context.exception), 'Unknown table "invalid_table_name".') + @freeze_time("2020-01-01T12:00:00Z") def test_query_runner_with_avg_math(self): feature_flag = self.create_feature_flag() diff --git a/posthog/hogql_queries/insights/trends/trends_query_runner.py b/posthog/hogql_queries/insights/trends/trends_query_runner.py index 668cd8b2afb48..3a3dabc69641a 100644 --- a/posthog/hogql_queries/insights/trends/trends_query_runner.py +++ b/posthog/hogql_queries/insights/trends/trends_query_runner.py @@ -17,6 +17,7 @@ from posthog.clickhouse import query_tagging from posthog.hogql import ast from posthog.hogql.constants import MAX_SELECT_RETURNED_ROWS, LimitContext +from posthog.hogql.context import HogQLContext from posthog.hogql.printer import to_printed_hogql from posthog.hogql.query import execute_hogql_query from posthog.hogql.timings import HogQLTimings @@ -291,7 +292,7 @@ def to_actors_query_options(self) -> InsightActorsQueryOptionsResponse: compare=res_compare, ) - def calculate(self): + def calculate(self, context: Optional[HogQLContext] = None): queries = self.to_queries() if len(queries) == 0: @@ -303,7 +304,8 @@ def calculate(self): response_hogql_query = ast.SelectSetQuery.create_from_queries(queries, "UNION ALL") with self.timings.measure("printing_hogql_for_response"): - response_hogql = to_printed_hogql(response_hogql_query, self.team, self.modifiers) + database = context.database if context else None + response_hogql = to_printed_hogql(response_hogql_query, self.team, self.modifiers, database) res_matrix: list[list[Any] | Any | None] = [None] * len(queries) timings_matrix: list[list[QueryTiming] | None] = [None] * (2 + len(queries)) @@ -330,6 +332,7 @@ def run( timings=timings, modifiers=self.modifiers, limit_context=self.limit_context, + context=context, ) timings_matrix[index + 1] = response.timings diff --git a/posthog/hogql_queries/web_analytics/stats_table.py b/posthog/hogql_queries/web_analytics/stats_table.py index a5c2b6640bf94..fef2c71af7fdb 100644 --- a/posthog/hogql_queries/web_analytics/stats_table.py +++ b/posthog/hogql_queries/web_analytics/stats_table.py @@ -77,6 +77,7 @@ def to_main_query(self) -> ast.SelectQuery: ) GROUP BY "context.columns.breakdown_value" ORDER BY "context.columns.visitors" DESC, +"context.columns.views" DESC, "context.columns.breakdown_value" ASC """, timings=self.timings, @@ -118,6 +119,7 @@ def _to_main_query_with_session_properties(self) -> ast.SelectQuery: ) GROUP BY "context.columns.breakdown_value" ORDER BY "context.columns.visitors" DESC, +"context.columns.views" DESC, "context.columns.breakdown_value" ASC """, timings=self.timings, @@ -162,6 +164,7 @@ def to_entry_bounce_query(self) -> ast.SelectQuery: ) GROUP BY "context.columns.breakdown_value" ORDER BY "context.columns.visitors" DESC, +"context.columns.views" DESC, "context.columns.breakdown_value" ASC """, timings=self.timings, @@ -269,6 +272,7 @@ def to_path_scroll_bounce_query(self) -> ast.SelectQuery: ) AS scroll ON counts.breakdown_value = scroll.breakdown_value ORDER BY "context.columns.visitors" DESC, +"context.columns.views" DESC, "context.columns.breakdown_value" ASC """, timings=self.timings, @@ -346,6 +350,7 @@ def to_path_bounce_query(self) -> ast.SelectQuery: ) as bounce ON counts.breakdown_value = bounce.breakdown_value ORDER BY "context.columns.visitors" DESC, +"context.columns.views" DESC, "context.columns.breakdown_value" ASC """, timings=self.timings, @@ -500,9 +505,11 @@ def _counts_breakdown_value(self): case WebStatsBreakdown.CITY: return parse_expr("tuple(properties.$geoip_country_code, properties.$geoip_city_name)") case WebStatsBreakdown.TIMEZONE: - # Timezone offsets would be slightly more useful, but that's not easily achievable - # with Clickhouse, we might attempt to change this in the future - return ast.Field(chain=["properties", "$timezone"]) + # Get the difference between the UNIX timestamp at UTC and the UNIX timestamp at the event's timezone + # Value is in milliseconds, turn it to hours, works even for fractional timezone offsets (I'm looking at you, Australia) + return parse_expr( + "if(or(isNull(properties.$timezone), empty(properties.$timezone)), NULL, (toUnixTimestamp64Milli(parseDateTimeBestEffort(assumeNotNull(toString(timestamp, properties.$timezone)))) - toUnixTimestamp64Milli(timestamp)) / 3600000)" + ) case _: raise NotImplementedError("Breakdown not implemented") diff --git a/posthog/hogql_queries/web_analytics/test/test_web_stats_table.py b/posthog/hogql_queries/web_analytics/test/test_web_stats_table.py index a58b8353daa1a..616d406978ca1 100644 --- a/posthog/hogql_queries/web_analytics/test/test_web_stats_table.py +++ b/posthog/hogql_queries/web_analytics/test/test_web_stats_table.py @@ -1006,50 +1006,142 @@ def test_cohort_test_filters(self): assert results == [["/path1", 1, 1]] def test_timezone_filter(self): - d1, s1 = "d1", str(uuid7("2024-07-30")) - d2, s2 = "d2", str(uuid7("2024-07-30")) + date = "2024-07-30" + + for idx, (distinct_id, session_id) in enumerate( + [ + ("UTC", str(uuid7(date))), + ("Asia/Calcutta", str(uuid7(date))), + ("America/New_York", str(uuid7(date))), + ("America/Sao_Paulo", str(uuid7(date))), + ] + ): + _create_person( + team_id=self.team.pk, + distinct_ids=[distinct_id], + properties={"name": session_id, "email": f"{distinct_id}@example.com"}, + ) + + for i in range(idx + 1): + _create_event( + team=self.team, + event="$pageview", + distinct_id=distinct_id, + timestamp=date, + properties={"$session_id": session_id, "$pathname": f"/path{i}", "$timezone": distinct_id}, + ) + + results = self._run_web_stats_table_query( + "all", + None, + breakdown_by=WebStatsBreakdown.TIMEZONE, + ).results + + # Brasilia UTC-3, New York UTC-4, Calcutta UTC+5:30, UTC + assert results == [[-3.0, 1.0, 4.0], [-4.0, 1.0, 3.0], [5.5, 1.0, 2.0], [0.0, 1.0, 1.0]] + + def test_timezone_filter_dst_change(self): + did = "id" + sid = str(uuid7("2019-02-17")) _create_person( team_id=self.team.pk, - distinct_ids=[d1], - properties={"name": d1, "email": "test@example.com"}, + distinct_ids=[did], + properties={"name": sid, "email": f"test@example.com"}, + ) + + # Cross daylight savings time change in Brazil + for i in range(6): + _create_event( + team=self.team, + event="$pageview", + distinct_id=did, + timestamp=f"2019-02-17 0{i}:00:00", + properties={"$session_id": sid, "$pathname": f"/path1", "$timezone": "America/Sao_Paulo"}, + ) + + results = self._run_web_stats_table_query( + "all", + None, + breakdown_by=WebStatsBreakdown.TIMEZONE, + ).results + + # Change from UTC-2 to UTC-3 in the middle of the night + assert results == [[-3.0, 1.0, 4.0], [-2.0, 1.0, 2.0]] + + def test_timezone_filter_with_invalid_timezone(self): + date = "2024-07-30" + + for idx, (distinct_id, session_id) in enumerate( + [ + ("UTC", str(uuid7(date))), + ("Timezone_not_exists", str(uuid7(date))), + ] + ): + _create_person( + team_id=self.team.pk, + distinct_ids=[distinct_id], + properties={"name": session_id, "email": f"{distinct_id}@example.com"}, + ) + + for i in range(idx + 1): + _create_event( + team=self.team, + event="$pageview", + distinct_id=distinct_id, + timestamp=date, + properties={"$session_id": session_id, "$pathname": f"/path{i}", "$timezone": distinct_id}, + ) + + with self.assertRaisesRegex(Exception, "Cannot load time zone"): + self._run_web_stats_table_query( + "all", + None, + breakdown_by=WebStatsBreakdown.TIMEZONE, + ) + + def test_timezone_filter_with_empty_timezone(self): + did = "id" + sid = str(uuid7("2019-02-17")) + + _create_person( + team_id=self.team.pk, + distinct_ids=[did], + properties={"name": sid, "email": f"test@example.com"}, ) + # Key not exists _create_event( team=self.team, event="$pageview", - distinct_id=d1, - timestamp="2024-07-30", - properties={"$session_id": s1, "$pathname": "/path1", "$timezone": "America/New_York"}, + distinct_id=did, + timestamp=f"2019-02-17 00:00:00", + properties={"$session_id": sid, "$pathname": f"/path1"}, ) - _create_person( - team_id=self.team.pk, - distinct_ids=[d2], - properties={"name": d2, "email": "d2@hedgebox.net"}, - ) + # Key exists, it's null _create_event( team=self.team, event="$pageview", - distinct_id=d2, - timestamp="2024-07-30", - properties={"$session_id": s2, "$pathname": "/path2", "$timezone": "America/Brasilia"}, + distinct_id=did, + timestamp=f"2019-02-17 00:00:00", + properties={"$session_id": sid, "$pathname": f"/path1", "$timezone": None}, ) + + # Key exists, it's empty string _create_event( team=self.team, event="$pageview", - distinct_id=d2, - timestamp="2024-07-30", - properties={"$session_id": s2, "$pathname": "/path3", "$timezone": "America/Brasilia"}, + distinct_id=did, + timestamp=f"2019-02-17 00:00:00", + properties={"$session_id": sid, "$pathname": f"/path1", "$timezone": ""}, ) - flush_persons_and_events() - results = self._run_web_stats_table_query( "all", None, breakdown_by=WebStatsBreakdown.TIMEZONE, - filter_test_accounts=True, ).results - assert results == [["America/Brasilia", 1.0, 2.0], ["America/New_York", 1.0, 1.0]] + # Don't crash, treat all of them null + assert results == [] diff --git a/posthog/migrations/0522_datawarehouse_salesforce_opportunity.py b/posthog/migrations/0522_datawarehouse_salesforce_opportunity.py new file mode 100644 index 0000000000000..d03baeda884bb --- /dev/null +++ b/posthog/migrations/0522_datawarehouse_salesforce_opportunity.py @@ -0,0 +1,34 @@ +# Generated by Django 4.2.15 on 2024-11-25 02:41 +from django.db import migrations, connection + + +def insert_salesforce_opportunity_schemas(apps, schema_editor): + with connection.cursor() as cursor: + cursor.execute("SELECT id, team_id FROM posthog_externaldatasource where source_type = 'Salesforce'") + salesforce_sources = cursor.fetchall() + + ExternalDataSchema = apps.get_model("posthog", "ExternalDataSchema") + for source in salesforce_sources: + schema = ExternalDataSchema.objects.create( + name="Opportunity", + source_id=source[0], + team_id=source[1], + should_sync=False, + sync_type=None, + sync_type_config={}, + ) + schema.save() + + +def reverse(apps, _): + pass + + +class Migration(migrations.Migration): + dependencies = [ + ("posthog", "0521_alter_errortrackingstackframe_context"), + ] + + operations = [ + migrations.RunPython(insert_salesforce_opportunity_schemas, reverse), + ] diff --git a/posthog/migrations/max_migration.txt b/posthog/migrations/max_migration.txt index 8d2eb0a249399..334dacbdf2b21 100644 --- a/posthog/migrations/max_migration.txt +++ b/posthog/migrations/max_migration.txt @@ -1 +1 @@ -0521_alter_errortrackingstackframe_context +0522_datawarehouse_salesforce_opportunity diff --git a/posthog/templates/head.html b/posthog/templates/head.html index 472d901fb218c..454a4c68d3053 100644 --- a/posthog/templates/head.html +++ b/posthog/templates/head.html @@ -16,8 +16,6 @@ - - {% if not opt_out_capture and js_posthog_api_key %}