diff --git a/.github/workflows/ci-backend-update-test-timing.yml b/.github/workflows/ci-backend-update-test-timing.yml index 22c0b6afca6ca..e04a8eff18d00 100644 --- a/.github/workflows/ci-backend-update-test-timing.yml +++ b/.github/workflows/ci-backend-update-test-timing.yml @@ -11,6 +11,10 @@ env: CLICKHOUSE_SECURE: 'False' CLICKHOUSE_VERIFY: 'False' TEST: 1 + OBJECT_STORAGE_ENABLED: 'True' + OBJECT_STORAGE_ENDPOINT: 'http://localhost:19000' + OBJECT_STORAGE_ACCESS_KEY_ID: 'object_storage_root_user' + OBJECT_STORAGE_SECRET_ACCESS_KEY: 'object_storage_root_password' jobs: django: diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml index a3900fbbc9ba6..36cd465263a10 100644 --- a/docker-compose.dev.yml +++ b/docker-compose.dev.yml @@ -48,6 +48,8 @@ services: - ./docker/clickhouse/docker-entrypoint-initdb.d:/docker-entrypoint-initdb.d - ./docker/clickhouse/config.xml:/etc/clickhouse-server/config.xml - ./docker/clickhouse/users-dev.xml:/etc/clickhouse-server/users.xml + extra_hosts: + - 'host.docker.internal:host-gateway' zookeeper: extends: diff --git a/frontend/src/lib/components/PropertyFilters/utils.ts b/frontend/src/lib/components/PropertyFilters/utils.ts index b73fe1e85ef17..dc65355ad187f 100644 --- a/frontend/src/lib/components/PropertyFilters/utils.ts +++ b/frontend/src/lib/components/PropertyFilters/utils.ts @@ -99,6 +99,7 @@ export const PROPERTY_FILTER_TYPE_TO_TAXONOMIC_FILTER_GROUP_TYPE: Omit< [PropertyFilterType.Session]: TaxonomicFilterGroupType.Sessions, [PropertyFilterType.HogQL]: TaxonomicFilterGroupType.HogQLExpression, [PropertyFilterType.Group]: TaxonomicFilterGroupType.GroupsPrefix, + [PropertyFilterType.DataWarehouse]: TaxonomicFilterGroupType.DataWarehouse, } export function formatPropertyLabel( diff --git a/frontend/src/lib/components/TaxonomicFilter/types.ts b/frontend/src/lib/components/TaxonomicFilter/types.ts index cfc4ce64c00a8..8872e148ed88a 100644 --- a/frontend/src/lib/components/TaxonomicFilter/types.ts +++ b/frontend/src/lib/components/TaxonomicFilter/types.ts @@ -79,6 +79,7 @@ export enum TaxonomicFilterGroupType { Actions = 'actions', Cohorts = 'cohorts', CohortsWithAllUsers = 'cohorts_with_all', + DataWarehouse = 'data_warehouse', Elements = 'elements', Events = 'events', EventProperties = 'event_properties', diff --git a/frontend/src/queries/nodes/InsightQuery/utils/queryNodeToFilter.ts b/frontend/src/queries/nodes/InsightQuery/utils/queryNodeToFilter.ts index 3d50497bf0862..8851be4a5c60b 100644 --- a/frontend/src/queries/nodes/InsightQuery/utils/queryNodeToFilter.ts +++ b/frontend/src/queries/nodes/InsightQuery/utils/queryNodeToFilter.ts @@ -4,6 +4,7 @@ import { isFunnelsFilter, isLifecycleFilter, isStickinessFilter, isTrendsFilter import { ActionsNode, BreakdownFilter, + DataWarehouseNode, EventsNode, FunnelsFilterLegacy, InsightNodeKind, @@ -17,6 +18,7 @@ import { } from '~/queries/schema' import { isActionsNode, + isDataWarehouseNode, isEventsNode, isFunnelsQuery, isLifecycleQuery, @@ -27,12 +29,24 @@ import { } from '~/queries/utils' import { ActionFilter, EntityTypes, FilterType, InsightType } from '~/types' -type FilterTypeActionsAndEvents = { events?: ActionFilter[]; actions?: ActionFilter[]; new_entity?: ActionFilter[] } +type FilterTypeActionsAndEvents = { + events?: ActionFilter[] + actions?: ActionFilter[] + data_warehouse?: ActionFilter[] + new_entity?: ActionFilter[] +} -export const seriesNodeToFilter = (node: EventsNode | ActionsNode, index?: number): ActionFilter => { +export const seriesNodeToFilter = ( + node: EventsNode | ActionsNode | DataWarehouseNode, + index?: number +): ActionFilter => { const entity: ActionFilter = objectClean({ - type: isActionsNode(node) ? EntityTypes.ACTIONS : EntityTypes.EVENTS, - id: (!isActionsNode(node) ? node.event : node.id) || null, + type: isDataWarehouseNode(node) + ? EntityTypes.DATA_WAREHOUSE + : isActionsNode(node) + ? EntityTypes.ACTIONS + : EntityTypes.EVENTS, + id: isDataWarehouseNode(node) ? node.table_name : (!isActionsNode(node) ? node.event : node.id) || null, order: index, name: node.name, custom_name: node.custom_name, @@ -47,10 +61,11 @@ export const seriesNodeToFilter = (node: EventsNode | ActionsNode, index?: numbe } export const seriesToActionsAndEvents = ( - series: (EventsNode | ActionsNode)[] + series: (EventsNode | ActionsNode | DataWarehouseNode)[] ): Required => { const actions: ActionFilter[] = [] const events: ActionFilter[] = [] + const data_warehouse: ActionFilter[] = [] const new_entity: ActionFilter[] = [] series.forEach((node, index) => { const entity = seriesNodeToFilter(node, index) @@ -58,12 +73,14 @@ export const seriesToActionsAndEvents = ( events.push(entity) } else if (isActionsNode(node)) { actions.push(entity) + } else if (isDataWarehouseNode(node)) { + data_warehouse.push(entity) } else { new_entity.push(entity) } }) - return { actions, events, new_entity } + return { actions, events, data_warehouse, new_entity } } export const hiddenLegendItemsToKeys = ( diff --git a/frontend/src/queries/nodes/InsightViz/utils.ts b/frontend/src/queries/nodes/InsightViz/utils.ts index 3ab18a60800b6..6bb24f64cdad2 100644 --- a/frontend/src/queries/nodes/InsightViz/utils.ts +++ b/frontend/src/queries/nodes/InsightViz/utils.ts @@ -1,7 +1,14 @@ import equal from 'fast-deep-equal' import { getEventNamesForAction, isEmptyObject } from 'lib/utils' -import { ActionsNode, BreakdownFilter, EventsNode, InsightQueryNode, TrendsQuery } from '~/queries/schema' +import { + ActionsNode, + BreakdownFilter, + DataWarehouseNode, + EventsNode, + InsightQueryNode, + TrendsQuery, +} from '~/queries/schema' import { isInsightQueryWithBreakdown, isInsightQueryWithSeries, @@ -59,7 +66,7 @@ export const getFormula = (query: InsightQueryNode): string | undefined => { } } -export const getSeries = (query: InsightQueryNode): (EventsNode | ActionsNode)[] | undefined => { +export const getSeries = (query: InsightQueryNode): (EventsNode | ActionsNode | DataWarehouseNode)[] | undefined => { if (isInsightQueryWithSeries(query)) { return query.series } else { diff --git a/frontend/src/queries/schema.json b/frontend/src/queries/schema.json index d8a23cae2f1b6..9bd300b2eb564 100644 --- a/frontend/src/queries/schema.json +++ b/frontend/src/queries/schema.json @@ -234,6 +234,9 @@ }, { "$ref": "#/definitions/ActionsNode" + }, + { + "$ref": "#/definitions/DataWarehouseNode" } ] }, @@ -268,6 +271,9 @@ }, { "$ref": "#/definitions/EmptyPropertyFilter" + }, + { + "$ref": "#/definitions/DataWarehousePropertyFilter" } ] }, @@ -473,7 +479,7 @@ ] }, "BreakdownType": { - "enum": ["cohort", "person", "event", "group", "session", "hogql"], + "enum": ["cohort", "person", "event", "group", "session", "hogql", "data_warehouse"], "type": "string" }, "BreakdownValueInt": { @@ -755,6 +761,105 @@ "required": ["kind", "source"], "type": "object" }, + "DataWarehouseNode": { + "additionalProperties": false, + "properties": { + "custom_name": { + "type": "string" + }, + "fixedProperties": { + "description": "Fixed properties in the query, can't be edited in the interface (e.g. scoping down by person)", + "items": { + "$ref": "#/definitions/AnyPropertyFilter" + }, + "type": "array" + }, + "id": { + "type": "string" + }, + "id_field": { + "type": "string" + }, + "kind": { + "const": "DataWarehouseNode", + "type": "string" + }, + "math": { + "anyOf": [ + { + "$ref": "#/definitions/BaseMathType" + }, + { + "$ref": "#/definitions/PropertyMathType" + }, + { + "$ref": "#/definitions/CountPerActorMathType" + }, + { + "$ref": "#/definitions/GroupMathType" + }, + { + "$ref": "#/definitions/HogQLMathType" + } + ] + }, + "math_group_type_index": { + "enum": [0, 1, 2, 3, 4], + "type": "number" + }, + "math_hogql": { + "type": "string" + }, + "math_property": { + "type": "string" + }, + "name": { + "type": "string" + }, + "properties": { + "description": "Properties configurable in the interface", + "items": { + "$ref": "#/definitions/AnyPropertyFilter" + }, + "type": "array" + }, + "response": { + "description": "Cached query response", + "type": "object" + }, + "table_name": { + "type": "string" + }, + "timestamp_field": { + "type": "string" + } + }, + "required": ["id", "id_field", "kind", "table_name", "timestamp_field"], + "type": "object" + }, + "DataWarehousePropertyFilter": { + "additionalProperties": false, + "properties": { + "key": { + "type": "string" + }, + "label": { + "type": "string" + }, + "operator": { + "$ref": "#/definitions/PropertyOperator" + }, + "type": { + "const": "data_warehouse", + "type": "string" + }, + "value": { + "$ref": "#/definitions/PropertyFilterValue" + } + }, + "required": ["key", "operator", "type"], + "type": "object" + }, "DatabaseSchemaQuery": { "additionalProperties": false, "properties": { @@ -955,7 +1060,7 @@ "type": "object" }, "EntityType": { - "enum": ["actions", "events", "new_entity"], + "enum": ["actions", "events", "data_warehouse", "new_entity"], "type": "string" }, "EventPropertyFilter": { @@ -2852,6 +2957,7 @@ "enum": [ "EventsNode", "ActionsNode", + "DataWarehouseNode", "EventsQuery", "PersonsNode", "HogQLQuery", @@ -3193,7 +3299,8 @@ "cohort", "recording", "group", - "hogql" + "hogql", + "data_warehouse" ], "type": "string" }, @@ -3997,6 +4104,9 @@ { "$ref": "#/definitions/PersonsNode" }, + { + "$ref": "#/definitions/DataWarehouseNode" + }, { "$ref": "#/definitions/TimeToSeeDataSessionsQuery" }, diff --git a/frontend/src/queries/schema.ts b/frontend/src/queries/schema.ts index 52f0393f6ec74..c65717c9c1d26 100644 --- a/frontend/src/queries/schema.ts +++ b/frontend/src/queries/schema.ts @@ -46,6 +46,7 @@ export enum NodeKind { // Data nodes EventsNode = 'EventsNode', ActionsNode = 'ActionsNode', + DataWarehouseNode = 'DataWarehouseNode', EventsQuery = 'EventsQuery', PersonsNode = 'PersonsNode', HogQLQuery = 'HogQLQuery', @@ -110,6 +111,7 @@ export type QuerySchema = | EventsNode // never queried directly | ActionsNode // old actions API endpoint | PersonsNode // old persons API endpoint + | DataWarehouseNode | TimeToSeeDataSessionsQuery // old API | EventsQuery | ActorsQuery @@ -364,12 +366,20 @@ export interface EventsNode extends EntityNode { } } +export interface DataWarehouseNode extends EntityNode { + id: string + kind: NodeKind.DataWarehouseNode + id_field: string + table_name: string + timestamp_field: string +} + export interface ActionsNode extends EntityNode { kind: NodeKind.ActionsNode id: integer } -export type AnyEntityNode = EventsNode | ActionsNode +export type AnyEntityNode = EventsNode | ActionsNode | DataWarehouseNode export interface QueryTiming { /** Key. Shortened to 'k' to save on data. */ diff --git a/frontend/src/queries/utils.ts b/frontend/src/queries/utils.ts index c6c12e8a3a03f..be59882cd850c 100644 --- a/frontend/src/queries/utils.ts +++ b/frontend/src/queries/utils.ts @@ -8,6 +8,7 @@ import { DatabaseSchemaQuery, DataTableNode, DataVisualizationNode, + DataWarehouseNode, DateRange, EventsNode, EventsQuery, @@ -88,6 +89,10 @@ export function isActionsNode(node?: Node | null): node is ActionsNode { return node?.kind === NodeKind.ActionsNode } +export function isDataWarehouseNode(node?: Node | null): node is DataWarehouseNode { + return node?.kind === NodeKind.DataWarehouseNode +} + export function isPersonsNode(node?: Node | null): node is PersonsNode { return node?.kind === NodeKind.PersonsNode } diff --git a/frontend/src/scenes/insights/InsightNav/insightNavLogic.tsx b/frontend/src/scenes/insights/InsightNav/insightNavLogic.tsx index 2028279e40573..d2f28906bfe95 100644 --- a/frontend/src/scenes/insights/InsightNav/insightNavLogic.tsx +++ b/frontend/src/scenes/insights/InsightNav/insightNavLogic.tsx @@ -12,6 +12,7 @@ import { insightMap } from '~/queries/nodes/InsightQuery/utils/queryNodeToFilter import { getDisplay, getShowPercentStackView, getShowValueOnSeries } from '~/queries/nodes/InsightViz/utils' import { ActionsNode, + DataWarehouseNode, EventsNode, FunnelsFilter, FunnelsQuery, @@ -70,9 +71,9 @@ export interface QueryPropertyCache } const cleanSeriesEntityMath = ( - entity: EventsNode | ActionsNode, + entity: EventsNode | ActionsNode | DataWarehouseNode, mathAvailability: MathAvailability -): EventsNode | ActionsNode => { +): EventsNode | ActionsNode | DataWarehouseNode => { const { math, math_property, math_group_type_index, math_hogql, ...baseEntity } = entity // TODO: This should be improved to keep a math that differs from the default. @@ -91,9 +92,9 @@ const cleanSeriesEntityMath = ( } const cleanSeriesMath = ( - series: (EventsNode | ActionsNode)[], + series: (EventsNode | ActionsNode | DataWarehouseNode)[], mathAvailability: MathAvailability -): (EventsNode | ActionsNode)[] => { +): (EventsNode | ActionsNode | DataWarehouseNode)[] => { return series.map((entity) => cleanSeriesEntityMath(entity, mathAvailability)) } diff --git a/frontend/src/scenes/insights/utils.tsx b/frontend/src/scenes/insights/utils.tsx index 50cc8fc63ec8c..7e30c7945871a 100644 --- a/frontend/src/scenes/insights/utils.tsx +++ b/frontend/src/scenes/insights/utils.tsx @@ -11,8 +11,8 @@ import { urls } from 'scenes/urls' import { dashboardsModel } from '~/models/dashboardsModel' import { FormatPropertyValueForDisplayFunction } from '~/models/propertyDefinitionsModel' import { examples } from '~/queries/examples' -import { ActionsNode, BreakdownFilter, EventsNode, PathsFilter } from '~/queries/schema' -import { isEventsNode } from '~/queries/utils' +import { ActionsNode, BreakdownFilter, DataWarehouseNode, EventsNode, PathsFilter } from '~/queries/schema' +import { isDataWarehouseNode, isEventsNode } from '~/queries/utils' import { ActionFilter, AnyPartialFilterType, @@ -59,7 +59,10 @@ export const getDisplayNameFromEntityFilter = ( return (isCustom ? customName : null) ?? name ?? (filter?.id ? `${filter?.id}` : null) } -export const getDisplayNameFromEntityNode = (node: EventsNode | ActionsNode, isCustom = true): string | null => { +export const getDisplayNameFromEntityNode = ( + node: EventsNode | ActionsNode | DataWarehouseNode, + isCustom = true +): string | null => { // Make sure names aren't blank strings const customName = ensureStringIsNotBlank(node?.custom_name) let name = ensureStringIsNotBlank(node?.name) @@ -70,7 +73,7 @@ export const getDisplayNameFromEntityNode = (node: EventsNode | ActionsNode, isC name = 'All events' } - const id = isEventsNode(node) ? node.event : node.id + const id = isDataWarehouseNode(node) ? node.table_name : isEventsNode(node) ? node.event : node.id // Return custom name. If that doesn't exist then the name, then the id, then just null. return (isCustom ? customName : null) ?? name ?? (id ? `${id}` : null) diff --git a/frontend/src/scenes/saved-insights/SavedInsights.tsx b/frontend/src/scenes/saved-insights/SavedInsights.tsx index 9f441785d2295..9879a4f76005d 100644 --- a/frontend/src/scenes/saved-insights/SavedInsights.tsx +++ b/frontend/src/scenes/saved-insights/SavedInsights.tsx @@ -172,6 +172,12 @@ export const QUERY_TYPES_METADATA: Record = { icon: IconAction, inMenu: true, }, + [NodeKind.DataWarehouseNode]: { + name: 'Data Warehouse', + description: 'List and explore data warehouse tables', + icon: IconTableChart, + inMenu: true, + }, [NodeKind.EventsQuery]: { name: 'Events Query', description: 'Hmmm, not every kind should be displayable I guess', diff --git a/frontend/src/types.ts b/frontend/src/types.ts index 178c0032da777..6d391d3571468 100644 --- a/frontend/src/types.ts +++ b/frontend/src/types.ts @@ -637,6 +637,7 @@ export enum PropertyFilterType { Recording = 'recording', Group = 'group', HogQL = 'hogql', + DataWarehouse = 'data_warehouse', } /** Sync with plugin-server/src/types.ts */ @@ -659,6 +660,11 @@ export interface PersonPropertyFilter extends BasePropertyFilter { operator: PropertyOperator } +export interface DataWarehousePropertyFilter extends BasePropertyFilter { + type: PropertyFilterType.DataWarehouse + operator: PropertyOperator +} + /** Sync with plugin-server/src/types.ts */ export interface ElementPropertyFilter extends BasePropertyFilter { type: PropertyFilterType.Element @@ -714,6 +720,7 @@ export type AnyPropertyFilter = | FeaturePropertyFilter | HogQLPropertyFilter | EmptyPropertyFilter + | DataWarehousePropertyFilter export type AnyFilterLike = AnyPropertyFilter | PropertyGroupFilter | PropertyGroupFilterValue @@ -879,7 +886,7 @@ export interface SessionRecordingsResponse { has_next: boolean } -export type EntityType = 'actions' | 'events' | 'new_entity' +export type EntityType = 'actions' | 'events' | 'data_warehouse' | 'new_entity' export interface Entity { id: string | number @@ -892,6 +899,7 @@ export interface Entity { export enum EntityTypes { ACTIONS = 'actions', EVENTS = 'events', + DATA_WAREHOUSE = 'data_warehouse', } export type EntityFilter = { @@ -1803,7 +1811,7 @@ export enum ChartDisplayType { BoldNumber = 'BoldNumber', } -export type BreakdownType = 'cohort' | 'person' | 'event' | 'group' | 'session' | 'hogql' +export type BreakdownType = 'cohort' | 'person' | 'event' | 'group' | 'session' | 'hogql' | 'data_warehouse' export type IntervalType = 'hour' | 'day' | 'week' | 'month' export type SmoothingType = number diff --git a/mypy-baseline.txt b/mypy-baseline.txt index 2e43733e26178..03c04a02a9667 100644 --- a/mypy-baseline.txt +++ b/mypy-baseline.txt @@ -331,7 +331,6 @@ posthog/models/property/util.py:0: error: Argument 1 to "append" of "list" has i posthog/models/property/util.py:0: error: Argument 1 to "append" of "list" has incompatible type "str | int"; expected "str" [arg-type] posthog/models/property/util.py:0: error: Argument 1 to "append" of "list" has incompatible type "str | int"; expected "str" [arg-type] posthog/queries/trends/util.py:0: error: Argument 1 to "translate_hogql" has incompatible type "str | None"; expected "str" [arg-type] -posthog/hogql/property.py:0: error: Non-overlapping equality check (left operand type: "Literal['person', 'cohort', 'element', 'static-cohort', 'precalculated-cohort', 'group', 'recording', 'behavioral', 'session']", right operand type: "Literal['feature']") [comparison-overlap] posthog/hogql/property.py:0: error: Argument "chain" to "Field" has incompatible type "list[str]"; expected "list[str | int]" [arg-type] posthog/hogql/property.py:0: note: "List" is invariant -- see https://mypy.readthedocs.io/en/stable/common_issues.html#variance posthog/hogql/property.py:0: note: Consider using "Sequence" instead, which is covariant @@ -379,16 +378,12 @@ posthog/hogql_queries/insights/trends/breakdown.py:0: error: Unsupported operand posthog/hogql_queries/insights/trends/breakdown.py:0: note: Left operand is of type "str | int" posthog/hogql_queries/insights/trends/breakdown.py:0: error: Incompatible return value type (got "list[tuple[str | int, Any | float | str | int]]", expected "list[tuple[float, float]]") [return-value] posthog/hogql_queries/insights/trends/breakdown.py:0: error: Item "None" of "BreakdownFilter | None" has no attribute "breakdown_type" [union-attr] -posthog/hogql_queries/insights/trends/breakdown.py:0: error: Argument "breakdown_type" to "get_properties_chain" has incompatible type "BreakdownType | Any | None"; expected "Literal['person', 'session', 'group', 'event']" [arg-type] posthog/hogql_queries/insights/trends/breakdown.py:0: error: Item "None" of "BreakdownFilter | None" has no attribute "breakdown" [union-attr] posthog/hogql_queries/insights/trends/breakdown.py:0: error: Argument "breakdown_field" to "get_properties_chain" has incompatible type "str | float | list[str | float] | Any | None"; expected "str" [arg-type] posthog/hogql_queries/insights/trends/breakdown.py:0: error: Item "None" of "BreakdownFilter | None" has no attribute "breakdown_group_type_index" [union-attr] posthog/hogql/metadata.py:0: error: Argument "metadata_source" to "translate_hogql" has incompatible type "SelectQuery | SelectUnionQuery"; expected "SelectQuery | None" [arg-type] posthog/hogql/metadata.py:0: error: Incompatible types in assignment (expression has type "Expr", variable has type "SelectQuery | SelectUnionQuery") [assignment] posthog/queries/breakdown_props.py:0: error: Argument 1 to "translate_hogql" has incompatible type "str | int"; expected "str" [arg-type] -posthog/hogql_queries/insights/funnels/base.py:0: error: Incompatible types in assignment (expression has type "FunnelExclusionEventsNode | FunnelExclusionActionsNode", variable has type "EventsNode | ActionsNode") [assignment] -posthog/hogql_queries/insights/funnels/base.py:0: error: Item "EventsNode" of "EventsNode | ActionsNode" has no attribute "funnelFromStep" [union-attr] -posthog/hogql_queries/insights/funnels/base.py:0: error: Item "ActionsNode" of "EventsNode | ActionsNode" has no attribute "funnelFromStep" [union-attr] posthog/queries/funnels/base.py:0: error: "HogQLContext" has no attribute "person_on_events_mode" [attr-defined] posthog/queries/funnels/base.py:0: error: Argument 1 to "translate_hogql" has incompatible type "str | int"; expected "str" [arg-type] ee/clickhouse/queries/funnels/funnel_correlation.py:0: error: Statement is unreachable [unreachable] diff --git a/posthog/constants.py b/posthog/constants.py index c9219ce1913b6..ad0129c4f99c0 100644 --- a/posthog/constants.py +++ b/posthog/constants.py @@ -38,6 +38,7 @@ class AvailableFeature(str, Enum): TREND_FILTER_TYPE_ACTIONS = "actions" TREND_FILTER_TYPE_EVENTS = "events" +TREND_FILTER_TYPE_DATA_WAREHOUSE = "data_warehouse" SESSION_RECORDINGS_FILTER_IDS = "session_ids" @@ -115,6 +116,7 @@ class AvailableFeature(str, Enum): ENTITIES = "entities" ACTIONS = "actions" EVENTS = "events" +DATA_WAREHOUSE_ENTITIES = "data_warehouse_entities" EXCLUSIONS = "exclusions" PROPERTIES = "properties" PROPERTY_GROUPS = "property_groups" diff --git a/posthog/hogql/functions/mapping.py b/posthog/hogql/functions/mapping.py index 2331fac2a2bb1..5490a0e453887 100644 --- a/posthog/hogql/functions/mapping.py +++ b/posthog/hogql/functions/mapping.py @@ -169,7 +169,7 @@ class HogQLFunctionMeta: "parseDateTime64BestEffortOrNull", 1, 1, - overloads=[((ast.DateTimeType, ast.DateType), "toDateTime")], + overloads=[((ast.DateTimeType, ast.DateType, ast.IntegerType), "toDateTime")], tz_aware=True, ), "toUUID": HogQLFunctionMeta("toUUIDOrNull", 1, 1), diff --git a/posthog/hogql/property.py b/posthog/hogql/property.py index dee266c6cec19..cf893b348eb7c 100644 --- a/posthog/hogql/property.py +++ b/posthog/hogql/property.py @@ -121,7 +121,11 @@ def property_to_expr( if property.type == "hogql": return parse_expr(property.key) elif ( - property.type == "event" or property.type == "feature" or property.type == "person" or property.type == "group" + property.type == "event" + or property.type == "feature" + or property.type == "person" + or property.type == "group" + or property.type == "data_warehouse" ): if scope == "person" and property.type != "person": raise NotImplementedException( @@ -134,6 +138,8 @@ def property_to_expr( chain = ["person", "properties"] elif property.type == "group": chain = [f"group_{property.group_type_index}", "properties"] + elif property.type == "data_warehouse": + chain = [] else: chain = ["properties"] field = ast.Field(chain=chain + [property.key]) diff --git a/posthog/hogql_queries/insights/funnels/base.py b/posthog/hogql_queries/insights/funnels/base.py index 13f7ead0c006f..f4c49e114ed53 100644 --- a/posthog/hogql_queries/insights/funnels/base.py +++ b/posthog/hogql_queries/insights/funnels/base.py @@ -24,6 +24,7 @@ ActionsNode, BreakdownAttributionType, BreakdownType, + DataWarehouseNode, EventsNode, FunnelExclusionActionsNode, FunnelTimeToConvertResults, @@ -356,26 +357,33 @@ def _format_single_funnel(self, results, with_breakdown=False): def _serialize_step( self, - step: ActionsNode | EventsNode, + step: ActionsNode | EventsNode | DataWarehouseNode, count: int, index: int, people: Optional[List[uuid.UUID]] = None, sampling_factor: Optional[float] = None, ) -> Dict[str, Any]: + action_id: Optional[str | int] if isinstance(step, EventsNode): name = step.event + action_id = step.event + type = "events" + elif isinstance(step, DataWarehouseNode): + raise NotImplementedError("DataWarehouseNode is not supported in funnels") else: action = Action.objects.get(pk=step.id) name = action.name + action_id = step.id + type = "actions" return { - "action_id": step.event if isinstance(step, EventsNode) else step.id, + "action_id": action_id, "name": name, "custom_name": step.custom_name, "order": index, "people": people if people else [], "count": correct_result_for_sampling(count, sampling_factor), - "type": "events" if isinstance(step, EventsNode) else "actions", + "type": type, } @property @@ -420,8 +428,10 @@ def _get_inner_event_query( step_cols = self._get_step_col(entity, index, entity_name) all_step_cols.extend(step_cols) - for exclusion_id, entity in enumerate(funnelsFilter.exclusions or []): - step_cols = self._get_step_col(entity, entity.funnelFromStep, entity_name, f"exclusion_{exclusion_id}_") + for exclusion_id, excluded_entity in enumerate(funnelsFilter.exclusions or []): + step_cols = self._get_step_col( + excluded_entity, excluded_entity.funnelFromStep, entity_name, f"exclusion_{exclusion_id}_" + ) # every exclusion entity has the form: exclusion__step_i & timestamp exclusion__latest_i # where i is the starting step for exclusion on that entity all_step_cols.extend(step_cols) @@ -572,6 +582,8 @@ def _build_step_query( # action action = Action.objects.get(pk=int(entity.id), team=self.context.team) event_expr = action_to_expr(action) + elif isinstance(entity, DataWarehouseNode): + raise NotImplementedError("DataWarehouseNode is not supported in funnels") elif entity.event is None: # all events event_expr = ast.Constant(value=True) diff --git a/posthog/hogql_queries/insights/funnels/funnel_unordered.py b/posthog/hogql_queries/insights/funnels/funnel_unordered.py index 03745309f9321..e0c7eba870f88 100644 --- a/posthog/hogql_queries/insights/funnels/funnel_unordered.py +++ b/posthog/hogql_queries/insights/funnels/funnel_unordered.py @@ -6,7 +6,7 @@ from posthog.hogql.parser import parse_expr from posthog.hogql_queries.insights.funnels.base import FunnelBase from posthog.hogql_queries.insights.funnels.utils import funnel_window_interval_unit_to_sql -from posthog.schema import ActionsNode, EventsNode +from posthog.schema import ActionsNode, EventsNode, DataWarehouseNode from posthog.queries.util import correct_result_for_sampling @@ -230,12 +230,15 @@ def _get_exclusion_condition(self) -> List[ast.Expr]: def _serialize_step( self, - step: ActionsNode | EventsNode, + step: ActionsNode | EventsNode | DataWarehouseNode, count: int, index: int, people: Optional[List[uuid.UUID]] = None, sampling_factor: Optional[float] = None, ) -> Dict[str, Any]: + if isinstance(step, DataWarehouseNode): + raise NotImplementedError("Data Warehouse queries are not supported in funnels") + return { "action_id": None, "name": f"Completed {index+1} step{'s' if index != 0 else ''}", diff --git a/posthog/hogql_queries/insights/stickiness_query_runner.py b/posthog/hogql_queries/insights/stickiness_query_runner.py index bc8c58ce6dea0..d0b4b65c67f9b 100644 --- a/posthog/hogql_queries/insights/stickiness_query_runner.py +++ b/posthog/hogql_queries/insights/stickiness_query_runner.py @@ -23,6 +23,7 @@ from posthog.models.filters.mixins.utils import cached_property from posthog.schema import ( ActionsNode, + DataWarehouseNode, EventsNode, StickinessQuery, HogQLQueryModifiers, @@ -31,12 +32,12 @@ class SeriesWithExtras: - series: EventsNode | ActionsNode + series: EventsNode | ActionsNode | DataWarehouseNode is_previous_period_series: Optional[bool] def __init__( self, - series: EventsNode | ActionsNode, + series: EventsNode | ActionsNode | DataWarehouseNode, is_previous_period_series: Optional[bool], ): self.series = series @@ -81,7 +82,7 @@ def _refresh_frequency(self): return refresh_frequency - def _aggregation_expressions(self, series: EventsNode | ActionsNode) -> ast.Expr: + def _aggregation_expressions(self, series: EventsNode | ActionsNode | DataWarehouseNode) -> ast.Expr: if series.math == "hogql" and series.math_hogql is not None: return parse_expr(series.math_hogql) elif series.math == "unique_group" and series.math_group_type_index is not None: @@ -323,9 +324,13 @@ def _sample_value(self) -> ast.RatioExpr: return ast.RatioExpr(left=ast.Constant(value=self.query.samplingFactor)) - def series_event(self, series: EventsNode | ActionsNode) -> str | None: + def series_event(self, series: EventsNode | ActionsNode | DataWarehouseNode) -> str | None: if isinstance(series, EventsNode): return series.event + + if isinstance(series, DataWarehouseNode): + return series.table_name + if isinstance(series, ActionsNode): # TODO: Can we load the Action in more efficiently? action = Action.objects.get(pk=int(series.id), team=self.team) diff --git a/posthog/hogql_queries/insights/trends/aggregation_operations.py b/posthog/hogql_queries/insights/trends/aggregation_operations.py index 3d9dcca085495..a679b2c76e7c3 100644 --- a/posthog/hogql_queries/insights/trends/aggregation_operations.py +++ b/posthog/hogql_queries/insights/trends/aggregation_operations.py @@ -1,9 +1,10 @@ -from typing import List, Optional, cast +from typing import List, Optional, cast, Union from posthog.hogql import ast from posthog.hogql.parser import parse_expr, parse_select from posthog.hogql_queries.utils.query_date_range import QueryDateRange from posthog.models.team.team import Team -from posthog.schema import ActionsNode, EventsNode +from posthog.schema import EventsNode, ActionsNode, DataWarehouseNode +from posthog.models.filters.mixins.utils import cached_property class QueryAlternator: @@ -49,14 +50,14 @@ def replace_select_from(self, join_expr: ast.JoinExpr) -> None: class AggregationOperations: team: Team - series: EventsNode | ActionsNode + series: Union[EventsNode, ActionsNode, DataWarehouseNode] query_date_range: QueryDateRange should_aggregate_values: bool def __init__( self, team: Team, - series: EventsNode | ActionsNode, + series: Union[EventsNode, ActionsNode, DataWarehouseNode], query_date_range: QueryDateRange, should_aggregate_values: bool, ) -> None: @@ -65,11 +66,18 @@ def __init__( self.query_date_range = query_date_range self.should_aggregate_values = should_aggregate_values + @cached_property + def _id_field(self) -> ast.Expr: + if isinstance(self.series, DataWarehouseNode): + return ast.Field(chain=["e", self.series.id_field]) + + return ast.Field(chain=["e", "uuid"]) + def select_aggregation(self) -> ast.Expr: if self.series.math == "hogql" and self.series.math_hogql is not None: return parse_expr(self.series.math_hogql) elif self.series.math == "total": - return parse_expr("count(e.uuid)") + return parse_expr("count({id_field})", placeholders={"id_field": self._id_field}) elif self.series.math == "dau": actor = "e.distinct_id" if self.team.aggregate_users_by_distinct_id else "e.person.id" return parse_expr(f"count(DISTINCT {actor})") @@ -99,7 +107,9 @@ def select_aggregation(self) -> ast.Expr: elif self.series.math == "p99": return self._math_quantile(0.99, None) - return parse_expr("count(e.uuid)") # All "count per actor" get replaced during query orchestration + return parse_expr( + "count({id_field})", placeholders={"id_field": self._id_field} + ) # All "count per actor" get replaced during query orchestration def actor_id(self) -> ast.Expr: if self.series.math == "unique_group" and self.series.math_group_type_index is not None: @@ -335,13 +345,14 @@ def _events_query( query = parse_select( """ SELECT - count(e.uuid) AS total + count({id_field}) AS total FROM events AS e SAMPLE {sample} WHERE {events_where_clause} GROUP BY {person_field} """, placeholders={ + "id_field": self._id_field, "events_where_clause": where_clause_combined, "sample": sample_value, "person_field": ast.Field( diff --git a/posthog/hogql_queries/insights/trends/breakdown.py b/posthog/hogql_queries/insights/trends/breakdown.py index 4106d45062d12..3e165e1a939ac 100644 --- a/posthog/hogql_queries/insights/trends/breakdown.py +++ b/posthog/hogql_queries/insights/trends/breakdown.py @@ -1,4 +1,4 @@ -from typing import Dict, List, Optional, Tuple +from typing import Dict, List, Optional, Tuple, Union from posthog.hogql import ast from posthog.hogql.parser import parse_expr from posthog.hogql.timings import HogQLTimings @@ -12,18 +12,17 @@ from posthog.hogql_queries.insights.trends.display import TrendsDisplay from posthog.hogql_queries.insights.trends.utils import ( get_properties_chain, - series_event_name, ) from posthog.hogql_queries.utils.query_date_range import QueryDateRange from posthog.models.filters.mixins.utils import cached_property from posthog.models.team.team import Team -from posthog.schema import ActionsNode, EventsNode, HogQLQueryModifiers, InCohortVia, TrendsQuery +from posthog.schema import ActionsNode, EventsNode, DataWarehouseNode, HogQLQueryModifiers, InCohortVia, TrendsQuery class Breakdown: query: TrendsQuery team: Team - series: EventsNode | ActionsNode + series: Union[EventsNode, ActionsNode, DataWarehouseNode] query_date_range: QueryDateRange timings: HogQLTimings modifiers: HogQLQueryModifiers @@ -34,7 +33,7 @@ def __init__( self, team: Team, query: TrendsQuery, - series: EventsNode | ActionsNode, + series: Union[EventsNode, ActionsNode, DataWarehouseNode], query_date_range: QueryDateRange, timings: HogQLTimings, modifiers: HogQLQueryModifiers, @@ -222,7 +221,7 @@ def _get_breakdown_values(self) -> List[str | int]: with self.timings.measure("breakdown_values_query"): breakdown = BreakdownValues( team=self.team, - event_name=series_event_name(self.series) or "", + series=self.series, events_filter=self.events_filter, chart_display_type=self._trends_display().display_type, breakdown_filter=self.query.breakdownFilter, diff --git a/posthog/hogql_queries/insights/trends/breakdown_values.py b/posthog/hogql_queries/insights/trends/breakdown_values.py index 37f8551d4b276..9424fd229081c 100644 --- a/posthog/hogql_queries/insights/trends/breakdown_values.py +++ b/posthog/hogql_queries/insights/trends/breakdown_values.py @@ -5,7 +5,8 @@ from posthog.hogql.query import execute_hogql_query from posthog.hogql_queries.insights.trends.utils import get_properties_chain from posthog.models.team.team import Team -from posthog.schema import BreakdownFilter, BreakdownType, ChartDisplayType +from posthog.schema import BreakdownFilter, BreakdownType, ChartDisplayType, ActionsNode, EventsNode, DataWarehouseNode +from functools import cached_property BREAKDOWN_OTHER_STRING_LABEL = "$$_posthog_breakdown_other_$$" BREAKDOWN_OTHER_NUMERIC_LABEL = 9007199254740991 # pow(2, 53) - 1, for JS compatibility @@ -15,7 +16,7 @@ class BreakdownValues: team: Team - event_name: str + series: Union[EventsNode, ActionsNode, DataWarehouseNode] breakdown_field: Union[str, float, List[Union[str, float]]] breakdown_type: BreakdownType events_filter: ast.Expr @@ -28,13 +29,13 @@ class BreakdownValues: def __init__( self, team: Team, - event_name: str, + series: Union[EventsNode, ActionsNode, DataWarehouseNode], events_filter: ast.Expr, chart_display_type: ChartDisplayType, breakdown_filter: BreakdownFilter, ): self.team = team - self.event_name = event_name + self.series = series self.breakdown_field = breakdown_filter.breakdown # type: ignore self.breakdown_type = breakdown_filter.breakdown_type # type: ignore self.events_filter = events_filter @@ -72,7 +73,7 @@ def get_breakdown_values(self) -> List[str | int]: alias="value", expr=ast.Field( chain=get_properties_chain( - breakdown_type=self.breakdown_type, # type: ignore + breakdown_type=self.breakdown_type, breakdown_field=str(self.breakdown_field), group_type_index=self.group_type_index, ) @@ -88,9 +89,8 @@ def get_breakdown_values(self) -> List[str | int]: """ SELECT {select_field}, - count(e.uuid) as count - FROM - events e + count({id_field}) as count + FROM {table} e WHERE {events_where} GROUP BY @@ -104,6 +104,8 @@ def get_breakdown_values(self) -> List[str | int]: "events_where": self.events_filter, "select_field": select_field, "breakdown_limit": ast.Constant(value=breakdown_limit), + "table": self._table, + "id_field": self._id_field, }, ) @@ -166,3 +168,17 @@ def _to_bucketing_expression(self) -> ast.Expr: qunatile_expression = f"quantiles({','.join([f'{quantile:.2f}' for quantile in quantiles])})(value)" return parse_expr(f"arrayCompact(arrayMap(x -> floor(x, 2), {qunatile_expression}))") + + @cached_property + def _id_field(self) -> ast.Field: + if isinstance(self.series, DataWarehouseNode): + return ast.Field(chain=["e", self.series.id_field]) + + return ast.Field(chain=["e", "uuid"]) + + @cached_property + def _table(self) -> ast.Field: + if isinstance(self.series, DataWarehouseNode): + return ast.Field(chain=[self.series.table_name]) + + return ast.Field(chain=["events"]) diff --git a/posthog/hogql_queries/insights/trends/data_warehouse_trends_query_builder.py b/posthog/hogql_queries/insights/trends/data_warehouse_trends_query_builder.py new file mode 100644 index 0000000000000..e0d7a5779f39a --- /dev/null +++ b/posthog/hogql_queries/insights/trends/data_warehouse_trends_query_builder.py @@ -0,0 +1,409 @@ +from typing import List, Optional, cast +from posthog.hogql import ast +from posthog.hogql.parser import parse_select, parse_expr +from posthog.hogql.property import property_to_expr +from posthog.hogql.timings import HogQLTimings +from posthog.hogql_queries.insights.trends.aggregation_operations import ( + AggregationOperations, +) +from posthog.hogql_queries.insights.trends.breakdown import Breakdown +from posthog.hogql_queries.insights.trends.display import TrendsDisplay +from posthog.hogql_queries.utils.query_date_range import QueryDateRange +from posthog.models.filters.mixins.utils import cached_property +from posthog.models.team.team import Team +from posthog.schema import HogQLQueryModifiers, TrendsQuery, DataWarehouseNode +from posthog.hogql_queries.insights.trends.trends_query_builder_abstract import TrendsQueryBuilderAbstract + + +class DataWarehouseTrendsQueryBuilder(TrendsQueryBuilderAbstract): + query: TrendsQuery + team: Team + query_date_range: QueryDateRange + series: DataWarehouseNode + timings: HogQLTimings + modifiers: HogQLQueryModifiers + + def __init__( + self, + trends_query: TrendsQuery, + team: Team, + query_date_range: QueryDateRange, + series: DataWarehouseNode, + timings: HogQLTimings, + modifiers: HogQLQueryModifiers, + ): + self.query = trends_query + self.team = team + self.query_date_range = query_date_range + self.series = series + self.timings = timings + self.modifiers = modifiers + + def build_query(self) -> ast.SelectQuery | ast.SelectUnionQuery: + breakdown = self._breakdown(is_actors_query=False) + + events_query: ast.SelectQuery | ast.SelectUnionQuery + + if self._trends_display.should_aggregate_values(): + events_query = self._get_events_subquery(False, is_actors_query=False, breakdown=breakdown) + else: + date_subqueries = self._get_date_subqueries(breakdown=breakdown) + event_query = self._get_events_subquery(False, is_actors_query=False, breakdown=breakdown) + + events_query = ast.SelectUnionQuery(select_queries=[*date_subqueries, event_query]) + + inner_select = self._inner_select_query(inner_query=events_query, breakdown=breakdown) + full_query = self._outer_select_query(inner_query=inner_select, breakdown=breakdown) + + return full_query + + def _get_date_subqueries(self, breakdown: Breakdown, ignore_breakdowns: bool = False) -> List[ast.SelectQuery]: + if not breakdown.enabled or ignore_breakdowns: + return [ + cast( + ast.SelectQuery, + parse_select( + """ + SELECT + 0 AS total, + {date_to_start_of_interval} - {number_interval_period} AS day_start + FROM + numbers( + coalesce(dateDiff({interval}, {date_from}, {date_to}), 0) + ) + """, + placeholders={ + **self.query_date_range.to_placeholders(), + }, + ), + ), + cast( + ast.SelectQuery, + parse_select( + """ + SELECT + 0 AS total, + {date_from_start_of_interval} AS day_start + """, + placeholders={ + **self.query_date_range.to_placeholders(), + }, + ), + ), + ] + + return [ + cast( + ast.SelectQuery, + parse_select( + """ + SELECT + 0 AS total, + ticks.day_start as day_start, + breakdown_value + FROM ( + SELECT + {date_to_start_of_interval} - {number_interval_period} AS day_start + FROM + numbers( + coalesce(dateDiff({interval}, {date_from}, {date_to}), 0) + ) + UNION ALL + SELECT {date_from_start_of_interval} AS day_start + ) as ticks + CROSS JOIN ( + SELECT breakdown_value + FROM ( + SELECT {cross_join_breakdown_values} + ) + ARRAY JOIN breakdown_value as breakdown_value + ) as sec + ORDER BY breakdown_value, day_start + """, + placeholders={ + **self.query_date_range.to_placeholders(), + **breakdown.placeholders(), + }, + ), + ) + ] + + def _get_events_subquery( + self, + no_modifications: Optional[bool], + is_actors_query: bool, + breakdown: Breakdown, + breakdown_values_override: Optional[str | int] = None, + actors_query_time_frame: Optional[str | int] = None, + ) -> ast.SelectQuery: + day_start = ast.Alias( + alias="day_start", + expr=ast.Call( + name=f"toStartOf{self.query_date_range.interval_name.title()}", + args=[ast.Call(name="toDateTime", args=[ast.Field(chain=[self.series.timestamp_field])])], + ), + ) + + events_filter = self._events_filter( + ignore_breakdowns=False, + breakdown=breakdown, + is_actors_query=is_actors_query, + ) + + default_query = cast( + ast.SelectQuery, + parse_select( + """ + SELECT + {aggregation_operation} AS total + FROM {table} AS e + WHERE {events_filter} + """, + placeholders={ + "events_filter": events_filter, + "aggregation_operation": self._aggregation_operation.select_aggregation(), + "table": self._table_expr, + }, + ), + ) + + default_query.group_by = [] + + if not self._trends_display.should_aggregate_values() and not is_actors_query: + default_query.select.append(day_start) + default_query.group_by.append(ast.Field(chain=["day_start"])) + + # TODO: Move this logic into the below branches when working on adding breakdown support for the person modal + if is_actors_query: + default_query.select = [ast.Alias(alias="person_id", expr=ast.Field(chain=["e", "person", "id"]))] + default_query.distinct = True + default_query.group_by = [] + + # No breakdowns and no complex series aggregation + if ( + not breakdown.enabled + and not self._aggregation_operation.requires_query_orchestration() + and not self._aggregation_operation.aggregating_on_session_duration() + ) or no_modifications is True: + return default_query + # Both breakdowns and complex series aggregation + elif breakdown.enabled and self._aggregation_operation.requires_query_orchestration(): + raise NotImplementedError( + "Breakdowns and complex series aggregation are not supported for Data Warehouse queries" + ) + # Breakdowns and session duration math property + elif breakdown.enabled and self._aggregation_operation.aggregating_on_session_duration(): + raise NotImplementedError( + "Breakdowns and session duration math property are not supported for Data Warehouse queries" + ) + # Just breakdowns + elif breakdown.enabled: + if not is_actors_query: + default_query.select.append(breakdown.column_expr()) + default_query.group_by.append(ast.Field(chain=["breakdown_value"])) + # Just session duration math property + elif self._aggregation_operation.aggregating_on_session_duration(): + raise NotImplementedError("Session duration math property is not supported for Data Warehouse queries") + # Just complex series aggregation + elif self._aggregation_operation.requires_query_orchestration(): + raise NotImplementedError("Complex series aggregation is not supported for Data Warehouse queries") + + return default_query + + def _outer_select_query(self, breakdown: Breakdown, inner_query: ast.SelectQuery) -> ast.SelectQuery: + query = cast( + ast.SelectQuery, + parse_select( + """ + SELECT + groupArray(day_start) AS date, + groupArray(count) AS total + FROM {inner_query} + """, + placeholders={"inner_query": inner_query}, + ), + ) + + query = self._trends_display.modify_outer_query( + outer_query=query, + inner_query=inner_query, + dates_queries=ast.SelectUnionQuery( + select_queries=self._get_date_subqueries(ignore_breakdowns=True, breakdown=breakdown) + ), + ) + + query.order_by = [ast.OrderExpr(expr=ast.Call(name="sum", args=[ast.Field(chain=["count"])]), order="DESC")] + + if breakdown.enabled: + query.select.append( + ast.Alias( + alias="breakdown_value", + expr=ast.Call( + name="ifNull", + args=[ + ast.Call(name="toString", args=[ast.Field(chain=["breakdown_value"])]), + ast.Constant(value=""), + ], + ), + ) + ) + query.group_by = [ast.Field(chain=["breakdown_value"])] + query.order_by.append(ast.OrderExpr(expr=ast.Field(chain=["breakdown_value"]), order="ASC")) + + return query + + def _inner_select_query( + self, breakdown: Breakdown, inner_query: ast.SelectQuery | ast.SelectUnionQuery + ) -> ast.SelectQuery: + query = cast( + ast.SelectQuery, + parse_select( + """ + SELECT + sum(total) AS count + FROM {inner_query} + """, + placeholders={"inner_query": inner_query}, + ), + ) + + if ( + self.query.trendsFilter is not None + and self.query.trendsFilter.smoothingIntervals is not None + and self.query.trendsFilter.smoothingIntervals > 1 + ): + rolling_average = ast.Alias( + alias="count", + expr=ast.Call( + name="floor", + args=[ + ast.WindowFunction( + name="avg", + args=[ast.Call(name="sum", args=[ast.Field(chain=["total"])])], + over_expr=ast.WindowExpr( + order_by=[ast.OrderExpr(expr=ast.Field(chain=["day_start"]), order="ASC")], + frame_method="ROWS", + frame_start=ast.WindowFrameExpr( + frame_type="PRECEDING", + frame_value=int(self.query.trendsFilter.smoothingIntervals - 1), + ), + frame_end=ast.WindowFrameExpr(frame_type="CURRENT ROW"), + ), + ) + ], + ), + ) + query.select = [rolling_average] + + query.group_by = [] + query.order_by = [] + + if not self._trends_display.should_aggregate_values(): + query.select.append(ast.Field(chain=["day_start"])) + query.group_by.append(ast.Field(chain=["day_start"])) + query.order_by.append(ast.OrderExpr(expr=ast.Field(chain=["day_start"]), order="ASC")) + + if breakdown.enabled: + query.select.append(ast.Field(chain=["breakdown_value"])) + query.group_by.append(ast.Field(chain=["breakdown_value"])) + query.order_by.append(ast.OrderExpr(expr=ast.Field(chain=["breakdown_value"]), order="ASC")) + + if self._trends_display.should_wrap_inner_query(): + query = self._trends_display.wrap_inner_query(query, breakdown.enabled) + if breakdown.enabled: + query.select.append(ast.Field(chain=["breakdown_value"])) + + return query + + def _events_filter( + self, + is_actors_query: bool, + breakdown: Breakdown | None, + ignore_breakdowns: bool = False, + breakdown_values_override: Optional[str | int] = None, + actors_query_time_frame: Optional[str | int] = None, + ) -> ast.Expr: + series = self.series + filters: List[ast.Expr] = [] + + filters.extend( + [ + parse_expr( + "{timestamp_field} >= {date_from_with_adjusted_start_of_interval}", + placeholders={ + "timestamp_field": ast.Call( + name="toDateTime", args=[ast.Field(chain=[self.series.timestamp_field])] + ), + **self.query_date_range.to_placeholders(), + }, + ), + parse_expr( + "{timestamp_field} <= {date_to}", + placeholders={ + "timestamp_field": ast.Call( + name="toDateTime", args=[ast.Field(chain=[self.series.timestamp_field])] + ), + **self.query_date_range.to_placeholders(), + }, + ), + ] + ) + + # Properties + if self.query.properties is not None and self.query.properties != []: + filters.append(property_to_expr(self.query.properties, self.team)) + + # Series Filters + if series.properties is not None and series.properties != []: + filters.append(property_to_expr(series.properties, self.team)) + + # Breakdown + if not ignore_breakdowns and breakdown is not None: + if breakdown.enabled and not breakdown.is_histogram_breakdown: + breakdown_filter = breakdown.events_where_filter() + if breakdown_filter is not None: + filters.append(breakdown_filter) + + if len(filters) == 0: + return ast.Constant(value=True) + + return ast.And(exprs=filters) + + def _breakdown(self, is_actors_query: bool, breakdown_values_override: Optional[str | int] = None): + return Breakdown( + team=self.team, + query=self.query, + series=self.series, + query_date_range=self.query_date_range, + timings=self.timings, + modifiers=self.modifiers, + events_filter=self._events_filter( + breakdown=None, # Passing in None because we know we dont actually need it + ignore_breakdowns=True, + is_actors_query=is_actors_query, + breakdown_values_override=breakdown_values_override, + ), + breakdown_values_override=[breakdown_values_override] if breakdown_values_override is not None else None, + ) + + @cached_property + def _aggregation_operation(self) -> AggregationOperations: + if self.series.math is not None and self.series.math != "total": + raise NotImplementedError("Math types other than total are not supported for Data Warehouse queries") + + return AggregationOperations( + self.team, self.series, self.query_date_range, self._trends_display.should_aggregate_values() + ) + + @cached_property + def _trends_display(self) -> TrendsDisplay: + display = ( + self.query.trendsFilter.display + if self.query.trendsFilter is not None and self.query.trendsFilter.display is not None + else None + ) + return TrendsDisplay(display) + + @cached_property + def _table_expr(self) -> ast.Field: + return ast.Field(chain=[self.series.table_name]) diff --git a/posthog/hogql_queries/insights/trends/series_with_extras.py b/posthog/hogql_queries/insights/trends/series_with_extras.py index 5dfafbe260768..afc5c57f995e9 100644 --- a/posthog/hogql_queries/insights/trends/series_with_extras.py +++ b/posthog/hogql_queries/insights/trends/series_with_extras.py @@ -1,9 +1,9 @@ -from typing import Optional -from posthog.schema import ActionsNode, EventsNode, TrendsQuery +from typing import Optional, Union +from posthog.schema import TrendsQuery, ActionsNode, EventsNode, DataWarehouseNode class SeriesWithExtras: - series: EventsNode | ActionsNode + series: Union[EventsNode, ActionsNode, DataWarehouseNode] series_order: int is_previous_period_series: Optional[bool] overriden_query: Optional[TrendsQuery] @@ -11,7 +11,7 @@ class SeriesWithExtras: def __init__( self, - series: EventsNode | ActionsNode, + series: Union[EventsNode, ActionsNode, DataWarehouseNode], series_order: int, is_previous_period_series: Optional[bool], overriden_query: Optional[TrendsQuery], diff --git a/posthog/hogql_queries/insights/trends/test/__snapshots__/test_data_warehouse_query_builder.ambr b/posthog/hogql_queries/insights/trends/test/__snapshots__/test_data_warehouse_query_builder.ambr new file mode 100644 index 0000000000000..13aafb360c061 --- /dev/null +++ b/posthog/hogql_queries/insights/trends/test/__snapshots__/test_data_warehouse_query_builder.ambr @@ -0,0 +1,217 @@ +# serializer version: 1 +# name: TestDataWarehouseQueryBuilder.test_trends_breakdown + ''' + SELECT groupArray(value) + FROM + (SELECT e.prop_1 AS value, + count(e.id) AS count + FROM s3Cluster('posthog', 'http://host.docker.internal:19000/posthog/test_storage_bucket-posthog.hogql.datawarehouse.trendquery/*.parquet', 'object_storage_root_user', 'object_storage_root_password', 'Parquet', 'id String, prop_1 String, prop_2 String, created DateTime64(3, \'UTC\')') AS e + WHERE and(ifNull(greaterOrEquals(toDateTime(toTimeZone(e.created, 'UTC'), 'UTC'), toStartOfDay(assumeNotNull(parseDateTime64BestEffortOrNull('2023-01-01 00:00:00', 6, 'UTC')))), 0), ifNull(lessOrEquals(toDateTime(toTimeZone(e.created, 'UTC'), 'UTC'), assumeNotNull(parseDateTime64BestEffortOrNull('2023-01-07 23:59:59', 6, 'UTC'))), 0)) + GROUP BY value + ORDER BY count DESC, value DESC + LIMIT 25) + LIMIT 100 SETTINGS readonly=2, + max_execution_time=60, + allow_experimental_object_type=1 + ''' +# --- +# name: TestDataWarehouseQueryBuilder.test_trends_breakdown.1 + ''' + SELECT groupArray(day_start) AS date, + groupArray(count) AS total, + ifNull(toString(breakdown_value), '') AS breakdown_value + FROM + (SELECT sum(total) AS count, + day_start AS day_start, + breakdown_value AS breakdown_value + FROM + (SELECT 0 AS total, + ticks.day_start AS day_start, + sec.breakdown_value AS breakdown_value + FROM + (SELECT minus(toStartOfDay(assumeNotNull(parseDateTime64BestEffortOrNull('2023-01-07 23:59:59', 6, 'UTC'))), toIntervalDay(numbers.number)) AS day_start + FROM numbers(coalesce(dateDiff('day', assumeNotNull(parseDateTime64BestEffortOrNull('2023-01-01 00:00:00', 6, 'UTC')), assumeNotNull(parseDateTime64BestEffortOrNull('2023-01-07 23:59:59', 6, 'UTC'))), 0)) AS numbers + UNION ALL SELECT toStartOfDay(assumeNotNull(parseDateTime64BestEffortOrNull('2023-01-01 00:00:00', 6, 'UTC'))) AS day_start) AS ticks + CROSS JOIN + (SELECT breakdown_value + FROM + (SELECT ['$$_posthog_breakdown_other_$$', 'd', 'c', 'b', 'a'] AS breakdown_value) ARRAY + JOIN breakdown_value AS breakdown_value) AS sec + ORDER BY sec.breakdown_value ASC, day_start ASC + UNION ALL SELECT count(e.id) AS total, + toStartOfDay(toDateTime(toTimeZone(e.created, 'UTC'), 'UTC')) AS day_start, + transform(ifNull(e.prop_1, '$$_posthog_breakdown_null_$$'), ['$$_posthog_breakdown_other_$$', 'd', 'c', 'b', 'a'], ['$$_posthog_breakdown_other_$$', 'd', 'c', 'b', 'a'], '$$_posthog_breakdown_other_$$') AS breakdown_value + FROM s3Cluster('posthog', 'http://host.docker.internal:19000/posthog/test_storage_bucket-posthog.hogql.datawarehouse.trendquery/*.parquet', 'object_storage_root_user', 'object_storage_root_password', 'Parquet', 'id String, prop_1 String, prop_2 String, created DateTime64(3, \'UTC\')') AS e + WHERE and(ifNull(greaterOrEquals(toDateTime(toTimeZone(e.created, 'UTC'), 'UTC'), toStartOfDay(assumeNotNull(parseDateTime64BestEffortOrNull('2023-01-01 00:00:00', 6, 'UTC')))), 0), ifNull(lessOrEquals(toDateTime(toTimeZone(e.created, 'UTC'), 'UTC'), assumeNotNull(parseDateTime64BestEffortOrNull('2023-01-07 23:59:59', 6, 'UTC'))), 0), or(ifNull(equals(transform(ifNull(e.prop_1, '$$_posthog_breakdown_null_$$'), ['$$_posthog_breakdown_other_$$', 'd', 'c', 'b', 'a'], ['$$_posthog_breakdown_other_$$', 'd', 'c', 'b', 'a'], '$$_posthog_breakdown_other_$$'), '$$_posthog_breakdown_other_$$'), 0), equals(e.prop_1, 'd'), equals(e.prop_1, 'c'), equals(e.prop_1, 'b'), equals(e.prop_1, 'a'))) + GROUP BY day_start, + breakdown_value) + GROUP BY day_start, + breakdown_value + ORDER BY day_start ASC, breakdown_value ASC) + GROUP BY breakdown_value + ORDER BY sum(count) DESC, breakdown_value ASC + LIMIT 100 SETTINGS readonly=2, + max_execution_time=60, + allow_experimental_object_type=1 + ''' +# --- +# name: TestDataWarehouseQueryBuilder.test_trends_breakdown_with_property + ''' + SELECT groupArray(value) + FROM + (SELECT e.prop_1 AS value, + count(e.id) AS count + FROM s3Cluster('posthog', 'http://host.docker.internal:19000/posthog/test_storage_bucket-posthog.hogql.datawarehouse.trendquery/*.parquet', 'object_storage_root_user', 'object_storage_root_password', 'Parquet', 'id String, prop_1 String, prop_2 String, created DateTime64(3, \'UTC\')') AS e + WHERE and(ifNull(greaterOrEquals(toDateTime(toTimeZone(e.created, 'UTC'), 'UTC'), toStartOfDay(assumeNotNull(parseDateTime64BestEffortOrNull('2023-01-01 00:00:00', 6, 'UTC')))), 0), ifNull(lessOrEquals(toDateTime(toTimeZone(e.created, 'UTC'), 'UTC'), assumeNotNull(parseDateTime64BestEffortOrNull('2023-01-07 23:59:59', 6, 'UTC'))), 0), equals(e.prop_1, 'a')) + GROUP BY value + ORDER BY count DESC, value DESC + LIMIT 25) + LIMIT 100 SETTINGS readonly=2, + max_execution_time=60, + allow_experimental_object_type=1 + ''' +# --- +# name: TestDataWarehouseQueryBuilder.test_trends_breakdown_with_property.1 + ''' + SELECT groupArray(day_start) AS date, + groupArray(count) AS total, + ifNull(toString(breakdown_value), '') AS breakdown_value + FROM + (SELECT sum(total) AS count, + day_start AS day_start, + breakdown_value AS breakdown_value + FROM + (SELECT 0 AS total, + ticks.day_start AS day_start, + sec.breakdown_value AS breakdown_value + FROM + (SELECT minus(toStartOfDay(assumeNotNull(parseDateTime64BestEffortOrNull('2023-01-07 23:59:59', 6, 'UTC'))), toIntervalDay(numbers.number)) AS day_start + FROM numbers(coalesce(dateDiff('day', assumeNotNull(parseDateTime64BestEffortOrNull('2023-01-01 00:00:00', 6, 'UTC')), assumeNotNull(parseDateTime64BestEffortOrNull('2023-01-07 23:59:59', 6, 'UTC'))), 0)) AS numbers + UNION ALL SELECT toStartOfDay(assumeNotNull(parseDateTime64BestEffortOrNull('2023-01-01 00:00:00', 6, 'UTC'))) AS day_start) AS ticks + CROSS JOIN + (SELECT breakdown_value + FROM + (SELECT ['$$_posthog_breakdown_other_$$', 'a'] AS breakdown_value) ARRAY + JOIN breakdown_value AS breakdown_value) AS sec + ORDER BY sec.breakdown_value ASC, day_start ASC + UNION ALL SELECT count(e.id) AS total, + toStartOfDay(toDateTime(toTimeZone(e.created, 'UTC'), 'UTC')) AS day_start, + transform(ifNull(e.prop_1, '$$_posthog_breakdown_null_$$'), ['$$_posthog_breakdown_other_$$', 'a'], ['$$_posthog_breakdown_other_$$', 'a'], '$$_posthog_breakdown_other_$$') AS breakdown_value + FROM s3Cluster('posthog', 'http://host.docker.internal:19000/posthog/test_storage_bucket-posthog.hogql.datawarehouse.trendquery/*.parquet', 'object_storage_root_user', 'object_storage_root_password', 'Parquet', 'id String, prop_1 String, prop_2 String, created DateTime64(3, \'UTC\')') AS e + WHERE and(ifNull(greaterOrEquals(toDateTime(toTimeZone(e.created, 'UTC'), 'UTC'), toStartOfDay(assumeNotNull(parseDateTime64BestEffortOrNull('2023-01-01 00:00:00', 6, 'UTC')))), 0), ifNull(lessOrEquals(toDateTime(toTimeZone(e.created, 'UTC'), 'UTC'), assumeNotNull(parseDateTime64BestEffortOrNull('2023-01-07 23:59:59', 6, 'UTC'))), 0), equals(e.prop_1, 'a'), or(ifNull(equals(transform(ifNull(e.prop_1, '$$_posthog_breakdown_null_$$'), ['$$_posthog_breakdown_other_$$', 'a'], ['$$_posthog_breakdown_other_$$', 'a'], '$$_posthog_breakdown_other_$$'), '$$_posthog_breakdown_other_$$'), 0), equals(e.prop_1, 'a'))) + GROUP BY day_start, + breakdown_value) + GROUP BY day_start, + breakdown_value + ORDER BY day_start ASC, breakdown_value ASC) + GROUP BY breakdown_value + ORDER BY sum(count) DESC, breakdown_value ASC + LIMIT 100 SETTINGS readonly=2, + max_execution_time=60, + allow_experimental_object_type=1 + ''' +# --- +# name: TestDataWarehouseQueryBuilder.test_trends_data_warehouse + ''' + SELECT groupArray(day_start) AS date, + groupArray(count) AS total + FROM + (SELECT sum(total) AS count, + day_start AS day_start + FROM + (SELECT 0 AS total, + minus(toStartOfDay(assumeNotNull(parseDateTime64BestEffortOrNull('2023-01-07 23:59:59', 6, 'UTC'))), toIntervalDay(numbers.number)) AS day_start + FROM numbers(coalesce(dateDiff('day', assumeNotNull(parseDateTime64BestEffortOrNull('2023-01-01 00:00:00', 6, 'UTC')), assumeNotNull(parseDateTime64BestEffortOrNull('2023-01-07 23:59:59', 6, 'UTC'))), 0)) AS numbers + UNION ALL SELECT 0 AS total, + toStartOfDay(assumeNotNull(parseDateTime64BestEffortOrNull('2023-01-01 00:00:00', 6, 'UTC'))) AS day_start + UNION ALL SELECT count(e.id) AS total, + toStartOfDay(toDateTime(toTimeZone(e.created, 'UTC'), 'UTC')) AS day_start + FROM s3Cluster('posthog', 'http://host.docker.internal:19000/posthog/test_storage_bucket-posthog.hogql.datawarehouse.trendquery/*.parquet', 'object_storage_root_user', 'object_storage_root_password', 'Parquet', 'id String, prop_1 String, prop_2 String, created DateTime64(3, \'UTC\')') AS e + WHERE and(ifNull(greaterOrEquals(toDateTime(toTimeZone(e.created, 'UTC'), 'UTC'), toStartOfDay(assumeNotNull(parseDateTime64BestEffortOrNull('2023-01-01 00:00:00', 6, 'UTC')))), 0), ifNull(lessOrEquals(toDateTime(toTimeZone(e.created, 'UTC'), 'UTC'), assumeNotNull(parseDateTime64BestEffortOrNull('2023-01-07 23:59:59', 6, 'UTC'))), 0)) + GROUP BY day_start) + GROUP BY day_start + ORDER BY day_start ASC) + ORDER BY sum(count) DESC + LIMIT 100 SETTINGS readonly=2, + max_execution_time=60, + allow_experimental_object_type=1 + ''' +# --- +# name: TestDataWarehouseQueryBuilder.test_trends_entity_property + ''' + SELECT groupArray(day_start) AS date, + groupArray(count) AS total + FROM + (SELECT sum(total) AS count, + day_start AS day_start + FROM + (SELECT 0 AS total, + minus(toStartOfDay(assumeNotNull(parseDateTime64BestEffortOrNull('2023-01-07 23:59:59', 6, 'UTC'))), toIntervalDay(numbers.number)) AS day_start + FROM numbers(coalesce(dateDiff('day', assumeNotNull(parseDateTime64BestEffortOrNull('2023-01-01 00:00:00', 6, 'UTC')), assumeNotNull(parseDateTime64BestEffortOrNull('2023-01-07 23:59:59', 6, 'UTC'))), 0)) AS numbers + UNION ALL SELECT 0 AS total, + toStartOfDay(assumeNotNull(parseDateTime64BestEffortOrNull('2023-01-01 00:00:00', 6, 'UTC'))) AS day_start + UNION ALL SELECT count(e.id) AS total, + toStartOfDay(toDateTime(toTimeZone(e.created, 'UTC'), 'UTC')) AS day_start + FROM s3Cluster('posthog', 'http://host.docker.internal:19000/posthog/test_storage_bucket-posthog.hogql.datawarehouse.trendquery/*.parquet', 'object_storage_root_user', 'object_storage_root_password', 'Parquet', 'id String, prop_1 String, prop_2 String, created DateTime64(3, \'UTC\')') AS e + WHERE and(ifNull(greaterOrEquals(toDateTime(toTimeZone(e.created, 'UTC'), 'UTC'), toStartOfDay(assumeNotNull(parseDateTime64BestEffortOrNull('2023-01-01 00:00:00', 6, 'UTC')))), 0), ifNull(lessOrEquals(toDateTime(toTimeZone(e.created, 'UTC'), 'UTC'), assumeNotNull(parseDateTime64BestEffortOrNull('2023-01-07 23:59:59', 6, 'UTC'))), 0), equals(e.prop_1, 'a')) + GROUP BY day_start) + GROUP BY day_start + ORDER BY day_start ASC) + ORDER BY sum(count) DESC + LIMIT 100 SETTINGS readonly=2, + max_execution_time=60, + allow_experimental_object_type=1 + ''' +# --- +# name: TestDataWarehouseQueryBuilder.test_trends_other_property_invalid + ''' + SELECT groupArray(day_start) AS date, + groupArray(count) AS total + FROM + (SELECT sum(total) AS count, + day_start AS day_start + FROM + (SELECT 0 AS total, + minus(toStartOfDay(assumeNotNull(parseDateTime64BestEffortOrNull('2023-01-07 23:59:59', 6, 'UTC'))), toIntervalDay(numbers.number)) AS day_start + FROM numbers(coalesce(dateDiff('day', assumeNotNull(parseDateTime64BestEffortOrNull('2023-01-01 00:00:00', 6, 'UTC')), assumeNotNull(parseDateTime64BestEffortOrNull('2023-01-07 23:59:59', 6, 'UTC'))), 0)) AS numbers + UNION ALL SELECT 0 AS total, + toStartOfDay(assumeNotNull(parseDateTime64BestEffortOrNull('2023-01-01 00:00:00', 6, 'UTC'))) AS day_start + UNION ALL SELECT count(e.id) AS total, + toStartOfDay(toDateTime(toTimeZone(e.created, 'UTC'), 'UTC')) AS day_start + FROM s3Cluster('posthog', 'http://host.docker.internal:19000/posthog/test_storage_bucket-posthog.hogql.datawarehouse.trendquery/*.parquet', 'object_storage_root_user', 'object_storage_root_password', 'Parquet', 'id String, prop_1 String, prop_2 String, created DateTime64(3, \'UTC\')') AS e + WHERE and(ifNull(greaterOrEquals(toDateTime(toTimeZone(e.created, 'UTC'), 'UTC'), toStartOfDay(assumeNotNull(parseDateTime64BestEffortOrNull('2023-01-01 00:00:00', 6, 'UTC')))), 0), ifNull(lessOrEquals(toDateTime(toTimeZone(e.created, 'UTC'), 'UTC'), assumeNotNull(parseDateTime64BestEffortOrNull('2023-01-07 23:59:59', 6, 'UTC'))), 0), true) + GROUP BY day_start) + GROUP BY day_start + ORDER BY day_start ASC) + ORDER BY sum(count) DESC + LIMIT 100 SETTINGS readonly=2, + max_execution_time=60, + allow_experimental_object_type=1 + ''' +# --- +# name: TestDataWarehouseQueryBuilder.test_trends_property + ''' + SELECT groupArray(day_start) AS date, + groupArray(count) AS total + FROM + (SELECT sum(total) AS count, + day_start AS day_start + FROM + (SELECT 0 AS total, + minus(toStartOfDay(assumeNotNull(parseDateTime64BestEffortOrNull('2023-01-07 23:59:59', 6, 'UTC'))), toIntervalDay(numbers.number)) AS day_start + FROM numbers(coalesce(dateDiff('day', assumeNotNull(parseDateTime64BestEffortOrNull('2023-01-01 00:00:00', 6, 'UTC')), assumeNotNull(parseDateTime64BestEffortOrNull('2023-01-07 23:59:59', 6, 'UTC'))), 0)) AS numbers + UNION ALL SELECT 0 AS total, + toStartOfDay(assumeNotNull(parseDateTime64BestEffortOrNull('2023-01-01 00:00:00', 6, 'UTC'))) AS day_start + UNION ALL SELECT count(e.id) AS total, + toStartOfDay(toDateTime(toTimeZone(e.created, 'UTC'), 'UTC')) AS day_start + FROM s3Cluster('posthog', 'http://host.docker.internal:19000/posthog/test_storage_bucket-posthog.hogql.datawarehouse.trendquery/*.parquet', 'object_storage_root_user', 'object_storage_root_password', 'Parquet', 'id String, prop_1 String, prop_2 String, created DateTime64(3, \'UTC\')') AS e + WHERE and(ifNull(greaterOrEquals(toDateTime(toTimeZone(e.created, 'UTC'), 'UTC'), toStartOfDay(assumeNotNull(parseDateTime64BestEffortOrNull('2023-01-01 00:00:00', 6, 'UTC')))), 0), ifNull(lessOrEquals(toDateTime(toTimeZone(e.created, 'UTC'), 'UTC'), assumeNotNull(parseDateTime64BestEffortOrNull('2023-01-07 23:59:59', 6, 'UTC'))), 0), equals(e.prop_1, 'a')) + GROUP BY day_start) + GROUP BY day_start + ORDER BY day_start ASC) + ORDER BY sum(count) DESC + LIMIT 100 SETTINGS readonly=2, + max_execution_time=60, + allow_experimental_object_type=1 + ''' +# --- diff --git a/posthog/hogql_queries/insights/trends/test/test_data_warehouse_query_builder.py b/posthog/hogql_queries/insights/trends/test/test_data_warehouse_query_builder.py new file mode 100644 index 0000000000000..19ecbca78bc28 --- /dev/null +++ b/posthog/hogql_queries/insights/trends/test/test_data_warehouse_query_builder.py @@ -0,0 +1,278 @@ +from datetime import datetime +from freezegun import freeze_time +from posthog.hogql.modifiers import create_default_modifiers_for_team + +from posthog.hogql.query import execute_hogql_query +from posthog.hogql.timings import HogQLTimings +from posthog.hogql_queries.insights.trends.data_warehouse_trends_query_builder import DataWarehouseTrendsQueryBuilder +from posthog.hogql_queries.utils.query_date_range import QueryDateRange +from posthog.schema import ( + BreakdownFilter, + BreakdownType, + ChartDisplayType, + DateRange, + DataWarehouseNode, + TrendsQuery, + TrendsFilter, +) +from posthog.test.base import BaseTest +from posthog.warehouse.models import DataWarehouseTable, DataWarehouseCredential + +from boto3 import resource +from botocore.config import Config +from posthog.settings import ( + OBJECT_STORAGE_ACCESS_KEY_ID, + OBJECT_STORAGE_BUCKET, + OBJECT_STORAGE_ENDPOINT, + OBJECT_STORAGE_SECRET_ACCESS_KEY, +) +import s3fs +from pyarrow import parquet as pq +import pyarrow as pa + +from posthog.test.base import ( + ClickhouseTestMixin, + snapshot_clickhouse_queries, +) +from posthog.hogql_queries.legacy_compatibility.filter_to_query import ( + clean_entity_properties, +) + +TEST_BUCKET = "test_storage_bucket-posthog.hogql.datawarehouse.trendquery" + + +class TestDataWarehouseQueryBuilder(ClickhouseTestMixin, BaseTest): + def teardown_method(self, method) -> None: + s3 = resource( + "s3", + endpoint_url=OBJECT_STORAGE_ENDPOINT, + aws_access_key_id=OBJECT_STORAGE_ACCESS_KEY_ID, + aws_secret_access_key=OBJECT_STORAGE_SECRET_ACCESS_KEY, + config=Config(signature_version="s3v4"), + region_name="us-east-1", + ) + bucket = s3.Bucket(OBJECT_STORAGE_BUCKET) + bucket.objects.filter(Prefix=TEST_BUCKET).delete() + + def get_response(self, trends_query: TrendsQuery): + query_date_range = QueryDateRange( + date_range=trends_query.dateRange, + team=self.team, + interval=trends_query.interval, + now=datetime.now(), + ) + + timings = HogQLTimings() + modifiers = create_default_modifiers_for_team(self.team) + + if isinstance(trends_query.series[0], DataWarehouseNode): + query_builder = DataWarehouseTrendsQueryBuilder( + trends_query=trends_query, + team=self.team, + query_date_range=query_date_range, + series=trends_query.series[0], + timings=timings, + modifiers=modifiers, + ) + else: + raise Exception("Unsupported series type") + + query = query_builder.build_query() + + return execute_hogql_query( + query_type="TrendsQuery", + query=query, + team=self.team, + timings=timings, + ) + + def create_parquet_file(self): + fs = s3fs.S3FileSystem( + client_kwargs={ + "region_name": "us-east-1", + "endpoint_url": OBJECT_STORAGE_ENDPOINT, + "aws_access_key_id": OBJECT_STORAGE_ACCESS_KEY_ID, + "aws_secret_access_key": OBJECT_STORAGE_SECRET_ACCESS_KEY, + }, + ) + + path_to_s3_object = "s3://" + OBJECT_STORAGE_BUCKET + f"/{TEST_BUCKET}" + + id = pa.array(["1", "2", "3", "4"]) + created = pa.array([datetime(2023, 1, 1), datetime(2023, 1, 2), datetime(2023, 1, 3), datetime(2023, 1, 4)]) + prop_1 = pa.array(["a", "b", "c", "d"]) + prop_2 = pa.array(["e", "f", "g", "h"]) + names = ["id", "created", "prop_1", "prop_2"] + + pq.write_to_dataset( + pa.Table.from_arrays([id, created, prop_1, prop_2], names=names), + path_to_s3_object, + filesystem=fs, + use_dictionary=True, + compression="snappy", + version="2.0", + ) + + table_name = "test_table_1" + + credential = DataWarehouseCredential.objects.create( + access_key=OBJECT_STORAGE_ACCESS_KEY_ID, access_secret=OBJECT_STORAGE_SECRET_ACCESS_KEY, team=self.team + ) + + # TODO: use env vars + DataWarehouseTable.objects.create( + name=table_name, + url_pattern=f"http://host.docker.internal:19000/{OBJECT_STORAGE_BUCKET}/{TEST_BUCKET}/*.parquet", + format=DataWarehouseTable.TableFormat.Parquet, + team=self.team, + columns={ + "id": "String", + "created": "DateTime64(3, 'UTC')", + "prop_1": "String", + "prop_2": "String", + }, + credential=credential, + ) + + return table_name + + @snapshot_clickhouse_queries + def test_trends_data_warehouse(self): + table_name = self.create_parquet_file() + + trends_query = TrendsQuery( + kind="TrendsQuery", + dateRange=DateRange(date_from="2023-01-01"), + series=[DataWarehouseNode(id=table_name, table_name=table_name, id_field="id", timestamp_field="created")], + ) + + with freeze_time("2023-01-07"): + response = self.get_response(trends_query=trends_query) + + assert response.columns is not None + assert set(response.columns).issubset({"date", "total"}) + assert response.results[0][1] == [1, 1, 1, 1, 0, 0, 0] + + @snapshot_clickhouse_queries + def test_trends_entity_property(self): + table_name = self.create_parquet_file() + + trends_query = TrendsQuery( + kind="TrendsQuery", + dateRange=DateRange(date_from="2023-01-01"), + series=[ + DataWarehouseNode( + id=table_name, + table_name=table_name, + id_field="id", + timestamp_field="created", + properties=clean_entity_properties([{"key": "prop_1", "value": "a", "type": "data_warehouse"}]), + ) + ], + ) + + with freeze_time("2023-01-07"): + response = self.get_response(trends_query=trends_query) + + assert response.columns is not None + assert set(response.columns).issubset({"date", "total"}) + assert response.results[0][1] == [1, 0, 0, 0, 0, 0, 0] + + @snapshot_clickhouse_queries + def test_trends_property(self): + table_name = self.create_parquet_file() + + trends_query = TrendsQuery( + kind="TrendsQuery", + dateRange=DateRange(date_from="2023-01-01"), + series=[DataWarehouseNode(id=table_name, table_name=table_name, id_field="id", timestamp_field="created")], + properties=clean_entity_properties([{"key": "prop_1", "value": "a", "type": "data_warehouse"}]), + ) + + with freeze_time("2023-01-07"): + response = self.get_response(trends_query=trends_query) + + assert response.columns is not None + assert set(response.columns).issubset({"date", "total"}) + assert response.results[0][1] == [1, 0, 0, 0, 0, 0, 0] + + @snapshot_clickhouse_queries + def test_trends_breakdown(self): + table_name = self.create_parquet_file() + + trends_query = TrendsQuery( + kind="TrendsQuery", + dateRange=DateRange(date_from="2023-01-01"), + series=[DataWarehouseNode(id=table_name, table_name=table_name, id_field="id", timestamp_field="created")], + breakdownFilter=BreakdownFilter(breakdown_type=BreakdownType.data_warehouse, breakdown="prop_1"), + ) + + with freeze_time("2023-01-07"): + response = self.get_response(trends_query=trends_query) + + assert response.columns is not None + assert set(response.columns).issubset({"date", "total", "breakdown_value"}) + assert response.results[0][1] == [1, 0, 0, 0, 0, 0, 0] + assert response.results[0][2] == "a" + + assert response.results[1][1] == [0, 1, 0, 0, 0, 0, 0] + assert response.results[1][2] == "b" + + assert response.results[2][1] == [0, 0, 1, 0, 0, 0, 0] + assert response.results[2][2] == "c" + + assert response.results[3][1] == [0, 0, 0, 1, 0, 0, 0] + assert response.results[3][2] == "d" + + assert response.results[4][1] == [0, 0, 0, 0, 0, 0, 0] + assert response.results[4][2] == "$$_posthog_breakdown_other_$$" + + @snapshot_clickhouse_queries + def test_trends_breakdown_with_property(self): + table_name = self.create_parquet_file() + + trends_query = TrendsQuery( + kind="TrendsQuery", + dateRange=DateRange(date_from="2023-01-01"), + series=[DataWarehouseNode(id=table_name, table_name=table_name, id_field="id", timestamp_field="created")], + properties=clean_entity_properties([{"key": "prop_1", "value": "a", "type": "data_warehouse"}]), + breakdownFilter=BreakdownFilter(breakdown_type=BreakdownType.data_warehouse, breakdown="prop_1"), + ) + + with freeze_time("2023-01-07"): + response = self.get_response(trends_query=trends_query) + + assert response.columns is not None + assert set(response.columns).issubset({"date", "total", "breakdown_value"}) + assert response.results[0][1] == [1, 0, 0, 0, 0, 0, 0] + assert response.results[0][2] == "a" + + assert response.results[1][1] == [0, 0, 0, 0, 0, 0, 0] + assert response.results[1][2] == "$$_posthog_breakdown_other_$$" + + def assert_column_names_with_display_type(self, display_type: ChartDisplayType): + # KLUDGE: creating data on every variant + table_name = self.create_parquet_file() + + trends_query = TrendsQuery( + kind="TrendsQuery", + dateRange=DateRange(date_from="2023-01-01"), + series=[DataWarehouseNode(id=table_name, table_name=table_name, id_field="id", timestamp_field="created")], + trendsFilter=TrendsFilter(display=display_type), + ) + + with freeze_time("2023-01-07"): + response = self.get_response(trends_query) + + assert response.columns is not None + assert set(response.columns).issubset({"date", "total"}) + + def test_column_names_with_display_type(self): + self.assert_column_names_with_display_type(ChartDisplayType.ActionsAreaGraph) + self.assert_column_names_with_display_type(ChartDisplayType.ActionsBar) + self.assert_column_names_with_display_type(ChartDisplayType.ActionsBarValue) + self.assert_column_names_with_display_type(ChartDisplayType.ActionsLineGraph) + self.assert_column_names_with_display_type(ChartDisplayType.ActionsPie) + self.assert_column_names_with_display_type(ChartDisplayType.BoldNumber) + self.assert_column_names_with_display_type(ChartDisplayType.WorldMap) + self.assert_column_names_with_display_type(ChartDisplayType.ActionsLineGraphCumulative) diff --git a/posthog/hogql_queries/insights/trends/test/test_query_builder.py b/posthog/hogql_queries/insights/trends/test/test_query_builder.py index c978c9f2767a5..75000f70276a8 100644 --- a/posthog/hogql_queries/insights/trends/test/test_query_builder.py +++ b/posthog/hogql_queries/insights/trends/test/test_query_builder.py @@ -4,7 +4,7 @@ from posthog.hogql.query import execute_hogql_query from posthog.hogql.timings import HogQLTimings -from posthog.hogql_queries.insights.trends.query_builder import TrendsQueryBuilder +from posthog.hogql_queries.insights.trends.trends_query_builder import TrendsQueryBuilder from posthog.hogql_queries.utils.query_date_range import QueryDateRange from posthog.schema import ( BaseMathType, @@ -12,6 +12,7 @@ BreakdownType, ChartDisplayType, DateRange, + DataWarehouseNode, EventsNode, HogQLQueryResponse, TrendsFilter, @@ -48,6 +49,9 @@ def get_response(self, trends_query: TrendsQuery) -> HogQLQueryResponse: timings = HogQLTimings() modifiers = create_default_modifiers_for_team(self.team) + if isinstance(trends_query.series[0], DataWarehouseNode): + raise Exception("Data Warehouse queries are not supported in this test") + query_builder = TrendsQueryBuilder( trends_query=trends_query, team=self.team, diff --git a/posthog/hogql_queries/insights/trends/test/test_trends.py b/posthog/hogql_queries/insights/trends/test/test_trends.py index 3ace5c53541fe..e8e0f6ea126fe 100644 --- a/posthog/hogql_queries/insights/trends/test/test_trends.py +++ b/posthog/hogql_queries/insights/trends/test/test_trends.py @@ -34,6 +34,7 @@ Organization, Person, ) + from posthog.models.group.util import create_group from posthog.models.instance_setting import ( get_instance_setting, @@ -46,6 +47,7 @@ BreakdownFilter, DateRange, EventsNode, + DataWarehouseNode, PropertyGroupFilter, TrendsFilter, TrendsQuery, @@ -169,7 +171,7 @@ def convert_filter_to_trends_query(filter: Filter) -> TrendsQuery: ) ) - series: List[EventsNode | ActionsNode] = [*events, *actions] + series: List[Union[EventsNode, ActionsNode, DataWarehouseNode]] = [*events, *actions] tq = TrendsQuery( series=series, diff --git a/posthog/hogql_queries/insights/trends/test/test_utils.py b/posthog/hogql_queries/insights/trends/test/test_utils.py index 450cc5e66ab95..e4ea8d833dad9 100644 --- a/posthog/hogql_queries/insights/trends/test/test_utils.py +++ b/posthog/hogql_queries/insights/trends/test/test_utils.py @@ -1,38 +1,39 @@ import pytest from posthog.hogql_queries.insights.trends.utils import get_properties_chain +from posthog.schema import BreakdownType def test_properties_chain_person(): - p1 = get_properties_chain(breakdown_type="person", breakdown_field="field", group_type_index=None) + p1 = get_properties_chain(breakdown_type=BreakdownType.person, breakdown_field="field", group_type_index=None) assert p1 == ["person", "properties", "field"] - p2 = get_properties_chain(breakdown_type="person", breakdown_field="field", group_type_index=1) + p2 = get_properties_chain(breakdown_type=BreakdownType.person, breakdown_field="field", group_type_index=1) assert p2 == ["person", "properties", "field"] def test_properties_chain_session(): - p1 = get_properties_chain(breakdown_type="session", breakdown_field="anything", group_type_index=None) + p1 = get_properties_chain(breakdown_type=BreakdownType.session, breakdown_field="anything", group_type_index=None) assert p1 == ["session", "duration"] - p2 = get_properties_chain(breakdown_type="session", breakdown_field="", group_type_index=None) + p2 = get_properties_chain(breakdown_type=BreakdownType.session, breakdown_field="", group_type_index=None) assert p2 == ["session", "duration"] - p3 = get_properties_chain(breakdown_type="session", breakdown_field="", group_type_index=1) + p3 = get_properties_chain(breakdown_type=BreakdownType.session, breakdown_field="", group_type_index=1) assert p3 == ["session", "duration"] def test_properties_chain_groups(): - p1 = get_properties_chain(breakdown_type="group", breakdown_field="anything", group_type_index=1) + p1 = get_properties_chain(breakdown_type=BreakdownType.group, breakdown_field="anything", group_type_index=1) assert p1 == ["group_1", "properties", "anything"] with pytest.raises(Exception) as e: - get_properties_chain(breakdown_type="group", breakdown_field="anything", group_type_index=None) + get_properties_chain(breakdown_type=BreakdownType.group, breakdown_field="anything", group_type_index=None) assert "group_type_index missing from params" in str(e.value) def test_properties_chain_events(): - p1 = get_properties_chain(breakdown_type="event", breakdown_field="anything", group_type_index=None) + p1 = get_properties_chain(breakdown_type=BreakdownType.event, breakdown_field="anything", group_type_index=None) assert p1 == ["properties", "anything"] - p2 = get_properties_chain(breakdown_type="event", breakdown_field="anything_else", group_type_index=1) + p2 = get_properties_chain(breakdown_type=BreakdownType.event, breakdown_field="anything_else", group_type_index=1) assert p2 == ["properties", "anything_else"] diff --git a/posthog/hogql_queries/insights/trends/query_builder.py b/posthog/hogql_queries/insights/trends/trends_query_builder.py similarity index 99% rename from posthog/hogql_queries/insights/trends/query_builder.py rename to posthog/hogql_queries/insights/trends/trends_query_builder.py index 67e25cfb6ea22..a0d306c23e4cf 100644 --- a/posthog/hogql_queries/insights/trends/query_builder.py +++ b/posthog/hogql_queries/insights/trends/trends_query_builder.py @@ -14,9 +14,10 @@ from posthog.models.filters.mixins.utils import cached_property from posthog.models.team.team import Team from posthog.schema import ActionsNode, EventsNode, HogQLQueryModifiers, TrendsQuery +from posthog.hogql_queries.insights.trends.trends_query_builder_abstract import TrendsQueryBuilderAbstract -class TrendsQueryBuilder: +class TrendsQueryBuilder(TrendsQueryBuilderAbstract): query: TrendsQuery team: Team query_date_range: QueryDateRange diff --git a/posthog/hogql_queries/insights/trends/trends_query_builder_abstract.py b/posthog/hogql_queries/insights/trends/trends_query_builder_abstract.py new file mode 100644 index 0000000000000..d321fc0a77ecf --- /dev/null +++ b/posthog/hogql_queries/insights/trends/trends_query_builder_abstract.py @@ -0,0 +1,52 @@ +import abc +from posthog.hogql import ast +from typing import List, Optional +from posthog.hogql_queries.insights.trends.breakdown import Breakdown + + +class TrendsQueryBuilderAbstract(metaclass=abc.ABCMeta): + @abc.abstractmethod + def build_query(self) -> ast.SelectQuery | ast.SelectUnionQuery: + pass + + # Private functions not really necessary but keeping here for uniformity for now + + @abc.abstractmethod + def _get_date_subqueries(self, breakdown: Breakdown, ignore_breakdowns: bool = False) -> List[ast.SelectQuery]: + pass + + @abc.abstractmethod + def _get_events_subquery( + self, + no_modifications: Optional[bool], + is_actors_query: bool, + breakdown: Breakdown, + breakdown_values_override: Optional[str | int] = None, + actors_query_time_frame: Optional[str | int] = None, + ) -> ast.SelectQuery: + pass + + @abc.abstractmethod + def _outer_select_query(self, breakdown: Breakdown, inner_query: ast.SelectQuery) -> ast.SelectQuery: + pass + + @abc.abstractmethod + def _inner_select_query( + self, breakdown: Breakdown, inner_query: ast.SelectQuery | ast.SelectUnionQuery + ) -> ast.SelectQuery: + pass + + @abc.abstractmethod + def _events_filter( + self, + is_actors_query: bool, + breakdown: Breakdown | None, + ignore_breakdowns: bool = False, + breakdown_values_override: Optional[str | int] = None, + actors_query_time_frame: Optional[str | int] = None, + ) -> ast.Expr: + pass + + @abc.abstractmethod + def _breakdown(self, is_actors_query: bool, breakdown_values_override: Optional[str | int] = None): + pass diff --git a/posthog/hogql_queries/insights/trends/trends_query_runner.py b/posthog/hogql_queries/insights/trends/trends_query_runner.py index c2cb8be2581bb..c386b5e3a7696 100644 --- a/posthog/hogql_queries/insights/trends/trends_query_runner.py +++ b/posthog/hogql_queries/insights/trends/trends_query_runner.py @@ -1,3 +1,4 @@ +from typing import Union from copy import deepcopy from datetime import timedelta from itertools import groupby @@ -28,7 +29,9 @@ BREAKDOWN_OTHER_STRING_LABEL, ) from posthog.hogql_queries.insights.trends.display import TrendsDisplay -from posthog.hogql_queries.insights.trends.query_builder import TrendsQueryBuilder +from posthog.hogql_queries.insights.trends.trends_query_builder_abstract import TrendsQueryBuilderAbstract +from posthog.hogql_queries.insights.trends.trends_query_builder import TrendsQueryBuilder +from posthog.hogql_queries.insights.trends.data_warehouse_trends_query_builder import DataWarehouseTrendsQueryBuilder from posthog.hogql_queries.insights.trends.series_with_extras import SeriesWithExtras from posthog.hogql_queries.query_runner import QueryRunner from posthog.hogql_queries.utils.formula_ast import FormulaAST @@ -49,6 +52,7 @@ CompareItem, DayItem, EventsNode, + DataWarehouseNode, HogQLQueryResponse, InCohortVia, InsightActorsQueryOptionsResponse, @@ -118,14 +122,27 @@ def to_queries(self) -> List[ast.SelectQuery | ast.SelectUnionQuery]: else: query_date_range = self.query_previous_date_range - query_builder = TrendsQueryBuilder( - trends_query=series.overriden_query or self.query, - team=self.team, - query_date_range=query_date_range, - series=series.series, - timings=self.timings, - modifiers=self.modifiers, - ) + query_builder: TrendsQueryBuilderAbstract + + if isinstance(series.series, DataWarehouseNode): + query_builder = DataWarehouseTrendsQueryBuilder( + trends_query=series.overriden_query or self.query, + team=self.team, + query_date_range=query_date_range, + series=series.series, + timings=self.timings, + modifiers=self.modifiers, + ) + else: + query_builder = TrendsQueryBuilder( + trends_query=series.overriden_query or self.query, + team=self.team, + query_date_range=query_date_range, + series=series.series, + timings=self.timings, + modifiers=self.modifiers, + ) + queries.append(query_builder.build_query()) return queries @@ -140,6 +157,10 @@ def to_actors_query( with self.timings.measure("trends_to_actors_query"): series = self.query.series[series_index] + # TODO: Add support for DataWarehouseNode + if isinstance(series, DataWarehouseNode): + raise Exception("DataWarehouseNode is not supported for actors query") + if compare == Compare.previous: query_date_range = self.query_previous_date_range @@ -187,6 +208,10 @@ def to_actors_query_options(self) -> InsightActorsQueryOptionsResponse: # Breakdowns for series in self.query.series: + # TODO: Add support for DataWarehouseNode + if isinstance(series, DataWarehouseNode): + continue + # TODO: Work out if we will have issues only getting breakdown values for # the "current" period and not "previous" period for when "compare" is turned on query_date_range = self.query_date_range @@ -513,13 +538,17 @@ def query_previous_date_range(self): now=datetime.now(), ) - def series_event(self, series: EventsNode | ActionsNode) -> str | None: + def series_event(self, series: Union[EventsNode, ActionsNode, DataWarehouseNode]) -> str | None: if isinstance(series, EventsNode): return series.event if isinstance(series, ActionsNode): # TODO: Can we load the Action in more efficiently? action = Action.objects.get(pk=int(series.id), team=self.team) return action.name + + if isinstance(series, DataWarehouseNode): + return series.table_name + return None def update_hogql_modifiers(self) -> None: diff --git a/posthog/hogql_queries/insights/trends/utils.py b/posthog/hogql_queries/insights/trends/utils.py index cd877757b2e24..eac127eb51bce 100644 --- a/posthog/hogql_queries/insights/trends/utils.py +++ b/posthog/hogql_queries/insights/trends/utils.py @@ -1,15 +1,15 @@ -from typing import List, Literal, Optional, Union -from posthog.schema import ActionsNode, EventsNode +from typing import List, Optional, Union +from posthog.schema import ActionsNode, DataWarehouseNode, EventsNode, BreakdownType -def series_event_name(series: EventsNode | ActionsNode) -> str | None: +def series_event_name(series: Union[EventsNode, ActionsNode, DataWarehouseNode]) -> str | None: if isinstance(series, EventsNode): return series.event return None def get_properties_chain( - breakdown_type: Union[Literal["person"], Literal["session"], Literal["group"], Literal["event"]], + breakdown_type: BreakdownType | None, breakdown_field: str, group_type_index: Optional[float | int], ) -> List[str | int]: @@ -25,4 +25,7 @@ def get_properties_chain( elif breakdown_type == "group" and group_type_index is None: raise Exception("group_type_index missing from params") + if breakdown_type == "data_warehouse": + return [breakdown_field] + return ["properties", breakdown_field] diff --git a/posthog/models/entity/entity.py b/posthog/models/entity/entity.py index aced3a18a8842..91865f9fa50f9 100644 --- a/posthog/models/entity/entity.py +++ b/posthog/models/entity/entity.py @@ -5,7 +5,7 @@ from django.conf import settings from rest_framework.exceptions import ValidationError -from posthog.constants import TREND_FILTER_TYPE_ACTIONS, TREND_FILTER_TYPE_EVENTS +from posthog.constants import TREND_FILTER_TYPE_ACTIONS, TREND_FILTER_TYPE_EVENTS, TREND_FILTER_TYPE_DATA_WAREHOUSE from posthog.models.action import Action from posthog.models.filters.mixins.funnel import FunnelFromToStepsMixin from posthog.models.filters.mixins.property import PropertyMixin @@ -49,7 +49,7 @@ class Entity(PropertyMixin): """ id: Optional[int | str] - type: Literal["events", "actions"] + type: Literal["events", "actions", "data_warehouse"] order: Optional[int] name: Optional[str] custom_name: Optional[str] @@ -63,13 +63,20 @@ class Entity(PropertyMixin): # The clean room way to do this would be passing the index _alongside_ the object, but OOP abuse is much less work index: int + # data warehouse fields + id_field: Optional[str] + timestamp_field: Optional[str] + def __init__(self, data: Dict[str, Any]) -> None: self.id = data.get("id") if data.get("type") not in [ TREND_FILTER_TYPE_ACTIONS, TREND_FILTER_TYPE_EVENTS, + TREND_FILTER_TYPE_DATA_WAREHOUSE, ]: - raise ValueError("Type needs to be either TREND_FILTER_TYPE_ACTIONS or TREND_FILTER_TYPE_EVENTS") + raise ValueError( + "Type needs to be either TREND_FILTER_TYPE_ACTIONS or TREND_FILTER_TYPE_EVENTS OR TREND_FILTER_TYPE_DATA_WAREHOUSE" + ) self.type = data["type"] order_provided = data.get("order") if order_provided is not None: @@ -86,6 +93,8 @@ def __init__(self, data: Dict[str, Any]) -> None: self.math_group_type_index = validate_group_type_index( "math_group_type_index", data.get("math_group_type_index") ) + self.id_field = data.get("id_field") + self.timestamp_field = data.get("timestamp_field") self._action: Optional[Action] = None self._data = data # push data to instance object so mixins are handled properly diff --git a/posthog/models/filters/mixins/common.py b/posthog/models/filters/mixins/common.py index edac192f55547..f0717c3d47f0b 100644 --- a/posthog/models/filters/mixins/common.py +++ b/posthog/models/filters/mixins/common.py @@ -25,6 +25,7 @@ BREAKDOWNS, CLIENT_QUERY_ID, COMPARE, + DATA_WAREHOUSE_ENTITIES, DATE_FROM, DATE_TO, DISPLAY, @@ -44,6 +45,7 @@ SHOWN_AS, SMOOTHING_INTERVALS, TREND_FILTER_TYPE_ACTIONS, + TREND_FILTER_TYPE_DATA_WAREHOUSE, TREND_FILTER_TYPE_EVENTS, TRENDS_WORLD_MAP, BreakdownAttributionType, @@ -467,6 +469,14 @@ def entities(self) -> List[Entity]: if isinstance(events, str): events = json.loads(events) processed_entities.extend([Entity({**entity, "type": TREND_FILTER_TYPE_EVENTS}) for entity in events]) + if self._data.get(DATA_WAREHOUSE_ENTITIES): + data_warehouse_entities = self._data.get(DATA_WAREHOUSE_ENTITIES, []) + if isinstance(data_warehouse_entities, str): + data_warehouse_entities = json.loads(data_warehouse_entities) + processed_entities.extend( + [Entity({**entity, "type": TREND_FILTER_TYPE_DATA_WAREHOUSE}) for entity in data_warehouse_entities] + ) + processed_entities.sort(key=lambda entity: entity.order if entity.order else -1) # Set sequential index values on entities for index, entity in enumerate(processed_entities): @@ -485,6 +495,10 @@ def actions(self) -> List[Entity]: def events(self) -> List[Entity]: return [entity for entity in self.entities if entity.type == TREND_FILTER_TYPE_EVENTS] + @cached_property + def data_warehouse_entities(self) -> List[Entity]: + return [entity for entity in self.entities if entity.type == TREND_FILTER_TYPE_DATA_WAREHOUSE] + @cached_property def exclusions(self) -> List[ExclusionEntity]: _exclusions: List[ExclusionEntity] = [] diff --git a/posthog/models/property/property.py b/posthog/models/property/property.py index dee63194ba41e..d0e0f94439cf5 100644 --- a/posthog/models/property/property.py +++ b/posthog/models/property/property.py @@ -30,6 +30,7 @@ class BehavioralPropertyType(str, Enum): ValueT = Union[str, int, List[str]] PropertyType = Literal[ "event", + "feature", "person", "cohort", "element", @@ -40,6 +41,7 @@ class BehavioralPropertyType(str, Enum): "behavioral", "session", "hogql", + "data_warehouse", ] PropertyName = str @@ -88,6 +90,7 @@ class BehavioralPropertyType(str, Enum): VALIDATE_PROP_TYPES = { "event": ["key", "value"], "person": ["key", "value"], + "data_warehouse": ["key", "value"], "cohort": ["key", "value"], "element": ["key", "value"], "static-cohort": ["key", "value"], diff --git a/posthog/schema.py b/posthog/schema.py index d9774ce049f71..b9f2cf1e034da 100644 --- a/posthog/schema.py +++ b/posthog/schema.py @@ -105,6 +105,7 @@ class BreakdownType(str, Enum): group = "group" session = "session" hogql = "hogql" + data_warehouse = "data_warehouse" class BreakdownValueInt(RootModel[int]): @@ -205,6 +206,7 @@ class EmptyPropertyFilter(BaseModel): class EntityType(str, Enum): actions = "actions" events = "events" + data_warehouse = "data_warehouse" new_entity = "new_entity" @@ -490,6 +492,7 @@ class LifecycleToggle(str, Enum): class NodeKind(str, Enum): EventsNode = "EventsNode" ActionsNode = "ActionsNode" + DataWarehouseNode = "DataWarehouseNode" EventsQuery = "EventsQuery" PersonsNode = "PersonsNode" HogQLQuery = "HogQLQuery" @@ -589,6 +592,7 @@ class PropertyFilterType(str, Enum): recording = "recording" group = "group" hogql = "hogql" + data_warehouse = "data_warehouse" class PropertyMathType(str, Enum): @@ -1069,6 +1073,17 @@ class ChartSettings(BaseModel): yAxis: Optional[List[ChartAxis]] = None +class DataWarehousePropertyFilter(BaseModel): + model_config = ConfigDict( + extra="forbid", + ) + key: str + label: Optional[str] = None + operator: PropertyOperator + type: Literal["data_warehouse"] = "data_warehouse" + value: Optional[Union[str, float, List[Union[str, float]]]] = None + + class ElementPropertyFilter(BaseModel): model_config = ConfigDict( extra="forbid", @@ -1639,11 +1654,69 @@ class DashboardFilter(BaseModel): FeaturePropertyFilter, HogQLPropertyFilter, EmptyPropertyFilter, + DataWarehousePropertyFilter, ] ] ] = None +class DataWarehouseNode(BaseModel): + model_config = ConfigDict( + extra="forbid", + ) + custom_name: Optional[str] = None + fixedProperties: Optional[ + List[ + Union[ + EventPropertyFilter, + PersonPropertyFilter, + ElementPropertyFilter, + SessionPropertyFilter, + CohortPropertyFilter, + RecordingDurationFilter, + GroupPropertyFilter, + FeaturePropertyFilter, + HogQLPropertyFilter, + EmptyPropertyFilter, + DataWarehousePropertyFilter, + ] + ] + ] = Field( + default=None, + description="Fixed properties in the query, can't be edited in the interface (e.g. scoping down by person)", + ) + id: str + id_field: str + kind: Literal["DataWarehouseNode"] = "DataWarehouseNode" + math: Optional[ + Union[BaseMathType, PropertyMathType, CountPerActorMathType, Literal["unique_group"], Literal["hogql"]] + ] = None + math_group_type_index: Optional[MathGroupTypeIndex] = None + math_hogql: Optional[str] = None + math_property: Optional[str] = None + name: Optional[str] = None + properties: Optional[ + List[ + Union[ + EventPropertyFilter, + PersonPropertyFilter, + ElementPropertyFilter, + SessionPropertyFilter, + CohortPropertyFilter, + RecordingDurationFilter, + GroupPropertyFilter, + FeaturePropertyFilter, + HogQLPropertyFilter, + EmptyPropertyFilter, + DataWarehousePropertyFilter, + ] + ] + ] = Field(default=None, description="Properties configurable in the interface") + response: Optional[Dict[str, Any]] = Field(default=None, description="Cached query response") + table_name: str + timestamp_field: str + + class DatabaseSchemaQuery(BaseModel): model_config = ConfigDict( extra="forbid", @@ -1672,6 +1745,7 @@ class EntityNode(BaseModel): FeaturePropertyFilter, HogQLPropertyFilter, EmptyPropertyFilter, + DataWarehousePropertyFilter, ] ] ] = Field( @@ -1699,6 +1773,7 @@ class EntityNode(BaseModel): FeaturePropertyFilter, HogQLPropertyFilter, EmptyPropertyFilter, + DataWarehousePropertyFilter, ] ] ] = Field(default=None, description="Properties configurable in the interface") @@ -1724,6 +1799,7 @@ class EventsNode(BaseModel): FeaturePropertyFilter, HogQLPropertyFilter, EmptyPropertyFilter, + DataWarehousePropertyFilter, ] ] ] = Field( @@ -1753,6 +1829,7 @@ class EventsNode(BaseModel): FeaturePropertyFilter, HogQLPropertyFilter, EmptyPropertyFilter, + DataWarehousePropertyFilter, ] ] ] = Field(default=None, description="Properties configurable in the interface") @@ -1781,6 +1858,7 @@ class EventsQuery(BaseModel): FeaturePropertyFilter, HogQLPropertyFilter, EmptyPropertyFilter, + DataWarehousePropertyFilter, ] ] ] = Field( @@ -1805,6 +1883,7 @@ class EventsQuery(BaseModel): FeaturePropertyFilter, HogQLPropertyFilter, EmptyPropertyFilter, + DataWarehousePropertyFilter, ] ] ] = Field(default=None, description="Properties configurable in the interface") @@ -1831,6 +1910,7 @@ class FunnelExclusionActionsNode(BaseModel): FeaturePropertyFilter, HogQLPropertyFilter, EmptyPropertyFilter, + DataWarehousePropertyFilter, ] ] ] = Field( @@ -1861,6 +1941,7 @@ class FunnelExclusionActionsNode(BaseModel): FeaturePropertyFilter, HogQLPropertyFilter, EmptyPropertyFilter, + DataWarehousePropertyFilter, ] ] ] = Field(default=None, description="Properties configurable in the interface") @@ -1886,6 +1967,7 @@ class FunnelExclusionEventsNode(BaseModel): FeaturePropertyFilter, HogQLPropertyFilter, EmptyPropertyFilter, + DataWarehousePropertyFilter, ] ] ] = Field( @@ -1917,6 +1999,7 @@ class FunnelExclusionEventsNode(BaseModel): FeaturePropertyFilter, HogQLPropertyFilter, EmptyPropertyFilter, + DataWarehousePropertyFilter, ] ] ] = Field(default=None, description="Properties configurable in the interface") @@ -1942,6 +2025,7 @@ class HogQLFilters(BaseModel): FeaturePropertyFilter, HogQLPropertyFilter, EmptyPropertyFilter, + DataWarehousePropertyFilter, ] ] ] = None @@ -1981,6 +2065,7 @@ class PersonsNode(BaseModel): FeaturePropertyFilter, HogQLPropertyFilter, EmptyPropertyFilter, + DataWarehousePropertyFilter, ] ] ] = Field( @@ -2003,6 +2088,7 @@ class PersonsNode(BaseModel): FeaturePropertyFilter, HogQLPropertyFilter, EmptyPropertyFilter, + DataWarehousePropertyFilter, ] ] ] = Field(default=None, description="Properties configurable in the interface") @@ -2029,6 +2115,7 @@ class PropertyGroupFilterValue(BaseModel): FeaturePropertyFilter, HogQLPropertyFilter, EmptyPropertyFilter, + DataWarehousePropertyFilter, ], ] ] @@ -2133,6 +2220,7 @@ class ActionsNode(BaseModel): FeaturePropertyFilter, HogQLPropertyFilter, EmptyPropertyFilter, + DataWarehousePropertyFilter, ] ] ] = Field( @@ -2161,6 +2249,7 @@ class ActionsNode(BaseModel): FeaturePropertyFilter, HogQLPropertyFilter, EmptyPropertyFilter, + DataWarehousePropertyFilter, ] ] ] = Field(default=None, description="Properties configurable in the interface") @@ -2251,6 +2340,7 @@ class RetentionQuery(BaseModel): FeaturePropertyFilter, HogQLPropertyFilter, EmptyPropertyFilter, + DataWarehousePropertyFilter, ] ], PropertyGroupFilter, @@ -2287,13 +2377,16 @@ class StickinessQuery(BaseModel): FeaturePropertyFilter, HogQLPropertyFilter, EmptyPropertyFilter, + DataWarehousePropertyFilter, ] ], PropertyGroupFilter, ] ] = Field(default=None, description="Property filters for all series") samplingFactor: Optional[float] = Field(default=None, description="Sampling rate") - series: List[Union[EventsNode, ActionsNode]] = Field(..., description="Events and actions to include") + series: List[Union[EventsNode, ActionsNode, DataWarehouseNode]] = Field( + ..., description="Events and actions to include" + ) stickinessFilter: Optional[StickinessFilter] = Field( default=None, description="Properties specific to the stickiness insight" ) @@ -2327,6 +2420,7 @@ class TrendsQuery(BaseModel): FeaturePropertyFilter, HogQLPropertyFilter, EmptyPropertyFilter, + DataWarehousePropertyFilter, ] ], PropertyGroupFilter, @@ -2334,7 +2428,9 @@ class TrendsQuery(BaseModel): ] = Field(default=None, description="Property filters for all series") response: Optional[TrendsQueryResponse] = None samplingFactor: Optional[float] = Field(default=None, description="Sampling rate") - series: List[Union[EventsNode, ActionsNode]] = Field(..., description="Events and actions to include") + series: List[Union[EventsNode, ActionsNode, DataWarehouseNode]] = Field( + ..., description="Events and actions to include" + ) trendsFilter: Optional[TrendsFilter] = Field(default=None, description="Properties specific to the trends insight") @@ -2379,6 +2475,7 @@ class FilterType(BaseModel): FeaturePropertyFilter, HogQLPropertyFilter, EmptyPropertyFilter, + DataWarehousePropertyFilter, ] ], PropertyGroupFilter, @@ -2418,13 +2515,16 @@ class FunnelsQuery(BaseModel): FeaturePropertyFilter, HogQLPropertyFilter, EmptyPropertyFilter, + DataWarehousePropertyFilter, ] ], PropertyGroupFilter, ] ] = Field(default=None, description="Property filters for all series") samplingFactor: Optional[float] = Field(default=None, description="Sampling rate") - series: List[Union[EventsNode, ActionsNode]] = Field(..., description="Events and actions to include") + series: List[Union[EventsNode, ActionsNode, DataWarehouseNode]] = Field( + ..., description="Events and actions to include" + ) class InsightsQueryBase(BaseModel): @@ -2451,6 +2551,7 @@ class InsightsQueryBase(BaseModel): FeaturePropertyFilter, HogQLPropertyFilter, EmptyPropertyFilter, + DataWarehousePropertyFilter, ] ], PropertyGroupFilter, @@ -2488,6 +2589,7 @@ class LifecycleQuery(BaseModel): FeaturePropertyFilter, HogQLPropertyFilter, EmptyPropertyFilter, + DataWarehousePropertyFilter, ] ], PropertyGroupFilter, @@ -2495,7 +2597,9 @@ class LifecycleQuery(BaseModel): ] = Field(default=None, description="Property filters for all series") response: Optional[LifecycleQueryResponse] = None samplingFactor: Optional[float] = Field(default=None, description="Sampling rate") - series: List[Union[EventsNode, ActionsNode]] = Field(..., description="Events and actions to include") + series: List[Union[EventsNode, ActionsNode, DataWarehouseNode]] = Field( + ..., description="Events and actions to include" + ) class NamedParametersTypeofDateRangeForFilter(BaseModel): @@ -2530,6 +2634,7 @@ class PathsQuery(BaseModel): FeaturePropertyFilter, HogQLPropertyFilter, EmptyPropertyFilter, + DataWarehousePropertyFilter, ] ], PropertyGroupFilter, @@ -2636,6 +2741,7 @@ class ActorsQuery(BaseModel): FeaturePropertyFilter, HogQLPropertyFilter, EmptyPropertyFilter, + DataWarehousePropertyFilter, ] ] ] = None @@ -2656,6 +2762,7 @@ class ActorsQuery(BaseModel): FeaturePropertyFilter, HogQLPropertyFilter, EmptyPropertyFilter, + DataWarehousePropertyFilter, ] ] ] = None @@ -2777,6 +2884,7 @@ class QueryRequest(BaseModel): EventsNode, ActionsNode, PersonsNode, + DataWarehouseNode, TimeToSeeDataSessionsQuery, EventsQuery, ActorsQuery, @@ -2814,6 +2922,7 @@ class QuerySchemaRoot( EventsNode, ActionsNode, PersonsNode, + DataWarehouseNode, TimeToSeeDataSessionsQuery, EventsQuery, ActorsQuery, @@ -2844,6 +2953,7 @@ class QuerySchemaRoot( EventsNode, ActionsNode, PersonsNode, + DataWarehouseNode, TimeToSeeDataSessionsQuery, EventsQuery, ActorsQuery, diff --git a/posthog/types.py b/posthog/types.py index f1db8b9565253..c3fa98e0f3ea1 100644 --- a/posthog/types.py +++ b/posthog/types.py @@ -7,10 +7,12 @@ from posthog.schema import ( ActionsNode, CohortPropertyFilter, + DataWarehouseNode, ElementPropertyFilter, EmptyPropertyFilter, EventPropertyFilter, EventsNode, + DataWarehousePropertyFilter, FeaturePropertyFilter, FunnelExclusionActionsNode, FunnelExclusionEventsNode, @@ -54,7 +56,8 @@ FeaturePropertyFilter, HogQLPropertyFilter, EmptyPropertyFilter, + DataWarehousePropertyFilter, ] -EntityNode: TypeAlias = Union[EventsNode, ActionsNode] +EntityNode: TypeAlias = Union[EventsNode, ActionsNode, DataWarehouseNode] ExclusionEntityNode: TypeAlias = Union[FunnelExclusionEventsNode, FunnelExclusionActionsNode]