diff --git a/frontend/__snapshots__/scenes-app-dashboards--insight-legend--dark.png b/frontend/__snapshots__/scenes-app-dashboards--insight-legend--dark.png index d1217fba75dfb..d7d9398cd00cc 100644 Binary files a/frontend/__snapshots__/scenes-app-dashboards--insight-legend--dark.png and b/frontend/__snapshots__/scenes-app-dashboards--insight-legend--dark.png differ diff --git a/frontend/__snapshots__/scenes-app-dashboards--insight-legend-legacy--dark.png b/frontend/__snapshots__/scenes-app-dashboards--insight-legend-legacy--dark.png index 9d811913930ca..60a8efe898a62 100644 Binary files a/frontend/__snapshots__/scenes-app-dashboards--insight-legend-legacy--dark.png and b/frontend/__snapshots__/scenes-app-dashboards--insight-legend-legacy--dark.png differ diff --git a/frontend/__snapshots__/scenes-app-dashboards--insight-legend-legacy--light.png b/frontend/__snapshots__/scenes-app-dashboards--insight-legend-legacy--light.png index c067fa7a5cd00..242a806185f1f 100644 Binary files a/frontend/__snapshots__/scenes-app-dashboards--insight-legend-legacy--light.png and b/frontend/__snapshots__/scenes-app-dashboards--insight-legend-legacy--light.png differ diff --git a/frontend/src/lib/api.ts b/frontend/src/lib/api.ts index f1cebb00ab68d..648ea4aba4478 100644 --- a/frontend/src/lib/api.ts +++ b/frontend/src/lib/api.ts @@ -13,6 +13,9 @@ import { ActionType, ActivityScope, AlertType, + AppMetricsTotalsV2Response, + AppMetricsV2RequestParams, + AppMetricsV2Response, BatchExportConfiguration, BatchExportRun, CohortType, @@ -1626,6 +1629,18 @@ const api = { ): Promise> { return await new ApiRequest().hogFunction(id).withAction('logs').withQueryString(params).get() }, + async metrics( + id: HogFunctionType['id'], + params: AppMetricsV2RequestParams = {} + ): Promise { + return await new ApiRequest().hogFunction(id).withAction('metrics').withQueryString(params).get() + }, + async metricsTotals( + id: HogFunctionType['id'], + params: Partial = {} + ): Promise { + return await new ApiRequest().hogFunction(id).withAction('metrics/totals').withQueryString(params).get() + }, async listTemplates(): Promise> { return await new ApiRequest().hogFunctionTemplates().get() diff --git a/frontend/src/scenes/pipeline/AppMetricSparkLine.tsx b/frontend/src/scenes/pipeline/AppMetricSparkLine.tsx index e4ef8770e7922..ed14e30ec0fdd 100644 --- a/frontend/src/scenes/pipeline/AppMetricSparkLine.tsx +++ b/frontend/src/scenes/pipeline/AppMetricSparkLine.tsx @@ -1,7 +1,9 @@ -import { useValues } from 'kea' +import { useActions, useValues } from 'kea' import { Sparkline, SparklineTimeSeries } from 'lib/components/Sparkline' +import { useEffect } from 'react' import { pipelineNodeMetricsLogic } from './pipelineNodeMetricsLogic' +import { pipelineNodeMetricsV2Logic } from './pipelineNodeMetricsV2Logic' import { PipelineBackend, PipelineNode } from './types' export function AppMetricSparkLine({ pipelineNode }: { pipelineNode: PipelineNode }): JSX.Element { @@ -34,3 +36,28 @@ export function AppMetricSparkLine({ pipelineNode }: { pipelineNode: PipelineNod } return } + +export function AppMetricSparkLineV2({ pipelineNode }: { pipelineNode: PipelineNode }): JSX.Element { + const logic = pipelineNodeMetricsV2Logic({ id: `${pipelineNode.id}`.replace('hog-', '') }) + const { appMetrics, appMetricsLoading } = useValues(logic) + const { loadMetrics } = useActions(logic) + + useEffect(() => { + loadMetrics() + }, []) + + const displayData: SparklineTimeSeries[] = [ + { + color: 'success', + name: 'Success', + values: appMetrics?.series.find((s) => s.name === 'succeeded')?.values || [], + }, + { + color: 'danger', + name: 'Failures', + values: appMetrics?.series.find((s) => s.name === 'failed')?.values || [], + }, + ] + + return +} diff --git a/frontend/src/scenes/pipeline/PipelineNode.tsx b/frontend/src/scenes/pipeline/PipelineNode.tsx index 1dcfb5ca63fd5..ea982b3826889 100644 --- a/frontend/src/scenes/pipeline/PipelineNode.tsx +++ b/frontend/src/scenes/pipeline/PipelineNode.tsx @@ -14,6 +14,7 @@ import { BatchExportRuns } from './BatchExportRuns' import { PipelineNodeConfiguration } from './PipelineNodeConfiguration' import { pipelineNodeLogic, PipelineNodeLogicProps } from './pipelineNodeLogic' import { PipelineNodeMetrics } from './PipelineNodeMetrics' +import { PipelineNodeMetricsV2 } from './PipelineNodeMetricsV2' import { PipelineBackend } from './types' export const PIPELINE_TAB_TO_NODE_STAGE: Partial> = { @@ -58,10 +59,8 @@ export function PipelineNode(params: { stage?: string; id?: string } = {}): JSX. [PipelineNodeTab.Configuration]: , } - if ([PipelineBackend.Plugin, PipelineBackend.BatchExport].includes(node.backend)) { - tabToContent[PipelineNodeTab.Metrics] = - } - + tabToContent[PipelineNodeTab.Metrics] = + node.backend === PipelineBackend.HogFunction ? : tabToContent[PipelineNodeTab.Logs] = if (node.backend === PipelineBackend.BatchExport) { diff --git a/frontend/src/scenes/pipeline/PipelineNodeMetricsV2.tsx b/frontend/src/scenes/pipeline/PipelineNodeMetricsV2.tsx new file mode 100644 index 0000000000000..7d16f4ea74329 --- /dev/null +++ b/frontend/src/scenes/pipeline/PipelineNodeMetricsV2.tsx @@ -0,0 +1,251 @@ +import { IconCalendar } from '@posthog/icons' +import { LemonSelect, LemonSkeleton, Popover, SpinnerOverlay, Tooltip } from '@posthog/lemon-ui' +import { BindLogic, useActions, useValues } from 'kea' +import { Chart, ChartDataset, ChartItem } from 'lib/Chart' +import { getColorVar } from 'lib/colors' +import { DateFilter } from 'lib/components/DateFilter/DateFilter' +import { humanFriendlyNumber, inStorybookTestRunner } from 'lib/utils' +import { useEffect, useRef, useState } from 'react' +import { InsightTooltip } from 'scenes/insights/InsightTooltip/InsightTooltip' + +import { pipelineNodeLogic } from './pipelineNodeLogic' +import { pipelineNodeMetricsV2Logic } from './pipelineNodeMetricsV2Logic' +import { PipelineBackend } from './types' + +const METRICS_INFO = { + succeeded: 'Total number of events processed successfully', + failed: 'Total number of events that had errors during processing', + filtered: 'Total number of events that were filtered out', + disabled_temporarily: + 'Total number of events that were skipped due to the destination being temporarily disabled (due to issues such as the destination being down or rate-limited)', + disabled_permanently: + 'Total number of events that were skipped due to the destination being permanently disabled (due to prolonged issues with the destination)', +} + +export function PipelineNodeMetricsV2(): JSX.Element { + const { node } = useValues(pipelineNodeLogic) + + const logic = pipelineNodeMetricsV2Logic({ id: `${node.id}` }) + + const { filters } = useValues(logic) + const { setFilters, loadMetrics, loadMetricsTotals } = useActions(logic) + + useEffect(() => { + loadMetrics() + loadMetricsTotals() + }, []) + + if (node.backend !== PipelineBackend.HogFunction) { + return
Metrics not available for this node
+ } + + return ( + +
+ + +
+

Delivery trends

+
+ setFilters({ interval: value })} + /> + setFilters({ after: from || undefined, before: to || undefined })} + allowedRollingDateOptions={['days', 'weeks', 'months', 'years']} + makeLabel={(key) => ( + <> + {key} + + )} + /> +
+ + +
+ + ) +} + +function AppMetricBigNumber({ + label, + value, + tooltip, +}: { + label: string + value: number | undefined + tooltip: JSX.Element | string +}): JSX.Element { + return ( + +
+
{label.replace(/_/g, ' ')}
+
{humanFriendlyNumber(value ?? 0)}
+
+
+ ) +} + +function AppMetricsTotals(): JSX.Element { + const { appMetricsTotals, appMetricsTotalsLoading } = useValues(pipelineNodeMetricsV2Logic) + + return ( +
+
+ {Object.entries(METRICS_INFO).map(([key, value]) => ( +
+ {appMetricsTotalsLoading ? ( + + ) : ( + + )} +
+ ))} +
+
+ ) +} + +function AppMetricsGraph(): JSX.Element { + const { appMetrics, appMetricsLoading } = useValues(pipelineNodeMetricsV2Logic) + const canvasRef = useRef(null) + const [popoverContent, setPopoverContent] = useState(null) + const [tooltipState, setTooltipState] = useState({ x: 0, y: 0, visible: false }) + + useEffect(() => { + let chart: Chart + if (canvasRef.current && appMetrics && !inStorybookTestRunner()) { + chart = new Chart(canvasRef.current?.getContext('2d') as ChartItem, { + type: 'line', + data: { + labels: appMetrics.labels, + datasets: [ + ...appMetrics.series.map((series) => ({ + label: series.name, + data: series.values, + borderColor: '', + ...colorConfig(series.name), + })), + ], + }, + options: { + scales: { + x: { + ticks: { + maxRotation: 0, + }, + grid: { + display: false, + }, + }, + y: { + beginAtZero: true, + }, + }, + plugins: { + // @ts-expect-error Types of library are out of date + crosshair: false, + legend: { + display: false, + }, + tooltip: { + enabled: false, // Using external tooltip + external({ tooltip, chart }) { + setPopoverContent( + ({ + id: i, + dataIndex: 0, + datasetIndex: 0, + label: dp.dataset.label, + color: dp.dataset.borderColor as string, + count: (dp.dataset.data?.[dp.dataIndex] as number) || 0, + }))} + renderSeries={(value) => value} + renderCount={(count) => humanFriendlyNumber(count)} + /> + ) + + const position = chart.canvas.getBoundingClientRect() + setTooltipState({ + x: position.left + tooltip.caretX, + y: position.top + tooltip.caretY, + visible: tooltip.opacity > 0, + }) + }, + }, + }, + maintainAspectRatio: false, + interaction: { + mode: 'index', + axis: 'x', + intersect: false, + }, + }, + }) + + return () => { + chart?.destroy() + } + } + }, [appMetrics]) + + return ( +
+ {appMetricsLoading && } + {!!appMetrics && } + +
+ +
+ ) +} + +function colorConfig(name: string): Partial> { + let color = '' + + switch (name) { + case 'succeeded': + color = getColorVar('success') + break + case 'failed': + color = getColorVar('danger') + break + default: + color = getColorVar('data-color-1') + break + } + + return { + borderColor: color, + hoverBorderColor: color, + hoverBackgroundColor: color, + backgroundColor: color, + fill: false, + borderWidth: 2, + pointRadius: 0, + } +} diff --git a/frontend/src/scenes/pipeline/destinations/Destinations.tsx b/frontend/src/scenes/pipeline/destinations/Destinations.tsx index 8b2404b168ae6..0dc46d76e3d2c 100644 --- a/frontend/src/scenes/pipeline/destinations/Destinations.tsx +++ b/frontend/src/scenes/pipeline/destinations/Destinations.tsx @@ -21,7 +21,7 @@ import { urls } from 'scenes/urls' import { AvailableFeature, PipelineNodeTab, PipelineStage, ProductKey } from '~/types' -import { AppMetricSparkLine } from '../AppMetricSparkLine' +import { AppMetricSparkLine, AppMetricSparkLineV2 } from '../AppMetricSparkLine' import { HogFunctionIcon } from '../hogfunctions/HogFunctionIcon' import { NewButton } from '../NewButton' import { pipelineAccessLogic } from '../pipelineAccessLogic' @@ -161,7 +161,11 @@ export function DestinationsTable(props: PipelineDestinationsLogicProps): JSX.El PipelineNodeTab.Metrics )} > - + {destination.backend === PipelineBackend.HogFunction ? ( + + ) : ( + + )} ) }, diff --git a/frontend/src/scenes/pipeline/pipelineNodeLogic.tsx b/frontend/src/scenes/pipeline/pipelineNodeLogic.tsx index 67a753a428f59..a96681205f778 100644 --- a/frontend/src/scenes/pipeline/pipelineNodeLogic.tsx +++ b/frontend/src/scenes/pipeline/pipelineNodeLogic.tsx @@ -107,8 +107,8 @@ export const pipelineNodeLogic = kea([ setCurrentTab: () => [urls.pipelineNode(props.stage as PipelineStage, props.id, values.currentTab)], } }), - urlToAction(({ actions, values }) => ({ - '/pipeline/:stage/:id/:nodeTab': ({ nodeTab }) => { + urlToAction(({ props, actions, values }) => ({ + [urls.pipelineNode(props.stage as PipelineStage, props.id, ':nodeTab')]: ({ nodeTab }) => { if (nodeTab !== values.currentTab && Object.values(PipelineNodeTab).includes(nodeTab as PipelineNodeTab)) { actions.setCurrentTab(nodeTab as PipelineNodeTab) } diff --git a/frontend/src/scenes/pipeline/pipelineNodeMetricsV2Logic.tsx b/frontend/src/scenes/pipeline/pipelineNodeMetricsV2Logic.tsx new file mode 100644 index 0000000000000..2564a6562ed12 --- /dev/null +++ b/frontend/src/scenes/pipeline/pipelineNodeMetricsV2Logic.tsx @@ -0,0 +1,68 @@ +import { actions, kea, key, listeners, path, props, reducers } from 'kea' +import { loaders } from 'kea-loaders' +import api from 'lib/api' + +import { AppMetricsTotalsV2Response, AppMetricsV2RequestParams, AppMetricsV2Response } from '~/types' + +import type { pipelineNodeMetricsV2LogicType } from './pipelineNodeMetricsV2LogicType' + +export type PipelineNodeMetricsProps = { + id: string +} + +export type MetricsFilters = Pick + +const DEFAULT_FILTERS: MetricsFilters = { + before: undefined, + after: '-7d', + interval: 'day', +} + +export const pipelineNodeMetricsV2Logic = kea([ + props({} as PipelineNodeMetricsProps), + key(({ id }: PipelineNodeMetricsProps) => id), + path((id) => ['scenes', 'pipeline', 'appMetricsLogic', id]), + actions({ + setFilters: (filters: Partial) => ({ filters }), + }), + loaders(({ values, props }) => ({ + appMetrics: [ + null as AppMetricsV2Response | null, + { + loadMetrics: async () => { + const params: AppMetricsV2RequestParams = { + ...values.filters, + breakdown_by: 'name', + } + return await api.hogFunctions.metrics(props.id, params) + }, + }, + ], + + appMetricsTotals: [ + null as AppMetricsTotalsV2Response | null, + { + loadMetricsTotals: async () => { + const params: AppMetricsV2RequestParams = { + breakdown_by: 'name', + } + return await api.hogFunctions.metricsTotals(props.id, params) + }, + }, + ], + })), + reducers({ + filters: [ + DEFAULT_FILTERS, + { + setFilters: (state, { filters }) => ({ ...state, ...filters }), + }, + ], + }), + listeners(({ actions }) => ({ + setFilters: async (_, breakpoint) => { + await breakpoint(100) + actions.loadMetrics() + }, + })), +]) diff --git a/frontend/src/types.ts b/frontend/src/types.ts index 2190458af61ed..ceede7bd84355 100644 --- a/frontend/src/types.ts +++ b/frontend/src/types.ts @@ -4374,6 +4374,28 @@ export interface AlertType { anomaly_condition: AnomalyCondition } +export type AppMetricsV2Response = { + labels: string[] + series: { + name: string + values: number[] + }[] +} + +export type AppMetricsTotalsV2Response = { + totals: Record +} + +export type AppMetricsV2RequestParams = { + after?: string + before?: string + // Comma separated list of log levels + name?: string + kind?: string + interval?: 'hour' | 'day' | 'week' + breakdown_by?: 'name' | 'kind' +} + export enum DataWarehouseTab { Explore = 'explore', ManagedSources = 'managed-sources', diff --git a/plugin-server/src/cdp/cdp-consumers.ts b/plugin-server/src/cdp/cdp-consumers.ts index 1d58275228eee..8463946bbf672 100644 --- a/plugin-server/src/cdp/cdp-consumers.ts +++ b/plugin-server/src/cdp/cdp-consumers.ts @@ -2,6 +2,7 @@ import { features, librdkafkaVersion, Message } from 'node-rdkafka' import { Counter, Histogram } from 'prom-client' import { + KAFKA_APP_METRICS_2, KAFKA_CDP_FUNCTION_CALLBACKS, KAFKA_CDP_FUNCTION_OVERFLOW, KAFKA_EVENTS_JSON, @@ -13,7 +14,7 @@ import { createRdConnectionConfigFromEnvVars, createRdProducerConfigFromEnvVars import { createKafkaProducer } from '../kafka/producer' import { addSentryBreadcrumbsEventListeners } from '../main/ingestion-queues/kafka-metrics' import { runInstrumentedFunction } from '../main/utils' -import { GroupTypeToColumnIndex, Hub, RawClickHouseEvent, TeamId, TimestampFormat } from '../types' +import { AppMetric2Type, GroupTypeToColumnIndex, Hub, RawClickHouseEvent, TeamId, TimestampFormat } from '../types' import { KafkaProducerWrapper } from '../utils/db/kafka-producer-wrapper' import { status } from '../utils/status' import { castTimestampOrNow } from '../utils/utils' @@ -151,6 +152,24 @@ abstract class CdpConsumerBase { ) } + protected logAppMetrics( + metric: Pick + ) { + const appMetric: AppMetric2Type = { + app_source: 'hog_function', + ...metric, + timestamp: castTimestampOrNow(null, TimestampFormat.ClickHouse), + } + + this.messagesToProduce.push({ + topic: KAFKA_APP_METRICS_2, + value: appMetric, + key: appMetric.app_source_id, + }) + + counterFunctionInvocation.inc({ outcome: appMetric.metric_name }, appMetric.count) + } + protected async processInvocationResults(results: HogFunctionInvocationResult[]): Promise { await runInstrumentedFunction({ statsKey: `cdpConsumer.handleEachBatch.produceResults`, @@ -161,8 +180,12 @@ abstract class CdpConsumerBase { const logs = result.logs result.logs = [] - counterFunctionInvocation.inc({ - outcome: result.error ? 'failed' : 'succeeded', + this.logAppMetrics({ + team_id: result.teamId, + app_source_id: result.hogFunctionId, + metric_kind: result.error ? 'failure' : 'success', + metric_name: result.error ? 'failed' : 'succeeded', + count: 1, }) logs.forEach((x) => { @@ -242,10 +265,19 @@ abstract class CdpConsumerBase { }, key: item.id, }) + // We don't report overflowed metric to appmetrics as it is sort of a meta-metric counterFunctionInvocation.inc({ outcome: 'overflowed' }) } else if (functionState > HogWatcherState.disabledForPeriod) { - // TODO: Report to AppMetrics 2 when it is ready - counterFunctionInvocation.inc({ outcome: 'disabled' }) + this.logAppMetrics({ + team_id: item.teamId, + app_source_id: item.hogFunctionId, + metric_kind: 'failure', + metric_name: + functionState === HogWatcherState.disabledForPeriod + ? 'disabled_temporarily' + : 'disabled_permanently', + count: 1, + }) continue } else { asyncResponsesToRun.push(item) @@ -271,34 +303,32 @@ abstract class CdpConsumerBase { const invocations: { globals: HogFunctionInvocationGlobals; hogFunction: HogFunctionType }[] = [] invocationGlobals.forEach((globals) => { - const { functions, total, matching } = this.hogExecutor.findMatchingFunctions(globals) - - counterFunctionInvocation.inc({ outcome: 'filtered' }, total - matching) + const { matchingFunctions, nonMatchingFunctions } = this.hogExecutor.findMatchingFunctions(globals) + + nonMatchingFunctions.forEach((item) => + this.logAppMetrics({ + team_id: item.team_id, + app_source_id: item.id, + metric_kind: 'other', + metric_name: 'filtered', + count: 1, + }) + ) // Filter for overflowed and disabled functions - const [healthy, overflowed, disabled] = functions.reduce( - (acc, item) => { - const state = this.hogWatcher.getFunctionState(item.id) - if (state >= HogWatcherState.disabledForPeriod) { - acc[2].push(item) - } else if (state >= HogWatcherState.overflowed) { - acc[1].push(item) - } else { - acc[0].push(item) - } - - return acc - }, - [[], [], []] as [HogFunctionType[], HogFunctionType[], HogFunctionType[]] - ) + const hogFunctionsByState = matchingFunctions.reduce((acc, item) => { + const state = this.hogWatcher.getFunctionState(item.id) + return { + ...acc, + [state]: [...(acc[state] ?? []), item], + } + return acc + }, {} as Record) - if (overflowed.length) { + if (hogFunctionsByState[HogWatcherState.overflowed]?.length) { + const overflowed = hogFunctionsByState[HogWatcherState.overflowed]! // Group all overflowed functions into one event counterFunctionInvocation.inc({ outcome: 'overflowed' }, overflowed.length) - // TODO: Report to AppMetrics 2 when it is ready - status.debug('🔁', `Oveflowing functions`, { - count: overflowed.length, - }) this.messagesToProduce.push({ topic: KAFKA_CDP_FUNCTION_OVERFLOW, @@ -313,18 +343,30 @@ abstract class CdpConsumerBase { }) } - if (disabled.length) { - counterFunctionInvocation.inc({ outcome: 'disabled' }, disabled.length) - // TODO: Report to AppMetrics 2 when it is ready - status.debug('🔁', `Disabled functions skipped`, { - count: disabled.length, + hogFunctionsByState[HogWatcherState.disabledForPeriod]?.forEach((item) => { + this.logAppMetrics({ + team_id: item.team_id, + app_source_id: item.id, + metric_kind: 'failure', + metric_name: 'disabled_temporarily', + count: 1, }) - } + }) - healthy.forEach((x) => { + hogFunctionsByState[HogWatcherState.disabledIndefinitely]?.forEach((item) => { + this.logAppMetrics({ + team_id: item.team_id, + app_source_id: item.id, + metric_kind: 'failure', + metric_name: 'disabled_permanently', + count: 1, + }) + }) + + hogFunctionsByState[HogWatcherState.healthy]?.forEach((item) => { invocations.push({ globals, - hogFunction: x, + hogFunction: item, }) }) }) @@ -583,7 +625,16 @@ export class CdpOverflowConsumer extends CdpConsumerBase { await this.runManyWithHeartbeat(invocations, (item) => { const state = this.hogWatcher.getFunctionState(item.hogFunctionId) if (state >= HogWatcherState.disabledForPeriod) { - counterFunctionInvocation.inc({ outcome: 'disabled' }) + this.logAppMetrics({ + team_id: item.globals.project.id, + app_source_id: item.hogFunctionId, + metric_kind: 'failure', + metric_name: + state === HogWatcherState.disabledForPeriod + ? 'disabled_temporarily' + : 'disabled_permanently', + count: 1, + }) return } return this.hogExecutor.executeFunction(item.globals, item.hogFunctionId) diff --git a/plugin-server/src/cdp/hog-executor.ts b/plugin-server/src/cdp/hog-executor.ts index 9bee644c3d80d..7eee502b6698e 100644 --- a/plugin-server/src/cdp/hog-executor.ts +++ b/plugin-server/src/cdp/hog-executor.ts @@ -96,36 +96,30 @@ export class HogExecutor { constructor(private hogFunctionManager: HogFunctionManager) {} findMatchingFunctions(event: HogFunctionInvocationGlobals): { - total: number - matching: number - functions: HogFunctionType[] + matchingFunctions: HogFunctionType[] + nonMatchingFunctions: HogFunctionType[] } { const allFunctionsForTeam = this.hogFunctionManager.getTeamHogFunctions(event.project.id) const filtersGlobals = convertToHogFunctionFilterGlobal(event) + const nonMatchingFunctions: HogFunctionType[] = [] + const matchingFunctions: HogFunctionType[] = [] + // Filter all functions based on the invocation - const functions = allFunctionsForTeam.filter((hogFunction) => { + allFunctionsForTeam.forEach((hogFunction) => { try { - const filters = hogFunction.filters - - if (!filters?.bytecode) { - // NOTE: If we don't have bytecode this indicates something went wrong. - // The model will always save a bytecode if it was compiled correctly - return false - } - - const filterResult = exec(filters.bytecode, { - globals: filtersGlobals, - timeout: DEFAULT_TIMEOUT_MS, - maxAsyncSteps: 0, - }) + if (hogFunction.filters?.bytecode) { + const filterResult = exec(hogFunction.filters.bytecode, { + globals: filtersGlobals, + timeout: DEFAULT_TIMEOUT_MS, + maxAsyncSteps: 0, + }) - if (typeof filterResult.result !== 'boolean') { - // NOTE: If the result is not a boolean we should not execute the function - return false + if (typeof filterResult.result === 'boolean' && filterResult.result) { + matchingFunctions.push(hogFunction) + return + } } - - return filterResult.result } catch (error) { status.error('🦔', `[HogExecutor] Error filtering function`, { hogFunctionId: hogFunction.id, @@ -134,20 +128,19 @@ export class HogExecutor { }) } - return false + nonMatchingFunctions.push(hogFunction) }) status.debug( '🦔', - `[HogExecutor] Found ${Object.keys(functions).length} matching functions out of ${ + `[HogExecutor] Found ${Object.keys(matchingFunctions).length} matching functions out of ${ Object.keys(allFunctionsForTeam).length } for team` ) return { - total: allFunctionsForTeam.length, - matching: functions.length, - functions, + nonMatchingFunctions, + matchingFunctions, } } diff --git a/plugin-server/src/cdp/types.ts b/plugin-server/src/cdp/types.ts index 5b3e75fc028a1..ef33a540df134 100644 --- a/plugin-server/src/cdp/types.ts +++ b/plugin-server/src/cdp/types.ts @@ -1,7 +1,13 @@ import { VMState } from '@posthog/hogvm' import { DateTime } from 'luxon' -import { ClickHouseTimestamp, ElementPropertyFilter, EventPropertyFilter, PersonPropertyFilter } from '../types' +import { + AppMetric2Type, + ClickHouseTimestamp, + ElementPropertyFilter, + EventPropertyFilter, + PersonPropertyFilter, +} from '../types' export type HogBytecode = any[] @@ -247,7 +253,7 @@ export type CdpOverflowMessage = CdpOverflowMessageInvocations | CdpOverflowMess export type HogFunctionMessageToProduce = { topic: string - value: CdpOverflowMessage | HogFunctionLogEntrySerialized | HogFunctionInvocationAsyncResponse + value: CdpOverflowMessage | HogFunctionLogEntrySerialized | HogFunctionInvocationAsyncResponse | AppMetric2Type key: string } diff --git a/plugin-server/src/config/kafka-topics.ts b/plugin-server/src/config/kafka-topics.ts index 2f095e117f7ba..5ff55c487524a 100644 --- a/plugin-server/src/config/kafka-topics.ts +++ b/plugin-server/src/config/kafka-topics.ts @@ -20,6 +20,7 @@ export const KAFKA_GROUPS = `${prefix}clickhouse_groups${suffix}` export const KAFKA_BUFFER = `${prefix}conversion_events_buffer${suffix}` export const KAFKA_INGESTION_WARNINGS = `${prefix}clickhouse_ingestion_warnings${suffix}` export const KAFKA_APP_METRICS = `${prefix}clickhouse_app_metrics${suffix}` +export const KAFKA_APP_METRICS_2 = `${prefix}clickhouse_app_metrics2${suffix}` export const KAFKA_JOBS = `${prefix}jobs${suffix}` export const KAFKA_JOBS_DLQ = `${prefix}jobs_dlq${suffix}` export const KAFKA_SCHEDULED_TASKS = `${prefix}scheduled_tasks${suffix}` diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts index e73945c6f1f23..c4df28fa9e798 100644 --- a/plugin-server/src/types.ts +++ b/plugin-server/src/types.ts @@ -1224,3 +1224,14 @@ export interface HookPayload { } } } + +export type AppMetric2Type = { + team_id: number + timestamp: ClickHouseTimestamp + app_source: string + app_source_id: string + instance_id?: string + metric_kind: 'failure' | 'success' | 'other' + metric_name: 'succeeded' | 'failed' | 'filtered' | 'disabled_temporarily' | 'disabled_permanently' + count: number +} diff --git a/plugin-server/tests/cdp/cdp-processed-events-consumer.test.ts b/plugin-server/tests/cdp/cdp-processed-events-consumer.test.ts index 0050446c026a9..a5710a620cd9d 100644 --- a/plugin-server/tests/cdp/cdp-processed-events-consumer.test.ts +++ b/plugin-server/tests/cdp/cdp-processed-events-consumer.test.ts @@ -148,8 +148,8 @@ describe('CDP Processed Events Consuner', () => { `) }) - it('generates logs and produces them to kafka', async () => { - await insertHogFunction({ + it('generates logs and metrics and produces them to kafka', async () => { + const hogFunction = await insertHogFunction({ ...HOG_EXAMPLES.simple_fetch, ...HOG_INPUTS_EXAMPLES.simple_fetch, ...HOG_FILTERS_EXAMPLES.no_filters, @@ -173,10 +173,25 @@ describe('CDP Processed Events Consuner', () => { ) expect(mockFetch).toHaveBeenCalledTimes(1) - // Once for the async callback, twice for the logs - expect(mockProducer.produce).toHaveBeenCalledTimes(3) + // Once for the async callback, twice for the logs, once for metrics + expect(mockProducer.produce).toHaveBeenCalledTimes(4) + + expect(decodeKafkaMessage(mockProducer.produce.mock.calls[0][0])).toEqual({ + key: expect.any(String), + topic: 'clickhouse_app_metrics2_test', + value: { + app_source: 'hog_function', + team_id: 2, + app_source_id: hogFunction.id, + metric_kind: 'success', + metric_name: 'succeeded', + count: 1, + timestamp: expect.any(String), + }, + waitForAck: true, + }) - expect(decodeKafkaMessage(mockProducer.produce.mock.calls[0][0])).toMatchObject({ + expect(decodeKafkaMessage(mockProducer.produce.mock.calls[1][0])).toEqual({ key: expect.any(String), topic: 'log_entries_test', value: { @@ -188,10 +203,11 @@ describe('CDP Processed Events Consuner', () => { team_id: 2, timestamp: expect.any(String), }, + waitForAck: true, }) - expect(decodeKafkaMessage(mockProducer.produce.mock.calls[1][0])).toMatchObject({ + expect(decodeKafkaMessage(mockProducer.produce.mock.calls[2][0])).toMatchObject({ topic: 'log_entries_test', value: { log_source: 'hog_function', @@ -200,7 +216,7 @@ describe('CDP Processed Events Consuner', () => { }, }) - const msg = decodeKafkaMessage(mockProducer.produce.mock.calls[2][0]) + const msg = decodeKafkaMessage(mockProducer.produce.mock.calls[3][0]) // Parse body so it can match by object equality rather than exact string equality msg.value.asyncFunctionRequest.args[1].body = JSON.parse(msg.value.asyncFunctionRequest.args[1].body) expect(msg).toEqual({ diff --git a/plugin-server/tests/cdp/hog-executor.test.ts b/plugin-server/tests/cdp/hog-executor.test.ts index 4b8074a5ccc3c..d1cd558e92521 100644 --- a/plugin-server/tests/cdp/hog-executor.test.ts +++ b/plugin-server/tests/cdp/hog-executor.test.ts @@ -65,7 +65,7 @@ describe('Hog Executor', () => { const globals = createHogExecutionGlobals() const results = executor .findMatchingFunctions(createHogExecutionGlobals()) - .functions.map((x) => executor.executeFunction(globals, x) as HogFunctionInvocationResult) + .matchingFunctions.map((x) => executor.executeFunction(globals, x) as HogFunctionInvocationResult) expect(results).toHaveLength(1) expect(results[0]).toMatchObject({ id: expect.any(String), @@ -77,7 +77,7 @@ describe('Hog Executor', () => { const globals = createHogExecutionGlobals() const results = executor .findMatchingFunctions(createHogExecutionGlobals()) - .functions.map((x) => executor.executeFunction(globals, x) as HogFunctionInvocationResult) + .matchingFunctions.map((x) => executor.executeFunction(globals, x) as HogFunctionInvocationResult) expect(results[0].logs).toMatchObject([ { team_id: 1, @@ -134,7 +134,7 @@ describe('Hog Executor', () => { const globals = createHogExecutionGlobals() const results = executor .findMatchingFunctions(createHogExecutionGlobals()) - .functions.map((x) => executor.executeFunction(globals, x) as HogFunctionInvocationResult) + .matchingFunctions.map((x) => executor.executeFunction(globals, x) as HogFunctionInvocationResult) expect(results[0]).toMatchObject({ id: results[0].id, globals: { @@ -193,7 +193,7 @@ describe('Hog Executor', () => { const globals = createHogExecutionGlobals() const results = executor .findMatchingFunctions(createHogExecutionGlobals()) - .functions.map((x) => executor.executeFunction(globals, x) as HogFunctionInvocationResult) + .matchingFunctions.map((x) => executor.executeFunction(globals, x) as HogFunctionInvocationResult) const splicedLogs = results[0].logs.splice(0, 100) logs.push(...splicedLogs) @@ -223,9 +223,8 @@ describe('Hog Executor', () => { mockFunctionManager.getTeamHogFunctions.mockReturnValue([fn]) const resultsShouldntMatch = executor.findMatchingFunctions(createHogExecutionGlobals()) - expect(resultsShouldntMatch.functions).toHaveLength(0) - expect(resultsShouldntMatch.total).toBe(1) - expect(resultsShouldntMatch.matching).toBe(0) + expect(resultsShouldntMatch.matchingFunctions).toHaveLength(0) + expect(resultsShouldntMatch.nonMatchingFunctions).toHaveLength(1) const resultsShouldMatch = executor.findMatchingFunctions( createHogExecutionGlobals({ @@ -237,9 +236,8 @@ describe('Hog Executor', () => { } as any, }) ) - expect(resultsShouldMatch.functions).toHaveLength(1) - expect(resultsShouldMatch.total).toBe(1) - expect(resultsShouldMatch.matching).toBe(1) + expect(resultsShouldMatch.matchingFunctions).toHaveLength(1) + expect(resultsShouldMatch.nonMatchingFunctions).toHaveLength(0) }) }) @@ -257,7 +255,7 @@ describe('Hog Executor', () => { const globals = createHogExecutionGlobals() const results = executor .findMatchingFunctions(createHogExecutionGlobals()) - .functions.map((x) => executor.executeFunction(globals, x) as HogFunctionInvocationResult) + .matchingFunctions.map((x) => executor.executeFunction(globals, x) as HogFunctionInvocationResult) expect(results).toHaveLength(1) // Run the result one time simulating a successful fetch @@ -295,7 +293,7 @@ describe('Hog Executor', () => { const globals = createHogExecutionGlobals() const results = executor .findMatchingFunctions(createHogExecutionGlobals()) - .functions.map((x) => executor.executeFunction(globals, x) as HogFunctionInvocationResult) + .matchingFunctions.map((x) => executor.executeFunction(globals, x) as HogFunctionInvocationResult) expect(results).toHaveLength(1) expect(results[0].error).toContain('Execution timed out after 0.1 seconds. Performed ') diff --git a/posthog/api/app_metrics2.py b/posthog/api/app_metrics2.py new file mode 100644 index 0000000000000..a6fc2c6daef54 --- /dev/null +++ b/posthog/api/app_metrics2.py @@ -0,0 +1,290 @@ +from dataclasses import dataclass +from datetime import datetime, timedelta +from typing import Any, Optional, cast +from rest_framework import serializers, viewsets +from rest_framework.request import Request +from rest_framework.response import Response +from rest_framework.decorators import action +from rest_framework.exceptions import ValidationError +from rest_framework_dataclasses.serializers import DataclassSerializer + +from posthog.clickhouse.client.execute import sync_execute +from posthog.models.team.team import Team +from posthog.utils import relative_date_parse_with_delta_mapping + + +@dataclass +class AppMetricSeries: + name: str + values: list[int] + + +@dataclass +class AppMetricsResponse: + labels: list[str] + series: list[AppMetricSeries] + + +class AppMetricResponseSerializer(DataclassSerializer): + class Meta: + dataclass = AppMetricsResponse + + +@dataclass +class AppMetricsTotalsResponse: + totals: dict[str, int] + + +class AppMetricsTotalsResponseSerializer(DataclassSerializer): + class Meta: + dataclass = AppMetricsTotalsResponse + + +class AppMetricsRequestSerializer(serializers.Serializer): + after = serializers.CharField(required=False, default="-7d") + before = serializers.CharField(required=False) + instance_id = serializers.CharField(required=False) + interval = serializers.ChoiceField(choices=["hour", "day", "week"], required=False, default="day") + name = serializers.CharField(required=False) + kind = serializers.CharField(required=False) + breakdown_by = serializers.ChoiceField(choices=["name", "kind"], required=False, default="kind") + + +def fetch_app_metrics_trends( + team_id: int, + app_source: str, + app_source_id: str, + after: datetime, + before: datetime, + breakdown_by: str = "kind", + interval: str = "day", + instance_id: Optional[str] = None, + name: Optional[list[str]] = None, + kind: Optional[list[str]] = None, +) -> AppMetricsResponse: + """Fetch a list of batch export log entries from ClickHouse.""" + + name = name or [] + kind = kind or [] + + clickhouse_kwargs: dict[str, Any] = {} + + clickhouse_query = f""" + SELECT + toStartOfInterval(timestamp, INTERVAL 1 {interval}) as timestamp, + metric_{breakdown_by} as breakdown, + count(breakdown) as count + FROM app_metrics2 + WHERE team_id = %(team_id)s + AND app_source = %(app_source)s + AND app_source_id = %(app_source_id)s + AND timestamp >= toDateTime64(%(after)s, 6) + AND timestamp <= toDateTime64(%(before)s, 6) + {'AND instance_id = %(instance_id)s' if instance_id else ''} + {'AND metric_name IN %(name)s' if name else ''} + {'AND metric_kind IN %(kind)s' if kind else ''} + GROUP BY timestamp, breakdown + ORDER BY timestamp ASC + """ + + clickhouse_kwargs["team_id"] = team_id + clickhouse_kwargs["app_source"] = app_source + clickhouse_kwargs["app_source_id"] = app_source_id + clickhouse_kwargs["after"] = after.strftime("%Y-%m-%dT%H:%M:%S") + clickhouse_kwargs["before"] = before.strftime("%Y-%m-%dT%H:%M:%S") + clickhouse_kwargs["instance_id"] = instance_id + clickhouse_kwargs["name"] = name + clickhouse_kwargs["kind"] = kind + clickhouse_kwargs["interval"] = interval.upper() + + results = sync_execute(clickhouse_query, clickhouse_kwargs) + + if not isinstance(results, list): + raise ValueError("Unexpected results from ClickHouse") + + # We create the x values based on the date range and interval + labels: list[str] = [] + label_format = "%Y-%m-%dT%H:%M" if interval == "hour" else "%Y-%m-%d" + + range_date = after + # Normalize the start of the range to the start of the interval + if interval == "hour": + range_date = range_date.replace(minute=0, second=0, microsecond=0) + elif interval == "day": + range_date = range_date.replace(hour=0, minute=0, second=0, microsecond=0) + elif interval == "week": + range_date = range_date.replace(hour=0, minute=0, second=0, microsecond=0) + range_date -= timedelta(days=range_date.weekday()) + + while range_date <= before: + labels.append(range_date.strftime(label_format)) + if interval == "hour": + range_date += timedelta(hours=1) + elif interval == "day": + range_date += timedelta(days=1) + elif interval == "week": + range_date += timedelta(weeks=1) + + response = AppMetricsResponse(labels=[], series=[]) + data_by_breakdown: dict[str, dict[str, int]] = {} + + breakdown_names = {row[1] for row in results} + + for result in results: + timestamp, breakdown, count = result + if breakdown not in data_by_breakdown: + data_by_breakdown[breakdown] = {} + + data_by_breakdown[breakdown][timestamp.strftime(label_format)] = count + + # Now we can construct the response object + + response.labels = labels + + for breakdown in breakdown_names: + series = AppMetricSeries(name=breakdown, values=[]) + for x in labels: + series.values.append(data_by_breakdown.get(breakdown, {}).get(x, 0)) + response.series.append(series) + + return response + + +def fetch_app_metric_totals( + team_id: int, + app_source: str, + app_source_id: str, + breakdown_by: str = "kind", + after: Optional[datetime] = None, + before: Optional[datetime] = None, + instance_id: Optional[str] = None, + name: Optional[list[str]] = None, + kind: Optional[list[str]] = None, +) -> AppMetricsTotalsResponse: + """ + Calculate the totals for the app metrics over the given period. + """ + + name = name or [] + kind = kind or [] + + clickhouse_kwargs: dict[str, Any] = { + "team_id": team_id, + "app_source": app_source, + "app_source_id": app_source_id, + "after": after.strftime("%Y-%m-%dT%H:%M:%S") if after else None, + "before": before.strftime("%Y-%m-%dT%H:%M:%S") if before else None, + } + + clickhouse_query = f""" + SELECT + metric_{breakdown_by} as breakdown, + count(breakdown) as count + FROM app_metrics2 + WHERE team_id = %(team_id)s + AND app_source = %(app_source)s + AND app_source_id = %(app_source_id)s + {'AND timestamp >= toDateTime64(%(after)s, 6)' if after else ''} + {'AND timestamp <= toDateTime64(%(before)s, 6)' if before else ''} + {'AND instance_id = %(instance_id)s' if instance_id else ''} + {'AND metric_name IN %(name)s' if name else ''} + {'AND metric_kind IN %(kind)s' if kind else ''} + GROUP BY breakdown + """ + + results = sync_execute(clickhouse_query, clickhouse_kwargs) + + if not isinstance(results, list): + raise ValueError("Unexpected results from ClickHouse") + + totals = {row[0]: row[1] for row in results} + return AppMetricsTotalsResponse(totals=totals) + + +class AppMetricsMixin(viewsets.GenericViewSet): + app_source: str # Should be set by the inheriting class + + def get_app_metrics_instance_id(self) -> Optional[str]: + """ + Can be used overridden to help with getting the instance_id for the app metrics. + Otherwise it defaults to null or the query param if given + """ + raise NotImplementedError() + + @action(detail=True, methods=["GET"]) + def metrics(self, request: Request, *args, **kwargs): + obj = self.get_object() + param_serializer = AppMetricsRequestSerializer(data=request.query_params) + + if not self.app_source: + raise ValidationError("app_source not set on the viewset") + + if not param_serializer.is_valid(): + raise ValidationError(param_serializer.errors) + + params = param_serializer.validated_data + + try: + instance_id = self.get_app_metrics_instance_id() + except NotImplementedError: + instance_id = params.get("instance_id") + + team = cast(Team, self.team) # type: ignore + + after_date, _, _ = relative_date_parse_with_delta_mapping(params.get("after", "-7d"), team.timezone_info) + before_date, _, _ = relative_date_parse_with_delta_mapping(params.get("before", "-0d"), team.timezone_info) + + data = fetch_app_metrics_trends( + team_id=self.team_id, # type: ignore + app_source=self.app_source, + app_source_id=str(obj.id), + # From request params + instance_id=instance_id, + interval=params.get("interval", "day"), + after=after_date, + before=before_date, + breakdown_by=params.get("breakdown_by"), + name=params["name"].split(",") if params.get("name") else None, + kind=params["kind"].split(",") if params.get("kind") else None, + ) + + serializer = AppMetricResponseSerializer(instance=data) + return Response(serializer.data) + + @action(detail=True, methods=["GET"], url_path="metrics/totals") + def metrics_totals(self, request: Request, *args, **kwargs): + obj = self.get_object() + param_serializer = AppMetricsRequestSerializer(data=request.query_params) + + if not self.app_source: + raise ValidationError("app_source not set on the viewset") + + if not param_serializer.is_valid(): + raise ValidationError(param_serializer.errors) + + params = param_serializer.validated_data + team = cast(Team, self.team) # type: ignore + + after_date = None + before_date = None + + if params.get("after"): + after_date, _, _ = relative_date_parse_with_delta_mapping(params["after"], team.timezone_info) + + if params.get("before"): + before_date, _, _ = relative_date_parse_with_delta_mapping(params["before"], team.timezone_info) + + data = fetch_app_metric_totals( + team_id=self.team_id, # type: ignore + app_source=self.app_source, + app_source_id=str(obj.id), + # From request params + after=after_date, + before=before_date, + breakdown_by=params.get("breakdown_by"), + name=params["name"].split(",") if params.get("name") else None, + kind=params["kind"].split(",") if params.get("kind") else None, + ) + + serializer = AppMetricsTotalsResponseSerializer(instance=data) + return Response(serializer.data) diff --git a/posthog/api/hog_function.py b/posthog/api/hog_function.py index cb5db432e093c..972178a875401 100644 --- a/posthog/api/hog_function.py +++ b/posthog/api/hog_function.py @@ -9,6 +9,7 @@ from rest_framework.request import Request from rest_framework.response import Response +from posthog.api.app_metrics2 import AppMetricsMixin from posthog.api.forbid_destroy_model import ForbidDestroyModel from posthog.api.hog_function_template import HogFunctionTemplateSerializer from posthog.api.log_entries import LogEntryMixin @@ -192,7 +193,9 @@ class HogFunctionInvocationSerializer(serializers.Serializer): logs = serializers.ListField(read_only=True) -class HogFunctionViewSet(TeamAndOrgViewSetMixin, LogEntryMixin, ForbidDestroyModel, viewsets.ModelViewSet): +class HogFunctionViewSet( + TeamAndOrgViewSetMixin, LogEntryMixin, AppMetricsMixin, ForbidDestroyModel, viewsets.ModelViewSet +): scope_object = "INTERNAL" # Keep internal until we are happy to release this GA queryset = HogFunction.objects.all() filter_backends = [DjangoFilterBackend] @@ -201,6 +204,7 @@ class HogFunctionViewSet(TeamAndOrgViewSetMixin, LogEntryMixin, ForbidDestroyMod permission_classes = [PostHogFeatureFlagPermission] posthog_feature_flag = {"hog-functions": ["create", "partial_update", "update"]} log_source = "hog_function" + app_source = "hog_function" def get_serializer_class(self) -> type[BaseSerializer]: return HogFunctionMinimalSerializer if self.action == "list" else HogFunctionSerializer