Skip to content

Commit

Permalink
feat(queries): Add possibility to run queries async (#18571)
Browse files Browse the repository at this point in the history
  • Loading branch information
webjunkie authored Nov 20, 2023
1 parent 58ffae9 commit ff4bfee
Show file tree
Hide file tree
Showing 26 changed files with 686 additions and 511 deletions.
129 changes: 0 additions & 129 deletions ee/clickhouse/test/test_client.py

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# name: ClickhouseTestExperimentSecondaryResults.test_basic_secondary_metric_results
'
/* user_id:126 celery:posthog.celery.sync_insight_caching_state */
/* user_id:132 celery:posthog.celery.sync_insight_caching_state */
SELECT team_id,
date_diff('second', max(timestamp), now()) AS age
FROM events
Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
17 changes: 14 additions & 3 deletions frontend/src/lib/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ import { EVENT_PROPERTY_DEFINITIONS_PER_PAGE } from 'scenes/data-management/prop
import { ActivityLogItem, ActivityScope } from 'lib/components/ActivityLog/humanizeActivity'
import { ActivityLogProps } from 'lib/components/ActivityLog/ActivityLog'
import { SavedSessionRecordingPlaylistsResult } from 'scenes/session-recordings/saved-playlists/savedSessionRecordingPlaylistsLogic'
import { QuerySchema } from '~/queries/schema'
import { QuerySchema, QueryStatus } from '~/queries/schema'
import { decompressSync, strFromU8 } from 'fflate'
import { getCurrentExporterData } from '~/exporter/exporterViewLogic'
import { encodeParams } from 'kea-router'
Expand Down Expand Up @@ -542,6 +542,10 @@ class ApiRequest {
return this.projectsDetail(teamId).addPathComponent('query')
}

public queryStatus(queryId: string, teamId?: TeamType['id']): ApiRequest {
return this.query(teamId).addPathComponent(queryId)
}

// Notebooks
public notebooks(teamId?: TeamType['id']): ApiRequest {
return this.projectsDetail(teamId).addPathComponent('notebooks')
Expand Down Expand Up @@ -1722,6 +1726,12 @@ const api = {
},
},

queryStatus: {
async get(queryId: string): Promise<QueryStatus> {
return await new ApiRequest().queryStatus(queryId).get()
},
},

queryURL: (): string => {
return new ApiRequest().query().assembleFullUrl(true)
},
Expand All @@ -1730,7 +1740,8 @@ const api = {
query: T,
options?: ApiMethodOptions,
queryId?: string,
refresh?: boolean
refresh?: boolean,
async?: boolean
): Promise<
T extends { [response: string]: any }
? T['response'] extends infer P | undefined
Expand All @@ -1740,7 +1751,7 @@ const api = {
> {
return await new ApiRequest()
.query()
.create({ ...options, data: { query, client_query_id: queryId, refresh: refresh } })
.create({ ...options, data: { query, client_query_id: queryId, refresh: refresh, async } })
},

/** Fetch data from specified URL. The result already is JSON-parsed. */
Expand Down
1 change: 1 addition & 0 deletions frontend/src/lib/constants.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ export const FEATURE_FLAGS = {
ROLE_BASED_ACCESS: 'role-based-access', // owner: #team-experiments, @liyiy
QUERY_RUNNING_TIME: 'query_running_time', // owner: @mariusandra
QUERY_TIMINGS: 'query-timings', // owner: @mariusandra
QUERY_ASYNC: 'query-async', // owner: @webjunkie
POSTHOG_3000: 'posthog-3000', // owner: @Twixes
POSTHOG_3000_NAV: 'posthog-3000-nav', // owner: @Twixes
ENABLE_PROMPTS: 'enable-prompts', // owner: @lharries
Expand Down
2 changes: 1 addition & 1 deletion frontend/src/queries/nodes/DataNode/dataNodeLogic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ export const dataNodeLogic = kea<dataNodeLogicType>([
abortQuery: async ({ queryId }) => {
try {
const { currentTeamId } = values
await api.create(`api/projects/${currentTeamId}/insights/cancel`, { client_query_id: queryId })
await api.delete(`api/projects/${currentTeamId}/query/${queryId}/`)
} catch (e) {
console.warn('Failed cancelling query', e)
}
Expand Down
44 changes: 42 additions & 2 deletions frontend/src/queries/query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,16 @@ import {
isStickinessFilter,
isTrendsFilter,
} from 'scenes/insights/sharedUtils'
import { flattenObject, toParams } from 'lib/utils'
import { flattenObject, delay, toParams } from 'lib/utils'
import { queryNodeToFilter } from './nodes/InsightQuery/utils/queryNodeToFilter'
import { now } from 'lib/dayjs'
import { currentSessionId } from 'lib/internalMetrics'
import { featureFlagLogic } from 'lib/logic/featureFlagLogic'
import { FEATURE_FLAGS } from 'lib/constants'

const QUERY_ASYNC_MAX_INTERVAL_SECONDS = 10
const QUERY_ASYNC_TOTAL_POLL_SECONDS = 300

//get export context for a given query
export function queryExportContext<N extends DataNode = DataNode>(
query: N,
Expand Down Expand Up @@ -91,6 +94,43 @@ export function queryExportContext<N extends DataNode = DataNode>(
throw new Error(`Unsupported query: ${query.kind}`)
}

async function executeQuery<N extends DataNode = DataNode>(
queryNode: N,
methodOptions?: ApiMethodOptions,
refresh?: boolean,
queryId?: string
): Promise<NonNullable<N['response']>> {
const queryAsyncEnabled = Boolean(featureFlagLogic.findMounted()?.values.featureFlags?.[FEATURE_FLAGS.QUERY_ASYNC])
const excludedKinds = ['HogQLMetadata']
const queryAsync = queryAsyncEnabled && !excludedKinds.includes(queryNode.kind)
const response = await api.query(queryNode, methodOptions, queryId, refresh, queryAsync)

if (!queryAsync || !response.query_async) {
return response
}

const pollStart = performance.now()
let currentDelay = 300 // start low, because all queries will take at minimum this

while (performance.now() - pollStart < QUERY_ASYNC_TOTAL_POLL_SECONDS * 1000) {
await delay(currentDelay)
currentDelay = Math.min(currentDelay * 2, QUERY_ASYNC_MAX_INTERVAL_SECONDS * 1000)

if (methodOptions?.signal?.aborted) {
const customAbortError = new Error('Query aborted')
customAbortError.name = 'AbortError'
throw customAbortError
}

const statusResponse = await api.queryStatus.get(response.id)

if (statusResponse.complete || statusResponse.error) {
return statusResponse.results
}
}
throw new Error('Query timed out')
}

// Return data for a given query
export async function query<N extends DataNode = DataNode>(
queryNode: N,
Expand Down Expand Up @@ -216,7 +256,7 @@ export async function query<N extends DataNode = DataNode>(
response = await fetchLegacyInsights()
}
} else {
response = await api.query(queryNode, methodOptions, queryId, refresh)
response = await executeQuery(queryNode, methodOptions, refresh, queryId)
if (isHogQLQuery(queryNode) && response && typeof response === 'object') {
logParams.clickhouse_sql = (response as HogQLQueryResponse)?.clickhouse
}
Expand Down
45 changes: 45 additions & 0 deletions frontend/src/queries/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -2413,6 +2413,51 @@
}
]
},
"QueryStatus": {
"additionalProperties": false,
"properties": {
"complete": {
"default": false,
"type": "boolean"
},
"end_time": {
"format": "date-time",
"type": "string"
},
"error": {
"default": false,
"type": "boolean"
},
"error_message": {
"default": "",
"type": "string"
},
"expiration_time": {
"format": "date-time",
"type": "string"
},
"id": {
"type": "string"
},
"query_async": {
"default": true,
"type": "boolean"
},
"results": {},
"start_time": {
"format": "date-time",
"type": "string"
},
"task_id": {
"type": "string"
},
"team_id": {
"type": "integer"
}
},
"required": ["id", "query_async", "team_id", "error", "complete", "error_message"],
"type": "object"
},
"QueryTiming": {
"additionalProperties": false,
"properties": {
Expand Down
22 changes: 22 additions & 0 deletions frontend/src/queries/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,28 @@ export interface QueryResponse {
next_allowed_client_refresh?: string
}

export type QueryStatus = {
id: string
/** @default true */
query_async: boolean
/** @asType integer */
team_id: number
/** @default false */
error: boolean
/** @default false */
complete: boolean
/** @default "" */
error_message: string
results?: any
/** @format date-time */
start_time?: string
/** @format date-time */
end_time?: string
/** @format date-time */
expiration_time?: string
task_id?: string
}

export interface LifecycleQueryResponse extends QueryResponse {
results: Record<string, any>[]
}
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
"build:esbuild": "node frontend/build.mjs",
"schema:build": "pnpm run schema:build:json && pnpm run schema:build:python",
"schema:build:json": "ts-json-schema-generator -f tsconfig.json --path 'frontend/src/queries/schema.ts' --no-type-check > frontend/src/queries/schema.json && prettier --write frontend/src/queries/schema.json",
"schema:build:python": "datamodel-codegen --class-name='SchemaRoot' --collapse-root-models --disable-timestamp --use-one-literal-as-default --use-default-kwarg --use-subclass-enum --input frontend/src/queries/schema.json --input-file-type jsonschema --output posthog/schema.py --output-model-type pydantic_v2.BaseModel && ruff format posthog/schema.py",
"schema:build:python": "datamodel-codegen --class-name='SchemaRoot' --collapse-root-models --disable-timestamp --use-one-literal-as-default --use-default --use-default-kwarg --use-subclass-enum --input frontend/src/queries/schema.json --input-file-type jsonschema --output posthog/schema.py --output-model-type pydantic_v2.BaseModel && ruff format posthog/schema.py",
"grammar:build": "npm run grammar:build:python && npm run grammar:build:cpp",
"grammar:build:python": "cd posthog/hogql/grammar && antlr -Dlanguage=Python3 HogQLLexer.g4 && antlr -visitor -no-listener -Dlanguage=Python3 HogQLParser.g4",
"grammar:build:cpp": "cd posthog/hogql/grammar && antlr -o ../../../hogql_parser -Dlanguage=Cpp HogQLLexer.g4 && antlr -o ../../../hogql_parser -visitor -no-listener -Dlanguage=Cpp HogQLParser.g4",
Expand Down
Loading

0 comments on commit ff4bfee

Please sign in to comment.