diff --git a/frontend/src/lib/api.ts b/frontend/src/lib/api.ts index bded3aa4f319d..85909b193e4b7 100644 --- a/frontend/src/lib/api.ts +++ b/frontend/src/lib/api.ts @@ -5,7 +5,6 @@ import { ActivityLogItem } from 'lib/components/ActivityLog/humanizeActivity' import { apiStatusLogic } from 'lib/logic/apiStatusLogic' import { objectClean, toParams } from 'lib/utils' import posthog from 'posthog-js' -import { LogEntry } from 'scenes/pipeline/pipelineNodeLogsLogic' import { SavedSessionRecordingPlaylistsResult } from 'scenes/session-recordings/saved-playlists/savedSessionRecordingPlaylistsLogic' import { getCurrentExporterData } from '~/exporter/exporterViewLogic' @@ -50,6 +49,7 @@ import { InsightModel, IntegrationType, ListOrganizationMembersParams, + LogEntry, MediaUploadResponse, NewEarlyAccessFeatureType, NotebookListItemType, @@ -1685,6 +1685,17 @@ const api = { async listIcons(params: { query?: string } = {}): Promise { return await new ApiRequest().hogFunctions().withAction('icons').withQueryString(params).get() }, + + async createTestInvocation( + id: HogFunctionType['id'], + data: { + configuration: Partial + mock_async_functions: boolean + event: any + } + ): Promise { + return await new ApiRequest().hogFunction(id).withAction('invocations').create({ data }) + }, }, annotations: { diff --git a/frontend/src/lib/components/CodeEditors.tsx b/frontend/src/lib/components/CodeEditors.tsx index 26f74b3f7153a..d6d195d025cc8 100644 --- a/frontend/src/lib/components/CodeEditors.tsx +++ b/frontend/src/lib/components/CodeEditors.tsx @@ -74,7 +74,7 @@ export function CodeEditor({ options, onMount, ...editorProps }: CodeEditorProps } export function CodeEditorResizeable({ - height: defaultHeight = 200, + height: defaultHeight, minHeight = '5rem', maxHeight = '90vh', ...props @@ -84,7 +84,7 @@ export function CodeEditorResizeable({ maxHeight?: string | number }): JSX.Element { const [height, setHeight] = useState(defaultHeight) - const [manualHeight, setManualHeight] = useState() + const [manualHeight, setManualHeight] = useState(defaultHeight) const ref = useRef(null) diff --git a/frontend/src/scenes/pipeline/hogfunctions/HogFunctionTest.tsx b/frontend/src/scenes/pipeline/hogfunctions/HogFunctionTest.tsx new file mode 100644 index 0000000000000..2a681ce4ae47c --- /dev/null +++ b/frontend/src/scenes/pipeline/hogfunctions/HogFunctionTest.tsx @@ -0,0 +1,188 @@ +import { TZLabel } from '@posthog/apps-common' +import { IconInfo, IconX } from '@posthog/icons' +import { LemonButton, LemonLabel, LemonSwitch, LemonTable, LemonTag, Tooltip } from '@posthog/lemon-ui' +import clsx from 'clsx' +import { useActions, useValues } from 'kea' +import { Form } from 'kea-forms' +import { CodeEditorResizeable } from 'lib/components/CodeEditors' +import { LemonField } from 'lib/lemon-ui/LemonField' + +import { hogFunctionTestLogic, HogFunctionTestLogicProps } from './hogFunctionTestLogic' + +const HogFunctionTestEditor = ({ + value, + onChange, +}: { + value: string + onChange?: (value?: string) => void +}): JSX.Element => { + return ( + + ) +} + +export function HogFunctionTestPlaceholder(): JSX.Element { + return ( +
+

Testing

+

Save your configuration to enable testing

+
+ ) +} + +export function HogFunctionTest(props: HogFunctionTestLogicProps): JSX.Element { + const { isTestInvocationSubmitting, testResult, expanded } = useValues(hogFunctionTestLogic(props)) + const { submitTestInvocation, setTestResult, toggleExpanded } = useActions(hogFunctionTestLogic(props)) + + return ( +
+
+
+ {!expanded ? ( + toggleExpanded()}> +

Testing

+
+ ) : ( +

Testing

+ )} + + {expanded && ( + <> + {testResult ? ( + setTestResult(null)} + loading={isTestInvocationSubmitting} + > + Clear test result + + ) : ( + <> + + {({ value, onChange }) => ( + + When selected, async functions such as `fetch` will not + actually be called but instead will be mocked out with + the fetch content logged instead + + } + > + + Mock out async functions + + + + } + /> + )} + + + Test function + + + )} + + } onClick={() => toggleExpanded()} tooltip="Hide testing" /> + + )} +
+ + {expanded && ( + <> + {testResult ? ( +
+
+ Test invocation result + + {testResult.status} + +
+ + , + width: 0, + }, + { + width: 100, + title: 'Level', + key: 'level', + dataIndex: 'level', + }, + { + title: 'Message', + key: 'message', + dataIndex: 'message', + render: (message) => {message}, + }, + ]} + className="ph-no-capture" + rowKey="timestamp" + pagination={{ pageSize: 200, hideOnSinglePage: true }} + /> +
+ ) : ( +
+ + {({ value, onChange }) => ( + <> +
+

+ The globals object is the context in which your function will be + tested. It should contain all the data that your function will need + to run +

+
+ + + + )} +
+
+ )} + + )} +
+
+ ) +} diff --git a/frontend/src/scenes/pipeline/hogfunctions/PipelineHogFunctionConfiguration.tsx b/frontend/src/scenes/pipeline/hogfunctions/PipelineHogFunctionConfiguration.tsx index 68e50c49db023..8faa305bda9f7 100644 --- a/frontend/src/scenes/pipeline/hogfunctions/PipelineHogFunctionConfiguration.tsx +++ b/frontend/src/scenes/pipeline/hogfunctions/PipelineHogFunctionConfiguration.tsx @@ -26,6 +26,7 @@ import { EntityTypes } from '~/types' import { HogFunctionIconEditable } from './HogFunctionIcon' import { HogFunctionInputWithSchema } from './HogFunctionInputs' +import { HogFunctionTest, HogFunctionTestPlaceholder } from './HogFunctionTest' import { pipelineHogFunctionConfigurationLogic } from './pipelineHogFunctionConfigurationLogic' export function PipelineHogFunctionConfiguration({ @@ -343,6 +344,8 @@ export function PipelineHogFunctionConfiguration({ )} + + {id ? : }
{saveButtons}
diff --git a/frontend/src/scenes/pipeline/hogfunctions/hogFunctionTestLogic.tsx b/frontend/src/scenes/pipeline/hogfunctions/hogFunctionTestLogic.tsx new file mode 100644 index 0000000000000..412eb59c3ddb9 --- /dev/null +++ b/frontend/src/scenes/pipeline/hogfunctions/hogFunctionTestLogic.tsx @@ -0,0 +1,95 @@ +import { lemonToast } from '@posthog/lemon-ui' +import { actions, afterMount, connect, kea, key, path, props, reducers } from 'kea' +import { forms } from 'kea-forms' +import api from 'lib/api' +import { tryJsonParse } from 'lib/utils' + +import { LogEntry } from '~/types' + +import type { hogFunctionTestLogicType } from './hogFunctionTestLogicType' +import { pipelineHogFunctionConfigurationLogic, sanitizeConfiguration } from './pipelineHogFunctionConfigurationLogic' +import { createExampleEvent } from './utils/event-conversion' + +export interface HogFunctionTestLogicProps { + id: string +} + +export type HogFunctionTestInvocationForm = { + globals: string // HogFunctionInvocationGlobals + mock_async_functions: boolean +} + +export type HogFunctionTestInvocationResult = { + status: 'success' | 'error' + logs: LogEntry[] +} + +export const hogFunctionTestLogic = kea([ + props({} as HogFunctionTestLogicProps), + key((props) => props.id), + path((id) => ['scenes', 'pipeline', 'hogfunctions', 'hogFunctionTestLogic', id]), + connect((props: HogFunctionTestLogicProps) => ({ + values: [pipelineHogFunctionConfigurationLogic({ id: props.id }), ['configuration', 'configurationHasErrors']], + actions: [pipelineHogFunctionConfigurationLogic({ id: props.id }), ['touchConfigurationField']], + })), + actions({ + setTestResult: (result: HogFunctionTestInvocationResult | null) => ({ result }), + toggleExpanded: (expanded?: boolean) => ({ expanded }), + }), + reducers({ + expanded: [ + false as boolean, + { + toggleExpanded: (_, { expanded }) => (expanded === undefined ? !_ : expanded), + }, + ], + + testResult: [ + null as HogFunctionTestInvocationResult | null, + { + setTestResult: (_, { result }) => result, + }, + ], + }), + forms(({ props, actions, values }) => ({ + testInvocation: { + defaults: { + mock_async_functions: true, + } as HogFunctionTestInvocationForm, + alwaysShowErrors: true, + errors: ({ globals }) => { + return { + globals: !globals ? 'Required' : tryJsonParse(globals) ? undefined : 'Invalid JSON', + } + }, + submit: async (data) => { + // Submit the test invocation + // Set the response somewhere + + if (values.configurationHasErrors) { + lemonToast.error('Please fix the configuration errors before testing.') + // TODO: How to get the form to show errors without submitting? + return + } + + const event = tryJsonParse(data.globals) + const configuration = sanitizeConfiguration(values.configuration) + + try { + const res = await api.hogFunctions.createTestInvocation(props.id, { + event, + mock_async_functions: data.mock_async_functions, + configuration, + }) + + actions.setTestResult(res) + } catch (e) { + lemonToast.error(`An unexpected serror occurred while trying to testing the function. ${e}`) + } + }, + }, + })), + afterMount(({ actions }) => { + actions.setTestInvocationValue('globals', JSON.stringify(createExampleEvent(), null, 2)) + }), +]) diff --git a/frontend/src/scenes/pipeline/hogfunctions/pipelineHogFunctionConfigurationLogic.tsx b/frontend/src/scenes/pipeline/hogfunctions/pipelineHogFunctionConfigurationLogic.tsx index 5937c80dc4eb7..d0ebcbe5e6ad1 100644 --- a/frontend/src/scenes/pipeline/hogfunctions/pipelineHogFunctionConfigurationLogic.tsx +++ b/frontend/src/scenes/pipeline/hogfunctions/pipelineHogFunctionConfigurationLogic.tsx @@ -9,6 +9,7 @@ import { urls } from 'scenes/urls' import { FilterType, + HogFunctionConfigurationType, HogFunctionTemplateType, HogFunctionType, PipelineNodeTab, @@ -24,8 +25,6 @@ export interface PipelineHogFunctionConfigurationLogicProps { id?: string } -export type HogFunctionConfigurationType = Omit - const NEW_FUNCTION_TEMPLATE: HogFunctionTemplateType = { id: 'new', name: '', @@ -68,7 +67,37 @@ function sanitizeFilters(filters?: FilterType): PluginConfigTypeNew['filters'] { return Object.keys(sanitized).length > 0 ? sanitized : undefined } -// Should likely be somewhat similar to pipelineBatchExportConfigurationLogic +export function sanitizeConfiguration(data: HogFunctionConfigurationType): HogFunctionConfigurationType { + const sanitizedInputs = {} + + data.inputs_schema?.forEach((input) => { + const value = data.inputs?.[input.key]?.value + + if (input.type === 'json' && typeof value === 'string') { + try { + sanitizedInputs[input.key] = { + value: JSON.parse(value), + } + } catch (e) { + // Ignore + } + } else { + sanitizedInputs[input.key] = { + value: value, + } + } + }) + + const payload: HogFunctionConfigurationType = { + ...data, + filters: data.filters ? sanitizeFilters(data.filters) : null, + inputs: sanitizedInputs, + icon_url: data.icon_url?.replace('&temp=true', ''), // Remove temp=true so it doesn't try and suggest new options next time + } + + return payload +} + export const pipelineHogFunctionConfigurationLogic = kea([ props({} as PipelineHogFunctionConfigurationLogicProps), key(({ id, templateId }: PipelineHogFunctionConfigurationLogicProps) => { @@ -140,42 +169,20 @@ export const pipelineHogFunctionConfigurationLogic = kea { try { - const sanitizedInputs = {} - - data.inputs_schema?.forEach((input) => { - const value = data.inputs?.[input.key]?.value - - if (input.type === 'json' && typeof value === 'string') { - try { - sanitizedInputs[input.key] = { - value: JSON.parse(value), - } - } catch (e) { - // Ignore - } - } else { - sanitizedInputs[input.key] = { - value: value, - } - } - }) - - const payload: HogFunctionConfigurationType = { - ...data, - filters: data.filters ? sanitizeFilters(data.filters) : null, - inputs: sanitizedInputs, - icon_url: data.icon_url?.replace('&temp=true', ''), // Remove temp=true so it doesn't try and suggest new options next time - } + const payload = sanitizeConfiguration(data) if (props.templateId) { // Only sent on create ;(payload as any).template_id = props.templateId } - if (!props.id) { - return await api.hogFunctions.create(payload) - } - return await api.hogFunctions.update(props.id, payload) + const res = props.id + ? await api.hogFunctions.update(props.id, payload) + : await api.hogFunctions.create(payload) + + lemonToast.success('Configuration saved') + + return res } catch (e) { const maybeValidationError = (e as any).data if (maybeValidationError?.type === 'validation_error') { @@ -214,15 +221,17 @@ export const pipelineHogFunctionConfigurationLogic = kea { - if (input.required && !inputs[input.key]) { - inputErrors[input.key] = 'This field is required' + const key = input.key + const value = inputs[key]?.value + if (input.required && !value) { + inputErrors[key] = 'This field is required' } - if (input.type === 'json' && typeof inputs[input.key] === 'string') { + if (input.type === 'json' && typeof value === 'string') { try { - JSON.parse(inputs[input.key].value) + JSON.parse(value) } catch (e) { - inputErrors[input.key] = 'Invalid JSON' + inputErrors[key] = 'Invalid JSON' } } }) @@ -313,6 +322,10 @@ export const pipelineHogFunctionConfigurationLogic = kea { + // Clear the manually set errors otherwise the submission won't work + actions.setConfigurationManualErrors({}) + }, })), afterMount(({ props, actions, cache }) => { if (props.templateId) { diff --git a/frontend/src/scenes/pipeline/hogfunctions/utils/event-conversion.ts b/frontend/src/scenes/pipeline/hogfunctions/utils/event-conversion.ts new file mode 100644 index 0000000000000..b806ac03c2b4a --- /dev/null +++ b/frontend/src/scenes/pipeline/hogfunctions/utils/event-conversion.ts @@ -0,0 +1,24 @@ +import { dayjs } from 'lib/dayjs' +import { uuid } from 'lib/utils' + +// NOTE: This is just for testing - it technically returns ParsedClickhouseEvent but not worth it to import that type +export const createExampleEvent = (): any => ({ + uuid: uuid(), + event: '$pageview', + distinct_id: '12345', + properties: { + $browser: 'Chrome', + $device_type: 'Desktop', + $current_url: `${window.location.origin}/project/1/activity/explore`, + $pathname: '/project/1/activity/explore', + $browser_version: 125, + }, + timestamp: dayjs().toISOString(), + created_at: dayjs().toISOString(), + url: `${window.location.origin}/project/1/activity/explore`, + person_id: uuid(), + person_created_at: dayjs().toISOString(), + person_properties: { + email: 'user@example.com', + }, +}) diff --git a/frontend/src/scenes/pipeline/pipelineNodeLogsLogic.tsx b/frontend/src/scenes/pipeline/pipelineNodeLogsLogic.tsx index 0e5c3466e83f9..fdf15ae7b39aa 100644 --- a/frontend/src/scenes/pipeline/pipelineNodeLogsLogic.tsx +++ b/frontend/src/scenes/pipeline/pipelineNodeLogsLogic.tsx @@ -7,14 +7,14 @@ import { dayjs } from 'lib/dayjs' import { pipelineNodeLogic, PipelineNodeLogicProps } from 'scenes/pipeline/pipelineNodeLogic' import api from '~/lib/api' -import { BatchExportLogEntry, PluginLogEntry } from '~/types' +import { BatchExportLogEntry, LogEntry, PluginLogEntry } from '~/types' import { teamLogic } from '../teamLogic' import type { pipelineNodeLogsLogicType } from './pipelineNodeLogsLogicType' import { PipelineBackend } from './types' import { LogLevelDisplay, logLevelsToTypeFilters, LogTypeDisplay } from './utils' -export type LogEntry = BatchExportLogEntry | PluginLogEntry +export type PipelineNodeLogEntry = BatchExportLogEntry | PluginLogEntry | LogEntry export enum PipelineLogLevel { Debug = 'DEBUG', @@ -42,10 +42,10 @@ export const pipelineNodeLogsLogic = kea([ }), loaders(({ props: { id }, values, actions, cache }) => ({ logs: [ - [] as LogEntry[], + [] as PipelineNodeLogEntry[], { loadLogs: async () => { - let results: LogEntry[] + let results: PipelineNodeLogEntry[] if (values.node.backend === PipelineBackend.BatchExport) { results = await api.batchExportLogs.search( values.node.id, @@ -76,7 +76,7 @@ export const pipelineNodeLogsLogic = kea([ return results }, loadMoreLogs: async () => { - let results: LogEntry[] + let results: PipelineNodeLogEntry[] if (values.node.backend === PipelineBackend.BatchExport) { results = await api.batchExportLogs.search( id as string, @@ -116,7 +116,7 @@ export const pipelineNodeLogsLogic = kea([ }, ], backgroundLogs: [ - [] as LogEntry[], + [] as PipelineNodeLogEntry[], { pollBackgroundLogs: async () => { // we fetch new logs in the background and allow the user to expand @@ -125,7 +125,7 @@ export const pipelineNodeLogsLogic = kea([ return values.backgroundLogs } - let results: LogEntry[] + let results: PipelineNodeLogEntry[] if (values.node.backend === PipelineBackend.BatchExport) { results = await api.batchExportLogs.search( id as string, @@ -167,7 +167,7 @@ export const pipelineNodeLogsLogic = kea([ }, ], backgroundLogs: [ - [] as LogEntry[], + [] as PipelineNodeLogEntry[], { clearBackgroundLogs: () => [], }, @@ -195,7 +195,7 @@ export const pipelineNodeLogsLogic = kea([ selectors(({ actions }) => ({ leadingEntry: [ (s) => [s.logs, s.backgroundLogs], - (logs: LogEntry[], backgroundLogs: LogEntry[]): LogEntry | null => { + (logs: PipelineNodeLogEntry[], backgroundLogs: PipelineNodeLogEntry[]): PipelineNodeLogEntry | null => { if (backgroundLogs.length) { return backgroundLogs[0] } @@ -207,7 +207,7 @@ export const pipelineNodeLogsLogic = kea([ ], trailingEntry: [ (s) => [s.logs, s.backgroundLogs], - (logs: LogEntry[], backgroundLogs: LogEntry[]): LogEntry | null => { + (logs: PipelineNodeLogEntry[], backgroundLogs: PipelineNodeLogEntry[]): PipelineNodeLogEntry | null => { if (logs.length) { return logs[logs.length - 1] } @@ -219,13 +219,14 @@ export const pipelineNodeLogsLogic = kea([ ], columns: [ (s) => [s.node], - (node): LemonTableColumns => { + (node): LemonTableColumns => { return [ { title: 'Timestamp', key: 'timestamp', dataIndex: 'timestamp', - sorter: (a: LogEntry, b: LogEntry) => dayjs(a.timestamp).unix() - dayjs(b.timestamp).unix(), + sorter: (a: PipelineNodeLogEntry, b: PipelineNodeLogEntry) => + dayjs(a.timestamp).unix() - dayjs(b.timestamp).unix(), render: (timestamp: string) => , width: 0, }, @@ -295,7 +296,7 @@ export const pipelineNodeLogsLogic = kea([ dataIndex: 'message', render: (message: string) => {message}, }, - ] as LemonTableColumns + ] as LemonTableColumns }, ], })), diff --git a/frontend/src/types.ts b/frontend/src/types.ts index e83e07503cf72..0d0888297d166 100644 --- a/frontend/src/types.ts +++ b/frontend/src/types.ts @@ -1942,6 +1942,15 @@ export interface PluginErrorType { event?: Record } +// The general log entry format that eventually everything should match +export type LogEntry = { + log_source_id: string + instance_id: string + timestamp: string + level: 'DEBUG' | 'INFO' | 'WARN' | 'ERROR' + message: string +} + export enum PluginLogEntryType { Debug = 'DEBUG', Log = 'LOG', @@ -4207,6 +4216,8 @@ export type HogFunctionType = { template?: HogFunctionTemplateType } +export type HogFunctionConfigurationType = Omit + export type HogFunctionTemplateType = Pick< HogFunctionType, 'id' | 'name' | 'description' | 'hog' | 'inputs_schema' | 'filters' | 'icon_url' diff --git a/mypy-baseline.txt b/mypy-baseline.txt index 274d7c4303d3a..8d95d21224471 100644 --- a/mypy-baseline.txt +++ b/mypy-baseline.txt @@ -63,7 +63,6 @@ posthog/redis.py:0: error: Import cycle from Django settings module prevents typ posthog/plugins/utils.py:0: error: Subclass of "str" and "bytes" cannot exist: would have incompatible method signatures [unreachable] posthog/plugins/utils.py:0: error: Statement is unreachable [unreachable] posthog/clickhouse/kafka_engine.py:0: error: Import cycle from Django settings module prevents type inference for 'KAFKA_HOSTS_FOR_CLICKHOUSE' [misc] -posthog/plugins/reload.py:0: error: Import cycle from Django settings module prevents type inference for 'PLUGINS_RELOAD_REDIS_URL' [misc] posthog/models/project.py:0: error: Incompatible type for "project" of "Team" (got "_T", expected "Project | Combinable") [misc] posthog/models/project.py:0: error: "_T" has no attribute "organization" [attr-defined] posthog/models/project.py:0: error: Incompatible return value type (got "tuple[_T, Team]", expected "tuple[Project, Team]") [return-value] diff --git a/plugin-server/package.json b/plugin-server/package.json index 611ebe0c5be78..612aeb27f94da 100644 --- a/plugin-server/package.json +++ b/plugin-server/package.json @@ -50,7 +50,7 @@ "@google-cloud/storage": "^5.8.5", "@maxmind/geoip2-node": "^3.4.0", "@posthog/clickhouse": "^1.7.0", - "@posthog/hogvm": "^1.0.12", + "@posthog/hogvm": "^1.0.14", "@posthog/plugin-scaffold": "1.4.4", "@sentry/node": "^7.49.0", "@sentry/profiling-node": "^0.3.0", @@ -131,6 +131,7 @@ "parse-prometheus-text-format": "^1.1.1", "pino-pretty": "^9.1.0", "prettier": "^2.8.8", + "supertest": "^7.0.0", "ts-node": "^10.9.1", "typescript": "^4.7.4" }, diff --git a/plugin-server/pnpm-lock.yaml b/plugin-server/pnpm-lock.yaml index 9a7d7d36ee6a8..ba81f39c0ec9b 100644 --- a/plugin-server/pnpm-lock.yaml +++ b/plugin-server/pnpm-lock.yaml @@ -44,8 +44,8 @@ dependencies: specifier: ^1.7.0 version: 1.7.0 '@posthog/hogvm': - specifier: ^1.0.12 - version: 1.0.12 + specifier: ^1.0.14 + version: 1.0.14 '@posthog/plugin-scaffold': specifier: 1.4.4 version: 1.4.4 @@ -282,6 +282,9 @@ devDependencies: prettier: specifier: ^2.8.8 version: 2.8.8 + supertest: + specifier: ^7.0.0 + version: 7.0.0 ts-node: specifier: ^10.9.1 version: 10.9.1(@swc/core@1.3.55)(@types/node@16.18.25)(typescript@4.9.5) @@ -3107,8 +3110,8 @@ packages: engines: {node: '>=12'} dev: false - /@posthog/hogvm@1.0.12: - resolution: {integrity: sha512-S8kO3X3BAfLp3SzluRmmST6aII+G2kYjGXC7373XPHIghGpFNlNq1gpllYvDjjGM2yVQbOBLzi5UvlzK0nG6rw==} + /@posthog/hogvm@1.0.14: + resolution: {integrity: sha512-mIdVcKGnJUqgfwnn/urNLZwkZtWMLIjsEmqtGUOX8Kw++log4QuBIvMf1eYY1yeVI4hC9oldr1GJttltwRAv5g==} dev: false /@posthog/plugin-scaffold@1.4.4: @@ -4232,6 +4235,10 @@ packages: engines: {node: '>=8'} dev: false + /asap@2.0.6: + resolution: {integrity: sha512-BSHWgDSAiKs50o2Re8ppvp3seVHXSRM44cdSsT9FfNEUUZLOGWVCsiWaRPWM1Znn+mqZ1OfVZ3z3DWEzSp7hRA==} + dev: true + /asn1.js@5.4.1: resolution: {integrity: sha512-+I//4cYPccV8LdmBLiX8CYvf9Sp3vQsrqu2QNXRcrbiWvcx/UdlFiqUJJzxRQxgsZmvhXhn4cSKeSmoFjVdupA==} dependencies: @@ -4970,6 +4977,10 @@ packages: engines: {node: '>= 6'} dev: true + /component-emitter@1.3.1: + resolution: {integrity: sha512-T0+barUSQRTUQASh8bx02dl+DhF54GtIDY13Y3m9oWTklKbb3Wv974meRpeZ3lp1JpLVECWWNHC4vaG2XHXouQ==} + dev: true + /compressible@2.0.18: resolution: {integrity: sha512-AF3r7P5dWxL8MxyITRMlORQNaOA2IkAFaTr4k7BUumjPtRpGDTZpl0Pb1XCO6JeDCBdp126Cgs9sMxqSjgYyRg==} engines: {node: '>= 0.6'} @@ -5057,6 +5068,10 @@ packages: engines: {node: '>= 0.6'} dev: false + /cookiejar@2.1.4: + resolution: {integrity: sha512-LDx6oHrK+PhzLKJU9j5S7/Y3jM/mUHvD/DeI1WQmJn652iPC5Y4TBzC9l+5OMOXlyTTA+SmVUPm0HQUwpD5Jqw==} + dev: true + /core-js-compat@3.30.1: resolution: {integrity: sha512-d690npR7MC6P0gq4npTl5n2VQeNAmUrJ90n+MHiKS7W2+xno4o3F5GDEuylSdi6EJ3VssibSGXOa1r3YXD3Mhw==} dependencies: @@ -5410,6 +5425,13 @@ packages: minimist: 1.2.8 dev: true + /dezalgo@1.0.4: + resolution: {integrity: sha512-rXSP0bf+5n0Qonsb+SVVfNfIsimO4HEtmnIpPHY8Q1UCzKlQrDMfdobr8nJOOsRgWCyMRqeSBQzmWUMq7zvVig==} + dependencies: + asap: 2.0.6 + wrappy: 1.0.2 + dev: true + /diff-sequences@28.1.1: resolution: {integrity: sha512-FU0iFaH/E23a+a718l8Qa/19bF9p06kgE0KipMOMadwa3SjnaElKzPaUC0vnibs6/B/9ni97s61mcejk8W1fQw==} engines: {node: ^12.13.0 || ^14.15.0 || ^16.10.0 || >=17.0.0} @@ -6196,6 +6218,23 @@ packages: mime-types: 2.1.35 dev: true + /form-data@4.0.0: + resolution: {integrity: sha512-ETEklSGi5t0QMZuiXoA/Q6vcnxcLQP5vdugSpuAyi6SVGi2clPPp+xgEhuMaHC+zGgn31Kd235W35f7Hykkaww==} + engines: {node: '>= 6'} + dependencies: + asynckit: 0.4.0 + combined-stream: 1.0.8 + mime-types: 2.1.35 + dev: true + + /formidable@3.5.1: + resolution: {integrity: sha512-WJWKelbRHN41m5dumb0/k8TeAx7Id/y3a+Z7QfhxP/htI9Js5zYaEDtG8uMgG0vM0lOlqnmjE99/kfpOYi/0Og==} + dependencies: + dezalgo: 1.0.4 + hexoid: 1.0.0 + once: 1.4.0 + dev: true + /forwarded@0.2.0: resolution: {integrity: sha512-buRG0fpBtRHSTCOASe6hD258tEubFoRLb4ZNA6NxMVHNw2gOcwHo9wyablzMzOA5z9xA9L1KNjk/Nt6MT9aYow==} engines: {node: '>= 0.6'} @@ -6667,6 +6706,11 @@ packages: readable-stream: 3.6.2 dev: true + /hexoid@1.0.0: + resolution: {integrity: sha512-QFLV0taWQOZtvIRIAdBChesmogZrtuXvVWsFHZTk2SU+anspqZ2vMnoLg7IE1+Uk16N19APic1BuF8bC8c2m5g==} + engines: {node: '>=8'} + dev: true + /hmac-drbg@1.0.1: resolution: {integrity: sha512-Tti3gMqLdZfhOQY1Mzf/AanLiqh1WTiJgEj26ZuYQ9fbkLomzGchCws4FyrSd4VkpBfiNhaE1On+lOz894jvXg==} dependencies: @@ -8054,7 +8098,6 @@ packages: /methods@1.1.2: resolution: {integrity: sha512-iclAHeNqNm68zFtnZ0e+1L2yUIdvzNoauKU4WBA3VvH/vPFieF7qfRlwUZU+DA9P9bPXIS90ulxoUoCH23sV2w==} engines: {node: '>= 0.6'} - dev: false /micromatch@4.0.5: resolution: {integrity: sha512-DMy+ERcEW2q8Z2Po+WNXuw3c5YaUSFjAO5GsJqfEl7UjvtIuFKO6ZrKvcItdy98dwFI2N1tg3zNIdKaQT+aNdA==} @@ -8088,6 +8131,12 @@ packages: hasBin: true dev: false + /mime@2.6.0: + resolution: {integrity: sha512-USPkMeET31rOMiarsBNIHZKLGgvKc/LrjofAnBlOttf5ajRvqiRA8QsenbcooctK6d6Ts6aqZXBA+XbkKthiQg==} + engines: {node: '>=4.0.0'} + hasBin: true + dev: true + /mime@3.0.0: resolution: {integrity: sha512-jSCU7/VB1loIWBZe14aEYHU/+1UMEHoaO7qxCOVJOw9GgH72VAWppxNcjU+x9a2k3GSIBXNKxXQFqRvvZ7vr3A==} engines: {node: '>=10.0.0'} @@ -9078,7 +9127,6 @@ packages: engines: {node: '>=0.6'} dependencies: side-channel: 1.0.4 - dev: false /querystring-es3@0.2.1: resolution: {integrity: sha512-773xhDQnZBMFobEiztv8LIl70ch5MSF/jUQVlhwFyBILqq96anmoctVIYz+ZRp0qbCKATTn6ev02M3r7Ga5vqA==} @@ -9853,6 +9901,33 @@ packages: minimist: 1.2.8 dev: true + /superagent@9.0.2: + resolution: {integrity: sha512-xuW7dzkUpcJq7QnhOsnNUgtYp3xRwpt2F7abdRYIpCsAt0hhUqia0EdxyXZQQpNmGtsCzYHryaKSV3q3GJnq7w==} + engines: {node: '>=14.18.0'} + dependencies: + component-emitter: 1.3.1 + cookiejar: 2.1.4 + debug: 4.3.4 + fast-safe-stringify: 2.1.1 + form-data: 4.0.0 + formidable: 3.5.1 + methods: 1.1.2 + mime: 2.6.0 + qs: 6.11.0 + transitivePeerDependencies: + - supports-color + dev: true + + /supertest@7.0.0: + resolution: {integrity: sha512-qlsr7fIC0lSddmA3tzojvzubYxvlGtzumcdHgPwbFWMISQwL22MhM2Y3LNt+6w9Yyx7559VW5ab70dgphm8qQA==} + engines: {node: '>=14.18.0'} + dependencies: + methods: 1.1.2 + superagent: 9.0.2 + transitivePeerDependencies: + - supports-color + dev: true + /supports-color@2.0.0: resolution: {integrity: sha512-KKNVtd6pCYgPIKU4cp2733HWYCpplQhddZLBUryaAHou723x+FRzQ5Df824Fj+IyyuiQTRoub4SnIFfIcrp70g==} engines: {node: '>=0.8.0'} diff --git a/plugin-server/src/cdp/async-function-executor.ts b/plugin-server/src/cdp/async-function-executor.ts index 89e6c6c299ba9..41c5e75299894 100644 --- a/plugin-server/src/cdp/async-function-executor.ts +++ b/plugin-server/src/cdp/async-function-executor.ts @@ -1,48 +1,59 @@ import { Webhook } from '@posthog/plugin-scaffold' -import { KAFKA_CDP_FUNCTION_CALLBACKS } from '../config/kafka-topics' import { PluginsServerConfig } from '../types' import { trackedFetch } from '../utils/fetch' import { status } from '../utils/status' import { RustyHook } from '../worker/rusty-hook' -import { - HogFunctionInvocationAsyncRequest, - HogFunctionInvocationAsyncResponse, - HogFunctionMessageToQueue, -} from './types' +import { HogFunctionInvocationAsyncResponse, HogFunctionInvocationResult } from './types' + +export type AsyncFunctionExecutorOptions = { + sync?: boolean +} export class AsyncFunctionExecutor { constructor(private serverConfig: PluginsServerConfig, private rustyHook: RustyHook) {} - async execute(request: HogFunctionInvocationAsyncRequest): Promise { + async execute( + request: HogFunctionInvocationResult, + options: AsyncFunctionExecutorOptions = { sync: false } + ): Promise { + if (!request.asyncFunctionRequest) { + throw new Error('No async function request provided') + } + const loggingContext = { hogFunctionId: request.hogFunctionId, invocationId: request.id, - asyncFunctionName: request.asyncFunctionName, + asyncFunctionName: request.asyncFunctionRequest.name, } status.info('🦔', `[AsyncFunctionExecutor] Executing async function`, loggingContext) - switch (request.asyncFunctionName) { + switch (request.asyncFunctionRequest.name) { case 'fetch': - return await this.asyncFunctionFetch(request) + return await this.asyncFunctionFetch(request, options) default: - status.error('🦔', `[HogExecutor] Unknown async function: ${request.asyncFunctionName}`, loggingContext) + status.error( + '🦔', + `[HogExecutor] Unknown async function: ${request.asyncFunctionRequest.name}`, + loggingContext + ) } } private async asyncFunctionFetch( - request: HogFunctionInvocationAsyncRequest - ): Promise { + request: HogFunctionInvocationResult, + options?: AsyncFunctionExecutorOptions + ): Promise { // TODO: validate the args - const args = request.asyncFunctionArgs ?? [] + const args = request.asyncFunctionRequest!.args ?? [] const url: string = args[0] - const options = args[1] + const fetchOptions = args[1] - const method = options.method || 'POST' - const headers = options.headers || { + const method = fetchOptions.method || 'POST' + const headers = fetchOptions.headers || { 'Content-Type': 'application/json', } - const body = options.body || {} + const body = fetchOptions.body || {} const webhook: Webhook = { url, @@ -51,25 +62,28 @@ export class AsyncFunctionExecutor { body: typeof body === 'string' ? body : JSON.stringify(body, undefined, 4), } - // NOTE: Purposefully disabled for now - once we have callback support we can re-enable - // const SPECIAL_CONFIG_ID = -3 // Hardcoded to mean Hog - // const success = await this.rustyHook.enqueueIfEnabledForTeam({ - // webhook: webhook, - // teamId: hogFunction.team_id, - // pluginId: SPECIAL_CONFIG_ID, - // pluginConfigId: SPECIAL_CONFIG_ID, - // }) - const success = false - // TODO: Temporary test code + if (!options?.sync === false) { + // NOTE: Purposefully disabled for now - once we have callback support we can re-enable + // const SPECIAL_CONFIG_ID = -3 // Hardcoded to mean Hog + // const success = await this.rustyHook.enqueueIfEnabledForTeam({ + // webhook: webhook, + // teamId: hogFunction.team_id, + // pluginId: SPECIAL_CONFIG_ID, + // pluginConfigId: SPECIAL_CONFIG_ID, + // }) + } + if (!success) { status.info('🦔', `[HogExecutor] Webhook not sent via rustyhook, sending directly instead`) - const response: HogFunctionInvocationAsyncResponse = { - ...request, + + const asyncFunctionResponse: HogFunctionInvocationAsyncResponse['asyncFunctionResponse'] = { + timings: [], } try { + const start = performance.now() const fetchResponse = await trackedFetch(url, { method: webhook.method, body: webhook.body, @@ -81,23 +95,31 @@ export class AsyncFunctionExecutor { try { body = JSON.parse(body) } catch (err) { - body + // Ignore } - response.vmResponse = { + const duration = performance.now() - start + + asyncFunctionResponse.timings.push({ + kind: 'async_function', + duration_ms: duration, + }) + + asyncFunctionResponse.vmResponse = { status: fetchResponse.status, body: body, } } catch (err) { status.error('🦔', `[HogExecutor] Error during fetch`, { ...request, error: String(err) }) - response.error = 'Something went wrong with the fetch request.' + asyncFunctionResponse.error = 'Something went wrong with the fetch request.' } - return { - topic: KAFKA_CDP_FUNCTION_CALLBACKS, - value: response, - key: response.id, + const response: HogFunctionInvocationAsyncResponse = { + ...request, + asyncFunctionResponse, } + + return response } } } diff --git a/plugin-server/src/cdp/cdp-processed-events-consumer.ts b/plugin-server/src/cdp/cdp-consumers.ts similarity index 71% rename from plugin-server/src/cdp/cdp-processed-events-consumer.ts rename to plugin-server/src/cdp/cdp-consumers.ts index 04bef4f999772..2a8891807d794 100644 --- a/plugin-server/src/cdp/cdp-processed-events-consumer.ts +++ b/plugin-server/src/cdp/cdp-consumers.ts @@ -1,3 +1,5 @@ +import { convertJSToHog } from '@posthog/hogvm' +import express from 'express' import { features, librdkafkaVersion, Message } from 'node-rdkafka' import { Histogram } from 'prom-client' @@ -7,25 +9,28 @@ import { createRdConnectionConfigFromEnvVars, createRdProducerConfigFromEnvVars import { createKafkaProducer } from '../kafka/producer' import { addSentryBreadcrumbsEventListeners } from '../main/ingestion-queues/kafka-metrics' import { runInstrumentedFunction } from '../main/utils' -import { GroupTypeToColumnIndex, Hub, PluginsServerConfig, RawClickHouseEvent, TeamId } from '../types' +import { GroupTypeToColumnIndex, Hub, PluginsServerConfig, RawClickHouseEvent, TeamId, TimestampFormat } from '../types' import { KafkaProducerWrapper } from '../utils/db/kafka-producer-wrapper' import { PostgresRouter } from '../utils/db/postgres' import { status } from '../utils/status' +import { castTimestampOrNow } from '../utils/utils' import { AppMetrics } from '../worker/ingestion/app-metrics' import { GroupTypeManager } from '../worker/ingestion/group-type-manager' import { OrganizationManager } from '../worker/ingestion/organization-manager' import { TeamManager } from '../worker/ingestion/team-manager' import { RustyHook } from '../worker/rusty-hook' import { AsyncFunctionExecutor } from './async-function-executor' -import { HogExecutor } from './hog-executor' +import { addLog, HogExecutor } from './hog-executor' import { HogFunctionManager } from './hog-function-manager' import { + HogFunctionInvocation, HogFunctionInvocationAsyncResponse, HogFunctionInvocationGlobals, HogFunctionInvocationResult, HogFunctionMessageToQueue, + HogFunctionType, } from './types' -import { convertToHogFunctionInvocationGlobals } from './utils' +import { convertToHogFunctionInvocationGlobals, convertToParsedClickhouseEvent } from './utils' // Must require as `tsc` strips unused `import` statements and just requiring this seems to init some globals require('@sentry/tracing') @@ -89,19 +94,32 @@ abstract class CdpConsumerBase { await Promise.all( results.map(async (result) => { - result.logs.forEach((x) => { + // Tricky: We want to pull all the logs out as we don't want them to be passed around to any subsequent functions + const logs = result.logs + result.logs = [] + + logs.forEach((x) => { + const sanitized = { + ...x, + timestamp: castTimestampOrNow(x.timestamp, TimestampFormat.ClickHouse), + } + // Convert timestamps to ISO strings messagesToProduce.push({ topic: KAFKA_LOG_ENTRIES, - value: x, + value: sanitized, key: x.instance_id, }) }) - if (result.asyncFunction) { - const res = await this.asyncFunctionExecutor!.execute(result.asyncFunction) + if (result.asyncFunctionRequest) { + const res = await this.asyncFunctionExecutor!.execute(result) if (res) { - messagesToProduce.push(res) + messagesToProduce.push({ + topic: KAFKA_CDP_FUNCTION_CALLBACKS, + value: res, + key: res.id, + }) } } }) @@ -282,7 +300,7 @@ export class CdpProcessedEventsConsumer extends CdpConsumerBase { } events.push( convertToHogFunctionInvocationGlobals( - clickHouseEvent, + convertToParsedClickhouseEvent(clickHouseEvent), team, this.config.SITE_URL ?? 'http://localhost:8000', groupTypes @@ -355,4 +373,101 @@ export class CdpFunctionCallbackConsumer extends CdpConsumerBase { return events } + + public addApiRoutes(app: express.Application) { + app.post('/api/projects/:team_id/hog_functions/:id/invocations', async (req, res): Promise => { + try { + const { id, team_id } = req.params + const { event, mock_async_functions, configuration } = req.body + + status.info('⚡️', 'Received invocation', { id, team_id, body: req.body }) + + if (!event) { + res.status(400).json({ error: 'Missing event' }) + return + } + + const [hogFunction, team] = await Promise.all([ + this.hogFunctionManager.fetchHogFunction(req.params.id), + this.teamManager.fetchTeam(parseInt(team_id)), + ]).catch(() => { + return [null, null] + }) + if (!hogFunction || !team || hogFunction.team_id !== team.id) { + res.status(404).json({ error: 'Hog function not found' }) + return + } + + let groupTypes: GroupTypeToColumnIndex | undefined = undefined + + if (await this.organizationManager.hasAvailableFeature(team.id, 'group_analytics')) { + // If the organization has group analytics enabled then we enrich the event with group data + groupTypes = await this.groupTypeManager.fetchGroupTypes(team.id) + } + + const globals = convertToHogFunctionInvocationGlobals( + event, + team, + this.config.SITE_URL ?? 'http://localhost:8000', + groupTypes + ) + + globals.source = { + name: hogFunction.name ?? `Hog function: ${hogFunction.id}`, + url: `${globals.project.url}/pipeline/destinations/hog-${hogFunction.id}/configuration/`, + } + + const invocation: HogFunctionInvocation = { + id, + globals: globals, + teamId: team.id, + hogFunctionId: id, + logs: [], + timings: [], + } + + // We use the provided config if given, otherwise the function's config + const functionConfiguration: HogFunctionType = configuration ?? hogFunction + + let response = this.hogExecutor.execute(functionConfiguration, invocation) + + while (response.asyncFunctionRequest) { + const asyncFunctionRequest = response.asyncFunctionRequest + + if (mock_async_functions || asyncFunctionRequest.name !== 'fetch') { + addLog(response, 'info', `Async function '${asyncFunctionRequest.name}' was mocked`) + + // Add the state, simulating what executeAsyncResponse would do + asyncFunctionRequest.vmState.stack.push(convertJSToHog({ status: 200, body: {} })) + } else { + const asyncRes = await this.asyncFunctionExecutor!.execute(response, { + sync: true, + }) + + if (!asyncRes || asyncRes.asyncFunctionResponse.error) { + addLog(response, 'error', 'Failed to execute async function') + } + asyncFunctionRequest.vmState.stack.push( + convertJSToHog(asyncRes?.asyncFunctionResponse.vmResponse ?? null) + ) + response.timings.push(...(asyncRes?.asyncFunctionResponse.timings ?? [])) + } + + // Clear it so we can't ever end up in a loop + delete response.asyncFunctionRequest + + response = this.hogExecutor.execute(functionConfiguration, response, asyncFunctionRequest.vmState) + } + + res.json({ + status: response.finished ? 'success' : 'error', + error: String(response.error), + logs: response.logs, + }) + } catch (e) { + console.error(e) + res.status(500).json({ error: e.message }) + } + }) + } } diff --git a/plugin-server/src/cdp/hog-executor.ts b/plugin-server/src/cdp/hog-executor.ts index 9de9051ee3a18..f86ed7aba528b 100644 --- a/plugin-server/src/cdp/hog-executor.ts +++ b/plugin-server/src/cdp/hog-executor.ts @@ -1,9 +1,9 @@ -import { convertHogToJS, convertJSToHog, exec, VMState } from '@posthog/hogvm' +import { convertHogToJS, convertJSToHog, exec, ExecResult, VMState } from '@posthog/hogvm' import { DateTime } from 'luxon' -import { PluginsServerConfig, TimestampFormat } from '../types' +import { PluginsServerConfig } from '../types' import { status } from '../utils/status' -import { castTimestampOrNow, UUIDT } from '../utils/utils' +import { UUIDT } from '../utils/utils' import { HogFunctionManager } from './hog-function-manager' import { HogFunctionInvocation, @@ -45,6 +45,27 @@ export const formatInput = (bytecode: any, globals: HogFunctionInvocation['globa } } +export const addLog = (result: HogFunctionInvocationResult, level: HogFunctionLogEntryLevel, message: string) => { + const lastLog = result.logs[result.logs.length - 1] + // TRICKY: The log entries table is de-duped by timestamp, so we need to ensure that the timestamps are unique + // It is unclear how this affects parallel execution environments + let now = DateTime.now() + if (lastLog && now <= lastLog.timestamp) { + // Ensure that the timestamps are unique + now = lastLog.timestamp.plus(1) + } + + result.logs.push({ + team_id: result.teamId, + log_source: 'hog_function', + log_source_id: result.hogFunctionId, + instance_id: result.id, + timestamp: now, + level, + message, + }) +} + export class HogExecutor { constructor(private serverConfig: PluginsServerConfig, private hogFunctionManager: HogFunctionManager) {} @@ -118,6 +139,10 @@ export class HogExecutor { const result = this.execute(hogFunction, { id: new UUIDT().toString(), globals: modifiedGlobals, + teamId: hogFunction.team_id, + hogFunctionId: hogFunction.id, + logs: [], + timings: [], }) results.push(result) @@ -144,28 +169,34 @@ export class HogExecutor { const baseInvocation: HogFunctionInvocation = { id: invocation.id, globals: invocation.globals, + teamId: invocation.teamId, + hogFunctionId: invocation.hogFunctionId, + timings: invocation.asyncFunctionResponse.timings, + // Logs we always reset as we don't want to carry over logs between calls + logs: [], } const errorRes = (error = 'Something went wrong'): HogFunctionInvocationResult => ({ ...baseInvocation, - hogFunctionId: invocation.hogFunctionId, - teamId: invocation.teamId, - success: false, + finished: false, error, - // TODO: Probably useful to save a log as well? - logs: [], }) if (!hogFunction) { return errorRes(`Hog Function with ID ${invocation.hogFunctionId} not found`) } - if (!invocation.vmState || invocation.error) { + const { vmState } = invocation.asyncFunctionRequest ?? {} + const { asyncFunctionResponse } = invocation + + if (!vmState || !asyncFunctionResponse.vmResponse || asyncFunctionResponse.error) { return errorRes(invocation.error ?? 'No VM state provided for async response') } - invocation.vmState.stack.push(convertJSToHog(invocation.vmResponse ?? null)) - return this.execute(hogFunction, baseInvocation, invocation.vmState) + // Add the response to the stack to continue execution + vmState.stack.push(convertJSToHog(asyncFunctionResponse.vmResponse ?? null)) + + return this.execute(hogFunction, baseInvocation, vmState) } execute( @@ -181,97 +212,92 @@ export class HogExecutor { status.info('🦔', `[HogExecutor] Executing function`, loggingContext) - let lastTimestamp = DateTime.now() - const result: HogFunctionInvocationResult = { ...invocation, - teamId: hogFunction.team_id, - hogFunctionId: hogFunction.id, - success: false, - logs: [], - } - - const log = (level: HogFunctionLogEntryLevel, message: string) => { - // TRICKY: The log entries table is de-duped by timestamp, so we need to ensure that the timestamps are unique - // It is unclear how this affects parallel execution environments - let now = DateTime.now() - if (now <= lastTimestamp) { - // Ensure that the timestamps are unique - now = lastTimestamp.plus(1) - } - lastTimestamp = now - - result.logs.push({ - team_id: hogFunction.team_id, - log_source: 'hog_function', - log_source_id: hogFunction.id, - instance_id: invocation.id, - timestamp: castTimestampOrNow(now, TimestampFormat.ClickHouse), - level, - message, - }) + asyncFunctionRequest: undefined, + finished: false, } if (!state) { - log('debug', `Executing function`) + addLog(result, 'debug', `Executing function`) } else { // NOTE: We do our own check here for async steps as it saves executing Hog and is easier to handle if (state.asyncSteps >= MAX_ASYNC_STEPS) { - log('error', `Function exceeded maximum async steps`) + addLog(result, 'error', `Function exceeded maximum async steps`) result.error = 'Function exceeded maximum async steps' return result } - log('debug', `Resuming function`) + addLog(result, 'debug', `Resuming function`) } try { - const globals = this.buildHogFunctionGlobals(hogFunction, invocation) - - const res = exec(state ?? hogFunction.bytecode, { - globals, - timeout: 100, // NOTE: This will likely be configurable in the future - maxAsyncSteps: MAX_ASYNC_STEPS, // NOTE: This will likely be configurable in the future - asyncFunctions: { - // We need to pass these in but they don't actually do anything as it is a sync exec - fetch: async () => Promise.resolve(), - }, - functions: { - print: (...args) => { - const message = args - .map((arg) => (typeof arg !== 'string' ? JSON.stringify(arg) : arg)) - .join(', ') - log('info', message) + const start = performance.now() + let globals: Record | undefined = undefined + let execRes: ExecResult | undefined = undefined + + try { + globals = this.buildHogFunctionGlobals(hogFunction, invocation) + } catch (e) { + addLog(result, 'error', `Error building inputs: ${e}`) + throw e + } + + try { + execRes = exec(state ?? hogFunction.bytecode, { + globals, + timeout: 100, // NOTE: This will likely be configurable in the future + maxAsyncSteps: MAX_ASYNC_STEPS, // NOTE: This will likely be configurable in the future + asyncFunctions: { + // We need to pass these in but they don't actually do anything as it is a sync exec + fetch: async () => Promise.resolve(), }, - }, + functions: { + print: (...args) => { + const message = args + .map((arg) => (typeof arg !== 'string' ? JSON.stringify(arg) : arg)) + .join(', ') + addLog(result, 'info', message) + }, + }, + }) + } catch (e) { + addLog(result, 'error', `Error executing function: ${e}`) + throw e + } + + const duration = performance.now() - start + + result.finished = execRes.finished + result.timings.push({ + kind: 'hog', + duration_ms: duration, }) - if (!res.finished) { - log('debug', `Suspending function due to async function call '${res.asyncFunctionName}'`) - status.info('🦔', `[HogExecutor] Function returned not finished. Executing async function`, { - ...loggingContext, - asyncFunctionName: res.asyncFunctionName, - }) + if (!execRes.finished) { + addLog(result, 'debug', `Suspending function due to async function call '${execRes.asyncFunctionName}'`) - const args = (res.asyncFunctionArgs ?? []).map((arg) => convertHogToJS(arg)) + const args = (execRes.asyncFunctionArgs ?? []).map((arg) => convertHogToJS(arg)) - if (res.asyncFunctionName) { - result.asyncFunction = { - ...invocation, - teamId: hogFunction.team_id, - hogFunctionId: hogFunction.id, - asyncFunctionName: res.asyncFunctionName, - asyncFunctionArgs: args, - vmState: res.state, + if (!execRes.state) { + // NOTE: This shouldn't be possible so is more of a type sanity check + throw new Error('State should be provided for async function') + } + if (execRes.asyncFunctionName) { + result.asyncFunctionRequest = { + name: execRes.asyncFunctionName, + args: args, + vmState: execRes.state, } } else { - log('warn', `Function was not finished but also had no async function to execute.`) + addLog(result, 'warn', `Function was not finished but also had no async function to execute.`) } } else { - log('debug', `Function completed`) + const totalDuration = result.timings.reduce((acc, timing) => acc + timing.duration_ms, 0) + + addLog(result, 'debug', `Function completed. Processing time ${totalDuration}ms`) } - result.success = true } catch (err) { - result.error = err + result.error = err.message status.error('🦔', `[HogExecutor] Error executing function ${hogFunction.id} - ${hogFunction.name}`, err) } @@ -282,7 +308,6 @@ export class HogExecutor { const builtInputs: Record = {} Object.entries(hogFunction.inputs).forEach(([key, item]) => { - // TODO: Replace this with iterator builtInputs[key] = item.value if (item.bytecode) { diff --git a/plugin-server/src/cdp/hog-function-manager.ts b/plugin-server/src/cdp/hog-function-manager.ts index 5b805c2224f91..4adbec4ab81cc 100644 --- a/plugin-server/src/cdp/hog-function-manager.ts +++ b/plugin-server/src/cdp/hog-function-manager.ts @@ -73,7 +73,7 @@ export class HogFunctionManager { public async reloadHogFunctions(teamId: Team['id'], ids: HogFunctionType['id'][]): Promise { status.info('🍿', `Reloading hog functions ${ids} from DB`) - const items = await fetchHogFunctions(this.postgres, ids) + const items = await fetchEnabledHogFunctions(this.postgres, ids) if (!this.cache[teamId]) { this.cache[teamId] = {} @@ -88,11 +88,15 @@ export class HogFunctionManager { this.cache[teamId][item.id] = item } } + + public fetchHogFunction(id: HogFunctionType['id']): Promise { + return fetchHogFunction(this.postgres, id) + } } const HOG_FUNCTION_FIELDS = ['id', 'team_id', 'name', 'enabled', 'inputs', 'filters', 'bytecode'] -export async function fetchAllHogFunctionsGroupedByTeam(client: PostgresRouter): Promise { +async function fetchAllHogFunctionsGroupedByTeam(client: PostgresRouter): Promise { const items = ( await client.query( PostgresUse.COMMON_READ, @@ -118,7 +122,7 @@ export async function fetchAllHogFunctionsGroupedByTeam(client: PostgresRouter): return cache } -export async function fetchHogFunctions( +async function fetchEnabledHogFunctions( client: PostgresRouter, ids: HogFunctionType['id'][] ): Promise { @@ -129,8 +133,22 @@ export async function fetchHogFunctions( FROM posthog_hogfunction WHERE id = ANY($1) AND deleted = FALSE AND enabled = TRUE`, [ids], - 'fetchHogFunctions' + 'fetchEnabledHogFunctions' ) ).rows return items } + +async function fetchHogFunction(client: PostgresRouter, id: HogFunctionType['id']): Promise { + const items: HogFunctionType[] = ( + await client.query( + PostgresUse.COMMON_READ, + `SELECT ${HOG_FUNCTION_FIELDS.join(', ')} + FROM posthog_hogfunction + WHERE id = $1 AND deleted = FALSE`, + [id], + 'fetchHogFunction' + ) + ).rows + return items[0] ?? null +} diff --git a/plugin-server/src/cdp/types.ts b/plugin-server/src/cdp/types.ts index b5ef75064abc8..83c30a4344d14 100644 --- a/plugin-server/src/cdp/types.ts +++ b/plugin-server/src/cdp/types.ts @@ -1,4 +1,5 @@ import { VMState } from '@posthog/hogvm' +import { DateTime } from 'luxon' import { ElementPropertyFilter, EventPropertyFilter, PersonPropertyFilter } from '../types' @@ -34,6 +35,30 @@ export interface HogFunctionFilters { bytecode?: HogBytecode } +// We have a "parsed" clickhous event type to make it easier to work with calls from kafka as well as those from the frontend +export interface ParsedClickhouseEvent { + uuid: string + event: string + team_id: number + distinct_id: string + person_id?: string + timestamp: string + created_at: string + properties: Record + person_created_at?: string + person_properties: Record + group0_properties: Record + group1_properties: Record + group2_properties: Record + group3_properties: Record + group4_properties: Record + group0_created_at?: string + group1_created_at?: string + group2_created_at?: string + group3_created_at?: string + group4_created_at?: string +} + export type HogFunctionInvocationGlobals = { project: { id: number @@ -106,38 +131,47 @@ export interface HogFunctionLogEntry { log_source: string // The kind of source (hog_function) log_source_id: string // The id of the hog function instance_id: string // The id of the specific invocation - timestamp: string + timestamp: DateTime level: HogFunctionLogEntryLevel message: string } +export interface HogFunctionTiming { + kind: 'hog' | 'async_function' + duration_ms: number +} + export type HogFunctionInvocation = { id: string globals: HogFunctionInvocationGlobals -} - -export type HogFunctionInvocationResult = HogFunctionInvocation & { teamId: number hogFunctionId: HogFunctionType['id'] - success: boolean - error?: any + // Logs and timings _could_ be passed in from the async function service logs: HogFunctionLogEntry[] - asyncFunction?: HogFunctionInvocationAsyncRequest + timings: HogFunctionTiming[] } -export type HogFunctionInvocationAsyncRequest = HogFunctionInvocation & { - teamId: number - hogFunctionId: HogFunctionType['id'] - vmState?: VMState - asyncFunctionName: string // TODO: Type this all more strongly - asyncFunctionArgs?: any[] +export type HogFunctionInvocationResult = HogFunctionInvocation & { + finished: boolean + error?: any + logs: HogFunctionLogEntry[] + timings: HogFunctionTiming[] + asyncFunctionRequest?: { + name: string + args: any[] + vmState: VMState + } } -export type HogFunctionInvocationAsyncResponse = HogFunctionInvocationAsyncRequest & { - /** An error message to indicate something went wrong and the invocation should be stopped */ - error?: any - /** The data to be passed to the Hog function from the response */ - vmResponse?: any +export type HogFunctionInvocationAsyncResponse = HogFunctionInvocationResult & { + // FOLLOWUP: do we want to type this more strictly? + asyncFunctionResponse: { + /** An error message to indicate something went wrong and the invocation should be stopped */ + error?: any + /** The data to be passed to the Hog function from the response */ + vmResponse?: any + timings: HogFunctionTiming[] + } } export type HogFunctionMessageToQueue = { diff --git a/plugin-server/src/cdp/utils.ts b/plugin-server/src/cdp/utils.ts index f8fe9c6dc075b..9ab4b5b248eb5 100644 --- a/plugin-server/src/cdp/utils.ts +++ b/plugin-server/src/cdp/utils.ts @@ -2,7 +2,7 @@ import { GroupTypeToColumnIndex, RawClickHouseEvent, Team } from '../types' import { clickHouseTimestampToISO } from '../utils/utils' -import { HogFunctionFilterGlobals, HogFunctionInvocationGlobals } from './types' +import { HogFunctionFilterGlobals, HogFunctionInvocationGlobals, ParsedClickhouseEvent } from './types' export const PERSON_DEFAULT_DISPLAY_NAME_PROPERTIES = [ 'email', @@ -25,24 +25,50 @@ const getPersonDisplayName = (team: Team, distinctId: string, properties: Record return (customIdentifier || distinctId)?.trim() } +export function convertToParsedClickhouseEvent(event: RawClickHouseEvent): ParsedClickhouseEvent { + const properties = event.properties ? JSON.parse(event.properties) : {} + if (event.elements_chain) { + properties['$elements_chain'] = event.elements_chain + } + + return { + uuid: event.uuid, + event: event.event, + team_id: event.team_id, + distinct_id: event.distinct_id, + person_id: event.person_id, + timestamp: clickHouseTimestampToISO(event.timestamp), + created_at: clickHouseTimestampToISO(event.created_at), + properties: properties, + person_created_at: event.person_created_at ? clickHouseTimestampToISO(event.person_created_at) : undefined, + person_properties: event.person_properties ? JSON.parse(event.person_properties) : {}, + group0_properties: event.group0_properties ? JSON.parse(event.group0_properties) : {}, + group1_properties: event.group1_properties ? JSON.parse(event.group1_properties) : {}, + group2_properties: event.group2_properties ? JSON.parse(event.group2_properties) : {}, + group3_properties: event.group3_properties ? JSON.parse(event.group3_properties) : {}, + group4_properties: event.group4_properties ? JSON.parse(event.group4_properties) : {}, + group0_created_at: event.group0_created_at ? clickHouseTimestampToISO(event.group0_created_at) : undefined, + group1_created_at: event.group1_created_at ? clickHouseTimestampToISO(event.group1_created_at) : undefined, + group2_created_at: event.group2_created_at ? clickHouseTimestampToISO(event.group2_created_at) : undefined, + group3_created_at: event.group3_created_at ? clickHouseTimestampToISO(event.group3_created_at) : undefined, + group4_created_at: event.group4_created_at ? clickHouseTimestampToISO(event.group4_created_at) : undefined, + } +} + // that we can keep to as a contract export function convertToHogFunctionInvocationGlobals( - event: RawClickHouseEvent, + event: ParsedClickhouseEvent, team: Team, siteUrl: string, groupTypes?: GroupTypeToColumnIndex ): HogFunctionInvocationGlobals { const projectUrl = `${siteUrl}/project/${team.id}` - - const properties = event.properties ? JSON.parse(event.properties) : {} - if (event.elements_chain) { - properties['$elements_chain'] = event.elements_chain - } + const properties = event.properties let person: HogFunctionInvocationGlobals['person'] if (event.person_id) { - const personProperties = event.person_properties ? JSON.parse(event.person_properties) : {} + const personProperties = event.person_properties const personDisplayName = getPersonDisplayName(team, event.distinct_id, personProperties) person = { @@ -64,7 +90,7 @@ export function convertToHogFunctionInvocationGlobals( // TODO: Check that groupProperties always exist if the event is in that group if (groupKey && groupProperties) { - const properties = JSON.parse(groupProperties) + const properties = groupProperties groups[groupType] = { id: groupKey, @@ -83,16 +109,12 @@ export function convertToHogFunctionInvocationGlobals( url: projectUrl, }, event: { - // TODO: Element chain! uuid: event.uuid, name: event.event!, distinct_id: event.distinct_id, properties, - timestamp: clickHouseTimestampToISO(event.timestamp), - // TODO: generate url - url: `${projectUrl}/events/${encodeURIComponent(event.uuid)}/${encodeURIComponent( - clickHouseTimestampToISO(event.timestamp) - )}`, + timestamp: event.timestamp, + url: `${projectUrl}/events/${encodeURIComponent(event.uuid)}/${encodeURIComponent(event.timestamp)}`, }, person, groups, diff --git a/plugin-server/src/main/pluginsServer.ts b/plugin-server/src/main/pluginsServer.ts index fea82eab88af5..b703db24abad3 100644 --- a/plugin-server/src/main/pluginsServer.ts +++ b/plugin-server/src/main/pluginsServer.ts @@ -10,7 +10,7 @@ import { Counter } from 'prom-client' import v8Profiler from 'v8-profiler-next' import { getPluginServerCapabilities } from '../capabilities' -import { CdpFunctionCallbackConsumer, CdpProcessedEventsConsumer } from '../cdp/cdp-processed-events-consumer' +import { CdpFunctionCallbackConsumer, CdpProcessedEventsConsumer } from '../cdp/cdp-consumers' import { defaultConfig, sessionRecordingConsumerConfig } from '../config/config' import { Hub, PluginServerCapabilities, PluginsServerConfig } from '../types' import { createHub, createKafkaClient, createKafkaProducerWrapper } from '../utils/db/hub' @@ -43,7 +43,7 @@ import { } from './ingestion-queues/on-event-handler-consumer' import { startScheduledTasksConsumer } from './ingestion-queues/scheduled-tasks-consumer' import { SessionRecordingIngester } from './ingestion-queues/session-recording/session-recordings-consumer' -import { setupCommonRoutes } from './services/http-server' +import { expressApp, setupCommonRoutes } from './services/http-server' import { getObjectStorage } from './services/object_storage' CompressionCodecs[CompressionTypes.Snappy] = SnappyCodec @@ -519,6 +519,11 @@ export async function startPluginsServer( await consumer.stop() }) healthChecks['cdp-function-callbacks'] = () => consumer.isHealthy() ?? false + + // NOTE: The function callback service is more idle so can handle http requests as well + if (capabilities.http) { + consumer.addApiRoutes(expressApp) + } } if (capabilities.personOverrides) { diff --git a/plugin-server/src/main/services/http-server.ts b/plugin-server/src/main/services/http-server.ts index 85c154dab66e7..8889f96f22032 100644 --- a/plugin-server/src/main/services/http-server.ts +++ b/plugin-server/src/main/services/http-server.ts @@ -12,6 +12,8 @@ v8Profiler.setGenerateType(1) export const expressApp: express.Application = express() +expressApp.use(express.json()) + export function setupCommonRoutes( healthChecks: { [service: string]: () => Promise | boolean }, analyticsEventsIngestionConsumer?: KafkaJSIngestionConsumer | IngestionConsumer diff --git a/plugin-server/tests/cdp/cdp-function-callbacks-consumer.test.ts b/plugin-server/tests/cdp/cdp-function-callbacks-consumer.test.ts new file mode 100644 index 0000000000000..d15c8344502d5 --- /dev/null +++ b/plugin-server/tests/cdp/cdp-function-callbacks-consumer.test.ts @@ -0,0 +1,415 @@ +import express from 'express' +import supertest from 'supertest' + +import { CdpFunctionCallbackConsumer } from '../../src/cdp/cdp-consumers' +import { HogFunctionType } from '../../src/cdp/types' +import { defaultConfig } from '../../src/config/config' +import { Hub, PluginsServerConfig, Team } from '../../src/types' +import { createHub } from '../../src/utils/db/hub' +import { getFirstTeam, resetTestDatabase } from '../helpers/sql' +import { HOG_EXAMPLES, HOG_FILTERS_EXAMPLES, HOG_INPUTS_EXAMPLES } from './examples' +import { insertHogFunction as _insertHogFunction } from './fixtures' + +const config: PluginsServerConfig = { + ...defaultConfig, +} + +const mockConsumer = { + on: jest.fn(), + commitSync: jest.fn(), + commit: jest.fn(), + queryWatermarkOffsets: jest.fn(), + committed: jest.fn(), + assignments: jest.fn(), + isConnected: jest.fn(() => true), + getMetadata: jest.fn(), +} + +jest.mock('../../src/kafka/batch-consumer', () => { + return { + startBatchConsumer: jest.fn(() => + Promise.resolve({ + join: () => ({ + finally: jest.fn(), + }), + stop: jest.fn(), + consumer: mockConsumer, + }) + ), + } +}) + +jest.mock('../../src/utils/fetch', () => { + return { + trackedFetch: jest.fn(() => + Promise.resolve({ + status: 200, + text: () => Promise.resolve(JSON.stringify({ success: true })), + json: () => Promise.resolve({ success: true }), + }) + ), + } +}) + +jest.mock('../../src/utils/db/kafka-producer-wrapper', () => { + const mockKafkaProducer = { + producer: { + connect: jest.fn(), + }, + disconnect: jest.fn(), + produce: jest.fn(), + } + return { + KafkaProducerWrapper: jest.fn(() => mockKafkaProducer), + } +}) + +const mockFetch: jest.Mock = require('../../src/utils/fetch').trackedFetch + +jest.setTimeout(1000) + +describe('CDP Processed Events Consuner', () => { + let processor: CdpFunctionCallbackConsumer + let hub: Hub + let closeHub: () => Promise + let team: Team + + const insertHogFunction = async (hogFunction: Partial) => { + const item = await _insertHogFunction(hub.postgres, team.id, hogFunction) + // Trigger the reload that django would do + await processor.hogFunctionManager.reloadAllHogFunctions() + return item + } + + beforeEach(async () => { + await resetTestDatabase() + ;[hub, closeHub] = await createHub() + team = await getFirstTeam(hub) + + processor = new CdpFunctionCallbackConsumer(config, hub) + await processor.start() + + mockFetch.mockClear() + }) + + afterEach(async () => { + jest.setTimeout(10000) + await processor.stop() + await closeHub() + }) + + afterAll(() => { + jest.useRealTimers() + }) + + // describe('general event processing', () => { + // /** + // * Tests here are somewhat expensive so should mostly simulate happy paths and the more e2e scenarios + // */ + // it('can parse incoming messages correctly', async () => { + // await insertHogFunction({ + // ...HOG_EXAMPLES.simple_fetch, + // ...HOG_INPUTS_EXAMPLES.simple_fetch, + // ...HOG_FILTERS_EXAMPLES.no_filters, + // }) + // // Create a message that should be processed by this function + // // Run the function and check that it was executed + // await processor.handleEachBatch( + // [ + // createMessage( + // createIncomingEvent(team.id, { + // uuid: 'b3a1fe86-b10c-43cc-acaf-d208977608d0', + // event: '$pageview', + // properties: JSON.stringify({ + // $lib_version: '1.0.0', + // }), + // }) + // ), + // ], + // noop + // ) + + // expect(mockFetch).toHaveBeenCalledTimes(1) + // expect(mockFetch.mock.calls[0]).toMatchInlineSnapshot(` + // Array [ + // "https://example.com/posthog-webhook", + // Object { + // "body": "{ + // \\"event\\": { + // \\"uuid\\": \\"b3a1fe86-b10c-43cc-acaf-d208977608d0\\", + // \\"name\\": \\"$pageview\\", + // \\"distinct_id\\": \\"distinct_id_1\\", + // \\"properties\\": { + // \\"$lib_version\\": \\"1.0.0\\", + // \\"$elements_chain\\": \\"[]\\" + // }, + // \\"timestamp\\": null, + // \\"url\\": \\"http://localhost:8000/project/2/events/b3a1fe86-b10c-43cc-acaf-d208977608d0/null\\" + // }, + // \\"groups\\": null, + // \\"nested\\": { + // \\"foo\\": \\"http://localhost:8000/project/2/events/b3a1fe86-b10c-43cc-acaf-d208977608d0/null\\" + // }, + // \\"person\\": null, + // \\"event_url\\": \\"http://localhost:8000/project/2/events/b3a1fe86-b10c-43cc-acaf-d208977608d0/null-test\\" + // }", + // "headers": Object { + // "version": "v=1.0.0", + // }, + // "method": "POST", + // "timeout": 10000, + // }, + // ] + // `) + // }) + + // it('generates logs and produces them to kafka', async () => { + // await insertHogFunction({ + // ...HOG_EXAMPLES.simple_fetch, + // ...HOG_INPUTS_EXAMPLES.simple_fetch, + // ...HOG_FILTERS_EXAMPLES.no_filters, + // }) + + // // Create a message that should be processed by this function + // // Run the function and check that it was executed + // await processor.handleEachBatch( + // [ + // createMessage( + // createIncomingEvent(team.id, { + // uuid: 'b3a1fe86-b10c-43cc-acaf-d208977608d0', + // event: '$pageview', + // properties: JSON.stringify({ + // $lib_version: '1.0.0', + // }), + // }) + // ), + // ], + // noop + // ) + + // expect(mockFetch).toHaveBeenCalledTimes(1) + // // Once for the async callback, twice for the logs + // expect(mockProducer.produce).toHaveBeenCalledTimes(3) + + // expect(decodeKafkaMessage(mockProducer.produce.mock.calls[0][0])).toMatchObject({ + // key: expect.any(String), + // topic: 'log_entries_test', + // value: { + // instance_id: expect.any(String), + // level: 'debug', + // log_source: 'hog_function', + // log_source_id: expect.any(String), + // message: 'Executing function', + // team_id: 2, + // timestamp: expect.any(String), + // }, + // waitForAck: true, + // }) + + // expect(decodeKafkaMessage(mockProducer.produce.mock.calls[1][0])).toMatchObject({ + // topic: 'log_entries_test', + // value: { + // log_source: 'hog_function', + // message: "Suspending function due to async function call 'fetch'", + // team_id: 2, + // }, + // }) + + // expect(decodeKafkaMessage(mockProducer.produce.mock.calls[2][0])).toEqual({ + // key: expect.any(String), + // topic: 'cdp_function_callbacks_test', + // value: { + // id: expect.any(String), + // globals: expect.objectContaining({ + // project: { id: 2, name: 'TEST PROJECT', url: 'http://localhost:8000/project/2' }, + // // We assume the rest is correct + // }), + // teamId: 2, + // hogFunctionId: expect.any(String), + // finished: false, + // logs: [], + // timings: [ + // { + // kind: 'hog', + // duration_ms: expect.any(Number), + // }, + // ], + // asyncFunctionRequest: { + // name: 'fetch', + // args: [ + // 'https://example.com/posthog-webhook', + // { + // headers: { version: 'v=1.0.0' }, + // body: { + // event: { + // uuid: 'b3a1fe86-b10c-43cc-acaf-d208977608d0', + // name: '$pageview', + // distinct_id: 'distinct_id_1', + // properties: { $lib_version: '1.0.0', $elements_chain: '[]' }, + // timestamp: null, + // url: 'http://localhost:8000/project/2/events/b3a1fe86-b10c-43cc-acaf-d208977608d0/null', + // }, + // event_url: + // 'http://localhost:8000/project/2/events/b3a1fe86-b10c-43cc-acaf-d208977608d0/null-test', + // groups: null, + // nested: { + // foo: 'http://localhost:8000/project/2/events/b3a1fe86-b10c-43cc-acaf-d208977608d0/null', + // }, + // person: null, + // }, + // method: 'POST', + // }, + // ], + // vmState: expect.any(Object), + // }, + // asyncFunctionResponse: { + // vmResponse: { + // status: 200, + // body: { success: true }, + // }, + // timings: [ + // { + // kind: 'async_function', + // duration_ms: expect.any(Number), + // }, + // ], + // }, + // }, + // waitForAck: true, + // }) + // }) + // }) + + describe('API invocation', () => { + let app: express.Express + let hogFunction: HogFunctionType + + const event = { + uuid: 'b3a1fe86-b10c-43cc-acaf-d208977608d0', + event: '$pageview', + properties: { + $lib_version: '1.0.0', + }, + } + + beforeEach(async () => { + app = express() + app.use(express.json()) + processor.addApiRoutes(app) + + hogFunction = await insertHogFunction({ + ...HOG_EXAMPLES.simple_fetch, + ...HOG_INPUTS_EXAMPLES.simple_fetch, + ...HOG_FILTERS_EXAMPLES.no_filters, + }) + }) + + it('errors if missing hog function or team', async () => { + const res = await supertest(app) + .post(`/api/projects/${hogFunction.team_id}/hog_functions/missing/invocations`) + .send({ event }) + + expect(res.status).toEqual(404) + }) + + it('errors if missing values', async () => { + const res = await supertest(app) + .post(`/api/projects/${hogFunction.team_id}/hog_functions/${hogFunction.id}/invocations`) + .send({}) + + expect(res.status).toEqual(400) + expect(res.body).toEqual({ + error: 'Missing event', + }) + }) + + it('can invoke a function via the API with mocks', async () => { + const res = await supertest(app) + .post(`/api/projects/${hogFunction.team_id}/hog_functions/${hogFunction.id}/invocations`) + .send({ event, mock_async_functions: true }) + + expect(res.status).toEqual(200) + expect(res.body).toMatchObject({ + status: 'success', + error: 'undefined', + logs: [ + { + log_source: 'hog_function', + level: 'debug', + message: 'Executing function', + }, + { + log_source: 'hog_function', + level: 'debug', + message: "Suspending function due to async function call 'fetch'", + }, + { + log_source: 'hog_function', + level: 'info', + message: "Async function 'fetch' was mocked", + }, + { + log_source: 'hog_function', + level: 'debug', + message: 'Resuming function', + }, + { + log_source: 'hog_function', + level: 'info', + message: 'Fetch response:, {"status":200,"body":{}}', + }, + { + log_source: 'hog_function', + level: 'debug', + message: expect.stringContaining('Function completed. Processing time'), + }, + ], + }) + }) + + it('can invoke a function via the API with real fetch', async () => { + mockFetch.mockImplementationOnce(() => + Promise.resolve({ + status: 201, + text: () => Promise.resolve(JSON.stringify({ real: true })), + }) + ) + const res = await supertest(app) + .post(`/api/projects/${hogFunction.team_id}/hog_functions/${hogFunction.id}/invocations`) + .send({ event, mock_async_functions: false }) + + expect(res.status).toEqual(200) + expect(res.body).toMatchObject({ + status: 'success', + error: 'undefined', + logs: [ + { + log_source: 'hog_function', + level: 'debug', + message: 'Executing function', + }, + { + log_source: 'hog_function', + level: 'debug', + message: "Suspending function due to async function call 'fetch'", + }, + { + log_source: 'hog_function', + level: 'debug', + message: 'Resuming function', + }, + { + log_source: 'hog_function', + level: 'info', + message: 'Fetch response:, {"status":201,"body":{"real":true}}', + }, + { + log_source: 'hog_function', + level: 'debug', + message: expect.stringContaining('Function completed. Processing time'), + }, + ], + }) + }) + }) +}) diff --git a/plugin-server/tests/cdp/cdp-processed-events-consumer.test.ts b/plugin-server/tests/cdp/cdp-processed-events-consumer.test.ts index 8cd9709b74eb8..6b650b05edc17 100644 --- a/plugin-server/tests/cdp/cdp-processed-events-consumer.test.ts +++ b/plugin-server/tests/cdp/cdp-processed-events-consumer.test.ts @@ -1,4 +1,4 @@ -import { CdpProcessedEventsConsumer } from '../../src/cdp/cdp-processed-events-consumer' +import { CdpProcessedEventsConsumer } from '../../src/cdp/cdp-consumers' import { HogFunctionType } from '../../src/cdp/types' import { defaultConfig } from '../../src/config/config' import { Hub, PluginsServerConfig, Team } from '../../src/types' @@ -234,35 +234,53 @@ describe('CDP Processed Events Consuner', () => { }), teamId: 2, hogFunctionId: expect.any(String), - asyncFunctionName: 'fetch', - asyncFunctionArgs: [ - 'https://example.com/posthog-webhook', + finished: false, + logs: [], + timings: [ { - headers: { version: 'v=1.0.0' }, - body: { - event: { - uuid: 'b3a1fe86-b10c-43cc-acaf-d208977608d0', - name: '$pageview', - distinct_id: 'distinct_id_1', - properties: { $lib_version: '1.0.0', $elements_chain: '[]' }, - timestamp: null, - url: 'http://localhost:8000/project/2/events/b3a1fe86-b10c-43cc-acaf-d208977608d0/null', - }, - event_url: - 'http://localhost:8000/project/2/events/b3a1fe86-b10c-43cc-acaf-d208977608d0/null-test', - groups: null, - nested: { - foo: 'http://localhost:8000/project/2/events/b3a1fe86-b10c-43cc-acaf-d208977608d0/null', + kind: 'hog', + duration_ms: expect.any(Number), + }, + ], + asyncFunctionRequest: { + name: 'fetch', + args: [ + 'https://example.com/posthog-webhook', + { + headers: { version: 'v=1.0.0' }, + body: { + event: { + uuid: 'b3a1fe86-b10c-43cc-acaf-d208977608d0', + name: '$pageview', + distinct_id: 'distinct_id_1', + properties: { $lib_version: '1.0.0', $elements_chain: '[]' }, + timestamp: null, + url: 'http://localhost:8000/project/2/events/b3a1fe86-b10c-43cc-acaf-d208977608d0/null', + }, + event_url: + 'http://localhost:8000/project/2/events/b3a1fe86-b10c-43cc-acaf-d208977608d0/null-test', + groups: null, + nested: { + foo: 'http://localhost:8000/project/2/events/b3a1fe86-b10c-43cc-acaf-d208977608d0/null', + }, + person: null, }, - person: null, + method: 'POST', }, - method: 'POST', + ], + vmState: expect.any(Object), + }, + asyncFunctionResponse: { + vmResponse: { + status: 200, + body: { success: true }, }, - ], - vmState: expect.any(Object), - vmResponse: { - status: 200, - body: { success: true }, + timings: [ + { + kind: 'async_function', + duration_ms: expect.any(Number), + }, + ], }, }, waitForAck: true, diff --git a/plugin-server/tests/cdp/fixtures.ts b/plugin-server/tests/cdp/fixtures.ts index 3cfb043434df1..9147d043a7468 100644 --- a/plugin-server/tests/cdp/fixtures.ts +++ b/plugin-server/tests/cdp/fixtures.ts @@ -55,7 +55,7 @@ export const insertHogFunction = async ( postgres: PostgresRouter, team_id: Team['id'], hogFunction: Partial = {} -) => { +): Promise => { const res = await insertRow( postgres, 'posthog_hogfunction', diff --git a/plugin-server/tests/cdp/hog-executor.test.ts b/plugin-server/tests/cdp/hog-executor.test.ts index 2cf34a5041626..a0cf212a3e906 100644 --- a/plugin-server/tests/cdp/hog-executor.test.ts +++ b/plugin-server/tests/cdp/hog-executor.test.ts @@ -1,8 +1,16 @@ +import { DateTime } from 'luxon' + import { HogExecutor } from '../../src/cdp/hog-executor' import { HogFunctionManager } from '../../src/cdp/hog-function-manager' -import { HogFunctionLogEntry, HogFunctionType } from '../../src/cdp/types' +import { + HogFunctionInvocationAsyncResponse, + HogFunctionInvocationResult, + HogFunctionLogEntry, + HogFunctionType, +} from '../../src/cdp/types' import { defaultConfig } from '../../src/config/config' -import { PluginsServerConfig } from '../../src/types' +import { PluginsServerConfig, TimestampFormat } from '../../src/types' +import { castTimestampOrNow } from '../../src/utils/utils' import { HOG_EXAMPLES, HOG_FILTERS_EXAMPLES, HOG_INPUTS_EXAMPLES } from './examples' import { createHogExecutionGlobals, createHogFunction, insertHogFunction as _insertHogFunction } from './fixtures' @@ -10,6 +18,24 @@ const config: PluginsServerConfig = { ...defaultConfig, } +const simulateMockFetchAsyncResponse = (result: HogFunctionInvocationResult): HogFunctionInvocationAsyncResponse => { + return { + ...result, + asyncFunctionResponse: { + timings: [ + { + kind: 'async_function', + duration_ms: 100, + }, + ], + vmResponse: { + status: 200, + body: 'success', + }, + }, + } +} + describe('Hog Executor', () => { jest.setTimeout(1000) let executor: HogExecutor @@ -57,7 +83,7 @@ describe('Hog Executor', () => { log_source: 'hog_function', log_source_id: hogFunction.id, instance_id: results[0].id, - timestamp: '2024-06-07 12:00:00.001', + timestamp: expect.any(DateTime), level: 'debug', message: 'Executing function', }, @@ -66,16 +92,24 @@ describe('Hog Executor', () => { log_source: 'hog_function', log_source_id: hogFunction.id, instance_id: results[0].id, - timestamp: '2024-06-07 12:00:00.002', + timestamp: expect.any(DateTime), level: 'debug', message: "Suspending function due to async function call 'fetch'", }, ]) + + expect(castTimestampOrNow(results[0].logs[0].timestamp, TimestampFormat.ClickHouse)).toEqual( + '2024-06-07 12:00:00.000' + ) + // Ensure the second log is one more + expect(castTimestampOrNow(results[0].logs[1].timestamp, TimestampFormat.ClickHouse)).toEqual( + '2024-06-07 12:00:00.001' + ) }) it('queues up an async function call', () => { const results = executor.executeMatchingFunctions(createHogExecutionGlobals()) - expect(results[0].asyncFunction).toMatchObject({ + expect(results[0]).toMatchObject({ id: results[0].id, globals: { project: { id: 1, name: 'test', url: 'http://localhost:8000/projects/1' }, @@ -94,50 +128,57 @@ describe('Hog Executor', () => { }, teamId: 1, hogFunctionId: hogFunction.id, - asyncFunctionName: 'fetch', - asyncFunctionArgs: [ - 'https://example.com/posthog-webhook', - { - headers: { version: 'v=1.2.3' }, - body: { - event: { - uuid: 'uuid', - name: 'test', - distinct_id: 'distinct_id', - url: 'http://localhost:8000/events/1', - properties: { $lib_version: '1.2.3' }, - timestamp: '2024-06-07T12:00:00.000Z', + asyncFunctionRequest: { + name: 'fetch', + args: [ + 'https://example.com/posthog-webhook', + { + headers: { version: 'v=1.2.3' }, + body: { + event: { + uuid: 'uuid', + name: 'test', + distinct_id: 'distinct_id', + url: 'http://localhost:8000/events/1', + properties: { $lib_version: '1.2.3' }, + timestamp: '2024-06-07T12:00:00.000Z', + }, + groups: null, + nested: { foo: 'http://localhost:8000/events/1' }, + person: null, + event_url: 'http://localhost:8000/events/1-test', }, - groups: null, - nested: { foo: 'http://localhost:8000/events/1' }, - person: null, - event_url: 'http://localhost:8000/events/1-test', + method: 'POST', }, - method: 'POST', + ], + vmState: expect.any(Object), + }, + timings: [ + { + kind: 'hog', + duration_ms: 0, }, ], - vmState: expect.any(Object), }) }) it('executes the full function in a loop', () => { const logs: HogFunctionLogEntry[] = [] const results = executor.executeMatchingFunctions(createHogExecutionGlobals()) - logs.push(...results[0].logs) - const asyncExecResult = executor.executeAsyncResponse({ - ...results[0].asyncFunction!, - vmResponse: { status: 200, body: 'success' }, - }) + const splicedLogs = results[0].logs.splice(0, 100) + logs.push(...splicedLogs) + + const asyncExecResult = executor.executeAsyncResponse(simulateMockFetchAsyncResponse(results[0])) logs.push(...asyncExecResult.logs) expect(asyncExecResult.error).toBeUndefined() - expect(asyncExecResult.success).toBe(true) + expect(asyncExecResult.finished).toBe(true) expect(logs.map((log) => log.message)).toEqual([ 'Executing function', "Suspending function due to async function call 'fetch'", 'Resuming function', 'Fetch response:, {"status":200,"body":"success"}', - 'Function completed', + 'Function completed. Processing time 100ms', ]) }) }) @@ -188,20 +229,15 @@ describe('Hog Executor', () => { expect(results).toHaveLength(1) // Run the result one time simulating a successful fetch - const asyncResult1 = executor.executeAsyncResponse({ - ...results[0].asyncFunction!, - vmResponse: { status: 200, body: 'success' }, - }) - expect(asyncResult1.success).toBe(true) - expect(asyncResult1.asyncFunction).toBeDefined() + const asyncResult1 = executor.executeAsyncResponse(simulateMockFetchAsyncResponse(results[0])) + expect(asyncResult1.finished).toBe(false) + expect(asyncResult1.error).toBe(undefined) + expect(asyncResult1.asyncFunctionRequest).toBeDefined() // Run the result one more time simulating a second successful fetch - const asyncResult2 = executor.executeAsyncResponse({ - ...asyncResult1.asyncFunction!, - vmResponse: { status: 200, body: 'success' }, - }) + const asyncResult2 = executor.executeAsyncResponse(simulateMockFetchAsyncResponse(asyncResult1)) // This time we should see an error for hitting the loop limit - expect(asyncResult2.success).toBe(false) + expect(asyncResult2.finished).toBe(false) expect(asyncResult2.error).toEqual('Function exceeded maximum async steps') expect(asyncResult2.logs.map((log) => log.message)).toEqual(['Function exceeded maximum async steps']) }) diff --git a/posthog/api/hog_function.py b/posthog/api/hog_function.py index 9ddc46cc3b45f..fc1789579bfb6 100644 --- a/posthog/api/hog_function.py +++ b/posthog/api/hog_function.py @@ -18,6 +18,7 @@ from posthog.cdp.validation import compile_hog, validate_inputs, validate_inputs_schema from posthog.models.hog_functions.hog_function import HogFunction from posthog.permissions import PostHogFeatureFlagPermission +from posthog.plugins.plugin_server_api import create_hog_invocation_test logger = structlog.get_logger(__name__) @@ -105,6 +106,14 @@ def create(self, validated_data: dict, *args, **kwargs) -> HogFunction: return super().create(validated_data=validated_data) +class HogFunctionInvocationSerializer(serializers.Serializer): + configuration = HogFunctionSerializer(write_only=True) + event = serializers.DictField(write_only=True) + mock_async_functions = serializers.BooleanField(default=True, write_only=True) + status = serializers.CharField(read_only=True) + logs = serializers.ListField(read_only=True) + + class HogFunctionViewSet(TeamAndOrgViewSetMixin, LogEntryMixin, ForbidDestroyModel, viewsets.ModelViewSet): scope_object = "INTERNAL" # Keep internal until we are happy to release this GA queryset = HogFunction.objects.all() @@ -143,3 +152,30 @@ def icon(self, request: Request, *args, **kwargs): icon_service = CDPIconsService() return icon_service.get_icon_http_response(id) + + @action(detail=True, methods=["POST"]) + def invocations(self, request: Request, *args, **kwargs): + hog_function = self.get_object() + serializer = HogFunctionInvocationSerializer(data=request.data, context=self.get_serializer_context()) + if not serializer.is_valid(): + return Response(serializer.errors, status=400) + + configuration = serializer.validated_data["configuration"] + # Remove the team from the config + configuration.pop("team") + + event = serializer.validated_data["event"] + mock_async_functions = serializer.validated_data["mock_async_functions"] + + res = create_hog_invocation_test( + team_id=hog_function.team_id, + hog_function_id=hog_function.id, + event=event, + configuration=configuration, + mock_async_functions=mock_async_functions, + ) + + if res.status_code != 200: + return Response({"status": "error"}, status=res.status_code) + + return Response(res.json()) diff --git a/posthog/api/plugin.py b/posthog/api/plugin.py index eb147a70d6d51..47a5ab5b3bb80 100644 --- a/posthog/api/plugin.py +++ b/posthog/api/plugin.py @@ -44,7 +44,7 @@ from posthog.permissions import APIScopePermission from posthog.plugins import can_configure_plugins, can_install_plugins, parse_url from posthog.plugins.access import can_globally_manage_plugins, has_plugin_access_level -from posthog.plugins.reload import populate_plugin_capabilities_on_workers +from posthog.plugins.plugin_server_api import populate_plugin_capabilities_on_workers from posthog.queries.app_metrics.app_metrics import TeamPluginsDeliveryRateQuery from posthog.utils import format_query_params_absolute_url diff --git a/posthog/models/hog_functions/utils.py b/posthog/cdp/filters.py similarity index 60% rename from posthog/models/hog_functions/utils.py rename to posthog/cdp/filters.py index 5ec265487d2e3..e24dd6bf120d8 100644 --- a/posthog/models/hog_functions/utils.py +++ b/posthog/cdp/filters.py @@ -1,7 +1,7 @@ -from typing import Any +from typing import Optional from posthog.models.action.action import Action from posthog.hogql.bytecode import create_bytecode -from posthog.hogql.parser import parse_expr, parse_string_template +from posthog.hogql.parser import parse_expr from posthog.hogql.property import action_to_expr, property_to_expr, ast from posthog.models.team.team import Team @@ -51,16 +51,35 @@ def hog_function_filters_to_expr(filters: dict, team: Team, actions: dict[int, A return ast.Constant(value=True) -def generate_template_bytecode(obj: Any) -> Any: - """ - Clones an object, compiling any string values to bytecode templates - """ +def filter_action_ids(filters: Optional[dict]) -> list[int]: + if not filters: + return [] + try: + return [int(action["id"]) for action in filters.get("actions", [])] + except KeyError: + return [] - if isinstance(obj, dict): - return {key: generate_template_bytecode(value) for key, value in obj.items()} - elif isinstance(obj, list): - return [generate_template_bytecode(item) for item in obj] - elif isinstance(obj, str): - return create_bytecode(parse_string_template(obj)) - else: - return obj + +def compile_filters_expr(filters: Optional[dict], team: Team, actions: Optional[dict[int, Action]] = None) -> ast.Expr: + filters = filters or {} + + if actions is None: + # If not provided as an optimization we fetch all actions + actions_list = ( + Action.objects.select_related("team").filter(team_id=team.id).filter(id__in=filter_action_ids(filters)) + ) + actions = {action.id: action for action in actions_list} + + return hog_function_filters_to_expr(filters, team, actions) + + +def compile_filters_bytecode(filters: Optional[dict], team: Team, actions: Optional[dict[int, Action]] = None) -> dict: + filters = filters or {} + try: + filters["bytecode"] = create_bytecode(compile_filters_expr(filters, team, actions)) + except Exception as e: + # TODO: Better reporting of this issue + filters["bytecode"] = None + filters["bytecode_error"] = str(e) + + return filters diff --git a/posthog/cdp/validation.py b/posthog/cdp/validation.py index 93dad3f8e6501..4a38523947de7 100644 --- a/posthog/cdp/validation.py +++ b/posthog/cdp/validation.py @@ -3,12 +3,26 @@ from rest_framework import serializers from posthog.hogql.bytecode import create_bytecode -from posthog.hogql.parser import parse_program -from posthog.models.hog_functions.utils import generate_template_bytecode +from posthog.hogql.parser import parse_program, parse_string_template logger = logging.getLogger(__name__) +def generate_template_bytecode(obj: Any) -> Any: + """ + Clones an object, compiling any string values to bytecode templates + """ + + if isinstance(obj, dict): + return {key: generate_template_bytecode(value) for key, value in obj.items()} + elif isinstance(obj, list): + return [generate_template_bytecode(item) for item in obj] + elif isinstance(obj, str): + return create_bytecode(parse_string_template(obj)) + else: + return obj + + class InputsSchemaItemSerializer(serializers.Serializer): type = serializers.ChoiceField(choices=["string", "boolean", "dictionary", "choice", "json"]) key = serializers.CharField() @@ -38,26 +52,25 @@ def validate(self, attrs): schema = self.context["schema"] value = attrs.get("value") + name: str = schema["key"] + item_type = schema["type"] + if schema.get("required") and not value: - raise serializers.ValidationError("This field is required.") + raise serializers.ValidationError({"inputs": {name: f"This field is required."}}) if not value: return attrs - name: str = schema["key"] - item_type = schema["type"] - value = attrs["value"] - # Validate each type if item_type == "string": if not isinstance(value, str): - raise serializers.ValidationError("Value must be a string.") + raise serializers.ValidationError({"inputs": {name: f"Value must be a string."}}) elif item_type == "boolean": if not isinstance(value, bool): - raise serializers.ValidationError("Value must be a boolean.") + raise serializers.ValidationError({"inputs": {name: f"Value must be a boolean."}}) elif item_type == "dictionary": if not isinstance(value, dict): - raise serializers.ValidationError("Value must be a dictionary.") + raise serializers.ValidationError({"inputs": {name: f"Value must be a dictionary."}}) try: if value: @@ -89,8 +102,7 @@ def validate_inputs(inputs_schema: list, inputs: dict) -> dict: serializer = InputsItemSerializer(data=value, context={"schema": schema}) if not serializer.is_valid(): - first_error = next(iter(serializer.errors.values()))[0] - raise serializers.ValidationError({"inputs": {schema["key"]: first_error}}) + raise serializers.ValidationError(serializer.errors) validated_inputs[schema["key"]] = serializer.validated_data @@ -102,5 +114,6 @@ def compile_hog(hog: str, supported_functions: Optional[set[str]] = None) -> lis try: program = parse_program(hog) return create_bytecode(program, supported_functions=supported_functions or {"fetch"}) - except Exception: + except Exception as e: + logger.error(f"Failed to compile hog {e}", exc_info=True) raise serializers.ValidationError({"hog": "Hog code has errors."}) diff --git a/posthog/models/action/action.py b/posthog/models/action/action.py index 1d89c3012578c..929f2753bc02c 100644 --- a/posthog/models/action/action.py +++ b/posthog/models/action/action.py @@ -8,7 +8,7 @@ from posthog.hogql.errors import BaseHogQLError from posthog.models.signals import mutable_receiver -from posthog.plugins.reload import drop_action_on_workers, reload_action_on_workers +from posthog.plugins.plugin_server_api import drop_action_on_workers, reload_action_on_workers ActionStepMatching = Literal["contains", "regex", "exact"] diff --git a/posthog/models/hog_functions/hog_function.py b/posthog/models/hog_functions/hog_function.py index 0307178e2bd33..5842f646dd12f 100644 --- a/posthog/models/hog_functions/hog_function.py +++ b/posthog/models/hog_functions/hog_function.py @@ -8,7 +8,7 @@ from posthog.models.action.action import Action from posthog.models.team.team import Team from posthog.models.utils import UUIDModel -from posthog.plugins.reload import reload_hog_functions_on_workers +from posthog.plugins.plugin_server_api import reload_hog_functions_on_workers class HogFunction(UUIDModel): @@ -44,28 +44,10 @@ def filter_action_ids(self) -> list[int]: except KeyError: return [] - def compile_filters_bytecode(self, actions: Optional[dict[int, Action]] = None): - from .utils import hog_function_filters_to_expr - from posthog.hogql.bytecode import create_bytecode - - self.filters = self.filters or {} - - if actions is None: - # If not provided as an optimization we fetch all actions - actions_list = ( - Action.objects.select_related("team").filter(team_id=self.team_id).filter(id__in=self.filter_action_ids) - ) - actions = {action.id: action for action in actions_list} - - try: - self.filters["bytecode"] = create_bytecode(hog_function_filters_to_expr(self.filters, self.team, actions)) - except Exception as e: - # TODO: Better reporting of this issue - self.filters["bytecode"] = None - self.filters["bytecode_error"] = str(e) - def save(self, *args, **kwargs): - self.compile_filters_bytecode() + from posthog.cdp.filters import compile_filters_bytecode + + self.filters = compile_filters_bytecode(self.filters, self.team) return super().save(*args, **kwargs) def __str__(self): diff --git a/posthog/models/organization.py b/posthog/models/organization.py index cc1c5ce669457..cc9656eb7d757 100644 --- a/posthog/models/organization.py +++ b/posthog/models/organization.py @@ -20,7 +20,7 @@ create_with_slug, sane_repr, ) -from posthog.plugins.reload import reset_available_product_features_cache_on_workers +from posthog.plugins.plugin_server_api import reset_available_product_features_cache_on_workers from posthog.utils import absolute_uri if TYPE_CHECKING: diff --git a/posthog/models/plugin.py b/posthog/models/plugin.py index d2ecd0d799c92..46ddfb9177f4c 100644 --- a/posthog/models/plugin.py +++ b/posthog/models/plugin.py @@ -20,7 +20,7 @@ from posthog.models.signals import mutable_receiver from posthog.models.team import Team from posthog.plugins.access import can_configure_plugins, can_install_plugins -from posthog.plugins.reload import populate_plugin_capabilities_on_workers, reload_plugins_on_workers +from posthog.plugins.plugin_server_api import populate_plugin_capabilities_on_workers, reload_plugins_on_workers from posthog.plugins.site import get_decide_site_apps from posthog.plugins.utils import ( download_plugin_archive, diff --git a/posthog/plugins/__init__.py b/posthog/plugins/__init__.py index 6ada31b398734..6b692c334f644 100644 --- a/posthog/plugins/__init__.py +++ b/posthog/plugins/__init__.py @@ -1,4 +1,3 @@ # flake8: noqa from .access import can_configure_plugins, can_install_plugins -from .reload import reload_plugins_on_workers from .utils import download_plugin_archive, get_file_from_archive, parse_url diff --git a/posthog/plugins/reload.py b/posthog/plugins/plugin_server_api.py similarity index 66% rename from posthog/plugins/reload.py rename to posthog/plugins/plugin_server_api.py index 7eda90c5ba3f4..1508c2a4c00c6 100644 --- a/posthog/plugins/reload.py +++ b/posthog/plugins/plugin_server_api.py @@ -1,9 +1,9 @@ import json from typing import Union +import requests import structlog -from django.conf import settings - from posthog.redis import get_client +from posthog.settings import CDP_FUNCTION_EXECUTOR_API_URL, PLUGINS_RELOAD_PUBSUB_CHANNEL, PLUGINS_RELOAD_REDIS_URL logger = structlog.get_logger(__name__) @@ -13,12 +13,13 @@ def publish_message(channel: str, payload: Union[dict, str]): message = json.dumps(payload) if not isinstance(payload, str) else payload - get_client(settings.PLUGINS_RELOAD_REDIS_URL).publish(channel, message) + get_client(PLUGINS_RELOAD_REDIS_URL).publish(channel, message) def reload_plugins_on_workers(): logger.info("Reloading plugins on workers") - publish_message(settings.PLUGINS_RELOAD_PUBSUB_CHANNEL, "reload!") + + publish_message(PLUGINS_RELOAD_PUBSUB_CHANNEL, "reload!") def reload_action_on_workers(team_id: int, action_id: int): @@ -47,3 +48,21 @@ def reset_available_product_features_cache_on_workers(organization_id: str): def populate_plugin_capabilities_on_workers(plugin_id: str): logger.info(f"Populating plugin capabilities for plugin {plugin_id} on workers") publish_message("populate-plugin-capabilities", {"plugin_id": plugin_id}) + + +def create_hog_invocation_test( + team_id: int, + hog_function_id: str, + event: dict, + configuration: dict, + mock_async_functions: bool, +) -> requests.Response: + logger.info(f"Creating hog invocation test for hog function {hog_function_id} on workers") + return requests.post( + CDP_FUNCTION_EXECUTOR_API_URL + f"/api/projects/{team_id}/hog_functions/{hog_function_id}/invocations", + json={ + "event": event, + "configuration": configuration, + "mock_async_functions": mock_async_functions, + }, + ) diff --git a/posthog/settings/data_stores.py b/posthog/settings/data_stores.py index 5606ad4f423fc..45b0e93dd9822 100644 --- a/posthog/settings/data_stores.py +++ b/posthog/settings/data_stores.py @@ -305,6 +305,14 @@ def _parse_kafka_hosts(hosts_string: str) -> list[str]: # We should move away to a different communication channel and remove this. PLUGINS_RELOAD_REDIS_URL = os.getenv("PLUGINS_RELOAD_REDIS_URL", REDIS_URL) + +CDP_FUNCTION_EXECUTOR_API_URL = get_from_env("CDP_FUNCTION_EXECUTOR_API_URL", "") + +if not CDP_FUNCTION_EXECUTOR_API_URL: + CDP_FUNCTION_EXECUTOR_API_URL = ( + "http://localhost:6738" if DEBUG else "http://ingestion-cdp-function-callbacks.posthog.svc.cluster.local" + ) + CACHES = { "default": { "BACKEND": "django_redis.cache.RedisCache", diff --git a/posthog/tasks/hog_functions.py b/posthog/tasks/hog_functions.py index 9304aff26a243..54b23761a71f0 100644 --- a/posthog/tasks/hog_functions.py +++ b/posthog/tasks/hog_functions.py @@ -2,8 +2,9 @@ from celery import shared_task +from posthog.cdp.filters import compile_filters_bytecode from posthog.models.action.action import Action -from posthog.plugins.reload import reload_hog_functions_on_workers +from posthog.plugins.plugin_server_api import reload_hog_functions_on_workers from posthog.tasks.utils import CeleryQueue @@ -47,7 +48,7 @@ def refresh_affected_hog_functions(team_id: Optional[int] = None, action_id: Opt actions_by_id = {action.id: action for action in all_related_actions} for hog_function in affected_hog_functions: - hog_function.compile_filters_bytecode(actions=actions_by_id) + hog_function.filters = compile_filters_bytecode(hog_function.filters, hog_function.team, actions_by_id) updates = HogFunction.objects.bulk_update(affected_hog_functions, ["filters"])