diff --git a/frontend/src/scenes/pipeline/PipelineNodeConfiguration.tsx b/frontend/src/scenes/pipeline/PipelineNodeConfiguration.tsx index 926469a85ffd6..1202b30550df8 100644 --- a/frontend/src/scenes/pipeline/PipelineNodeConfiguration.tsx +++ b/frontend/src/scenes/pipeline/PipelineNodeConfiguration.tsx @@ -1,7 +1,7 @@ import { useValues } from 'kea' import { NotFound } from 'lib/components/NotFound' -import { PipelineHogFunctionConfiguration } from './hogfunctions/PipelineHogFunctionConfiguration' +import { HogFunctionConfiguration } from './hogfunctions/HogFunctionConfiguration' import { PipelineBatchExportConfiguration } from './PipelineBatchExportConfiguration' import { pipelineNodeLogic } from './pipelineNodeLogic' import { PipelinePluginConfiguration } from './PipelinePluginConfiguration' @@ -17,7 +17,7 @@ export function PipelineNodeConfiguration(): JSX.Element { return (
{node.backend === PipelineBackend.HogFunction ? ( - + ) : node.backend === PipelineBackend.Plugin ? ( ) : ( diff --git a/frontend/src/scenes/pipeline/PipelineNodeNew.tsx b/frontend/src/scenes/pipeline/PipelineNodeNew.tsx index e24cca9060531..b85e1c494e85d 100644 --- a/frontend/src/scenes/pipeline/PipelineNodeNew.tsx +++ b/frontend/src/scenes/pipeline/PipelineNodeNew.tsx @@ -14,8 +14,8 @@ import { AvailableFeature, BatchExportService, HogFunctionTemplateType, Pipeline import { pipelineDestinationsLogic } from './destinationsLogic' import { frontendAppsLogic } from './frontendAppsLogic' +import { HogFunctionConfiguration } from './hogfunctions/HogFunctionConfiguration' import { HogFunctionIcon } from './hogfunctions/HogFunctionIcon' -import { PipelineHogFunctionConfiguration } from './hogfunctions/PipelineHogFunctionConfiguration' import { PipelineBatchExportConfiguration } from './PipelineBatchExportConfiguration' import { PIPELINE_TAB_TO_NODE_STAGE } from './PipelineNode' import { pipelineNodeNewLogic, PipelineNodeNewLogicProps } from './pipelineNodeNewLogic' @@ -118,7 +118,7 @@ export function PipelineNodeNew(params: { stage?: string; id?: string } = {}): J } if (hogFunctionId) { - const res = + const res = if (stage === PipelineStage.Destination) { return {res} } diff --git a/frontend/src/scenes/pipeline/hogfunctions/PipelineHogFunctionConfiguration.tsx b/frontend/src/scenes/pipeline/hogfunctions/HogFunctionConfiguration.tsx similarity index 97% rename from frontend/src/scenes/pipeline/hogfunctions/PipelineHogFunctionConfiguration.tsx rename to frontend/src/scenes/pipeline/hogfunctions/HogFunctionConfiguration.tsx index aca7f1290538a..f632a589be597 100644 --- a/frontend/src/scenes/pipeline/hogfunctions/PipelineHogFunctionConfiguration.tsx +++ b/frontend/src/scenes/pipeline/hogfunctions/HogFunctionConfiguration.tsx @@ -24,21 +24,15 @@ import { MathAvailability } from 'scenes/insights/filters/ActionFilter/ActionFil import { groupsModel } from '~/models/groupsModel' import { EntityTypes } from '~/types' +import { hogFunctionConfigurationLogic } from './hogFunctionConfigurationLogic' import { HogFunctionIconEditable } from './HogFunctionIcon' import { HogFunctionInputs } from './HogFunctionInputs' import { HogFunctionStatusIndicator } from './HogFunctionStatusIndicator' import { HogFunctionTest, HogFunctionTestPlaceholder } from './HogFunctionTest' -import { pipelineHogFunctionConfigurationLogic } from './pipelineHogFunctionConfigurationLogic' -export function PipelineHogFunctionConfiguration({ - templateId, - id, -}: { - templateId?: string - id?: string -}): JSX.Element { +export function HogFunctionConfiguration({ templateId, id }: { templateId?: string; id?: string }): JSX.Element { const logicProps = { templateId, id } - const logic = pipelineHogFunctionConfigurationLogic(logicProps) + const logic = hogFunctionConfigurationLogic(logicProps) const { isConfigurationSubmitting, configurationChanged, @@ -116,7 +110,7 @@ export function PipelineHogFunctionConfiguration({ return (
- + @@ -127,7 +121,7 @@ export function PipelineHogFunctionConfiguration({ />
): JSX.Element { - const { globalVars } = useValues(pipelineHogFunctionConfigurationLogic) + const { globalVars } = useValues(hogFunctionConfigurationLogic) return } @@ -395,8 +395,8 @@ function HogFunctionInputSchemaControls({ value, onChange, onDone }: HogFunction export function HogFunctionInputWithSchema({ schema }: HogFunctionInputWithSchemaProps): JSX.Element { const { attributes, listeners, setNodeRef, transform, transition } = useSortable({ id: schema.key }) - const { showSource, configuration } = useValues(pipelineHogFunctionConfigurationLogic) - const { setConfigurationValue } = useActions(pipelineHogFunctionConfigurationLogic) + const { showSource, configuration } = useValues(hogFunctionConfigurationLogic) + const { setConfigurationValue } = useActions(hogFunctionConfigurationLogic) const [editing, setEditing] = useState(showSource) const value = configuration.inputs?.[schema.key] @@ -488,8 +488,8 @@ export function HogFunctionInputWithSchema({ schema }: HogFunctionInputWithSchem } export function HogFunctionInputs(): JSX.Element { - const { showSource, configuration } = useValues(pipelineHogFunctionConfigurationLogic) - const { setConfigurationValue } = useActions(pipelineHogFunctionConfigurationLogic) + const { showSource, configuration } = useValues(hogFunctionConfigurationLogic) + const { setConfigurationValue } = useActions(hogFunctionConfigurationLogic) if (!configuration?.inputs_schema?.length) { return This function does not require any input variables. diff --git a/frontend/src/scenes/pipeline/hogfunctions/HogFunctionStatusIndicator.tsx b/frontend/src/scenes/pipeline/hogfunctions/HogFunctionStatusIndicator.tsx index c3d2c82c54693..0d3ae278271bd 100644 --- a/frontend/src/scenes/pipeline/hogfunctions/HogFunctionStatusIndicator.tsx +++ b/frontend/src/scenes/pipeline/hogfunctions/HogFunctionStatusIndicator.tsx @@ -5,7 +5,7 @@ import { dayjs } from 'lib/dayjs' import { HogWatcherState } from '~/types' -import { pipelineHogFunctionConfigurationLogic } from './pipelineHogFunctionConfigurationLogic' +import { hogFunctionConfigurationLogic } from './hogFunctionConfigurationLogic' type DisplayOptions = { tagType: LemonTagProps['type']; display: string; description: JSX.Element } const displayMap: Record = { @@ -56,7 +56,7 @@ const DEFAULT_DISPLAY: DisplayOptions = { } export function HogFunctionStatusIndicator(): JSX.Element | null { - const { hogFunction } = useValues(pipelineHogFunctionConfigurationLogic) + const { hogFunction } = useValues(hogFunctionConfigurationLogic) if (!hogFunction || !hogFunction.enabled) { return null diff --git a/frontend/src/scenes/pipeline/hogfunctions/pipelineHogFunctionConfigurationLogic.tsx b/frontend/src/scenes/pipeline/hogfunctions/hogFunctionConfigurationLogic.tsx similarity index 93% rename from frontend/src/scenes/pipeline/hogfunctions/pipelineHogFunctionConfigurationLogic.tsx rename to frontend/src/scenes/pipeline/hogfunctions/hogFunctionConfigurationLogic.tsx index 08a93ac5140f4..1b411b9838dee 100644 --- a/frontend/src/scenes/pipeline/hogfunctions/pipelineHogFunctionConfigurationLogic.tsx +++ b/frontend/src/scenes/pipeline/hogfunctions/hogFunctionConfigurationLogic.tsx @@ -19,9 +19,9 @@ import { PluginConfigTypeNew, } from '~/types' -import type { pipelineHogFunctionConfigurationLogicType } from './pipelineHogFunctionConfigurationLogicType' +import type { hogFunctionConfigurationLogicType } from './hogFunctionConfigurationLogicType' -export interface PipelineHogFunctionConfigurationLogicProps { +export interface HogFunctionConfigurationLogicProps { templateId?: string id?: string } @@ -99,12 +99,12 @@ export function sanitizeConfiguration(data: HogFunctionConfigurationType): HogFu return payload } -export const pipelineHogFunctionConfigurationLogic = kea([ - props({} as PipelineHogFunctionConfigurationLogicProps), - key(({ id, templateId }: PipelineHogFunctionConfigurationLogicProps) => { +export const hogFunctionConfigurationLogic = kea([ + props({} as HogFunctionConfigurationLogicProps), + key(({ id, templateId }: HogFunctionConfigurationLogicProps) => { return id ?? templateId ?? 'new' }), - path((id) => ['scenes', 'pipeline', 'pipelineHogFunctionConfigurationLogic', id]), + path((id) => ['scenes', 'pipeline', 'hogFunctionConfigurationLogic', id]), actions({ setShowSource: (showSource: boolean) => ({ showSource }), resetForm: (configuration?: HogFunctionConfigurationType) => ({ configuration }), @@ -262,7 +262,7 @@ export const pipelineHogFunctionConfigurationLogic = kea [s.hogFunction], (): Record => createExampleEvent()], })), - listeners(({ actions, values, cache, props }) => ({ + listeners(({ actions, values, cache }) => ({ loadTemplateSuccess: () => actions.resetForm(), loadHogFunctionSuccess: () => actions.resetForm(), upsertHogFunctionSuccess: () => actions.resetForm(), @@ -298,18 +298,6 @@ export const pipelineHogFunctionConfigurationLogic = kea { - if (!props.id) { - router.actions.replace( - urls.pipelineNode( - PipelineStage.Destination, - `hog-${configuration.id}`, - PipelineNodeTab.Configuration - ) - ) - } - }, - duplicate: async () => { if (values.hogFunction) { const newConfig = { @@ -386,5 +374,14 @@ export const pipelineHogFunctionConfigurationLogic = kea { + if (hogFunction && props.templateId) { + // Catch all for any scenario where we need to redirect away from the template to the actual hog function + router.actions.replace( + urls.pipelineNode(PipelineStage.Destination, `hog-${hogFunction.id}`, PipelineNodeTab.Configuration) + ) + } + }, })), ]) diff --git a/frontend/src/scenes/pipeline/hogfunctions/hogFunctionTestLogic.tsx b/frontend/src/scenes/pipeline/hogfunctions/hogFunctionTestLogic.tsx index 412eb59c3ddb9..4ae6514c471ca 100644 --- a/frontend/src/scenes/pipeline/hogfunctions/hogFunctionTestLogic.tsx +++ b/frontend/src/scenes/pipeline/hogfunctions/hogFunctionTestLogic.tsx @@ -6,8 +6,8 @@ import { tryJsonParse } from 'lib/utils' import { LogEntry } from '~/types' +import { hogFunctionConfigurationLogic, sanitizeConfiguration } from './hogFunctionConfigurationLogic' import type { hogFunctionTestLogicType } from './hogFunctionTestLogicType' -import { pipelineHogFunctionConfigurationLogic, sanitizeConfiguration } from './pipelineHogFunctionConfigurationLogic' import { createExampleEvent } from './utils/event-conversion' export interface HogFunctionTestLogicProps { @@ -29,8 +29,8 @@ export const hogFunctionTestLogic = kea([ key((props) => props.id), path((id) => ['scenes', 'pipeline', 'hogfunctions', 'hogFunctionTestLogic', id]), connect((props: HogFunctionTestLogicProps) => ({ - values: [pipelineHogFunctionConfigurationLogic({ id: props.id }), ['configuration', 'configurationHasErrors']], - actions: [pipelineHogFunctionConfigurationLogic({ id: props.id }), ['touchConfigurationField']], + values: [hogFunctionConfigurationLogic({ id: props.id }), ['configuration', 'configurationHasErrors']], + actions: [hogFunctionConfigurationLogic({ id: props.id }), ['touchConfigurationField']], })), actions({ setTestResult: (result: HogFunctionTestInvocationResult | null) => ({ result }), diff --git a/frontend/src/scenes/pipeline/hogfunctions/integrations/HogFunctionInputIntegrationField.tsx b/frontend/src/scenes/pipeline/hogfunctions/integrations/HogFunctionInputIntegrationField.tsx index ca348c2f7fb2d..2a334c0e4ad31 100644 --- a/frontend/src/scenes/pipeline/hogfunctions/integrations/HogFunctionInputIntegrationField.tsx +++ b/frontend/src/scenes/pipeline/hogfunctions/integrations/HogFunctionInputIntegrationField.tsx @@ -5,7 +5,7 @@ import { SlackChannelPicker } from 'lib/integrations/SlackIntegrationHelpers' import { HogFunctionInputSchemaType } from '~/types' -import { pipelineHogFunctionConfigurationLogic } from '../pipelineHogFunctionConfigurationLogic' +import { hogFunctionConfigurationLogic } from '../hogFunctionConfigurationLogic' export type HogFunctionInputIntegrationFieldProps = { schema: HogFunctionInputSchemaType @@ -18,7 +18,7 @@ export function HogFunctionInputIntegrationField({ value, onChange, }: HogFunctionInputIntegrationFieldProps): JSX.Element { - const { configuration } = useValues(pipelineHogFunctionConfigurationLogic) + const { configuration } = useValues(hogFunctionConfigurationLogic) const { integrationsLoading, integrations } = useValues(integrationsLogic) if (integrationsLoading) { diff --git a/plugin-server/src/cdp/cdp-consumers.ts b/plugin-server/src/cdp/cdp-consumers.ts index c56671061a2e9..2dd1468aae480 100644 --- a/plugin-server/src/cdp/cdp-consumers.ts +++ b/plugin-server/src/cdp/cdp-consumers.ts @@ -251,66 +251,72 @@ abstract class CdpConsumerBase { return await runInstrumentedFunction({ statsKey: `cdpConsumer.handleEachBatch.executeMatchingFunctions`, func: async () => { - const results = ( - await Promise.all( - invocationGlobals.map((globals) => { - const { functions, total, matching } = this.hogExecutor.findMatchingFunctions(globals) - - counterFunctionInvocation.inc({ outcome: 'filtered' }, total - matching) - - // Filter for overflowed and disabled functions - const [healthy, overflowed, disabled] = functions.reduce( - (acc, item) => { - const state = this.hogWatcher.getFunctionState(item.id) - if (state >= HogWatcherState.disabledForPeriod) { - acc[2].push(item) - } else if (state >= HogWatcherState.overflowed) { - acc[1].push(item) - } else { - acc[0].push(item) - } - - return acc - }, - [[], [], []] as [HogFunctionType[], HogFunctionType[], HogFunctionType[]] - ) - - if (overflowed.length) { - counterFunctionInvocation.inc({ outcome: 'overflowed' }, overflowed.length) - // TODO: Report to AppMetrics 2 when it is ready - status.debug('🔁', `Oveflowing functions`, { - count: overflowed.length, - }) - - this.messagesToProduce.push({ - topic: KAFKA_CDP_FUNCTION_OVERFLOW, - value: { - source: 'event_invocations', - payload: { - hogFunctionIds: overflowed.map((x) => x.id), - globals, - }, - }, - key: globals.event.uuid, - }) + const invocations: { globals: HogFunctionInvocationGlobals; hogFunction: HogFunctionType }[] = [] + + invocationGlobals.forEach((globals) => { + const { functions, total, matching } = this.hogExecutor.findMatchingFunctions(globals) + + counterFunctionInvocation.inc({ outcome: 'filtered' }, total - matching) + + // Filter for overflowed and disabled functions + const [healthy, overflowed, disabled] = functions.reduce( + (acc, item) => { + const state = this.hogWatcher.getFunctionState(item.id) + if (state >= HogWatcherState.disabledForPeriod) { + acc[2].push(item) + } else if (state >= HogWatcherState.overflowed) { + acc[1].push(item) + } else { + acc[0].push(item) } - if (disabled.length) { - counterFunctionInvocation.inc({ outcome: 'disabled' }, disabled.length) - // TODO: Report to AppMetrics 2 when it is ready - status.debug('🔁', `Disabled functions skipped`, { - count: disabled.length, - }) - } + return acc + }, + [[], [], []] as [HogFunctionType[], HogFunctionType[], HogFunctionType[]] + ) + + if (overflowed.length) { + // Group all overflowed functions into one event + counterFunctionInvocation.inc({ outcome: 'overflowed' }, overflowed.length) + // TODO: Report to AppMetrics 2 when it is ready + status.debug('🔁', `Oveflowing functions`, { + count: overflowed.length, + }) + + this.messagesToProduce.push({ + topic: KAFKA_CDP_FUNCTION_OVERFLOW, + value: { + source: 'event_invocations', + payload: { + hogFunctionIds: overflowed.map((x) => x.id), + globals, + }, + }, + key: globals.event.uuid, + }) + } - return this.runManyWithHeartbeat(healthy, (x) => - this.hogExecutor.executeFunction(globals, x) - ) + if (disabled.length) { + counterFunctionInvocation.inc({ outcome: 'disabled' }, disabled.length) + // TODO: Report to AppMetrics 2 when it is ready + status.debug('🔁', `Disabled functions skipped`, { + count: disabled.length, }) + } + + healthy.forEach((x) => { + invocations.push({ + globals, + hogFunction: x, + }) + }) + }) + + const results = ( + await this.runManyWithHeartbeat(invocations, (item) => + this.hogExecutor.executeFunction(item.globals, item.hogFunction) ) - ) - .flat() - .filter((x) => !!x) as HogFunctionInvocationResult[] + ).filter((x) => !!x) as HogFunctionInvocationResult[] this.hogWatcher.currentObservations.observeResults(results) return results @@ -556,9 +562,14 @@ export class CdpOverflowConsumer extends CdpConsumerBase { .flat() const results = ( - await this.runManyWithHeartbeat(invocations, (item) => - this.hogExecutor.executeFunction(item.globals, item.hogFunctionId) - ) + await this.runManyWithHeartbeat(invocations, (item) => { + const state = this.hogWatcher.getFunctionState(item.hogFunctionId) + if (state >= HogWatcherState.disabledForPeriod) { + counterFunctionInvocation.inc({ outcome: 'disabled' }) + return + } + return this.hogExecutor.executeFunction(item.globals, item.hogFunctionId) + }) ).filter((x) => !!x) as HogFunctionInvocationResult[] this.hogWatcher.currentObservations.observeResults(results)