diff --git a/frontend/__snapshots__/components-playlist--default--dark.png b/frontend/__snapshots__/components-playlist--default--dark.png new file mode 100644 index 0000000000000..88b23fc64f8e8 Binary files /dev/null and b/frontend/__snapshots__/components-playlist--default--dark.png differ diff --git a/frontend/__snapshots__/components-playlist--default--light.png b/frontend/__snapshots__/components-playlist--default--light.png new file mode 100644 index 0000000000000..8a46d7ab3637f Binary files /dev/null and b/frontend/__snapshots__/components-playlist--default--light.png differ diff --git a/frontend/__snapshots__/components-playlist--multiple-sections--dark.png b/frontend/__snapshots__/components-playlist--multiple-sections--dark.png new file mode 100644 index 0000000000000..c32bc9749df03 Binary files /dev/null and b/frontend/__snapshots__/components-playlist--multiple-sections--dark.png differ diff --git a/frontend/__snapshots__/components-playlist--multiple-sections--light.png b/frontend/__snapshots__/components-playlist--multiple-sections--light.png new file mode 100644 index 0000000000000..a5efa06274e97 Binary files /dev/null and b/frontend/__snapshots__/components-playlist--multiple-sections--light.png differ diff --git a/frontend/__snapshots__/components-playlist--with-footer--dark.png b/frontend/__snapshots__/components-playlist--with-footer--dark.png new file mode 100644 index 0000000000000..88b23fc64f8e8 Binary files /dev/null and b/frontend/__snapshots__/components-playlist--with-footer--dark.png differ diff --git a/frontend/__snapshots__/components-playlist--with-footer--light.png b/frontend/__snapshots__/components-playlist--with-footer--light.png new file mode 100644 index 0000000000000..8a46d7ab3637f Binary files /dev/null and b/frontend/__snapshots__/components-playlist--with-footer--light.png differ diff --git a/frontend/__snapshots__/exporter-exporter--funnel-historical-trends-insight--dark.png b/frontend/__snapshots__/exporter-exporter--funnel-historical-trends-insight--dark.png deleted file mode 100644 index a820c399ecbf2..0000000000000 Binary files a/frontend/__snapshots__/exporter-exporter--funnel-historical-trends-insight--dark.png and /dev/null differ diff --git a/frontend/__snapshots__/exporter-exporter--funnel-historical-trends-insight--light.png b/frontend/__snapshots__/exporter-exporter--funnel-historical-trends-insight--light.png deleted file mode 100644 index 1276a2564f958..0000000000000 Binary files a/frontend/__snapshots__/exporter-exporter--funnel-historical-trends-insight--light.png and /dev/null differ diff --git a/frontend/__snapshots__/replay-player-failure--recent-recordings-404--dark.png b/frontend/__snapshots__/replay-player-failure--recent-recordings-404--dark.png index 688871da10a15..7c36f82f3c19e 100644 Binary files a/frontend/__snapshots__/replay-player-failure--recent-recordings-404--dark.png and b/frontend/__snapshots__/replay-player-failure--recent-recordings-404--dark.png differ diff --git a/frontend/__snapshots__/replay-player-failure--recent-recordings-404--light.png b/frontend/__snapshots__/replay-player-failure--recent-recordings-404--light.png index 1b812d9edd438..e07d3b8776663 100644 Binary files a/frontend/__snapshots__/replay-player-failure--recent-recordings-404--light.png and b/frontend/__snapshots__/replay-player-failure--recent-recordings-404--light.png differ diff --git a/frontend/src/exporter/Exporter.stories.tsx b/frontend/src/exporter/Exporter.stories.tsx index cc64aac00d502..b2be152974a2b 100644 --- a/frontend/src/exporter/Exporter.stories.tsx +++ b/frontend/src/exporter/Exporter.stories.tsx @@ -138,6 +138,7 @@ FunnelTopToBottomBreakdownInsight.args = { } export const FunnelHistoricalTrendsInsight: Story = Template.bind({}) +FunnelHistoricalTrendsInsight.tags = ['autodocs', 'test-skip'] FunnelHistoricalTrendsInsight.args = { insight: require('../mocks/fixtures/api/projects/team_id/insights/funnelHistoricalTrends.json'), } diff --git a/frontend/src/lib/api.ts b/frontend/src/lib/api.ts index bded3aa4f319d..85909b193e4b7 100644 --- a/frontend/src/lib/api.ts +++ b/frontend/src/lib/api.ts @@ -5,7 +5,6 @@ import { ActivityLogItem } from 'lib/components/ActivityLog/humanizeActivity' import { apiStatusLogic } from 'lib/logic/apiStatusLogic' import { objectClean, toParams } from 'lib/utils' import posthog from 'posthog-js' -import { LogEntry } from 'scenes/pipeline/pipelineNodeLogsLogic' import { SavedSessionRecordingPlaylistsResult } from 'scenes/session-recordings/saved-playlists/savedSessionRecordingPlaylistsLogic' import { getCurrentExporterData } from '~/exporter/exporterViewLogic' @@ -50,6 +49,7 @@ import { InsightModel, IntegrationType, ListOrganizationMembersParams, + LogEntry, MediaUploadResponse, NewEarlyAccessFeatureType, NotebookListItemType, @@ -1685,6 +1685,17 @@ const api = { async listIcons(params: { query?: string } = {}): Promise { return await new ApiRequest().hogFunctions().withAction('icons').withQueryString(params).get() }, + + async createTestInvocation( + id: HogFunctionType['id'], + data: { + configuration: Partial + mock_async_functions: boolean + event: any + } + ): Promise { + return await new ApiRequest().hogFunction(id).withAction('invocations').create({ data }) + }, }, annotations: { diff --git a/frontend/src/lib/components/CodeEditors.tsx b/frontend/src/lib/components/CodeEditors.tsx index 26f74b3f7153a..d6d195d025cc8 100644 --- a/frontend/src/lib/components/CodeEditors.tsx +++ b/frontend/src/lib/components/CodeEditors.tsx @@ -74,7 +74,7 @@ export function CodeEditor({ options, onMount, ...editorProps }: CodeEditorProps } export function CodeEditorResizeable({ - height: defaultHeight = 200, + height: defaultHeight, minHeight = '5rem', maxHeight = '90vh', ...props @@ -84,7 +84,7 @@ export function CodeEditorResizeable({ maxHeight?: string | number }): JSX.Element { const [height, setHeight] = useState(defaultHeight) - const [manualHeight, setManualHeight] = useState() + const [manualHeight, setManualHeight] = useState(defaultHeight) const ref = useRef(null) diff --git a/frontend/src/scenes/session-recordings/playlist/SessionRecordingsPlaylist.scss b/frontend/src/lib/components/Playlist/Playlist.scss similarity index 75% rename from frontend/src/scenes/session-recordings/playlist/SessionRecordingsPlaylist.scss rename to frontend/src/lib/components/Playlist/Playlist.scss index a7df986ce52db..292fe2f7b3629 100644 --- a/frontend/src/scenes/session-recordings/playlist/SessionRecordingsPlaylist.scss +++ b/frontend/src/lib/components/Playlist/Playlist.scss @@ -1,7 +1,7 @@ @import '../../../styles/mixins'; @import '../../../styles/vars'; -.SessionRecordingsPlaylist { +.Playlist { display: flex; flex-direction: row; align-items: flex-start; @@ -11,7 +11,7 @@ border: 1px solid var(--border); border-radius: var(--radius); - .SessionRecordingsPlaylist__list { + .Playlist__list { position: relative; display: flex; flex-direction: column; @@ -19,7 +19,7 @@ height: 100%; overflow: hidden; - &:not(.SessionRecordingsPlaylist__list--collapsed) { + &:not(.Playlist__list--collapsed) { width: 25%; min-width: 305px; max-width: 350px; @@ -30,18 +30,11 @@ } } - .SessionRecordingsPlaylist__player { + .Playlist__main { flex: 1; width: 100%; height: 100%; overflow: hidden; - - .SessionRecordingsPlaylist__loading { - display: flex; - align-items: center; - justify-content: center; - margin-top: 10rem; - } } &--embedded { @@ -49,7 +42,7 @@ } &--wide { - .SessionRecordingsPlaylist__player { + .Playlist__main { flex: 1; height: 100%; } diff --git a/frontend/src/lib/components/Playlist/Playlist.stories.tsx b/frontend/src/lib/components/Playlist/Playlist.stories.tsx new file mode 100644 index 0000000000000..0fdc98cd1d4bc --- /dev/null +++ b/frontend/src/lib/components/Playlist/Playlist.stories.tsx @@ -0,0 +1,79 @@ +import { Meta, StoryFn, StoryObj } from '@storybook/react' +import { range } from 'lib/utils' + +import { Playlist, PlaylistProps } from './Playlist' + +type Story = StoryObj +const meta: Meta = { + title: 'Components/Playlist', + component: Playlist, +} +export default meta + +type ObjectType = { id: string | number } + +const ListItem = ({ item }: { item: ObjectType }): JSX.Element =>
Object {item.id}
+ +const Template: StoryFn = (props: Partial>) => { + const mainContent = ({ activeItem }: { activeItem: ObjectType }): JSX.Element => ( +
+ {activeItem ? `Object ${activeItem.id} selected` : 'Select an item from the list'} +
+ ) + + return ( +
+ No items
} + content={mainContent} + {...props} + /> + + ) +} + +export const Default: Story = Template.bind({}) +Default.args = { + sections: [ + { + key: 'default', + title: 'Default section', + items: range(0, 100).map((idx) => ({ id: idx })), + render: ListItem, + }, + ], +} + +export const MultipleSections: Story = Template.bind({}) +MultipleSections.args = { + sections: [ + { + key: 'one', + title: 'First section', + items: range(0, 5).map((idx) => ({ id: idx })), + render: ListItem, + initiallyOpen: true, + }, + { + key: 'two', + title: 'Second section', + items: range(0, 5).map((idx) => ({ id: idx })), + render: ListItem, + }, + ], +} + +export const WithFooter: Story = Template.bind({}) +WithFooter.args = { + sections: [ + { + key: 'default', + title: 'Section with footer', + items: range(0, 100).map((idx) => ({ id: idx })), + render: ListItem, + footer:
Section footer
, + }, + ], +} diff --git a/frontend/src/lib/components/Playlist/Playlist.tsx b/frontend/src/lib/components/Playlist/Playlist.tsx new file mode 100644 index 0000000000000..42acac30276b0 --- /dev/null +++ b/frontend/src/lib/components/Playlist/Playlist.tsx @@ -0,0 +1,309 @@ +import './Playlist.scss' + +import { IconCollapse } from '@posthog/icons' +import { LemonButton, LemonButtonProps, LemonCollapse, LemonSkeleton, Tooltip } from '@posthog/lemon-ui' +import clsx from 'clsx' +import { useResizeBreakpoints } from 'lib/hooks/useResizeObserver' +import { IconChevronRight } from 'lib/lemon-ui/icons' +import { LemonTableLoader } from 'lib/lemon-ui/LemonTable/LemonTableLoader' +import { range } from 'lib/utils' +import { useEffect, useRef, useState } from 'react' +import { DraggableToNotebook } from 'scenes/notebooks/AddToNotebook/DraggableToNotebook' + +import { Resizer } from '../Resizer/Resizer' + +const SCROLL_TRIGGER_OFFSET = 100 + +export type PlaylistSection = { + key: string + title?: string + items: T[] + render: ({ item, isActive }: { item: T; isActive: boolean }) => JSX.Element + initiallyOpen?: boolean + footer?: JSX.Element +} + +type PlaylistHeaderAction = Pick & { + key: string + content: React.ReactNode +} + +export type PlaylistProps = { + sections: PlaylistSection[] + listEmptyState: JSX.Element + content: ({ activeItem }: { activeItem: T | null }) => JSX.Element + title?: string + notebooksHref?: string + embedded?: boolean + loading?: boolean + headerActions?: PlaylistHeaderAction[] + onScrollListEdge?: (edge: 'top' | 'bottom') => void + onSelect?: (item: T) => void + 'data-attr'?: string + activeItemId?: string +} + +const CounterBadge = ({ children }: { children: React.ReactNode }): JSX.Element => ( + {children} +) + +export function Playlist< + T extends { + id: string | number // accepts any object as long as it conforms to the interface of having an `id` + [key: string]: any + } +>({ + title, + notebooksHref, + loading, + embedded = false, + activeItemId: propsActiveItemId, + content, + sections, + headerActions = [], + onScrollListEdge, + listEmptyState, + onSelect, + 'data-attr': dataAttr, +}: PlaylistProps): JSX.Element { + const [controlledActiveItemId, setControlledActiveItemId] = useState(null) + const [listCollapsed, setListCollapsed] = useState(false) + const playlistListRef = useRef(null) + const { ref: playlistRef, size } = useResizeBreakpoints({ + 0: 'small', + 750: 'medium', + }) + + const onChangeActiveItem = (item: T): void => { + setControlledActiveItemId(item.id) + onSelect?.(item) + } + + const activeItemId = propsActiveItemId === undefined ? controlledActiveItemId : propsActiveItemId + + const activeItem = sections.flatMap((s) => s.items).find((i) => i.id === activeItemId) || null + + return ( +
+
+ {listCollapsed ? ( + setListCollapsed(false)} /> + ) : ( + setListCollapsed(true)} + activeItemId={activeItemId} + setActiveItemId={onChangeActiveItem} + emptyState={listEmptyState} + /> + )} + setListCollapsed(value)} + onDoubleClick={() => setListCollapsed(!listCollapsed)} + /> +
+
{content({ activeItem })}
+
+ ) +} + +const CollapsedList = ({ onClickOpen }: { onClickOpen: () => void }): JSX.Element => ( +
+ } onClick={onClickOpen} /> +
+) + +function List< + T extends { + id: string | number + [key: string]: any + } +>({ + title, + notebooksHref, + onClickCollapse, + setActiveItemId, + headerActions = [], + sections, + activeItemId, + onScrollListEdge, + loading, + emptyState, +}: { + title: PlaylistProps['title'] + notebooksHref: PlaylistProps['notebooksHref'] + onClickCollapse: () => void + activeItemId: T['id'] | null + setActiveItemId: (item: T) => void + headerActions: PlaylistProps['headerActions'] + sections: PlaylistProps['sections'] + onScrollListEdge: PlaylistProps['onScrollListEdge'] + loading: PlaylistProps['loading'] + emptyState: PlaylistProps['listEmptyState'] +}): JSX.Element { + const [activeHeaderActionKey, setActiveHeaderActionKey] = useState(null) + const lastScrollPositionRef = useRef(0) + const contentRef = useRef(null) + + useEffect(() => { + if (contentRef.current) { + contentRef.current.scrollTop = 0 + } + }, [activeHeaderActionKey]) + + const handleScroll = (e: React.UIEvent): void => { + // If we are scrolling down then check if we are at the bottom of the list + if (e.currentTarget.scrollTop > lastScrollPositionRef.current) { + const scrollPosition = e.currentTarget.scrollTop + e.currentTarget.clientHeight + if (e.currentTarget.scrollHeight - scrollPosition < SCROLL_TRIGGER_OFFSET) { + onScrollListEdge?.('bottom') + } + } + + // Same again but if scrolling to the top + if (e.currentTarget.scrollTop < lastScrollPositionRef.current) { + if (e.currentTarget.scrollTop < SCROLL_TRIGGER_OFFSET) { + onScrollListEdge?.('top') + } + } + + lastScrollPositionRef.current = e.currentTarget.scrollTop + } + + const itemsCount = sections.flatMap((s) => s.items).length + const actionContent = headerActions?.find((a) => activeHeaderActionKey === a.key)?.content + const initiallyOpenSections = sections.filter((s) => s.initiallyOpen).map((s) => s.key) + + return ( +
+ +
+ } onClick={onClickCollapse} /> + + {title ? ( + + {title} + + ) : null} + + Showing {itemsCount} results. +
+ Scrolling to the bottom or the top of the list will load older or newer results + respectively. + + } + > + + {Math.min(999, itemsCount)}+ + +
+
+ {headerActions.map(({ key, icon, tooltip, children }) => ( + setActiveHeaderActionKey(activeHeaderActionKey === key ? null : key)} + > + {children} + + ))} + +
+
+ +
+ {actionContent &&
{actionContent}
} + + {sections.flatMap((s) => s.items).length ? ( + <> + {sections.length > 1 ? ( + ({ + key: s.key, + header: s.title, + content: ( + + ), + className: 'p-0', + }))} + multiple + embedded + size="small" + /> + ) : ( + + )} + + ) : loading ? ( + + ) : ( + emptyState + )} +
+
+ ) +} + +export function ListSection< + T extends { + id: string | number + [key: string]: any + } +>({ + items, + render, + footer, + onClick, + activeItemId, +}: PlaylistSection & { + onClick: (item: T) => void + activeItemId: T['id'] | null +}): JSX.Element { + return ( + <> + {items.length && + items.map((item) => ( +
onClick(item)}> + {render({ item, isActive: item.id === activeItemId })} +
+ ))} + {footer} + + ) +} + +const LoadingState = (): JSX.Element => { + return ( + <> + {range(20).map((i) => ( +
+ + +
+ ))} + + ) +} diff --git a/frontend/src/lib/lemon-ui/LemonCollapse/LemonCollapse.scss b/frontend/src/lib/lemon-ui/LemonCollapse/LemonCollapse.scss index 01aa63775fcf9..6e7dfba7a766c 100644 --- a/frontend/src/lib/lemon-ui/LemonCollapse/LemonCollapse.scss +++ b/frontend/src/lib/lemon-ui/LemonCollapse/LemonCollapse.scss @@ -5,6 +5,11 @@ overflow: hidden; border: 1px solid var(--border); border-radius: var(--radius); + + &--embedded { + border: none; + border-radius: 0; + } } .LemonCollapsePanel { diff --git a/frontend/src/lib/lemon-ui/LemonCollapse/LemonCollapse.tsx b/frontend/src/lib/lemon-ui/LemonCollapse/LemonCollapse.tsx index 55b9743c907bd..96e8105c60133 100644 --- a/frontend/src/lib/lemon-ui/LemonCollapse/LemonCollapse.tsx +++ b/frontend/src/lib/lemon-ui/LemonCollapse/LemonCollapse.tsx @@ -21,6 +21,7 @@ interface LemonCollapsePropsBase { panels: (LemonCollapsePanel | null | false)[] className?: string size?: LemonButtonProps['size'] + embedded?: boolean } interface LemonCollapsePropsSingle extends LemonCollapsePropsBase { @@ -43,6 +44,7 @@ export function LemonCollapse({ panels, className, size, + embedded, ...props }: LemonCollapseProps): JSX.Element { let isPanelExpanded: (key: K) => boolean @@ -72,7 +74,7 @@ export function LemonCollapse({ } return ( -
+
{(panels.filter(Boolean) as LemonCollapsePanel[]).map(({ key, ...panel }) => ( - {({ value, onChange }) => { - return ( - <> -
- {schema.label || schema.key} - {showSource ? ( - } - onClick={() => setEditing(true)} - /> - ) : null} -
- onChange({ value: val })} - /> - - ) - }} - - ) + return ( +
+ {!editing ? ( + + {({ value, onChange }) => { + return ( + <> +
+ + {schema.label || schema.key} + + {showSource ? ( + <> + + inputs.{schema.key} + +
+ } + onClick={() => setEditing(true)} + /> + + ) : null} +
+ onChange({ value: val })} + /> + + ) + }} + + ) : ( +
+ setEditing(false)} + /> +
+ )} +
+ ) +} + +export function HogFunctionInputs(): JSX.Element { + const { showSource, configuration } = useValues(pipelineHogFunctionConfigurationLogic) + const { setConfigurationValue } = useActions(pipelineHogFunctionConfigurationLogic) + + if (!configuration?.inputs_schema?.length) { + return This function does not require any input variables. } + const inputSchemas = configuration.inputs_schema + const inputSchemaIds = inputSchemas.map((schema) => schema.key) + return ( -
- setEditing(false)} /> -
+ <> + { + if (over && active.id !== over.id) { + const oldIndex = inputSchemaIds.indexOf(active.id as string) + const newIndex = inputSchemaIds.indexOf(over.id as string) + + setConfigurationValue('inputs_schema', arrayMove(inputSchemas, oldIndex, newIndex)) + } + }} + > + + {configuration.inputs_schema?.map((schema) => { + return + })} + + + ) } diff --git a/frontend/src/scenes/pipeline/hogfunctions/HogFunctionTest.tsx b/frontend/src/scenes/pipeline/hogfunctions/HogFunctionTest.tsx new file mode 100644 index 0000000000000..2a681ce4ae47c --- /dev/null +++ b/frontend/src/scenes/pipeline/hogfunctions/HogFunctionTest.tsx @@ -0,0 +1,188 @@ +import { TZLabel } from '@posthog/apps-common' +import { IconInfo, IconX } from '@posthog/icons' +import { LemonButton, LemonLabel, LemonSwitch, LemonTable, LemonTag, Tooltip } from '@posthog/lemon-ui' +import clsx from 'clsx' +import { useActions, useValues } from 'kea' +import { Form } from 'kea-forms' +import { CodeEditorResizeable } from 'lib/components/CodeEditors' +import { LemonField } from 'lib/lemon-ui/LemonField' + +import { hogFunctionTestLogic, HogFunctionTestLogicProps } from './hogFunctionTestLogic' + +const HogFunctionTestEditor = ({ + value, + onChange, +}: { + value: string + onChange?: (value?: string) => void +}): JSX.Element => { + return ( + + ) +} + +export function HogFunctionTestPlaceholder(): JSX.Element { + return ( +
+

Testing

+

Save your configuration to enable testing

+
+ ) +} + +export function HogFunctionTest(props: HogFunctionTestLogicProps): JSX.Element { + const { isTestInvocationSubmitting, testResult, expanded } = useValues(hogFunctionTestLogic(props)) + const { submitTestInvocation, setTestResult, toggleExpanded } = useActions(hogFunctionTestLogic(props)) + + return ( +
+
+
+ {!expanded ? ( + toggleExpanded()}> +

Testing

+
+ ) : ( +

Testing

+ )} + + {expanded && ( + <> + {testResult ? ( + setTestResult(null)} + loading={isTestInvocationSubmitting} + > + Clear test result + + ) : ( + <> + + {({ value, onChange }) => ( + + When selected, async functions such as `fetch` will not + actually be called but instead will be mocked out with + the fetch content logged instead + + } + > + + Mock out async functions + + + + } + /> + )} + + + Test function + + + )} + + } onClick={() => toggleExpanded()} tooltip="Hide testing" /> + + )} +
+ + {expanded && ( + <> + {testResult ? ( +
+
+ Test invocation result + + {testResult.status} + +
+ + , + width: 0, + }, + { + width: 100, + title: 'Level', + key: 'level', + dataIndex: 'level', + }, + { + title: 'Message', + key: 'message', + dataIndex: 'message', + render: (message) => {message}, + }, + ]} + className="ph-no-capture" + rowKey="timestamp" + pagination={{ pageSize: 200, hideOnSinglePage: true }} + /> +
+ ) : ( +
+ + {({ value, onChange }) => ( + <> +
+

+ The globals object is the context in which your function will be + tested. It should contain all the data that your function will need + to run +

+
+ + + + )} +
+
+ )} + + )} +
+
+ ) +} diff --git a/frontend/src/scenes/pipeline/hogfunctions/PipelineHogFunctionConfiguration.tsx b/frontend/src/scenes/pipeline/hogfunctions/PipelineHogFunctionConfiguration.tsx index 68e50c49db023..71d1dd694a1e6 100644 --- a/frontend/src/scenes/pipeline/hogfunctions/PipelineHogFunctionConfiguration.tsx +++ b/frontend/src/scenes/pipeline/hogfunctions/PipelineHogFunctionConfiguration.tsx @@ -25,7 +25,8 @@ import { groupsModel } from '~/models/groupsModel' import { EntityTypes } from '~/types' import { HogFunctionIconEditable } from './HogFunctionIcon' -import { HogFunctionInputWithSchema } from './HogFunctionInputs' +import { HogFunctionInputs } from './HogFunctionInputs' +import { HogFunctionTest, HogFunctionTestPlaceholder } from './HogFunctionTest' import { pipelineHogFunctionConfigurationLogic } from './pipelineHogFunctionConfigurationLogic' export function PipelineHogFunctionConfiguration({ @@ -262,15 +263,7 @@ export function PipelineHogFunctionConfiguration({
- {configuration?.inputs_schema?.length ? ( - configuration?.inputs_schema.map((schema, index) => { - return - }) - ) : ( - - This function does not require any input variables. - - )} + {showSource ? ( <> @@ -343,6 +336,8 @@ export function PipelineHogFunctionConfiguration({ )}
+ + {id ? : }
{saveButtons}
diff --git a/frontend/src/scenes/pipeline/hogfunctions/hogFunctionTestLogic.tsx b/frontend/src/scenes/pipeline/hogfunctions/hogFunctionTestLogic.tsx new file mode 100644 index 0000000000000..412eb59c3ddb9 --- /dev/null +++ b/frontend/src/scenes/pipeline/hogfunctions/hogFunctionTestLogic.tsx @@ -0,0 +1,95 @@ +import { lemonToast } from '@posthog/lemon-ui' +import { actions, afterMount, connect, kea, key, path, props, reducers } from 'kea' +import { forms } from 'kea-forms' +import api from 'lib/api' +import { tryJsonParse } from 'lib/utils' + +import { LogEntry } from '~/types' + +import type { hogFunctionTestLogicType } from './hogFunctionTestLogicType' +import { pipelineHogFunctionConfigurationLogic, sanitizeConfiguration } from './pipelineHogFunctionConfigurationLogic' +import { createExampleEvent } from './utils/event-conversion' + +export interface HogFunctionTestLogicProps { + id: string +} + +export type HogFunctionTestInvocationForm = { + globals: string // HogFunctionInvocationGlobals + mock_async_functions: boolean +} + +export type HogFunctionTestInvocationResult = { + status: 'success' | 'error' + logs: LogEntry[] +} + +export const hogFunctionTestLogic = kea([ + props({} as HogFunctionTestLogicProps), + key((props) => props.id), + path((id) => ['scenes', 'pipeline', 'hogfunctions', 'hogFunctionTestLogic', id]), + connect((props: HogFunctionTestLogicProps) => ({ + values: [pipelineHogFunctionConfigurationLogic({ id: props.id }), ['configuration', 'configurationHasErrors']], + actions: [pipelineHogFunctionConfigurationLogic({ id: props.id }), ['touchConfigurationField']], + })), + actions({ + setTestResult: (result: HogFunctionTestInvocationResult | null) => ({ result }), + toggleExpanded: (expanded?: boolean) => ({ expanded }), + }), + reducers({ + expanded: [ + false as boolean, + { + toggleExpanded: (_, { expanded }) => (expanded === undefined ? !_ : expanded), + }, + ], + + testResult: [ + null as HogFunctionTestInvocationResult | null, + { + setTestResult: (_, { result }) => result, + }, + ], + }), + forms(({ props, actions, values }) => ({ + testInvocation: { + defaults: { + mock_async_functions: true, + } as HogFunctionTestInvocationForm, + alwaysShowErrors: true, + errors: ({ globals }) => { + return { + globals: !globals ? 'Required' : tryJsonParse(globals) ? undefined : 'Invalid JSON', + } + }, + submit: async (data) => { + // Submit the test invocation + // Set the response somewhere + + if (values.configurationHasErrors) { + lemonToast.error('Please fix the configuration errors before testing.') + // TODO: How to get the form to show errors without submitting? + return + } + + const event = tryJsonParse(data.globals) + const configuration = sanitizeConfiguration(values.configuration) + + try { + const res = await api.hogFunctions.createTestInvocation(props.id, { + event, + mock_async_functions: data.mock_async_functions, + configuration, + }) + + actions.setTestResult(res) + } catch (e) { + lemonToast.error(`An unexpected serror occurred while trying to testing the function. ${e}`) + } + }, + }, + })), + afterMount(({ actions }) => { + actions.setTestInvocationValue('globals', JSON.stringify(createExampleEvent(), null, 2)) + }), +]) diff --git a/frontend/src/scenes/pipeline/hogfunctions/pipelineHogFunctionConfigurationLogic.tsx b/frontend/src/scenes/pipeline/hogfunctions/pipelineHogFunctionConfigurationLogic.tsx index 5937c80dc4eb7..d0ebcbe5e6ad1 100644 --- a/frontend/src/scenes/pipeline/hogfunctions/pipelineHogFunctionConfigurationLogic.tsx +++ b/frontend/src/scenes/pipeline/hogfunctions/pipelineHogFunctionConfigurationLogic.tsx @@ -9,6 +9,7 @@ import { urls } from 'scenes/urls' import { FilterType, + HogFunctionConfigurationType, HogFunctionTemplateType, HogFunctionType, PipelineNodeTab, @@ -24,8 +25,6 @@ export interface PipelineHogFunctionConfigurationLogicProps { id?: string } -export type HogFunctionConfigurationType = Omit - const NEW_FUNCTION_TEMPLATE: HogFunctionTemplateType = { id: 'new', name: '', @@ -68,7 +67,37 @@ function sanitizeFilters(filters?: FilterType): PluginConfigTypeNew['filters'] { return Object.keys(sanitized).length > 0 ? sanitized : undefined } -// Should likely be somewhat similar to pipelineBatchExportConfigurationLogic +export function sanitizeConfiguration(data: HogFunctionConfigurationType): HogFunctionConfigurationType { + const sanitizedInputs = {} + + data.inputs_schema?.forEach((input) => { + const value = data.inputs?.[input.key]?.value + + if (input.type === 'json' && typeof value === 'string') { + try { + sanitizedInputs[input.key] = { + value: JSON.parse(value), + } + } catch (e) { + // Ignore + } + } else { + sanitizedInputs[input.key] = { + value: value, + } + } + }) + + const payload: HogFunctionConfigurationType = { + ...data, + filters: data.filters ? sanitizeFilters(data.filters) : null, + inputs: sanitizedInputs, + icon_url: data.icon_url?.replace('&temp=true', ''), // Remove temp=true so it doesn't try and suggest new options next time + } + + return payload +} + export const pipelineHogFunctionConfigurationLogic = kea([ props({} as PipelineHogFunctionConfigurationLogicProps), key(({ id, templateId }: PipelineHogFunctionConfigurationLogicProps) => { @@ -140,42 +169,20 @@ export const pipelineHogFunctionConfigurationLogic = kea { try { - const sanitizedInputs = {} - - data.inputs_schema?.forEach((input) => { - const value = data.inputs?.[input.key]?.value - - if (input.type === 'json' && typeof value === 'string') { - try { - sanitizedInputs[input.key] = { - value: JSON.parse(value), - } - } catch (e) { - // Ignore - } - } else { - sanitizedInputs[input.key] = { - value: value, - } - } - }) - - const payload: HogFunctionConfigurationType = { - ...data, - filters: data.filters ? sanitizeFilters(data.filters) : null, - inputs: sanitizedInputs, - icon_url: data.icon_url?.replace('&temp=true', ''), // Remove temp=true so it doesn't try and suggest new options next time - } + const payload = sanitizeConfiguration(data) if (props.templateId) { // Only sent on create ;(payload as any).template_id = props.templateId } - if (!props.id) { - return await api.hogFunctions.create(payload) - } - return await api.hogFunctions.update(props.id, payload) + const res = props.id + ? await api.hogFunctions.update(props.id, payload) + : await api.hogFunctions.create(payload) + + lemonToast.success('Configuration saved') + + return res } catch (e) { const maybeValidationError = (e as any).data if (maybeValidationError?.type === 'validation_error') { @@ -214,15 +221,17 @@ export const pipelineHogFunctionConfigurationLogic = kea { - if (input.required && !inputs[input.key]) { - inputErrors[input.key] = 'This field is required' + const key = input.key + const value = inputs[key]?.value + if (input.required && !value) { + inputErrors[key] = 'This field is required' } - if (input.type === 'json' && typeof inputs[input.key] === 'string') { + if (input.type === 'json' && typeof value === 'string') { try { - JSON.parse(inputs[input.key].value) + JSON.parse(value) } catch (e) { - inputErrors[input.key] = 'Invalid JSON' + inputErrors[key] = 'Invalid JSON' } } }) @@ -313,6 +322,10 @@ export const pipelineHogFunctionConfigurationLogic = kea { + // Clear the manually set errors otherwise the submission won't work + actions.setConfigurationManualErrors({}) + }, })), afterMount(({ props, actions, cache }) => { if (props.templateId) { diff --git a/frontend/src/scenes/pipeline/hogfunctions/utils/event-conversion.ts b/frontend/src/scenes/pipeline/hogfunctions/utils/event-conversion.ts new file mode 100644 index 0000000000000..b806ac03c2b4a --- /dev/null +++ b/frontend/src/scenes/pipeline/hogfunctions/utils/event-conversion.ts @@ -0,0 +1,24 @@ +import { dayjs } from 'lib/dayjs' +import { uuid } from 'lib/utils' + +// NOTE: This is just for testing - it technically returns ParsedClickhouseEvent but not worth it to import that type +export const createExampleEvent = (): any => ({ + uuid: uuid(), + event: '$pageview', + distinct_id: '12345', + properties: { + $browser: 'Chrome', + $device_type: 'Desktop', + $current_url: `${window.location.origin}/project/1/activity/explore`, + $pathname: '/project/1/activity/explore', + $browser_version: 125, + }, + timestamp: dayjs().toISOString(), + created_at: dayjs().toISOString(), + url: `${window.location.origin}/project/1/activity/explore`, + person_id: uuid(), + person_created_at: dayjs().toISOString(), + person_properties: { + email: 'user@example.com', + }, +}) diff --git a/frontend/src/scenes/pipeline/pipelineNodeLogsLogic.tsx b/frontend/src/scenes/pipeline/pipelineNodeLogsLogic.tsx index 0e5c3466e83f9..fdf15ae7b39aa 100644 --- a/frontend/src/scenes/pipeline/pipelineNodeLogsLogic.tsx +++ b/frontend/src/scenes/pipeline/pipelineNodeLogsLogic.tsx @@ -7,14 +7,14 @@ import { dayjs } from 'lib/dayjs' import { pipelineNodeLogic, PipelineNodeLogicProps } from 'scenes/pipeline/pipelineNodeLogic' import api from '~/lib/api' -import { BatchExportLogEntry, PluginLogEntry } from '~/types' +import { BatchExportLogEntry, LogEntry, PluginLogEntry } from '~/types' import { teamLogic } from '../teamLogic' import type { pipelineNodeLogsLogicType } from './pipelineNodeLogsLogicType' import { PipelineBackend } from './types' import { LogLevelDisplay, logLevelsToTypeFilters, LogTypeDisplay } from './utils' -export type LogEntry = BatchExportLogEntry | PluginLogEntry +export type PipelineNodeLogEntry = BatchExportLogEntry | PluginLogEntry | LogEntry export enum PipelineLogLevel { Debug = 'DEBUG', @@ -42,10 +42,10 @@ export const pipelineNodeLogsLogic = kea([ }), loaders(({ props: { id }, values, actions, cache }) => ({ logs: [ - [] as LogEntry[], + [] as PipelineNodeLogEntry[], { loadLogs: async () => { - let results: LogEntry[] + let results: PipelineNodeLogEntry[] if (values.node.backend === PipelineBackend.BatchExport) { results = await api.batchExportLogs.search( values.node.id, @@ -76,7 +76,7 @@ export const pipelineNodeLogsLogic = kea([ return results }, loadMoreLogs: async () => { - let results: LogEntry[] + let results: PipelineNodeLogEntry[] if (values.node.backend === PipelineBackend.BatchExport) { results = await api.batchExportLogs.search( id as string, @@ -116,7 +116,7 @@ export const pipelineNodeLogsLogic = kea([ }, ], backgroundLogs: [ - [] as LogEntry[], + [] as PipelineNodeLogEntry[], { pollBackgroundLogs: async () => { // we fetch new logs in the background and allow the user to expand @@ -125,7 +125,7 @@ export const pipelineNodeLogsLogic = kea([ return values.backgroundLogs } - let results: LogEntry[] + let results: PipelineNodeLogEntry[] if (values.node.backend === PipelineBackend.BatchExport) { results = await api.batchExportLogs.search( id as string, @@ -167,7 +167,7 @@ export const pipelineNodeLogsLogic = kea([ }, ], backgroundLogs: [ - [] as LogEntry[], + [] as PipelineNodeLogEntry[], { clearBackgroundLogs: () => [], }, @@ -195,7 +195,7 @@ export const pipelineNodeLogsLogic = kea([ selectors(({ actions }) => ({ leadingEntry: [ (s) => [s.logs, s.backgroundLogs], - (logs: LogEntry[], backgroundLogs: LogEntry[]): LogEntry | null => { + (logs: PipelineNodeLogEntry[], backgroundLogs: PipelineNodeLogEntry[]): PipelineNodeLogEntry | null => { if (backgroundLogs.length) { return backgroundLogs[0] } @@ -207,7 +207,7 @@ export const pipelineNodeLogsLogic = kea([ ], trailingEntry: [ (s) => [s.logs, s.backgroundLogs], - (logs: LogEntry[], backgroundLogs: LogEntry[]): LogEntry | null => { + (logs: PipelineNodeLogEntry[], backgroundLogs: PipelineNodeLogEntry[]): PipelineNodeLogEntry | null => { if (logs.length) { return logs[logs.length - 1] } @@ -219,13 +219,14 @@ export const pipelineNodeLogsLogic = kea([ ], columns: [ (s) => [s.node], - (node): LemonTableColumns => { + (node): LemonTableColumns => { return [ { title: 'Timestamp', key: 'timestamp', dataIndex: 'timestamp', - sorter: (a: LogEntry, b: LogEntry) => dayjs(a.timestamp).unix() - dayjs(b.timestamp).unix(), + sorter: (a: PipelineNodeLogEntry, b: PipelineNodeLogEntry) => + dayjs(a.timestamp).unix() - dayjs(b.timestamp).unix(), render: (timestamp: string) => , width: 0, }, @@ -295,7 +296,7 @@ export const pipelineNodeLogsLogic = kea([ dataIndex: 'message', render: (message: string) => {message}, }, - ] as LemonTableColumns + ] as LemonTableColumns }, ], })), diff --git a/frontend/src/scenes/session-recordings/playlist/SessionRecordingsPlaylist.tsx b/frontend/src/scenes/session-recordings/playlist/SessionRecordingsPlaylist.tsx index 95d80a623be5c..01a34e228013c 100644 --- a/frontend/src/scenes/session-recordings/playlist/SessionRecordingsPlaylist.tsx +++ b/frontend/src/scenes/session-recordings/playlist/SessionRecordingsPlaylist.tsx @@ -1,23 +1,13 @@ -import './SessionRecordingsPlaylist.scss' - -import { IconCollapse, IconFilter, IconGear } from '@posthog/icons' -import { LemonButton, Link } from '@posthog/lemon-ui' -import clsx from 'clsx' -import { range } from 'd3' +import { IconFilter, IconGear } from '@posthog/icons' +import { LemonButton, Link, Spinner } from '@posthog/lemon-ui' import { BindLogic, useActions, useValues } from 'kea' import { EmptyMessage } from 'lib/components/EmptyMessage/EmptyMessage' +import { Playlist, PlaylistSection } from 'lib/components/Playlist/Playlist' import { PropertyKeyInfo } from 'lib/components/PropertyKeyInfo' -import { Resizer } from 'lib/components/Resizer/Resizer' import { FEATURE_FLAGS } from 'lib/constants' -import { useResizeBreakpoints } from 'lib/hooks/useResizeObserver' -import { IconChevronRight, IconWithCount } from 'lib/lemon-ui/icons' +import { IconWithCount } from 'lib/lemon-ui/icons' import { LemonBanner } from 'lib/lemon-ui/LemonBanner' -import { LemonTableLoader } from 'lib/lemon-ui/LemonTable/LemonTableLoader' -import { Spinner } from 'lib/lemon-ui/Spinner' -import { Tooltip } from 'lib/lemon-ui/Tooltip' import { featureFlagLogic } from 'lib/logic/featureFlagLogic' -import React, { useEffect, useRef } from 'react' -import { DraggableToNotebook } from 'scenes/notebooks/AddToNotebook/DraggableToNotebook' import { useNotebookNode } from 'scenes/notebooks/Nodes/NotebookNodeContext' import { urls } from 'scenes/urls' @@ -26,410 +16,245 @@ import { ReplayTabs, SessionRecordingType } from '~/types' import { RecordingsUniversalFilters } from '../filters/RecordingsUniversalFilters' import { SessionRecordingsFilters } from '../filters/SessionRecordingsFilters' import { SessionRecordingPlayer } from '../player/SessionRecordingPlayer' -import { SessionRecordingPreview, SessionRecordingPreviewSkeleton } from './SessionRecordingPreview' +import { SessionRecordingPreview } from './SessionRecordingPreview' import { DEFAULT_RECORDING_FILTERS, - RECORDINGS_LIMIT, SessionRecordingPlaylistLogicProps, sessionRecordingsPlaylistLogic, } from './sessionRecordingsPlaylistLogic' import { SessionRecordingsPlaylistSettings } from './SessionRecordingsPlaylistSettings' import { SessionRecordingsPlaylistTroubleshooting } from './SessionRecordingsPlaylistTroubleshooting' -const SCROLL_TRIGGER_OFFSET = 100 - -const CounterBadge = ({ children }: { children: React.ReactNode }): JSX.Element => ( - {children} -) - -function UnusableEventsWarning(props: { unusableEventsInFilter: string[] }): JSX.Element { - // TODO add docs on how to enrich custom events with session_id and link to it from here - return ( - -

Cannot use these events to filter for session recordings:

-
  • - {props.unusableEventsInFilter.map((event) => ( - "{event}" - ))} -
  • -

    - Events have to have a to be used to filter recordings. This is - added automatically by{' '} - - the Web SDK - - ,{' '} - - the Android SDK - -

    -
    - ) -} - -function PinnedRecordingsList(): JSX.Element | null { - const { setSelectedRecordingId } = useActions(sessionRecordingsPlaylistLogic) - const { activeSessionRecordingId, pinnedRecordings } = useValues(sessionRecordingsPlaylistLogic) - - const { featureFlags } = useValues(featureFlagLogic) - const isTestingSaved = featureFlags[FEATURE_FLAGS.SAVED_NOT_PINNED] === 'test' - - const description = isTestingSaved ? 'Saved' : 'Pinned' - - if (!pinnedRecordings.length) { - return null +export function SessionRecordingsPlaylist(props: SessionRecordingPlaylistLogicProps): JSX.Element { + const logicProps: SessionRecordingPlaylistLogicProps = { + ...props, + autoPlay: props.autoPlay ?? true, } - - return ( - <> -
    - {description} recordings -
    - {pinnedRecordings.map((rec) => ( -
    - setSelectedRecordingId(rec.id)} - isActive={activeSessionRecordingId === rec.id} - pinned={true} - /> -
    - ))} - - ) -} - -function RecordingsLists(): JSX.Element { + const logic = sessionRecordingsPlaylistLogic(logicProps) const { filters, - advancedFilters, - simpleFilters, - hasNext, pinnedRecordings, - otherRecordings, - sessionRecordingsResponseLoading, - activeSessionRecordingId, - showFilters, - showSettings, totalFiltersCount, - sessionRecordingsAPIErrored, - unusableEventsInFilter, - logicProps, - showOtherRecordings, - recordingsCount, - isRecordingsListCollapsed, - sessionSummaryLoading, useUniversalFiltering, - } = useValues(sessionRecordingsPlaylistLogic) + matchingEventsMatchType, + sessionRecordingsResponseLoading, + otherRecordings, + sessionSummaryLoading, + advancedFilters, + simpleFilters, + activeSessionRecordingId, + hasNext, + } = useValues(logic) const { + maybeLoadSessionRecordings, + summarizeSession, setSelectedRecordingId, setAdvancedFilters, setSimpleFilters, - maybeLoadSessionRecordings, - setShowFilters, - setShowSettings, resetFilters, - toggleShowOtherRecordings, - toggleRecordingsListCollapsed, - summarizeSession, - } = useActions(sessionRecordingsPlaylistLogic) - - const onRecordingClick = (recording: SessionRecordingType): void => { - setSelectedRecordingId(recording.id) - } - - const onSummarizeClick = (recording: SessionRecordingType): void => { - summarizeSession(recording.id) - } - - const lastScrollPositionRef = useRef(0) - const contentRef = useRef(null) - - const handleScroll = (e: React.UIEvent): void => { - // If we are scrolling down then check if we are at the bottom of the list - if (e.currentTarget.scrollTop > lastScrollPositionRef.current) { - const scrollPosition = e.currentTarget.scrollTop + e.currentTarget.clientHeight - if (e.currentTarget.scrollHeight - scrollPosition < SCROLL_TRIGGER_OFFSET) { - maybeLoadSessionRecordings('older') - } - } - - // Same again but if scrolling to the top - if (e.currentTarget.scrollTop < lastScrollPositionRef.current) { - if (e.currentTarget.scrollTop < SCROLL_TRIGGER_OFFSET) { - maybeLoadSessionRecordings('newer') - } - } + } = useActions(logic) - lastScrollPositionRef.current = e.currentTarget.scrollTop - } + const { featureFlags } = useValues(featureFlagLogic) + const isTestingSaved = featureFlags[FEATURE_FLAGS.SAVED_NOT_PINNED] === 'test' - useEffect(() => { - if (contentRef.current) { - contentRef.current.scrollTop = 0 - } - }, [showFilters, showSettings]) + const pinnedDescription = isTestingSaved ? 'Saved' : 'Pinned' const notebookNode = useNotebookNode() - return isRecordingsListCollapsed ? ( -
    - } onClick={() => toggleRecordingsListCollapsed()} /> -
    - ) : ( -
    - -
    - } - onClick={() => toggleRecordingsListCollapsed()} - /> - - {!notebookNode ? ( - - Recordings - - ) : null} - - Showing {recordingsCount} results. -
    - Scrolling to the bottom or the top of the list will load older or newer recordings - respectively. - - } - > - - {Math.min(999, recordingsCount)}+ - -
    -
    - {(!useUniversalFiltering || notebookNode) && ( - - - - } - onClick={() => { - if (notebookNode) { - notebookNode.actions.toggleEditing() - } else { - setShowFilters(!showFilters) - } - }} - > - Filter - - )} - } - onClick={() => setShowSettings(!showSettings)} - /> - -
    -
    + const sections: PlaylistSection[] = [] + const headerActions = [] -
    - {!notebookNode && showFilters ? ( -
    - -
    - ) : showSettings ? ( - - ) : null} + const onSummarizeClick = (recording: SessionRecordingType): void => { + summarizeSession(recording.id) + } - {pinnedRecordings.length || otherRecordings.length ? ( -
      - + if (!useUniversalFiltering || notebookNode) { + headerActions.push({ + key: 'filters', + tooltip: 'Filter recordings', + content: ( + + ), + icon: ( + + + + ), + children: 'Filter', + }) + } - {pinnedRecordings.length ? ( -
      - Other recordings - toggleShowOtherRecordings()}> - {showOtherRecordings ? 'Hide' : 'Show'} - -
      - ) : null} + headerActions.push({ + key: 'settings', + tooltip: 'Playlist settings', + content: , + icon: , + }) - <> - {showOtherRecordings - ? otherRecordings.map((rec) => ( -
      - onRecordingClick(rec)} - isActive={activeSessionRecordingId === rec.id} - pinned={false} - summariseFn={onSummarizeClick} - sessionSummaryLoading={sessionSummaryLoading} - /> -
      - )) - : null} + if (pinnedRecordings.length) { + sections.push({ + key: 'pinned', + title: `${pinnedDescription} recordings`, + items: pinnedRecordings, + render: ({ item, isActive }) => ( + + ), + initiallyOpen: true, + }) + } -
      - {!showOtherRecordings && totalFiltersCount ? ( - <>Filters do not apply to pinned recordings - ) : sessionRecordingsResponseLoading ? ( - <> - Loading older recordings - - ) : hasNext ? ( - maybeLoadSessionRecordings('older')}> - Load more - - ) : ( - 'No more results' - )} -
      - -
    - ) : sessionRecordingsResponseLoading ? ( + sections.push({ + key: 'other', + title: 'Other recordings', + items: otherRecordings, + render: ({ item, isActive }) => ( + + ), + footer: ( +
    + {sessionRecordingsResponseLoading ? ( <> - {range(RECORDINGS_LIMIT).map((i) => ( - - ))} + Loading older recordings + ) : hasNext ? ( + maybeLoadSessionRecordings('older')}>Load more ) : ( -
    - {sessionRecordingsAPIErrored ? ( - Error while trying to load recordings. - ) : unusableEventsInFilter.length ? ( - - ) : ( -
    - {filters.date_from === DEFAULT_RECORDING_FILTERS.date_from ? ( - <> - No matching recordings found - { - setAdvancedFilters({ - date_from: '-30d', - }) - }} - > - Search over the last 30 days - - - ) : ( - - )} -
    - )} -
    + 'No more results' )}
    -
    - ) -} - -export function SessionRecordingsPlaylist(props: SessionRecordingPlaylistLogicProps): JSX.Element { - const logicProps: SessionRecordingPlaylistLogicProps = { - ...props, - autoPlay: props.autoPlay ?? true, - } - const playlistRecordingsListRef = useRef(null) - const logic = sessionRecordingsPlaylistLogic(logicProps) - const { - activeSessionRecording, - activeSessionRecordingId, - matchingEventsMatchType, - pinnedRecordings, - isRecordingsListCollapsed, - useUniversalFiltering, - } = useValues(logic) - const { toggleRecordingsListCollapsed } = useActions(logic) - - const { ref: playlistRef, size } = useResizeBreakpoints({ - 0: 'small', - 750: 'medium', + ), }) - const notebookNode = useNotebookNode() - return (
    {useUniversalFiltering && } - -
    -
    - - toggleRecordingsListCollapsed(shouldBeClosed)} - onDoubleClick={() => toggleRecordingsListCollapsed()} - /> -
    -
    - {!activeSessionRecordingId ? ( -
    - -
    - ) : ( + notebooksHref={urls.replay(ReplayTabs.Recent, filters)} + title={!notebookNode ? 'Recordings' : undefined} + embedded={!!notebookNode} + sections={sections} + headerActions={headerActions} + loading={sessionRecordingsResponseLoading} + onScrollListEdge={(edge) => { + if (edge === 'top') { + maybeLoadSessionRecordings('newer') + } else { + maybeLoadSessionRecordings('older') + } + }} + listEmptyState={} + onSelect={(item) => setSelectedRecordingId(item.id)} + activeItemId={activeSessionRecordingId} + content={({ activeItem }) => + activeItem ? ( x.id === activeSessionRecordingId)} + pinned={!!pinnedRecordings.find((x) => x.id === activeItem.id)} setPinned={ props.onPinnedChange ? (pinned) => { - if (!activeSessionRecording?.id) { + if (!activeItem.id) { return } - props.onPinnedChange?.(activeSessionRecording, pinned) + props.onPinnedChange?.(activeItem, pinned) } : undefined } /> - )} -
    -
    + ) : ( +
    + +
    + ) + } + />
    ) } + +const ListEmptyState = (): JSX.Element => { + const { filters, sessionRecordingsAPIErrored, unusableEventsInFilter } = useValues(sessionRecordingsPlaylistLogic) + const { setAdvancedFilters } = useActions(sessionRecordingsPlaylistLogic) + + return ( +
    + {sessionRecordingsAPIErrored ? ( + Error while trying to load recordings. + ) : unusableEventsInFilter.length ? ( + + ) : ( +
    + {filters.date_from === DEFAULT_RECORDING_FILTERS.date_from ? ( + <> + No matching recordings found + { + setAdvancedFilters({ + date_from: '-30d', + }) + }} + > + Search over the last 30 days + + + ) : ( + + )} +
    + )} +
    + ) +} + +function UnusableEventsWarning(props: { unusableEventsInFilter: string[] }): JSX.Element { + // TODO add docs on how to enrich custom events with session_id and link to it from here + return ( + +

    Cannot use these events to filter for session recordings:

    +
  • + {props.unusableEventsInFilter.map((event) => ( + "{event}" + ))} +
  • +

    + Events have to have a to be used to filter recordings. This is + added automatically by{' '} + + the Web SDK + + ,{' '} + + the Android SDK + +

    +
    + ) +} diff --git a/frontend/src/scenes/session-recordings/playlist/SessionRecordingsPlaylistScene.tsx b/frontend/src/scenes/session-recordings/playlist/SessionRecordingsPlaylistScene.tsx index b9a504d182821..01be80946d2c8 100644 --- a/frontend/src/scenes/session-recordings/playlist/SessionRecordingsPlaylistScene.tsx +++ b/frontend/src/scenes/session-recordings/playlist/SessionRecordingsPlaylistScene.tsx @@ -1,5 +1,3 @@ -import './SessionRecordingsPlaylist.scss' - import { LemonButton, LemonDivider } from '@posthog/lemon-ui' import { useActions, useValues } from 'kea' import { EditableField } from 'lib/components/EditableField/EditableField' diff --git a/frontend/src/scenes/session-recordings/playlist/SessionRecordingsPlaylistSettings.tsx b/frontend/src/scenes/session-recordings/playlist/SessionRecordingsPlaylistSettings.tsx index be48802d9c7c4..972628dd0032b 100644 --- a/frontend/src/scenes/session-recordings/playlist/SessionRecordingsPlaylistSettings.tsx +++ b/frontend/src/scenes/session-recordings/playlist/SessionRecordingsPlaylistSettings.tsx @@ -12,7 +12,7 @@ export function SessionRecordingsPlaylistSettings(): JSX.Element { const { orderBy } = useValues(sessionRecordingsPlaylistLogic) return ( -
    +
    diff --git a/frontend/src/scenes/session-recordings/playlist/sessionRecordingsPlaylistLogic.ts b/frontend/src/scenes/session-recordings/playlist/sessionRecordingsPlaylistLogic.ts index bf0807e092fac..0b66d4bb34062 100644 --- a/frontend/src/scenes/session-recordings/playlist/sessionRecordingsPlaylistLogic.ts +++ b/frontend/src/scenes/session-recordings/playlist/sessionRecordingsPlaylistLogic.ts @@ -25,7 +25,6 @@ import { RecordingDurationFilter, RecordingFilters, RecordingUniversalFilters, - ReplayTabs, SessionRecordingId, SessionRecordingsResponse, SessionRecordingType, @@ -186,7 +185,6 @@ export interface SessionRecordingPlaylistLogicProps { onFiltersChange?: (filters: RecordingFilters) => void pinnedRecordings?: (SessionRecordingType | string)[] onPinnedChange?: (recording: SessionRecordingType, pinned: boolean) => void - currentTab?: ReplayTabs } export interface SessionSummaryResponse { @@ -201,6 +199,7 @@ export const sessionRecordingsPlaylistLogic = kea `${props.logicKey}-${props.personUUID}-${props.updateSearchParams ? '-with-search' : ''}` ), + connect({ actions: [ eventUsageLogic, @@ -215,6 +214,7 @@ export const sessionRecordingsPlaylistLogic = kea) => ({ filters }), setAdvancedFilters: (filters: Partial) => ({ filters }), @@ -234,7 +234,6 @@ export const sessionRecordingsPlaylistLogic = kea ({ show }), - toggleRecordingsListCollapsed: (override?: boolean) => ({ override }), }), propsChanged(({ actions, props }, oldProps) => { if (!objectsEqual(props.advancedFilters, oldProps.advancedFilters)) { @@ -514,13 +513,6 @@ export const sessionRecordingsPlaylistLogic = kea false, }, ], - isRecordingsListCollapsed: [ - false, - { persist: true }, - { - toggleRecordingsListCollapsed: (state, { override }) => override ?? !state, - }, - ], })), listeners(({ props, actions, values }) => ({ loadAllRecordings: () => { diff --git a/frontend/src/types.ts b/frontend/src/types.ts index e83e07503cf72..0d0888297d166 100644 --- a/frontend/src/types.ts +++ b/frontend/src/types.ts @@ -1942,6 +1942,15 @@ export interface PluginErrorType { event?: Record } +// The general log entry format that eventually everything should match +export type LogEntry = { + log_source_id: string + instance_id: string + timestamp: string + level: 'DEBUG' | 'INFO' | 'WARN' | 'ERROR' + message: string +} + export enum PluginLogEntryType { Debug = 'DEBUG', Log = 'LOG', @@ -4207,6 +4216,8 @@ export type HogFunctionType = { template?: HogFunctionTemplateType } +export type HogFunctionConfigurationType = Omit + export type HogFunctionTemplateType = Pick< HogFunctionType, 'id' | 'name' | 'description' | 'hog' | 'inputs_schema' | 'filters' | 'icon_url' diff --git a/hogvm/python/cli.py b/hogvm/python/cli.py index f18c316ce61c9..9a7df552edf76 100644 --- a/hogvm/python/cli.py +++ b/hogvm/python/cli.py @@ -1,3 +1,4 @@ +from datetime import timedelta import sys import json from .execute import execute_bytecode @@ -18,6 +19,6 @@ code = file.read() code = json.loads(code) -response = execute_bytecode(code, globals=None, timeout=5, team=None, debug=debug) +response = execute_bytecode(code, globals=None, timeout=timedelta(seconds=5), team=None, debug=debug) for line in response.stdout: print(line) # noqa: T201 diff --git a/hogvm/python/execute.py b/hogvm/python/execute.py index 2a17447c9bd62..e297976d5a675 100644 --- a/hogvm/python/execute.py +++ b/hogvm/python/execute.py @@ -1,3 +1,4 @@ +from datetime import timedelta import re import time from copy import deepcopy @@ -26,7 +27,7 @@ def execute_bytecode( bytecode: list[Any], globals: Optional[dict[str, Any]] = None, functions: Optional[dict[str, Callable[..., Any]]] = None, - timeout=5, + timeout=timedelta(seconds=5), team: Optional["Team"] = None, debug=False, ) -> BytecodeResult: @@ -60,8 +61,8 @@ def pop_stack(): return BytecodeResult(result=None, stdout=stdout, bytecode=bytecode) def check_timeout(): - if time.time() - start_time > timeout and not debug: - raise HogVMException(f"Execution timed out after {timeout} seconds. Performed {ops} ops.") + if time.time() - start_time > timeout.total_seconds() and not debug: + raise HogVMException(f"Execution timed out after {timeout.total_seconds()} seconds. Performed {ops} ops.") while True: ops += 1 diff --git a/hogvm/typescript/package.json b/hogvm/typescript/package.json index 9232321bd00ac..9a00e51813d4e 100644 --- a/hogvm/typescript/package.json +++ b/hogvm/typescript/package.json @@ -1,6 +1,6 @@ { "name": "@posthog/hogvm", - "version": "1.0.14", + "version": "1.0.15", "description": "PostHog Hog Virtual Machine", "types": "dist/index.d.ts", "main": "dist/index.js", diff --git a/hogvm/typescript/src/execute.ts b/hogvm/typescript/src/execute.ts index 4101d64f69d1b..865f57408a713 100644 --- a/hogvm/typescript/src/execute.ts +++ b/hogvm/typescript/src/execute.ts @@ -3,7 +3,7 @@ import { ASYNC_STL, STL } from './stl/stl' import { convertHogToJS, convertJSToHog, getNestedValue, like, setNestedValue } from './utils' const DEFAULT_MAX_ASYNC_STEPS = 100 -const DEFAULT_TIMEOUT = 5 // seconds +const DEFAULT_TIMEOUT_MS = 5000 // ms export interface VMState { /** Bytecode running in the VM */ @@ -25,10 +25,13 @@ export interface VMState { } export interface ExecOptions { + /** Global variables to be passed into the function */ globals?: Record functions?: Record any> asyncFunctions?: Record Promise> + /** Timeout in milliseconds */ timeout?: number + /** Max number of async function that can happen. When reached the function will throw */ maxAsyncSteps?: number } @@ -66,7 +69,7 @@ export async function execAsync(bytecode: any[], options?: ExecOptions): Promise const result = await ASYNC_STL[response.asyncFunctionName]( response.asyncFunctionArgs, response.asyncFunctionName, - options?.timeout ?? DEFAULT_TIMEOUT + options?.timeout ?? DEFAULT_TIMEOUT_MS ) vmState.stack.push(result) } else { @@ -105,7 +108,7 @@ export function exec(code: any[] | VMState, options?: ExecOptions): ExecResult { const declaredFunctions: Record = vmState ? vmState.declaredFunctions : {} let ip = vmState ? vmState.ip : 1 let ops = vmState ? vmState.ops : 0 - const timeout = options?.timeout ?? DEFAULT_TIMEOUT + const timeout = options?.timeout ?? DEFAULT_TIMEOUT_MS const maxAsyncSteps = options?.maxAsyncSteps ?? DEFAULT_MAX_ASYNC_STEPS function popStack(): any { @@ -122,8 +125,8 @@ export function exec(code: any[] | VMState, options?: ExecOptions): ExecResult { return bytecode![++ip] } function checkTimeout(): void { - if (syncDuration + Date.now() - startTime > timeout * 1000) { - throw new Error(`Execution timed out after ${timeout} seconds. Performed ${ops} ops.`) + if (syncDuration + Date.now() - startTime > timeout) { + throw new Error(`Execution timed out after ${timeout / 1000} seconds. Performed ${ops} ops.`) } } diff --git a/mypy-baseline.txt b/mypy-baseline.txt index 274d7c4303d3a..8d95d21224471 100644 --- a/mypy-baseline.txt +++ b/mypy-baseline.txt @@ -63,7 +63,6 @@ posthog/redis.py:0: error: Import cycle from Django settings module prevents typ posthog/plugins/utils.py:0: error: Subclass of "str" and "bytes" cannot exist: would have incompatible method signatures [unreachable] posthog/plugins/utils.py:0: error: Statement is unreachable [unreachable] posthog/clickhouse/kafka_engine.py:0: error: Import cycle from Django settings module prevents type inference for 'KAFKA_HOSTS_FOR_CLICKHOUSE' [misc] -posthog/plugins/reload.py:0: error: Import cycle from Django settings module prevents type inference for 'PLUGINS_RELOAD_REDIS_URL' [misc] posthog/models/project.py:0: error: Incompatible type for "project" of "Team" (got "_T", expected "Project | Combinable") [misc] posthog/models/project.py:0: error: "_T" has no attribute "organization" [attr-defined] posthog/models/project.py:0: error: Incompatible return value type (got "tuple[_T, Team]", expected "tuple[Project, Team]") [return-value] diff --git a/package.json b/package.json index 7b7a450e679da..2327d509d49d0 100644 --- a/package.json +++ b/package.json @@ -146,7 +146,7 @@ "pmtiles": "^2.11.0", "postcss": "^8.4.31", "postcss-preset-env": "^9.3.0", - "posthog-js": "1.139.3", + "posthog-js": "1.139.4", "posthog-js-lite": "3.0.0", "prettier": "^2.8.8", "prop-types": "^15.7.2", diff --git a/plugin-server/package.json b/plugin-server/package.json index 611ebe0c5be78..612aeb27f94da 100644 --- a/plugin-server/package.json +++ b/plugin-server/package.json @@ -50,7 +50,7 @@ "@google-cloud/storage": "^5.8.5", "@maxmind/geoip2-node": "^3.4.0", "@posthog/clickhouse": "^1.7.0", - "@posthog/hogvm": "^1.0.12", + "@posthog/hogvm": "^1.0.14", "@posthog/plugin-scaffold": "1.4.4", "@sentry/node": "^7.49.0", "@sentry/profiling-node": "^0.3.0", @@ -131,6 +131,7 @@ "parse-prometheus-text-format": "^1.1.1", "pino-pretty": "^9.1.0", "prettier": "^2.8.8", + "supertest": "^7.0.0", "ts-node": "^10.9.1", "typescript": "^4.7.4" }, diff --git a/plugin-server/pnpm-lock.yaml b/plugin-server/pnpm-lock.yaml index 9a7d7d36ee6a8..ba81f39c0ec9b 100644 --- a/plugin-server/pnpm-lock.yaml +++ b/plugin-server/pnpm-lock.yaml @@ -44,8 +44,8 @@ dependencies: specifier: ^1.7.0 version: 1.7.0 '@posthog/hogvm': - specifier: ^1.0.12 - version: 1.0.12 + specifier: ^1.0.14 + version: 1.0.14 '@posthog/plugin-scaffold': specifier: 1.4.4 version: 1.4.4 @@ -282,6 +282,9 @@ devDependencies: prettier: specifier: ^2.8.8 version: 2.8.8 + supertest: + specifier: ^7.0.0 + version: 7.0.0 ts-node: specifier: ^10.9.1 version: 10.9.1(@swc/core@1.3.55)(@types/node@16.18.25)(typescript@4.9.5) @@ -3107,8 +3110,8 @@ packages: engines: {node: '>=12'} dev: false - /@posthog/hogvm@1.0.12: - resolution: {integrity: sha512-S8kO3X3BAfLp3SzluRmmST6aII+G2kYjGXC7373XPHIghGpFNlNq1gpllYvDjjGM2yVQbOBLzi5UvlzK0nG6rw==} + /@posthog/hogvm@1.0.14: + resolution: {integrity: sha512-mIdVcKGnJUqgfwnn/urNLZwkZtWMLIjsEmqtGUOX8Kw++log4QuBIvMf1eYY1yeVI4hC9oldr1GJttltwRAv5g==} dev: false /@posthog/plugin-scaffold@1.4.4: @@ -4232,6 +4235,10 @@ packages: engines: {node: '>=8'} dev: false + /asap@2.0.6: + resolution: {integrity: sha512-BSHWgDSAiKs50o2Re8ppvp3seVHXSRM44cdSsT9FfNEUUZLOGWVCsiWaRPWM1Znn+mqZ1OfVZ3z3DWEzSp7hRA==} + dev: true + /asn1.js@5.4.1: resolution: {integrity: sha512-+I//4cYPccV8LdmBLiX8CYvf9Sp3vQsrqu2QNXRcrbiWvcx/UdlFiqUJJzxRQxgsZmvhXhn4cSKeSmoFjVdupA==} dependencies: @@ -4970,6 +4977,10 @@ packages: engines: {node: '>= 6'} dev: true + /component-emitter@1.3.1: + resolution: {integrity: sha512-T0+barUSQRTUQASh8bx02dl+DhF54GtIDY13Y3m9oWTklKbb3Wv974meRpeZ3lp1JpLVECWWNHC4vaG2XHXouQ==} + dev: true + /compressible@2.0.18: resolution: {integrity: sha512-AF3r7P5dWxL8MxyITRMlORQNaOA2IkAFaTr4k7BUumjPtRpGDTZpl0Pb1XCO6JeDCBdp126Cgs9sMxqSjgYyRg==} engines: {node: '>= 0.6'} @@ -5057,6 +5068,10 @@ packages: engines: {node: '>= 0.6'} dev: false + /cookiejar@2.1.4: + resolution: {integrity: sha512-LDx6oHrK+PhzLKJU9j5S7/Y3jM/mUHvD/DeI1WQmJn652iPC5Y4TBzC9l+5OMOXlyTTA+SmVUPm0HQUwpD5Jqw==} + dev: true + /core-js-compat@3.30.1: resolution: {integrity: sha512-d690npR7MC6P0gq4npTl5n2VQeNAmUrJ90n+MHiKS7W2+xno4o3F5GDEuylSdi6EJ3VssibSGXOa1r3YXD3Mhw==} dependencies: @@ -5410,6 +5425,13 @@ packages: minimist: 1.2.8 dev: true + /dezalgo@1.0.4: + resolution: {integrity: sha512-rXSP0bf+5n0Qonsb+SVVfNfIsimO4HEtmnIpPHY8Q1UCzKlQrDMfdobr8nJOOsRgWCyMRqeSBQzmWUMq7zvVig==} + dependencies: + asap: 2.0.6 + wrappy: 1.0.2 + dev: true + /diff-sequences@28.1.1: resolution: {integrity: sha512-FU0iFaH/E23a+a718l8Qa/19bF9p06kgE0KipMOMadwa3SjnaElKzPaUC0vnibs6/B/9ni97s61mcejk8W1fQw==} engines: {node: ^12.13.0 || ^14.15.0 || ^16.10.0 || >=17.0.0} @@ -6196,6 +6218,23 @@ packages: mime-types: 2.1.35 dev: true + /form-data@4.0.0: + resolution: {integrity: sha512-ETEklSGi5t0QMZuiXoA/Q6vcnxcLQP5vdugSpuAyi6SVGi2clPPp+xgEhuMaHC+zGgn31Kd235W35f7Hykkaww==} + engines: {node: '>= 6'} + dependencies: + asynckit: 0.4.0 + combined-stream: 1.0.8 + mime-types: 2.1.35 + dev: true + + /formidable@3.5.1: + resolution: {integrity: sha512-WJWKelbRHN41m5dumb0/k8TeAx7Id/y3a+Z7QfhxP/htI9Js5zYaEDtG8uMgG0vM0lOlqnmjE99/kfpOYi/0Og==} + dependencies: + dezalgo: 1.0.4 + hexoid: 1.0.0 + once: 1.4.0 + dev: true + /forwarded@0.2.0: resolution: {integrity: sha512-buRG0fpBtRHSTCOASe6hD258tEubFoRLb4ZNA6NxMVHNw2gOcwHo9wyablzMzOA5z9xA9L1KNjk/Nt6MT9aYow==} engines: {node: '>= 0.6'} @@ -6667,6 +6706,11 @@ packages: readable-stream: 3.6.2 dev: true + /hexoid@1.0.0: + resolution: {integrity: sha512-QFLV0taWQOZtvIRIAdBChesmogZrtuXvVWsFHZTk2SU+anspqZ2vMnoLg7IE1+Uk16N19APic1BuF8bC8c2m5g==} + engines: {node: '>=8'} + dev: true + /hmac-drbg@1.0.1: resolution: {integrity: sha512-Tti3gMqLdZfhOQY1Mzf/AanLiqh1WTiJgEj26ZuYQ9fbkLomzGchCws4FyrSd4VkpBfiNhaE1On+lOz894jvXg==} dependencies: @@ -8054,7 +8098,6 @@ packages: /methods@1.1.2: resolution: {integrity: sha512-iclAHeNqNm68zFtnZ0e+1L2yUIdvzNoauKU4WBA3VvH/vPFieF7qfRlwUZU+DA9P9bPXIS90ulxoUoCH23sV2w==} engines: {node: '>= 0.6'} - dev: false /micromatch@4.0.5: resolution: {integrity: sha512-DMy+ERcEW2q8Z2Po+WNXuw3c5YaUSFjAO5GsJqfEl7UjvtIuFKO6ZrKvcItdy98dwFI2N1tg3zNIdKaQT+aNdA==} @@ -8088,6 +8131,12 @@ packages: hasBin: true dev: false + /mime@2.6.0: + resolution: {integrity: sha512-USPkMeET31rOMiarsBNIHZKLGgvKc/LrjofAnBlOttf5ajRvqiRA8QsenbcooctK6d6Ts6aqZXBA+XbkKthiQg==} + engines: {node: '>=4.0.0'} + hasBin: true + dev: true + /mime@3.0.0: resolution: {integrity: sha512-jSCU7/VB1loIWBZe14aEYHU/+1UMEHoaO7qxCOVJOw9GgH72VAWppxNcjU+x9a2k3GSIBXNKxXQFqRvvZ7vr3A==} engines: {node: '>=10.0.0'} @@ -9078,7 +9127,6 @@ packages: engines: {node: '>=0.6'} dependencies: side-channel: 1.0.4 - dev: false /querystring-es3@0.2.1: resolution: {integrity: sha512-773xhDQnZBMFobEiztv8LIl70ch5MSF/jUQVlhwFyBILqq96anmoctVIYz+ZRp0qbCKATTn6ev02M3r7Ga5vqA==} @@ -9853,6 +9901,33 @@ packages: minimist: 1.2.8 dev: true + /superagent@9.0.2: + resolution: {integrity: sha512-xuW7dzkUpcJq7QnhOsnNUgtYp3xRwpt2F7abdRYIpCsAt0hhUqia0EdxyXZQQpNmGtsCzYHryaKSV3q3GJnq7w==} + engines: {node: '>=14.18.0'} + dependencies: + component-emitter: 1.3.1 + cookiejar: 2.1.4 + debug: 4.3.4 + fast-safe-stringify: 2.1.1 + form-data: 4.0.0 + formidable: 3.5.1 + methods: 1.1.2 + mime: 2.6.0 + qs: 6.11.0 + transitivePeerDependencies: + - supports-color + dev: true + + /supertest@7.0.0: + resolution: {integrity: sha512-qlsr7fIC0lSddmA3tzojvzubYxvlGtzumcdHgPwbFWMISQwL22MhM2Y3LNt+6w9Yyx7559VW5ab70dgphm8qQA==} + engines: {node: '>=14.18.0'} + dependencies: + methods: 1.1.2 + superagent: 9.0.2 + transitivePeerDependencies: + - supports-color + dev: true + /supports-color@2.0.0: resolution: {integrity: sha512-KKNVtd6pCYgPIKU4cp2733HWYCpplQhddZLBUryaAHou723x+FRzQ5Df824Fj+IyyuiQTRoub4SnIFfIcrp70g==} engines: {node: '>=0.8.0'} diff --git a/plugin-server/src/cdp/async-function-executor.ts b/plugin-server/src/cdp/async-function-executor.ts index 89e6c6c299ba9..41c5e75299894 100644 --- a/plugin-server/src/cdp/async-function-executor.ts +++ b/plugin-server/src/cdp/async-function-executor.ts @@ -1,48 +1,59 @@ import { Webhook } from '@posthog/plugin-scaffold' -import { KAFKA_CDP_FUNCTION_CALLBACKS } from '../config/kafka-topics' import { PluginsServerConfig } from '../types' import { trackedFetch } from '../utils/fetch' import { status } from '../utils/status' import { RustyHook } from '../worker/rusty-hook' -import { - HogFunctionInvocationAsyncRequest, - HogFunctionInvocationAsyncResponse, - HogFunctionMessageToQueue, -} from './types' +import { HogFunctionInvocationAsyncResponse, HogFunctionInvocationResult } from './types' + +export type AsyncFunctionExecutorOptions = { + sync?: boolean +} export class AsyncFunctionExecutor { constructor(private serverConfig: PluginsServerConfig, private rustyHook: RustyHook) {} - async execute(request: HogFunctionInvocationAsyncRequest): Promise { + async execute( + request: HogFunctionInvocationResult, + options: AsyncFunctionExecutorOptions = { sync: false } + ): Promise { + if (!request.asyncFunctionRequest) { + throw new Error('No async function request provided') + } + const loggingContext = { hogFunctionId: request.hogFunctionId, invocationId: request.id, - asyncFunctionName: request.asyncFunctionName, + asyncFunctionName: request.asyncFunctionRequest.name, } status.info('🦔', `[AsyncFunctionExecutor] Executing async function`, loggingContext) - switch (request.asyncFunctionName) { + switch (request.asyncFunctionRequest.name) { case 'fetch': - return await this.asyncFunctionFetch(request) + return await this.asyncFunctionFetch(request, options) default: - status.error('🦔', `[HogExecutor] Unknown async function: ${request.asyncFunctionName}`, loggingContext) + status.error( + '🦔', + `[HogExecutor] Unknown async function: ${request.asyncFunctionRequest.name}`, + loggingContext + ) } } private async asyncFunctionFetch( - request: HogFunctionInvocationAsyncRequest - ): Promise { + request: HogFunctionInvocationResult, + options?: AsyncFunctionExecutorOptions + ): Promise { // TODO: validate the args - const args = request.asyncFunctionArgs ?? [] + const args = request.asyncFunctionRequest!.args ?? [] const url: string = args[0] - const options = args[1] + const fetchOptions = args[1] - const method = options.method || 'POST' - const headers = options.headers || { + const method = fetchOptions.method || 'POST' + const headers = fetchOptions.headers || { 'Content-Type': 'application/json', } - const body = options.body || {} + const body = fetchOptions.body || {} const webhook: Webhook = { url, @@ -51,25 +62,28 @@ export class AsyncFunctionExecutor { body: typeof body === 'string' ? body : JSON.stringify(body, undefined, 4), } - // NOTE: Purposefully disabled for now - once we have callback support we can re-enable - // const SPECIAL_CONFIG_ID = -3 // Hardcoded to mean Hog - // const success = await this.rustyHook.enqueueIfEnabledForTeam({ - // webhook: webhook, - // teamId: hogFunction.team_id, - // pluginId: SPECIAL_CONFIG_ID, - // pluginConfigId: SPECIAL_CONFIG_ID, - // }) - const success = false - // TODO: Temporary test code + if (!options?.sync === false) { + // NOTE: Purposefully disabled for now - once we have callback support we can re-enable + // const SPECIAL_CONFIG_ID = -3 // Hardcoded to mean Hog + // const success = await this.rustyHook.enqueueIfEnabledForTeam({ + // webhook: webhook, + // teamId: hogFunction.team_id, + // pluginId: SPECIAL_CONFIG_ID, + // pluginConfigId: SPECIAL_CONFIG_ID, + // }) + } + if (!success) { status.info('🦔', `[HogExecutor] Webhook not sent via rustyhook, sending directly instead`) - const response: HogFunctionInvocationAsyncResponse = { - ...request, + + const asyncFunctionResponse: HogFunctionInvocationAsyncResponse['asyncFunctionResponse'] = { + timings: [], } try { + const start = performance.now() const fetchResponse = await trackedFetch(url, { method: webhook.method, body: webhook.body, @@ -81,23 +95,31 @@ export class AsyncFunctionExecutor { try { body = JSON.parse(body) } catch (err) { - body + // Ignore } - response.vmResponse = { + const duration = performance.now() - start + + asyncFunctionResponse.timings.push({ + kind: 'async_function', + duration_ms: duration, + }) + + asyncFunctionResponse.vmResponse = { status: fetchResponse.status, body: body, } } catch (err) { status.error('🦔', `[HogExecutor] Error during fetch`, { ...request, error: String(err) }) - response.error = 'Something went wrong with the fetch request.' + asyncFunctionResponse.error = 'Something went wrong with the fetch request.' } - return { - topic: KAFKA_CDP_FUNCTION_CALLBACKS, - value: response, - key: response.id, + const response: HogFunctionInvocationAsyncResponse = { + ...request, + asyncFunctionResponse, } + + return response } } } diff --git a/plugin-server/src/cdp/cdp-processed-events-consumer.ts b/plugin-server/src/cdp/cdp-consumers.ts similarity index 71% rename from plugin-server/src/cdp/cdp-processed-events-consumer.ts rename to plugin-server/src/cdp/cdp-consumers.ts index 04bef4f999772..2a8891807d794 100644 --- a/plugin-server/src/cdp/cdp-processed-events-consumer.ts +++ b/plugin-server/src/cdp/cdp-consumers.ts @@ -1,3 +1,5 @@ +import { convertJSToHog } from '@posthog/hogvm' +import express from 'express' import { features, librdkafkaVersion, Message } from 'node-rdkafka' import { Histogram } from 'prom-client' @@ -7,25 +9,28 @@ import { createRdConnectionConfigFromEnvVars, createRdProducerConfigFromEnvVars import { createKafkaProducer } from '../kafka/producer' import { addSentryBreadcrumbsEventListeners } from '../main/ingestion-queues/kafka-metrics' import { runInstrumentedFunction } from '../main/utils' -import { GroupTypeToColumnIndex, Hub, PluginsServerConfig, RawClickHouseEvent, TeamId } from '../types' +import { GroupTypeToColumnIndex, Hub, PluginsServerConfig, RawClickHouseEvent, TeamId, TimestampFormat } from '../types' import { KafkaProducerWrapper } from '../utils/db/kafka-producer-wrapper' import { PostgresRouter } from '../utils/db/postgres' import { status } from '../utils/status' +import { castTimestampOrNow } from '../utils/utils' import { AppMetrics } from '../worker/ingestion/app-metrics' import { GroupTypeManager } from '../worker/ingestion/group-type-manager' import { OrganizationManager } from '../worker/ingestion/organization-manager' import { TeamManager } from '../worker/ingestion/team-manager' import { RustyHook } from '../worker/rusty-hook' import { AsyncFunctionExecutor } from './async-function-executor' -import { HogExecutor } from './hog-executor' +import { addLog, HogExecutor } from './hog-executor' import { HogFunctionManager } from './hog-function-manager' import { + HogFunctionInvocation, HogFunctionInvocationAsyncResponse, HogFunctionInvocationGlobals, HogFunctionInvocationResult, HogFunctionMessageToQueue, + HogFunctionType, } from './types' -import { convertToHogFunctionInvocationGlobals } from './utils' +import { convertToHogFunctionInvocationGlobals, convertToParsedClickhouseEvent } from './utils' // Must require as `tsc` strips unused `import` statements and just requiring this seems to init some globals require('@sentry/tracing') @@ -89,19 +94,32 @@ abstract class CdpConsumerBase { await Promise.all( results.map(async (result) => { - result.logs.forEach((x) => { + // Tricky: We want to pull all the logs out as we don't want them to be passed around to any subsequent functions + const logs = result.logs + result.logs = [] + + logs.forEach((x) => { + const sanitized = { + ...x, + timestamp: castTimestampOrNow(x.timestamp, TimestampFormat.ClickHouse), + } + // Convert timestamps to ISO strings messagesToProduce.push({ topic: KAFKA_LOG_ENTRIES, - value: x, + value: sanitized, key: x.instance_id, }) }) - if (result.asyncFunction) { - const res = await this.asyncFunctionExecutor!.execute(result.asyncFunction) + if (result.asyncFunctionRequest) { + const res = await this.asyncFunctionExecutor!.execute(result) if (res) { - messagesToProduce.push(res) + messagesToProduce.push({ + topic: KAFKA_CDP_FUNCTION_CALLBACKS, + value: res, + key: res.id, + }) } } }) @@ -282,7 +300,7 @@ export class CdpProcessedEventsConsumer extends CdpConsumerBase { } events.push( convertToHogFunctionInvocationGlobals( - clickHouseEvent, + convertToParsedClickhouseEvent(clickHouseEvent), team, this.config.SITE_URL ?? 'http://localhost:8000', groupTypes @@ -355,4 +373,101 @@ export class CdpFunctionCallbackConsumer extends CdpConsumerBase { return events } + + public addApiRoutes(app: express.Application) { + app.post('/api/projects/:team_id/hog_functions/:id/invocations', async (req, res): Promise => { + try { + const { id, team_id } = req.params + const { event, mock_async_functions, configuration } = req.body + + status.info('⚡️', 'Received invocation', { id, team_id, body: req.body }) + + if (!event) { + res.status(400).json({ error: 'Missing event' }) + return + } + + const [hogFunction, team] = await Promise.all([ + this.hogFunctionManager.fetchHogFunction(req.params.id), + this.teamManager.fetchTeam(parseInt(team_id)), + ]).catch(() => { + return [null, null] + }) + if (!hogFunction || !team || hogFunction.team_id !== team.id) { + res.status(404).json({ error: 'Hog function not found' }) + return + } + + let groupTypes: GroupTypeToColumnIndex | undefined = undefined + + if (await this.organizationManager.hasAvailableFeature(team.id, 'group_analytics')) { + // If the organization has group analytics enabled then we enrich the event with group data + groupTypes = await this.groupTypeManager.fetchGroupTypes(team.id) + } + + const globals = convertToHogFunctionInvocationGlobals( + event, + team, + this.config.SITE_URL ?? 'http://localhost:8000', + groupTypes + ) + + globals.source = { + name: hogFunction.name ?? `Hog function: ${hogFunction.id}`, + url: `${globals.project.url}/pipeline/destinations/hog-${hogFunction.id}/configuration/`, + } + + const invocation: HogFunctionInvocation = { + id, + globals: globals, + teamId: team.id, + hogFunctionId: id, + logs: [], + timings: [], + } + + // We use the provided config if given, otherwise the function's config + const functionConfiguration: HogFunctionType = configuration ?? hogFunction + + let response = this.hogExecutor.execute(functionConfiguration, invocation) + + while (response.asyncFunctionRequest) { + const asyncFunctionRequest = response.asyncFunctionRequest + + if (mock_async_functions || asyncFunctionRequest.name !== 'fetch') { + addLog(response, 'info', `Async function '${asyncFunctionRequest.name}' was mocked`) + + // Add the state, simulating what executeAsyncResponse would do + asyncFunctionRequest.vmState.stack.push(convertJSToHog({ status: 200, body: {} })) + } else { + const asyncRes = await this.asyncFunctionExecutor!.execute(response, { + sync: true, + }) + + if (!asyncRes || asyncRes.asyncFunctionResponse.error) { + addLog(response, 'error', 'Failed to execute async function') + } + asyncFunctionRequest.vmState.stack.push( + convertJSToHog(asyncRes?.asyncFunctionResponse.vmResponse ?? null) + ) + response.timings.push(...(asyncRes?.asyncFunctionResponse.timings ?? [])) + } + + // Clear it so we can't ever end up in a loop + delete response.asyncFunctionRequest + + response = this.hogExecutor.execute(functionConfiguration, response, asyncFunctionRequest.vmState) + } + + res.json({ + status: response.finished ? 'success' : 'error', + error: String(response.error), + logs: response.logs, + }) + } catch (e) { + console.error(e) + res.status(500).json({ error: e.message }) + } + }) + } } diff --git a/plugin-server/src/cdp/hog-executor.ts b/plugin-server/src/cdp/hog-executor.ts index 9de9051ee3a18..f86ed7aba528b 100644 --- a/plugin-server/src/cdp/hog-executor.ts +++ b/plugin-server/src/cdp/hog-executor.ts @@ -1,9 +1,9 @@ -import { convertHogToJS, convertJSToHog, exec, VMState } from '@posthog/hogvm' +import { convertHogToJS, convertJSToHog, exec, ExecResult, VMState } from '@posthog/hogvm' import { DateTime } from 'luxon' -import { PluginsServerConfig, TimestampFormat } from '../types' +import { PluginsServerConfig } from '../types' import { status } from '../utils/status' -import { castTimestampOrNow, UUIDT } from '../utils/utils' +import { UUIDT } from '../utils/utils' import { HogFunctionManager } from './hog-function-manager' import { HogFunctionInvocation, @@ -45,6 +45,27 @@ export const formatInput = (bytecode: any, globals: HogFunctionInvocation['globa } } +export const addLog = (result: HogFunctionInvocationResult, level: HogFunctionLogEntryLevel, message: string) => { + const lastLog = result.logs[result.logs.length - 1] + // TRICKY: The log entries table is de-duped by timestamp, so we need to ensure that the timestamps are unique + // It is unclear how this affects parallel execution environments + let now = DateTime.now() + if (lastLog && now <= lastLog.timestamp) { + // Ensure that the timestamps are unique + now = lastLog.timestamp.plus(1) + } + + result.logs.push({ + team_id: result.teamId, + log_source: 'hog_function', + log_source_id: result.hogFunctionId, + instance_id: result.id, + timestamp: now, + level, + message, + }) +} + export class HogExecutor { constructor(private serverConfig: PluginsServerConfig, private hogFunctionManager: HogFunctionManager) {} @@ -118,6 +139,10 @@ export class HogExecutor { const result = this.execute(hogFunction, { id: new UUIDT().toString(), globals: modifiedGlobals, + teamId: hogFunction.team_id, + hogFunctionId: hogFunction.id, + logs: [], + timings: [], }) results.push(result) @@ -144,28 +169,34 @@ export class HogExecutor { const baseInvocation: HogFunctionInvocation = { id: invocation.id, globals: invocation.globals, + teamId: invocation.teamId, + hogFunctionId: invocation.hogFunctionId, + timings: invocation.asyncFunctionResponse.timings, + // Logs we always reset as we don't want to carry over logs between calls + logs: [], } const errorRes = (error = 'Something went wrong'): HogFunctionInvocationResult => ({ ...baseInvocation, - hogFunctionId: invocation.hogFunctionId, - teamId: invocation.teamId, - success: false, + finished: false, error, - // TODO: Probably useful to save a log as well? - logs: [], }) if (!hogFunction) { return errorRes(`Hog Function with ID ${invocation.hogFunctionId} not found`) } - if (!invocation.vmState || invocation.error) { + const { vmState } = invocation.asyncFunctionRequest ?? {} + const { asyncFunctionResponse } = invocation + + if (!vmState || !asyncFunctionResponse.vmResponse || asyncFunctionResponse.error) { return errorRes(invocation.error ?? 'No VM state provided for async response') } - invocation.vmState.stack.push(convertJSToHog(invocation.vmResponse ?? null)) - return this.execute(hogFunction, baseInvocation, invocation.vmState) + // Add the response to the stack to continue execution + vmState.stack.push(convertJSToHog(asyncFunctionResponse.vmResponse ?? null)) + + return this.execute(hogFunction, baseInvocation, vmState) } execute( @@ -181,97 +212,92 @@ export class HogExecutor { status.info('🦔', `[HogExecutor] Executing function`, loggingContext) - let lastTimestamp = DateTime.now() - const result: HogFunctionInvocationResult = { ...invocation, - teamId: hogFunction.team_id, - hogFunctionId: hogFunction.id, - success: false, - logs: [], - } - - const log = (level: HogFunctionLogEntryLevel, message: string) => { - // TRICKY: The log entries table is de-duped by timestamp, so we need to ensure that the timestamps are unique - // It is unclear how this affects parallel execution environments - let now = DateTime.now() - if (now <= lastTimestamp) { - // Ensure that the timestamps are unique - now = lastTimestamp.plus(1) - } - lastTimestamp = now - - result.logs.push({ - team_id: hogFunction.team_id, - log_source: 'hog_function', - log_source_id: hogFunction.id, - instance_id: invocation.id, - timestamp: castTimestampOrNow(now, TimestampFormat.ClickHouse), - level, - message, - }) + asyncFunctionRequest: undefined, + finished: false, } if (!state) { - log('debug', `Executing function`) + addLog(result, 'debug', `Executing function`) } else { // NOTE: We do our own check here for async steps as it saves executing Hog and is easier to handle if (state.asyncSteps >= MAX_ASYNC_STEPS) { - log('error', `Function exceeded maximum async steps`) + addLog(result, 'error', `Function exceeded maximum async steps`) result.error = 'Function exceeded maximum async steps' return result } - log('debug', `Resuming function`) + addLog(result, 'debug', `Resuming function`) } try { - const globals = this.buildHogFunctionGlobals(hogFunction, invocation) - - const res = exec(state ?? hogFunction.bytecode, { - globals, - timeout: 100, // NOTE: This will likely be configurable in the future - maxAsyncSteps: MAX_ASYNC_STEPS, // NOTE: This will likely be configurable in the future - asyncFunctions: { - // We need to pass these in but they don't actually do anything as it is a sync exec - fetch: async () => Promise.resolve(), - }, - functions: { - print: (...args) => { - const message = args - .map((arg) => (typeof arg !== 'string' ? JSON.stringify(arg) : arg)) - .join(', ') - log('info', message) + const start = performance.now() + let globals: Record | undefined = undefined + let execRes: ExecResult | undefined = undefined + + try { + globals = this.buildHogFunctionGlobals(hogFunction, invocation) + } catch (e) { + addLog(result, 'error', `Error building inputs: ${e}`) + throw e + } + + try { + execRes = exec(state ?? hogFunction.bytecode, { + globals, + timeout: 100, // NOTE: This will likely be configurable in the future + maxAsyncSteps: MAX_ASYNC_STEPS, // NOTE: This will likely be configurable in the future + asyncFunctions: { + // We need to pass these in but they don't actually do anything as it is a sync exec + fetch: async () => Promise.resolve(), }, - }, + functions: { + print: (...args) => { + const message = args + .map((arg) => (typeof arg !== 'string' ? JSON.stringify(arg) : arg)) + .join(', ') + addLog(result, 'info', message) + }, + }, + }) + } catch (e) { + addLog(result, 'error', `Error executing function: ${e}`) + throw e + } + + const duration = performance.now() - start + + result.finished = execRes.finished + result.timings.push({ + kind: 'hog', + duration_ms: duration, }) - if (!res.finished) { - log('debug', `Suspending function due to async function call '${res.asyncFunctionName}'`) - status.info('🦔', `[HogExecutor] Function returned not finished. Executing async function`, { - ...loggingContext, - asyncFunctionName: res.asyncFunctionName, - }) + if (!execRes.finished) { + addLog(result, 'debug', `Suspending function due to async function call '${execRes.asyncFunctionName}'`) - const args = (res.asyncFunctionArgs ?? []).map((arg) => convertHogToJS(arg)) + const args = (execRes.asyncFunctionArgs ?? []).map((arg) => convertHogToJS(arg)) - if (res.asyncFunctionName) { - result.asyncFunction = { - ...invocation, - teamId: hogFunction.team_id, - hogFunctionId: hogFunction.id, - asyncFunctionName: res.asyncFunctionName, - asyncFunctionArgs: args, - vmState: res.state, + if (!execRes.state) { + // NOTE: This shouldn't be possible so is more of a type sanity check + throw new Error('State should be provided for async function') + } + if (execRes.asyncFunctionName) { + result.asyncFunctionRequest = { + name: execRes.asyncFunctionName, + args: args, + vmState: execRes.state, } } else { - log('warn', `Function was not finished but also had no async function to execute.`) + addLog(result, 'warn', `Function was not finished but also had no async function to execute.`) } } else { - log('debug', `Function completed`) + const totalDuration = result.timings.reduce((acc, timing) => acc + timing.duration_ms, 0) + + addLog(result, 'debug', `Function completed. Processing time ${totalDuration}ms`) } - result.success = true } catch (err) { - result.error = err + result.error = err.message status.error('🦔', `[HogExecutor] Error executing function ${hogFunction.id} - ${hogFunction.name}`, err) } @@ -282,7 +308,6 @@ export class HogExecutor { const builtInputs: Record = {} Object.entries(hogFunction.inputs).forEach(([key, item]) => { - // TODO: Replace this with iterator builtInputs[key] = item.value if (item.bytecode) { diff --git a/plugin-server/src/cdp/hog-function-manager.ts b/plugin-server/src/cdp/hog-function-manager.ts index 5b805c2224f91..4adbec4ab81cc 100644 --- a/plugin-server/src/cdp/hog-function-manager.ts +++ b/plugin-server/src/cdp/hog-function-manager.ts @@ -73,7 +73,7 @@ export class HogFunctionManager { public async reloadHogFunctions(teamId: Team['id'], ids: HogFunctionType['id'][]): Promise { status.info('🍿', `Reloading hog functions ${ids} from DB`) - const items = await fetchHogFunctions(this.postgres, ids) + const items = await fetchEnabledHogFunctions(this.postgres, ids) if (!this.cache[teamId]) { this.cache[teamId] = {} @@ -88,11 +88,15 @@ export class HogFunctionManager { this.cache[teamId][item.id] = item } } + + public fetchHogFunction(id: HogFunctionType['id']): Promise { + return fetchHogFunction(this.postgres, id) + } } const HOG_FUNCTION_FIELDS = ['id', 'team_id', 'name', 'enabled', 'inputs', 'filters', 'bytecode'] -export async function fetchAllHogFunctionsGroupedByTeam(client: PostgresRouter): Promise { +async function fetchAllHogFunctionsGroupedByTeam(client: PostgresRouter): Promise { const items = ( await client.query( PostgresUse.COMMON_READ, @@ -118,7 +122,7 @@ export async function fetchAllHogFunctionsGroupedByTeam(client: PostgresRouter): return cache } -export async function fetchHogFunctions( +async function fetchEnabledHogFunctions( client: PostgresRouter, ids: HogFunctionType['id'][] ): Promise { @@ -129,8 +133,22 @@ export async function fetchHogFunctions( FROM posthog_hogfunction WHERE id = ANY($1) AND deleted = FALSE AND enabled = TRUE`, [ids], - 'fetchHogFunctions' + 'fetchEnabledHogFunctions' ) ).rows return items } + +async function fetchHogFunction(client: PostgresRouter, id: HogFunctionType['id']): Promise { + const items: HogFunctionType[] = ( + await client.query( + PostgresUse.COMMON_READ, + `SELECT ${HOG_FUNCTION_FIELDS.join(', ')} + FROM posthog_hogfunction + WHERE id = $1 AND deleted = FALSE`, + [id], + 'fetchHogFunction' + ) + ).rows + return items[0] ?? null +} diff --git a/plugin-server/src/cdp/types.ts b/plugin-server/src/cdp/types.ts index b5ef75064abc8..83c30a4344d14 100644 --- a/plugin-server/src/cdp/types.ts +++ b/plugin-server/src/cdp/types.ts @@ -1,4 +1,5 @@ import { VMState } from '@posthog/hogvm' +import { DateTime } from 'luxon' import { ElementPropertyFilter, EventPropertyFilter, PersonPropertyFilter } from '../types' @@ -34,6 +35,30 @@ export interface HogFunctionFilters { bytecode?: HogBytecode } +// We have a "parsed" clickhous event type to make it easier to work with calls from kafka as well as those from the frontend +export interface ParsedClickhouseEvent { + uuid: string + event: string + team_id: number + distinct_id: string + person_id?: string + timestamp: string + created_at: string + properties: Record + person_created_at?: string + person_properties: Record + group0_properties: Record + group1_properties: Record + group2_properties: Record + group3_properties: Record + group4_properties: Record + group0_created_at?: string + group1_created_at?: string + group2_created_at?: string + group3_created_at?: string + group4_created_at?: string +} + export type HogFunctionInvocationGlobals = { project: { id: number @@ -106,38 +131,47 @@ export interface HogFunctionLogEntry { log_source: string // The kind of source (hog_function) log_source_id: string // The id of the hog function instance_id: string // The id of the specific invocation - timestamp: string + timestamp: DateTime level: HogFunctionLogEntryLevel message: string } +export interface HogFunctionTiming { + kind: 'hog' | 'async_function' + duration_ms: number +} + export type HogFunctionInvocation = { id: string globals: HogFunctionInvocationGlobals -} - -export type HogFunctionInvocationResult = HogFunctionInvocation & { teamId: number hogFunctionId: HogFunctionType['id'] - success: boolean - error?: any + // Logs and timings _could_ be passed in from the async function service logs: HogFunctionLogEntry[] - asyncFunction?: HogFunctionInvocationAsyncRequest + timings: HogFunctionTiming[] } -export type HogFunctionInvocationAsyncRequest = HogFunctionInvocation & { - teamId: number - hogFunctionId: HogFunctionType['id'] - vmState?: VMState - asyncFunctionName: string // TODO: Type this all more strongly - asyncFunctionArgs?: any[] +export type HogFunctionInvocationResult = HogFunctionInvocation & { + finished: boolean + error?: any + logs: HogFunctionLogEntry[] + timings: HogFunctionTiming[] + asyncFunctionRequest?: { + name: string + args: any[] + vmState: VMState + } } -export type HogFunctionInvocationAsyncResponse = HogFunctionInvocationAsyncRequest & { - /** An error message to indicate something went wrong and the invocation should be stopped */ - error?: any - /** The data to be passed to the Hog function from the response */ - vmResponse?: any +export type HogFunctionInvocationAsyncResponse = HogFunctionInvocationResult & { + // FOLLOWUP: do we want to type this more strictly? + asyncFunctionResponse: { + /** An error message to indicate something went wrong and the invocation should be stopped */ + error?: any + /** The data to be passed to the Hog function from the response */ + vmResponse?: any + timings: HogFunctionTiming[] + } } export type HogFunctionMessageToQueue = { diff --git a/plugin-server/src/cdp/utils.ts b/plugin-server/src/cdp/utils.ts index f8fe9c6dc075b..9ab4b5b248eb5 100644 --- a/plugin-server/src/cdp/utils.ts +++ b/plugin-server/src/cdp/utils.ts @@ -2,7 +2,7 @@ import { GroupTypeToColumnIndex, RawClickHouseEvent, Team } from '../types' import { clickHouseTimestampToISO } from '../utils/utils' -import { HogFunctionFilterGlobals, HogFunctionInvocationGlobals } from './types' +import { HogFunctionFilterGlobals, HogFunctionInvocationGlobals, ParsedClickhouseEvent } from './types' export const PERSON_DEFAULT_DISPLAY_NAME_PROPERTIES = [ 'email', @@ -25,24 +25,50 @@ const getPersonDisplayName = (team: Team, distinctId: string, properties: Record return (customIdentifier || distinctId)?.trim() } +export function convertToParsedClickhouseEvent(event: RawClickHouseEvent): ParsedClickhouseEvent { + const properties = event.properties ? JSON.parse(event.properties) : {} + if (event.elements_chain) { + properties['$elements_chain'] = event.elements_chain + } + + return { + uuid: event.uuid, + event: event.event, + team_id: event.team_id, + distinct_id: event.distinct_id, + person_id: event.person_id, + timestamp: clickHouseTimestampToISO(event.timestamp), + created_at: clickHouseTimestampToISO(event.created_at), + properties: properties, + person_created_at: event.person_created_at ? clickHouseTimestampToISO(event.person_created_at) : undefined, + person_properties: event.person_properties ? JSON.parse(event.person_properties) : {}, + group0_properties: event.group0_properties ? JSON.parse(event.group0_properties) : {}, + group1_properties: event.group1_properties ? JSON.parse(event.group1_properties) : {}, + group2_properties: event.group2_properties ? JSON.parse(event.group2_properties) : {}, + group3_properties: event.group3_properties ? JSON.parse(event.group3_properties) : {}, + group4_properties: event.group4_properties ? JSON.parse(event.group4_properties) : {}, + group0_created_at: event.group0_created_at ? clickHouseTimestampToISO(event.group0_created_at) : undefined, + group1_created_at: event.group1_created_at ? clickHouseTimestampToISO(event.group1_created_at) : undefined, + group2_created_at: event.group2_created_at ? clickHouseTimestampToISO(event.group2_created_at) : undefined, + group3_created_at: event.group3_created_at ? clickHouseTimestampToISO(event.group3_created_at) : undefined, + group4_created_at: event.group4_created_at ? clickHouseTimestampToISO(event.group4_created_at) : undefined, + } +} + // that we can keep to as a contract export function convertToHogFunctionInvocationGlobals( - event: RawClickHouseEvent, + event: ParsedClickhouseEvent, team: Team, siteUrl: string, groupTypes?: GroupTypeToColumnIndex ): HogFunctionInvocationGlobals { const projectUrl = `${siteUrl}/project/${team.id}` - - const properties = event.properties ? JSON.parse(event.properties) : {} - if (event.elements_chain) { - properties['$elements_chain'] = event.elements_chain - } + const properties = event.properties let person: HogFunctionInvocationGlobals['person'] if (event.person_id) { - const personProperties = event.person_properties ? JSON.parse(event.person_properties) : {} + const personProperties = event.person_properties const personDisplayName = getPersonDisplayName(team, event.distinct_id, personProperties) person = { @@ -64,7 +90,7 @@ export function convertToHogFunctionInvocationGlobals( // TODO: Check that groupProperties always exist if the event is in that group if (groupKey && groupProperties) { - const properties = JSON.parse(groupProperties) + const properties = groupProperties groups[groupType] = { id: groupKey, @@ -83,16 +109,12 @@ export function convertToHogFunctionInvocationGlobals( url: projectUrl, }, event: { - // TODO: Element chain! uuid: event.uuid, name: event.event!, distinct_id: event.distinct_id, properties, - timestamp: clickHouseTimestampToISO(event.timestamp), - // TODO: generate url - url: `${projectUrl}/events/${encodeURIComponent(event.uuid)}/${encodeURIComponent( - clickHouseTimestampToISO(event.timestamp) - )}`, + timestamp: event.timestamp, + url: `${projectUrl}/events/${encodeURIComponent(event.uuid)}/${encodeURIComponent(event.timestamp)}`, }, person, groups, diff --git a/plugin-server/src/main/pluginsServer.ts b/plugin-server/src/main/pluginsServer.ts index fea82eab88af5..b703db24abad3 100644 --- a/plugin-server/src/main/pluginsServer.ts +++ b/plugin-server/src/main/pluginsServer.ts @@ -10,7 +10,7 @@ import { Counter } from 'prom-client' import v8Profiler from 'v8-profiler-next' import { getPluginServerCapabilities } from '../capabilities' -import { CdpFunctionCallbackConsumer, CdpProcessedEventsConsumer } from '../cdp/cdp-processed-events-consumer' +import { CdpFunctionCallbackConsumer, CdpProcessedEventsConsumer } from '../cdp/cdp-consumers' import { defaultConfig, sessionRecordingConsumerConfig } from '../config/config' import { Hub, PluginServerCapabilities, PluginsServerConfig } from '../types' import { createHub, createKafkaClient, createKafkaProducerWrapper } from '../utils/db/hub' @@ -43,7 +43,7 @@ import { } from './ingestion-queues/on-event-handler-consumer' import { startScheduledTasksConsumer } from './ingestion-queues/scheduled-tasks-consumer' import { SessionRecordingIngester } from './ingestion-queues/session-recording/session-recordings-consumer' -import { setupCommonRoutes } from './services/http-server' +import { expressApp, setupCommonRoutes } from './services/http-server' import { getObjectStorage } from './services/object_storage' CompressionCodecs[CompressionTypes.Snappy] = SnappyCodec @@ -519,6 +519,11 @@ export async function startPluginsServer( await consumer.stop() }) healthChecks['cdp-function-callbacks'] = () => consumer.isHealthy() ?? false + + // NOTE: The function callback service is more idle so can handle http requests as well + if (capabilities.http) { + consumer.addApiRoutes(expressApp) + } } if (capabilities.personOverrides) { diff --git a/plugin-server/src/main/services/http-server.ts b/plugin-server/src/main/services/http-server.ts index 85c154dab66e7..8889f96f22032 100644 --- a/plugin-server/src/main/services/http-server.ts +++ b/plugin-server/src/main/services/http-server.ts @@ -12,6 +12,8 @@ v8Profiler.setGenerateType(1) export const expressApp: express.Application = express() +expressApp.use(express.json()) + export function setupCommonRoutes( healthChecks: { [service: string]: () => Promise | boolean }, analyticsEventsIngestionConsumer?: KafkaJSIngestionConsumer | IngestionConsumer diff --git a/plugin-server/tests/cdp/cdp-function-callbacks-consumer.test.ts b/plugin-server/tests/cdp/cdp-function-callbacks-consumer.test.ts new file mode 100644 index 0000000000000..d15c8344502d5 --- /dev/null +++ b/plugin-server/tests/cdp/cdp-function-callbacks-consumer.test.ts @@ -0,0 +1,415 @@ +import express from 'express' +import supertest from 'supertest' + +import { CdpFunctionCallbackConsumer } from '../../src/cdp/cdp-consumers' +import { HogFunctionType } from '../../src/cdp/types' +import { defaultConfig } from '../../src/config/config' +import { Hub, PluginsServerConfig, Team } from '../../src/types' +import { createHub } from '../../src/utils/db/hub' +import { getFirstTeam, resetTestDatabase } from '../helpers/sql' +import { HOG_EXAMPLES, HOG_FILTERS_EXAMPLES, HOG_INPUTS_EXAMPLES } from './examples' +import { insertHogFunction as _insertHogFunction } from './fixtures' + +const config: PluginsServerConfig = { + ...defaultConfig, +} + +const mockConsumer = { + on: jest.fn(), + commitSync: jest.fn(), + commit: jest.fn(), + queryWatermarkOffsets: jest.fn(), + committed: jest.fn(), + assignments: jest.fn(), + isConnected: jest.fn(() => true), + getMetadata: jest.fn(), +} + +jest.mock('../../src/kafka/batch-consumer', () => { + return { + startBatchConsumer: jest.fn(() => + Promise.resolve({ + join: () => ({ + finally: jest.fn(), + }), + stop: jest.fn(), + consumer: mockConsumer, + }) + ), + } +}) + +jest.mock('../../src/utils/fetch', () => { + return { + trackedFetch: jest.fn(() => + Promise.resolve({ + status: 200, + text: () => Promise.resolve(JSON.stringify({ success: true })), + json: () => Promise.resolve({ success: true }), + }) + ), + } +}) + +jest.mock('../../src/utils/db/kafka-producer-wrapper', () => { + const mockKafkaProducer = { + producer: { + connect: jest.fn(), + }, + disconnect: jest.fn(), + produce: jest.fn(), + } + return { + KafkaProducerWrapper: jest.fn(() => mockKafkaProducer), + } +}) + +const mockFetch: jest.Mock = require('../../src/utils/fetch').trackedFetch + +jest.setTimeout(1000) + +describe('CDP Processed Events Consuner', () => { + let processor: CdpFunctionCallbackConsumer + let hub: Hub + let closeHub: () => Promise + let team: Team + + const insertHogFunction = async (hogFunction: Partial) => { + const item = await _insertHogFunction(hub.postgres, team.id, hogFunction) + // Trigger the reload that django would do + await processor.hogFunctionManager.reloadAllHogFunctions() + return item + } + + beforeEach(async () => { + await resetTestDatabase() + ;[hub, closeHub] = await createHub() + team = await getFirstTeam(hub) + + processor = new CdpFunctionCallbackConsumer(config, hub) + await processor.start() + + mockFetch.mockClear() + }) + + afterEach(async () => { + jest.setTimeout(10000) + await processor.stop() + await closeHub() + }) + + afterAll(() => { + jest.useRealTimers() + }) + + // describe('general event processing', () => { + // /** + // * Tests here are somewhat expensive so should mostly simulate happy paths and the more e2e scenarios + // */ + // it('can parse incoming messages correctly', async () => { + // await insertHogFunction({ + // ...HOG_EXAMPLES.simple_fetch, + // ...HOG_INPUTS_EXAMPLES.simple_fetch, + // ...HOG_FILTERS_EXAMPLES.no_filters, + // }) + // // Create a message that should be processed by this function + // // Run the function and check that it was executed + // await processor.handleEachBatch( + // [ + // createMessage( + // createIncomingEvent(team.id, { + // uuid: 'b3a1fe86-b10c-43cc-acaf-d208977608d0', + // event: '$pageview', + // properties: JSON.stringify({ + // $lib_version: '1.0.0', + // }), + // }) + // ), + // ], + // noop + // ) + + // expect(mockFetch).toHaveBeenCalledTimes(1) + // expect(mockFetch.mock.calls[0]).toMatchInlineSnapshot(` + // Array [ + // "https://example.com/posthog-webhook", + // Object { + // "body": "{ + // \\"event\\": { + // \\"uuid\\": \\"b3a1fe86-b10c-43cc-acaf-d208977608d0\\", + // \\"name\\": \\"$pageview\\", + // \\"distinct_id\\": \\"distinct_id_1\\", + // \\"properties\\": { + // \\"$lib_version\\": \\"1.0.0\\", + // \\"$elements_chain\\": \\"[]\\" + // }, + // \\"timestamp\\": null, + // \\"url\\": \\"http://localhost:8000/project/2/events/b3a1fe86-b10c-43cc-acaf-d208977608d0/null\\" + // }, + // \\"groups\\": null, + // \\"nested\\": { + // \\"foo\\": \\"http://localhost:8000/project/2/events/b3a1fe86-b10c-43cc-acaf-d208977608d0/null\\" + // }, + // \\"person\\": null, + // \\"event_url\\": \\"http://localhost:8000/project/2/events/b3a1fe86-b10c-43cc-acaf-d208977608d0/null-test\\" + // }", + // "headers": Object { + // "version": "v=1.0.0", + // }, + // "method": "POST", + // "timeout": 10000, + // }, + // ] + // `) + // }) + + // it('generates logs and produces them to kafka', async () => { + // await insertHogFunction({ + // ...HOG_EXAMPLES.simple_fetch, + // ...HOG_INPUTS_EXAMPLES.simple_fetch, + // ...HOG_FILTERS_EXAMPLES.no_filters, + // }) + + // // Create a message that should be processed by this function + // // Run the function and check that it was executed + // await processor.handleEachBatch( + // [ + // createMessage( + // createIncomingEvent(team.id, { + // uuid: 'b3a1fe86-b10c-43cc-acaf-d208977608d0', + // event: '$pageview', + // properties: JSON.stringify({ + // $lib_version: '1.0.0', + // }), + // }) + // ), + // ], + // noop + // ) + + // expect(mockFetch).toHaveBeenCalledTimes(1) + // // Once for the async callback, twice for the logs + // expect(mockProducer.produce).toHaveBeenCalledTimes(3) + + // expect(decodeKafkaMessage(mockProducer.produce.mock.calls[0][0])).toMatchObject({ + // key: expect.any(String), + // topic: 'log_entries_test', + // value: { + // instance_id: expect.any(String), + // level: 'debug', + // log_source: 'hog_function', + // log_source_id: expect.any(String), + // message: 'Executing function', + // team_id: 2, + // timestamp: expect.any(String), + // }, + // waitForAck: true, + // }) + + // expect(decodeKafkaMessage(mockProducer.produce.mock.calls[1][0])).toMatchObject({ + // topic: 'log_entries_test', + // value: { + // log_source: 'hog_function', + // message: "Suspending function due to async function call 'fetch'", + // team_id: 2, + // }, + // }) + + // expect(decodeKafkaMessage(mockProducer.produce.mock.calls[2][0])).toEqual({ + // key: expect.any(String), + // topic: 'cdp_function_callbacks_test', + // value: { + // id: expect.any(String), + // globals: expect.objectContaining({ + // project: { id: 2, name: 'TEST PROJECT', url: 'http://localhost:8000/project/2' }, + // // We assume the rest is correct + // }), + // teamId: 2, + // hogFunctionId: expect.any(String), + // finished: false, + // logs: [], + // timings: [ + // { + // kind: 'hog', + // duration_ms: expect.any(Number), + // }, + // ], + // asyncFunctionRequest: { + // name: 'fetch', + // args: [ + // 'https://example.com/posthog-webhook', + // { + // headers: { version: 'v=1.0.0' }, + // body: { + // event: { + // uuid: 'b3a1fe86-b10c-43cc-acaf-d208977608d0', + // name: '$pageview', + // distinct_id: 'distinct_id_1', + // properties: { $lib_version: '1.0.0', $elements_chain: '[]' }, + // timestamp: null, + // url: 'http://localhost:8000/project/2/events/b3a1fe86-b10c-43cc-acaf-d208977608d0/null', + // }, + // event_url: + // 'http://localhost:8000/project/2/events/b3a1fe86-b10c-43cc-acaf-d208977608d0/null-test', + // groups: null, + // nested: { + // foo: 'http://localhost:8000/project/2/events/b3a1fe86-b10c-43cc-acaf-d208977608d0/null', + // }, + // person: null, + // }, + // method: 'POST', + // }, + // ], + // vmState: expect.any(Object), + // }, + // asyncFunctionResponse: { + // vmResponse: { + // status: 200, + // body: { success: true }, + // }, + // timings: [ + // { + // kind: 'async_function', + // duration_ms: expect.any(Number), + // }, + // ], + // }, + // }, + // waitForAck: true, + // }) + // }) + // }) + + describe('API invocation', () => { + let app: express.Express + let hogFunction: HogFunctionType + + const event = { + uuid: 'b3a1fe86-b10c-43cc-acaf-d208977608d0', + event: '$pageview', + properties: { + $lib_version: '1.0.0', + }, + } + + beforeEach(async () => { + app = express() + app.use(express.json()) + processor.addApiRoutes(app) + + hogFunction = await insertHogFunction({ + ...HOG_EXAMPLES.simple_fetch, + ...HOG_INPUTS_EXAMPLES.simple_fetch, + ...HOG_FILTERS_EXAMPLES.no_filters, + }) + }) + + it('errors if missing hog function or team', async () => { + const res = await supertest(app) + .post(`/api/projects/${hogFunction.team_id}/hog_functions/missing/invocations`) + .send({ event }) + + expect(res.status).toEqual(404) + }) + + it('errors if missing values', async () => { + const res = await supertest(app) + .post(`/api/projects/${hogFunction.team_id}/hog_functions/${hogFunction.id}/invocations`) + .send({}) + + expect(res.status).toEqual(400) + expect(res.body).toEqual({ + error: 'Missing event', + }) + }) + + it('can invoke a function via the API with mocks', async () => { + const res = await supertest(app) + .post(`/api/projects/${hogFunction.team_id}/hog_functions/${hogFunction.id}/invocations`) + .send({ event, mock_async_functions: true }) + + expect(res.status).toEqual(200) + expect(res.body).toMatchObject({ + status: 'success', + error: 'undefined', + logs: [ + { + log_source: 'hog_function', + level: 'debug', + message: 'Executing function', + }, + { + log_source: 'hog_function', + level: 'debug', + message: "Suspending function due to async function call 'fetch'", + }, + { + log_source: 'hog_function', + level: 'info', + message: "Async function 'fetch' was mocked", + }, + { + log_source: 'hog_function', + level: 'debug', + message: 'Resuming function', + }, + { + log_source: 'hog_function', + level: 'info', + message: 'Fetch response:, {"status":200,"body":{}}', + }, + { + log_source: 'hog_function', + level: 'debug', + message: expect.stringContaining('Function completed. Processing time'), + }, + ], + }) + }) + + it('can invoke a function via the API with real fetch', async () => { + mockFetch.mockImplementationOnce(() => + Promise.resolve({ + status: 201, + text: () => Promise.resolve(JSON.stringify({ real: true })), + }) + ) + const res = await supertest(app) + .post(`/api/projects/${hogFunction.team_id}/hog_functions/${hogFunction.id}/invocations`) + .send({ event, mock_async_functions: false }) + + expect(res.status).toEqual(200) + expect(res.body).toMatchObject({ + status: 'success', + error: 'undefined', + logs: [ + { + log_source: 'hog_function', + level: 'debug', + message: 'Executing function', + }, + { + log_source: 'hog_function', + level: 'debug', + message: "Suspending function due to async function call 'fetch'", + }, + { + log_source: 'hog_function', + level: 'debug', + message: 'Resuming function', + }, + { + log_source: 'hog_function', + level: 'info', + message: 'Fetch response:, {"status":201,"body":{"real":true}}', + }, + { + log_source: 'hog_function', + level: 'debug', + message: expect.stringContaining('Function completed. Processing time'), + }, + ], + }) + }) + }) +}) diff --git a/plugin-server/tests/cdp/cdp-processed-events-consumer.test.ts b/plugin-server/tests/cdp/cdp-processed-events-consumer.test.ts index 8cd9709b74eb8..6b650b05edc17 100644 --- a/plugin-server/tests/cdp/cdp-processed-events-consumer.test.ts +++ b/plugin-server/tests/cdp/cdp-processed-events-consumer.test.ts @@ -1,4 +1,4 @@ -import { CdpProcessedEventsConsumer } from '../../src/cdp/cdp-processed-events-consumer' +import { CdpProcessedEventsConsumer } from '../../src/cdp/cdp-consumers' import { HogFunctionType } from '../../src/cdp/types' import { defaultConfig } from '../../src/config/config' import { Hub, PluginsServerConfig, Team } from '../../src/types' @@ -234,35 +234,53 @@ describe('CDP Processed Events Consuner', () => { }), teamId: 2, hogFunctionId: expect.any(String), - asyncFunctionName: 'fetch', - asyncFunctionArgs: [ - 'https://example.com/posthog-webhook', + finished: false, + logs: [], + timings: [ { - headers: { version: 'v=1.0.0' }, - body: { - event: { - uuid: 'b3a1fe86-b10c-43cc-acaf-d208977608d0', - name: '$pageview', - distinct_id: 'distinct_id_1', - properties: { $lib_version: '1.0.0', $elements_chain: '[]' }, - timestamp: null, - url: 'http://localhost:8000/project/2/events/b3a1fe86-b10c-43cc-acaf-d208977608d0/null', - }, - event_url: - 'http://localhost:8000/project/2/events/b3a1fe86-b10c-43cc-acaf-d208977608d0/null-test', - groups: null, - nested: { - foo: 'http://localhost:8000/project/2/events/b3a1fe86-b10c-43cc-acaf-d208977608d0/null', + kind: 'hog', + duration_ms: expect.any(Number), + }, + ], + asyncFunctionRequest: { + name: 'fetch', + args: [ + 'https://example.com/posthog-webhook', + { + headers: { version: 'v=1.0.0' }, + body: { + event: { + uuid: 'b3a1fe86-b10c-43cc-acaf-d208977608d0', + name: '$pageview', + distinct_id: 'distinct_id_1', + properties: { $lib_version: '1.0.0', $elements_chain: '[]' }, + timestamp: null, + url: 'http://localhost:8000/project/2/events/b3a1fe86-b10c-43cc-acaf-d208977608d0/null', + }, + event_url: + 'http://localhost:8000/project/2/events/b3a1fe86-b10c-43cc-acaf-d208977608d0/null-test', + groups: null, + nested: { + foo: 'http://localhost:8000/project/2/events/b3a1fe86-b10c-43cc-acaf-d208977608d0/null', + }, + person: null, }, - person: null, + method: 'POST', }, - method: 'POST', + ], + vmState: expect.any(Object), + }, + asyncFunctionResponse: { + vmResponse: { + status: 200, + body: { success: true }, }, - ], - vmState: expect.any(Object), - vmResponse: { - status: 200, - body: { success: true }, + timings: [ + { + kind: 'async_function', + duration_ms: expect.any(Number), + }, + ], }, }, waitForAck: true, diff --git a/plugin-server/tests/cdp/fixtures.ts b/plugin-server/tests/cdp/fixtures.ts index 3cfb043434df1..9147d043a7468 100644 --- a/plugin-server/tests/cdp/fixtures.ts +++ b/plugin-server/tests/cdp/fixtures.ts @@ -55,7 +55,7 @@ export const insertHogFunction = async ( postgres: PostgresRouter, team_id: Team['id'], hogFunction: Partial = {} -) => { +): Promise => { const res = await insertRow( postgres, 'posthog_hogfunction', diff --git a/plugin-server/tests/cdp/hog-executor.test.ts b/plugin-server/tests/cdp/hog-executor.test.ts index 2cf34a5041626..a0cf212a3e906 100644 --- a/plugin-server/tests/cdp/hog-executor.test.ts +++ b/plugin-server/tests/cdp/hog-executor.test.ts @@ -1,8 +1,16 @@ +import { DateTime } from 'luxon' + import { HogExecutor } from '../../src/cdp/hog-executor' import { HogFunctionManager } from '../../src/cdp/hog-function-manager' -import { HogFunctionLogEntry, HogFunctionType } from '../../src/cdp/types' +import { + HogFunctionInvocationAsyncResponse, + HogFunctionInvocationResult, + HogFunctionLogEntry, + HogFunctionType, +} from '../../src/cdp/types' import { defaultConfig } from '../../src/config/config' -import { PluginsServerConfig } from '../../src/types' +import { PluginsServerConfig, TimestampFormat } from '../../src/types' +import { castTimestampOrNow } from '../../src/utils/utils' import { HOG_EXAMPLES, HOG_FILTERS_EXAMPLES, HOG_INPUTS_EXAMPLES } from './examples' import { createHogExecutionGlobals, createHogFunction, insertHogFunction as _insertHogFunction } from './fixtures' @@ -10,6 +18,24 @@ const config: PluginsServerConfig = { ...defaultConfig, } +const simulateMockFetchAsyncResponse = (result: HogFunctionInvocationResult): HogFunctionInvocationAsyncResponse => { + return { + ...result, + asyncFunctionResponse: { + timings: [ + { + kind: 'async_function', + duration_ms: 100, + }, + ], + vmResponse: { + status: 200, + body: 'success', + }, + }, + } +} + describe('Hog Executor', () => { jest.setTimeout(1000) let executor: HogExecutor @@ -57,7 +83,7 @@ describe('Hog Executor', () => { log_source: 'hog_function', log_source_id: hogFunction.id, instance_id: results[0].id, - timestamp: '2024-06-07 12:00:00.001', + timestamp: expect.any(DateTime), level: 'debug', message: 'Executing function', }, @@ -66,16 +92,24 @@ describe('Hog Executor', () => { log_source: 'hog_function', log_source_id: hogFunction.id, instance_id: results[0].id, - timestamp: '2024-06-07 12:00:00.002', + timestamp: expect.any(DateTime), level: 'debug', message: "Suspending function due to async function call 'fetch'", }, ]) + + expect(castTimestampOrNow(results[0].logs[0].timestamp, TimestampFormat.ClickHouse)).toEqual( + '2024-06-07 12:00:00.000' + ) + // Ensure the second log is one more + expect(castTimestampOrNow(results[0].logs[1].timestamp, TimestampFormat.ClickHouse)).toEqual( + '2024-06-07 12:00:00.001' + ) }) it('queues up an async function call', () => { const results = executor.executeMatchingFunctions(createHogExecutionGlobals()) - expect(results[0].asyncFunction).toMatchObject({ + expect(results[0]).toMatchObject({ id: results[0].id, globals: { project: { id: 1, name: 'test', url: 'http://localhost:8000/projects/1' }, @@ -94,50 +128,57 @@ describe('Hog Executor', () => { }, teamId: 1, hogFunctionId: hogFunction.id, - asyncFunctionName: 'fetch', - asyncFunctionArgs: [ - 'https://example.com/posthog-webhook', - { - headers: { version: 'v=1.2.3' }, - body: { - event: { - uuid: 'uuid', - name: 'test', - distinct_id: 'distinct_id', - url: 'http://localhost:8000/events/1', - properties: { $lib_version: '1.2.3' }, - timestamp: '2024-06-07T12:00:00.000Z', + asyncFunctionRequest: { + name: 'fetch', + args: [ + 'https://example.com/posthog-webhook', + { + headers: { version: 'v=1.2.3' }, + body: { + event: { + uuid: 'uuid', + name: 'test', + distinct_id: 'distinct_id', + url: 'http://localhost:8000/events/1', + properties: { $lib_version: '1.2.3' }, + timestamp: '2024-06-07T12:00:00.000Z', + }, + groups: null, + nested: { foo: 'http://localhost:8000/events/1' }, + person: null, + event_url: 'http://localhost:8000/events/1-test', }, - groups: null, - nested: { foo: 'http://localhost:8000/events/1' }, - person: null, - event_url: 'http://localhost:8000/events/1-test', + method: 'POST', }, - method: 'POST', + ], + vmState: expect.any(Object), + }, + timings: [ + { + kind: 'hog', + duration_ms: 0, }, ], - vmState: expect.any(Object), }) }) it('executes the full function in a loop', () => { const logs: HogFunctionLogEntry[] = [] const results = executor.executeMatchingFunctions(createHogExecutionGlobals()) - logs.push(...results[0].logs) - const asyncExecResult = executor.executeAsyncResponse({ - ...results[0].asyncFunction!, - vmResponse: { status: 200, body: 'success' }, - }) + const splicedLogs = results[0].logs.splice(0, 100) + logs.push(...splicedLogs) + + const asyncExecResult = executor.executeAsyncResponse(simulateMockFetchAsyncResponse(results[0])) logs.push(...asyncExecResult.logs) expect(asyncExecResult.error).toBeUndefined() - expect(asyncExecResult.success).toBe(true) + expect(asyncExecResult.finished).toBe(true) expect(logs.map((log) => log.message)).toEqual([ 'Executing function', "Suspending function due to async function call 'fetch'", 'Resuming function', 'Fetch response:, {"status":200,"body":"success"}', - 'Function completed', + 'Function completed. Processing time 100ms', ]) }) }) @@ -188,20 +229,15 @@ describe('Hog Executor', () => { expect(results).toHaveLength(1) // Run the result one time simulating a successful fetch - const asyncResult1 = executor.executeAsyncResponse({ - ...results[0].asyncFunction!, - vmResponse: { status: 200, body: 'success' }, - }) - expect(asyncResult1.success).toBe(true) - expect(asyncResult1.asyncFunction).toBeDefined() + const asyncResult1 = executor.executeAsyncResponse(simulateMockFetchAsyncResponse(results[0])) + expect(asyncResult1.finished).toBe(false) + expect(asyncResult1.error).toBe(undefined) + expect(asyncResult1.asyncFunctionRequest).toBeDefined() // Run the result one more time simulating a second successful fetch - const asyncResult2 = executor.executeAsyncResponse({ - ...asyncResult1.asyncFunction!, - vmResponse: { status: 200, body: 'success' }, - }) + const asyncResult2 = executor.executeAsyncResponse(simulateMockFetchAsyncResponse(asyncResult1)) // This time we should see an error for hitting the loop limit - expect(asyncResult2.success).toBe(false) + expect(asyncResult2.finished).toBe(false) expect(asyncResult2.error).toEqual('Function exceeded maximum async steps') expect(asyncResult2.logs.map((log) => log.message)).toEqual(['Function exceeded maximum async steps']) }) diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 45bdf07dd9d4a..9974d8b091188 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -260,8 +260,8 @@ dependencies: specifier: ^9.3.0 version: 9.3.0(postcss@8.4.31) posthog-js: - specifier: 1.139.3 - version: 1.139.3 + specifier: 1.139.4 + version: 1.139.4 posthog-js-lite: specifier: 3.0.0 version: 3.0.0 @@ -17706,8 +17706,8 @@ packages: resolution: {integrity: sha512-dyajjnfzZD1tht4N7p7iwf7nBnR1MjVaVu+MKr+7gBgA39bn28wizCIJZztZPtHy4PY0YwtSGgwfBCuG/hnHgA==} dev: false - /posthog-js@1.139.3: - resolution: {integrity: sha512-NmPtOAXogxT/QSmeVCQJeIemBn8rEGfFPIXOynYKgXfntrw0KhzGXXvRXGLVjl5Zx6Nslf5NlmMnzmq1wjLZIA==} + /posthog-js@1.139.4: + resolution: {integrity: sha512-K0bV3xI7PCgJYN+qPQ26BglOtGzgXHM+BU3pBo1ukbX33O2/CktzFfnKvYdarXuIEBWsRdiiloqb+ok4pI8/Hw==} dependencies: fflate: 0.4.8 preact: 10.22.0 diff --git a/posthog/api/hog_function.py b/posthog/api/hog_function.py index 9ddc46cc3b45f..fc1789579bfb6 100644 --- a/posthog/api/hog_function.py +++ b/posthog/api/hog_function.py @@ -18,6 +18,7 @@ from posthog.cdp.validation import compile_hog, validate_inputs, validate_inputs_schema from posthog.models.hog_functions.hog_function import HogFunction from posthog.permissions import PostHogFeatureFlagPermission +from posthog.plugins.plugin_server_api import create_hog_invocation_test logger = structlog.get_logger(__name__) @@ -105,6 +106,14 @@ def create(self, validated_data: dict, *args, **kwargs) -> HogFunction: return super().create(validated_data=validated_data) +class HogFunctionInvocationSerializer(serializers.Serializer): + configuration = HogFunctionSerializer(write_only=True) + event = serializers.DictField(write_only=True) + mock_async_functions = serializers.BooleanField(default=True, write_only=True) + status = serializers.CharField(read_only=True) + logs = serializers.ListField(read_only=True) + + class HogFunctionViewSet(TeamAndOrgViewSetMixin, LogEntryMixin, ForbidDestroyModel, viewsets.ModelViewSet): scope_object = "INTERNAL" # Keep internal until we are happy to release this GA queryset = HogFunction.objects.all() @@ -143,3 +152,30 @@ def icon(self, request: Request, *args, **kwargs): icon_service = CDPIconsService() return icon_service.get_icon_http_response(id) + + @action(detail=True, methods=["POST"]) + def invocations(self, request: Request, *args, **kwargs): + hog_function = self.get_object() + serializer = HogFunctionInvocationSerializer(data=request.data, context=self.get_serializer_context()) + if not serializer.is_valid(): + return Response(serializer.errors, status=400) + + configuration = serializer.validated_data["configuration"] + # Remove the team from the config + configuration.pop("team") + + event = serializer.validated_data["event"] + mock_async_functions = serializer.validated_data["mock_async_functions"] + + res = create_hog_invocation_test( + team_id=hog_function.team_id, + hog_function_id=hog_function.id, + event=event, + configuration=configuration, + mock_async_functions=mock_async_functions, + ) + + if res.status_code != 200: + return Response({"status": "error"}, status=res.status_code) + + return Response(res.json()) diff --git a/posthog/api/plugin.py b/posthog/api/plugin.py index eb147a70d6d51..47a5ab5b3bb80 100644 --- a/posthog/api/plugin.py +++ b/posthog/api/plugin.py @@ -44,7 +44,7 @@ from posthog.permissions import APIScopePermission from posthog.plugins import can_configure_plugins, can_install_plugins, parse_url from posthog.plugins.access import can_globally_manage_plugins, has_plugin_access_level -from posthog.plugins.reload import populate_plugin_capabilities_on_workers +from posthog.plugins.plugin_server_api import populate_plugin_capabilities_on_workers from posthog.queries.app_metrics.app_metrics import TeamPluginsDeliveryRateQuery from posthog.utils import format_query_params_absolute_url diff --git a/posthog/models/hog_functions/utils.py b/posthog/cdp/filters.py similarity index 60% rename from posthog/models/hog_functions/utils.py rename to posthog/cdp/filters.py index 5ec265487d2e3..e24dd6bf120d8 100644 --- a/posthog/models/hog_functions/utils.py +++ b/posthog/cdp/filters.py @@ -1,7 +1,7 @@ -from typing import Any +from typing import Optional from posthog.models.action.action import Action from posthog.hogql.bytecode import create_bytecode -from posthog.hogql.parser import parse_expr, parse_string_template +from posthog.hogql.parser import parse_expr from posthog.hogql.property import action_to_expr, property_to_expr, ast from posthog.models.team.team import Team @@ -51,16 +51,35 @@ def hog_function_filters_to_expr(filters: dict, team: Team, actions: dict[int, A return ast.Constant(value=True) -def generate_template_bytecode(obj: Any) -> Any: - """ - Clones an object, compiling any string values to bytecode templates - """ +def filter_action_ids(filters: Optional[dict]) -> list[int]: + if not filters: + return [] + try: + return [int(action["id"]) for action in filters.get("actions", [])] + except KeyError: + return [] - if isinstance(obj, dict): - return {key: generate_template_bytecode(value) for key, value in obj.items()} - elif isinstance(obj, list): - return [generate_template_bytecode(item) for item in obj] - elif isinstance(obj, str): - return create_bytecode(parse_string_template(obj)) - else: - return obj + +def compile_filters_expr(filters: Optional[dict], team: Team, actions: Optional[dict[int, Action]] = None) -> ast.Expr: + filters = filters or {} + + if actions is None: + # If not provided as an optimization we fetch all actions + actions_list = ( + Action.objects.select_related("team").filter(team_id=team.id).filter(id__in=filter_action_ids(filters)) + ) + actions = {action.id: action for action in actions_list} + + return hog_function_filters_to_expr(filters, team, actions) + + +def compile_filters_bytecode(filters: Optional[dict], team: Team, actions: Optional[dict[int, Action]] = None) -> dict: + filters = filters or {} + try: + filters["bytecode"] = create_bytecode(compile_filters_expr(filters, team, actions)) + except Exception as e: + # TODO: Better reporting of this issue + filters["bytecode"] = None + filters["bytecode_error"] = str(e) + + return filters diff --git a/posthog/cdp/validation.py b/posthog/cdp/validation.py index 93dad3f8e6501..4a38523947de7 100644 --- a/posthog/cdp/validation.py +++ b/posthog/cdp/validation.py @@ -3,12 +3,26 @@ from rest_framework import serializers from posthog.hogql.bytecode import create_bytecode -from posthog.hogql.parser import parse_program -from posthog.models.hog_functions.utils import generate_template_bytecode +from posthog.hogql.parser import parse_program, parse_string_template logger = logging.getLogger(__name__) +def generate_template_bytecode(obj: Any) -> Any: + """ + Clones an object, compiling any string values to bytecode templates + """ + + if isinstance(obj, dict): + return {key: generate_template_bytecode(value) for key, value in obj.items()} + elif isinstance(obj, list): + return [generate_template_bytecode(item) for item in obj] + elif isinstance(obj, str): + return create_bytecode(parse_string_template(obj)) + else: + return obj + + class InputsSchemaItemSerializer(serializers.Serializer): type = serializers.ChoiceField(choices=["string", "boolean", "dictionary", "choice", "json"]) key = serializers.CharField() @@ -38,26 +52,25 @@ def validate(self, attrs): schema = self.context["schema"] value = attrs.get("value") + name: str = schema["key"] + item_type = schema["type"] + if schema.get("required") and not value: - raise serializers.ValidationError("This field is required.") + raise serializers.ValidationError({"inputs": {name: f"This field is required."}}) if not value: return attrs - name: str = schema["key"] - item_type = schema["type"] - value = attrs["value"] - # Validate each type if item_type == "string": if not isinstance(value, str): - raise serializers.ValidationError("Value must be a string.") + raise serializers.ValidationError({"inputs": {name: f"Value must be a string."}}) elif item_type == "boolean": if not isinstance(value, bool): - raise serializers.ValidationError("Value must be a boolean.") + raise serializers.ValidationError({"inputs": {name: f"Value must be a boolean."}}) elif item_type == "dictionary": if not isinstance(value, dict): - raise serializers.ValidationError("Value must be a dictionary.") + raise serializers.ValidationError({"inputs": {name: f"Value must be a dictionary."}}) try: if value: @@ -89,8 +102,7 @@ def validate_inputs(inputs_schema: list, inputs: dict) -> dict: serializer = InputsItemSerializer(data=value, context={"schema": schema}) if not serializer.is_valid(): - first_error = next(iter(serializer.errors.values()))[0] - raise serializers.ValidationError({"inputs": {schema["key"]: first_error}}) + raise serializers.ValidationError(serializer.errors) validated_inputs[schema["key"]] = serializer.validated_data @@ -102,5 +114,6 @@ def compile_hog(hog: str, supported_functions: Optional[set[str]] = None) -> lis try: program = parse_program(hog) return create_bytecode(program, supported_functions=supported_functions or {"fetch"}) - except Exception: + except Exception as e: + logger.error(f"Failed to compile hog {e}", exc_info=True) raise serializers.ValidationError({"hog": "Hog code has errors."}) diff --git a/posthog/hogql/bytecode.py b/posthog/hogql/bytecode.py index 1a5933a88bc92..48ddcd2f7e283 100644 --- a/posthog/hogql/bytecode.py +++ b/posthog/hogql/bytecode.py @@ -1,4 +1,5 @@ import dataclasses +from datetime import timedelta from typing import Any, Optional, cast, TYPE_CHECKING from collections.abc import Callable @@ -388,7 +389,7 @@ def execute_hog( team: Optional["Team"] = None, globals: Optional[dict[str, Any]] = None, functions: Optional[dict[str, Callable[..., Any]]] = None, - timeout=10, + timeout=timedelta(seconds=10), ) -> BytecodeResult: source_code = source_code.strip() if source_code.count("\n") == 0: diff --git a/posthog/models/action/action.py b/posthog/models/action/action.py index 1d89c3012578c..929f2753bc02c 100644 --- a/posthog/models/action/action.py +++ b/posthog/models/action/action.py @@ -8,7 +8,7 @@ from posthog.hogql.errors import BaseHogQLError from posthog.models.signals import mutable_receiver -from posthog.plugins.reload import drop_action_on_workers, reload_action_on_workers +from posthog.plugins.plugin_server_api import drop_action_on_workers, reload_action_on_workers ActionStepMatching = Literal["contains", "regex", "exact"] diff --git a/posthog/models/hog_functions/hog_function.py b/posthog/models/hog_functions/hog_function.py index 0307178e2bd33..5842f646dd12f 100644 --- a/posthog/models/hog_functions/hog_function.py +++ b/posthog/models/hog_functions/hog_function.py @@ -8,7 +8,7 @@ from posthog.models.action.action import Action from posthog.models.team.team import Team from posthog.models.utils import UUIDModel -from posthog.plugins.reload import reload_hog_functions_on_workers +from posthog.plugins.plugin_server_api import reload_hog_functions_on_workers class HogFunction(UUIDModel): @@ -44,28 +44,10 @@ def filter_action_ids(self) -> list[int]: except KeyError: return [] - def compile_filters_bytecode(self, actions: Optional[dict[int, Action]] = None): - from .utils import hog_function_filters_to_expr - from posthog.hogql.bytecode import create_bytecode - - self.filters = self.filters or {} - - if actions is None: - # If not provided as an optimization we fetch all actions - actions_list = ( - Action.objects.select_related("team").filter(team_id=self.team_id).filter(id__in=self.filter_action_ids) - ) - actions = {action.id: action for action in actions_list} - - try: - self.filters["bytecode"] = create_bytecode(hog_function_filters_to_expr(self.filters, self.team, actions)) - except Exception as e: - # TODO: Better reporting of this issue - self.filters["bytecode"] = None - self.filters["bytecode_error"] = str(e) - def save(self, *args, **kwargs): - self.compile_filters_bytecode() + from posthog.cdp.filters import compile_filters_bytecode + + self.filters = compile_filters_bytecode(self.filters, self.team) return super().save(*args, **kwargs) def __str__(self): diff --git a/posthog/models/organization.py b/posthog/models/organization.py index cc1c5ce669457..cc9656eb7d757 100644 --- a/posthog/models/organization.py +++ b/posthog/models/organization.py @@ -20,7 +20,7 @@ create_with_slug, sane_repr, ) -from posthog.plugins.reload import reset_available_product_features_cache_on_workers +from posthog.plugins.plugin_server_api import reset_available_product_features_cache_on_workers from posthog.utils import absolute_uri if TYPE_CHECKING: diff --git a/posthog/models/plugin.py b/posthog/models/plugin.py index d2ecd0d799c92..46ddfb9177f4c 100644 --- a/posthog/models/plugin.py +++ b/posthog/models/plugin.py @@ -20,7 +20,7 @@ from posthog.models.signals import mutable_receiver from posthog.models.team import Team from posthog.plugins.access import can_configure_plugins, can_install_plugins -from posthog.plugins.reload import populate_plugin_capabilities_on_workers, reload_plugins_on_workers +from posthog.plugins.plugin_server_api import populate_plugin_capabilities_on_workers, reload_plugins_on_workers from posthog.plugins.site import get_decide_site_apps from posthog.plugins.utils import ( download_plugin_archive, diff --git a/posthog/plugins/__init__.py b/posthog/plugins/__init__.py index 6ada31b398734..6b692c334f644 100644 --- a/posthog/plugins/__init__.py +++ b/posthog/plugins/__init__.py @@ -1,4 +1,3 @@ # flake8: noqa from .access import can_configure_plugins, can_install_plugins -from .reload import reload_plugins_on_workers from .utils import download_plugin_archive, get_file_from_archive, parse_url diff --git a/posthog/plugins/reload.py b/posthog/plugins/plugin_server_api.py similarity index 66% rename from posthog/plugins/reload.py rename to posthog/plugins/plugin_server_api.py index 7eda90c5ba3f4..1508c2a4c00c6 100644 --- a/posthog/plugins/reload.py +++ b/posthog/plugins/plugin_server_api.py @@ -1,9 +1,9 @@ import json from typing import Union +import requests import structlog -from django.conf import settings - from posthog.redis import get_client +from posthog.settings import CDP_FUNCTION_EXECUTOR_API_URL, PLUGINS_RELOAD_PUBSUB_CHANNEL, PLUGINS_RELOAD_REDIS_URL logger = structlog.get_logger(__name__) @@ -13,12 +13,13 @@ def publish_message(channel: str, payload: Union[dict, str]): message = json.dumps(payload) if not isinstance(payload, str) else payload - get_client(settings.PLUGINS_RELOAD_REDIS_URL).publish(channel, message) + get_client(PLUGINS_RELOAD_REDIS_URL).publish(channel, message) def reload_plugins_on_workers(): logger.info("Reloading plugins on workers") - publish_message(settings.PLUGINS_RELOAD_PUBSUB_CHANNEL, "reload!") + + publish_message(PLUGINS_RELOAD_PUBSUB_CHANNEL, "reload!") def reload_action_on_workers(team_id: int, action_id: int): @@ -47,3 +48,21 @@ def reset_available_product_features_cache_on_workers(organization_id: str): def populate_plugin_capabilities_on_workers(plugin_id: str): logger.info(f"Populating plugin capabilities for plugin {plugin_id} on workers") publish_message("populate-plugin-capabilities", {"plugin_id": plugin_id}) + + +def create_hog_invocation_test( + team_id: int, + hog_function_id: str, + event: dict, + configuration: dict, + mock_async_functions: bool, +) -> requests.Response: + logger.info(f"Creating hog invocation test for hog function {hog_function_id} on workers") + return requests.post( + CDP_FUNCTION_EXECUTOR_API_URL + f"/api/projects/{team_id}/hog_functions/{hog_function_id}/invocations", + json={ + "event": event, + "configuration": configuration, + "mock_async_functions": mock_async_functions, + }, + ) diff --git a/posthog/settings/data_stores.py b/posthog/settings/data_stores.py index 5606ad4f423fc..45b0e93dd9822 100644 --- a/posthog/settings/data_stores.py +++ b/posthog/settings/data_stores.py @@ -305,6 +305,14 @@ def _parse_kafka_hosts(hosts_string: str) -> list[str]: # We should move away to a different communication channel and remove this. PLUGINS_RELOAD_REDIS_URL = os.getenv("PLUGINS_RELOAD_REDIS_URL", REDIS_URL) + +CDP_FUNCTION_EXECUTOR_API_URL = get_from_env("CDP_FUNCTION_EXECUTOR_API_URL", "") + +if not CDP_FUNCTION_EXECUTOR_API_URL: + CDP_FUNCTION_EXECUTOR_API_URL = ( + "http://localhost:6738" if DEBUG else "http://ingestion-cdp-function-callbacks.posthog.svc.cluster.local" + ) + CACHES = { "default": { "BACKEND": "django_redis.cache.RedisCache", diff --git a/posthog/tasks/hog_functions.py b/posthog/tasks/hog_functions.py index 9304aff26a243..54b23761a71f0 100644 --- a/posthog/tasks/hog_functions.py +++ b/posthog/tasks/hog_functions.py @@ -2,8 +2,9 @@ from celery import shared_task +from posthog.cdp.filters import compile_filters_bytecode from posthog.models.action.action import Action -from posthog.plugins.reload import reload_hog_functions_on_workers +from posthog.plugins.plugin_server_api import reload_hog_functions_on_workers from posthog.tasks.utils import CeleryQueue @@ -47,7 +48,7 @@ def refresh_affected_hog_functions(team_id: Optional[int] = None, action_id: Opt actions_by_id = {action.id: action for action in all_related_actions} for hog_function in affected_hog_functions: - hog_function.compile_filters_bytecode(actions=actions_by_id) + hog_function.filters = compile_filters_bytecode(hog_function.filters, hog_function.team, actions_by_id) updates = HogFunction.objects.bulk_update(affected_hog_functions, ["filters"])