Skip to content

Commit

Permalink
fix: Redirect after creation (#23308)
Browse files Browse the repository at this point in the history
  • Loading branch information
benjackwhite authored Jun 28, 2024
1 parent c6c3b12 commit 9217f54
Show file tree
Hide file tree
Showing 9 changed files with 107 additions and 105 deletions.
4 changes: 2 additions & 2 deletions frontend/src/scenes/pipeline/PipelineNodeConfiguration.tsx
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { useValues } from 'kea'
import { NotFound } from 'lib/components/NotFound'

import { PipelineHogFunctionConfiguration } from './hogfunctions/PipelineHogFunctionConfiguration'
import { HogFunctionConfiguration } from './hogfunctions/HogFunctionConfiguration'
import { PipelineBatchExportConfiguration } from './PipelineBatchExportConfiguration'
import { pipelineNodeLogic } from './pipelineNodeLogic'
import { PipelinePluginConfiguration } from './PipelinePluginConfiguration'
Expand All @@ -17,7 +17,7 @@ export function PipelineNodeConfiguration(): JSX.Element {
return (
<div className="space-y-3">
{node.backend === PipelineBackend.HogFunction ? (
<PipelineHogFunctionConfiguration id={node.id} />
<HogFunctionConfiguration id={node.id} />
) : node.backend === PipelineBackend.Plugin ? (
<PipelinePluginConfiguration stage={stage} pluginConfigId={node.id} />
) : (
Expand Down
4 changes: 2 additions & 2 deletions frontend/src/scenes/pipeline/PipelineNodeNew.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import { AvailableFeature, BatchExportService, HogFunctionTemplateType, Pipeline

import { pipelineDestinationsLogic } from './destinationsLogic'
import { frontendAppsLogic } from './frontendAppsLogic'
import { HogFunctionConfiguration } from './hogfunctions/HogFunctionConfiguration'
import { HogFunctionIcon } from './hogfunctions/HogFunctionIcon'
import { PipelineHogFunctionConfiguration } from './hogfunctions/PipelineHogFunctionConfiguration'
import { PipelineBatchExportConfiguration } from './PipelineBatchExportConfiguration'
import { PIPELINE_TAB_TO_NODE_STAGE } from './PipelineNode'
import { pipelineNodeNewLogic, PipelineNodeNewLogicProps } from './pipelineNodeNewLogic'
Expand Down Expand Up @@ -118,7 +118,7 @@ export function PipelineNodeNew(params: { stage?: string; id?: string } = {}): J
}

if (hogFunctionId) {
const res = <PipelineHogFunctionConfiguration templateId={hogFunctionId} />
const res = <HogFunctionConfiguration templateId={hogFunctionId} />
if (stage === PipelineStage.Destination) {
return <PayGateMini feature={AvailableFeature.DATA_PIPELINES}>{res}</PayGateMini>
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,15 @@ import { MathAvailability } from 'scenes/insights/filters/ActionFilter/ActionFil
import { groupsModel } from '~/models/groupsModel'
import { EntityTypes } from '~/types'

import { hogFunctionConfigurationLogic } from './hogFunctionConfigurationLogic'
import { HogFunctionIconEditable } from './HogFunctionIcon'
import { HogFunctionInputs } from './HogFunctionInputs'
import { HogFunctionStatusIndicator } from './HogFunctionStatusIndicator'
import { HogFunctionTest, HogFunctionTestPlaceholder } from './HogFunctionTest'
import { pipelineHogFunctionConfigurationLogic } from './pipelineHogFunctionConfigurationLogic'

export function PipelineHogFunctionConfiguration({
templateId,
id,
}: {
templateId?: string
id?: string
}): JSX.Element {
export function HogFunctionConfiguration({ templateId, id }: { templateId?: string; id?: string }): JSX.Element {
const logicProps = { templateId, id }
const logic = pipelineHogFunctionConfigurationLogic(logicProps)
const logic = hogFunctionConfigurationLogic(logicProps)
const {
isConfigurationSubmitting,
configurationChanged,
Expand Down Expand Up @@ -116,7 +110,7 @@ export function PipelineHogFunctionConfiguration({

return (
<div className="space-y-3">
<BindLogic logic={pipelineHogFunctionConfigurationLogic} props={logicProps}>
<BindLogic logic={hogFunctionConfigurationLogic} props={logicProps}>
<PageHeader
buttons={
<>
Expand All @@ -127,7 +121,7 @@ export function PipelineHogFunctionConfiguration({
/>

<Form
logic={pipelineHogFunctionConfigurationLogic}
logic={hogFunctionConfigurationLogic}
props={logicProps}
formKey="configuration"
className="space-y-3"
Expand Down
12 changes: 6 additions & 6 deletions frontend/src/scenes/pipeline/hogfunctions/HogFunctionInputs.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import { useEffect, useMemo, useState } from 'react'
import { groupsModel } from '~/models/groupsModel'
import { HogFunctionInputSchemaType } from '~/types'

import { hogFunctionConfigurationLogic } from './hogFunctionConfigurationLogic'
import { HogFunctionInputIntegration } from './integrations/HogFunctionInputIntegration'
import { HogFunctionInputIntegrationField } from './integrations/HogFunctionInputIntegrationField'
import { pipelineHogFunctionConfigurationLogic } from './pipelineHogFunctionConfigurationLogic'

export type HogFunctionInputProps = {
schema: HogFunctionInputSchemaType
Expand Down Expand Up @@ -179,7 +179,7 @@ function JsonConfigField(props: {
}

function HogFunctionTemplateInput(props: Omit<CodeEditorInlineProps, 'globals'>): JSX.Element {
const { globalVars } = useValues(pipelineHogFunctionConfigurationLogic)
const { globalVars } = useValues(hogFunctionConfigurationLogic)
return <CodeEditorInline {...props} globals={globalVars} />
}

Expand Down Expand Up @@ -395,8 +395,8 @@ function HogFunctionInputSchemaControls({ value, onChange, onDone }: HogFunction

export function HogFunctionInputWithSchema({ schema }: HogFunctionInputWithSchemaProps): JSX.Element {
const { attributes, listeners, setNodeRef, transform, transition } = useSortable({ id: schema.key })
const { showSource, configuration } = useValues(pipelineHogFunctionConfigurationLogic)
const { setConfigurationValue } = useActions(pipelineHogFunctionConfigurationLogic)
const { showSource, configuration } = useValues(hogFunctionConfigurationLogic)
const { setConfigurationValue } = useActions(hogFunctionConfigurationLogic)
const [editing, setEditing] = useState(showSource)

const value = configuration.inputs?.[schema.key]
Expand Down Expand Up @@ -488,8 +488,8 @@ export function HogFunctionInputWithSchema({ schema }: HogFunctionInputWithSchem
}

export function HogFunctionInputs(): JSX.Element {
const { showSource, configuration } = useValues(pipelineHogFunctionConfigurationLogic)
const { setConfigurationValue } = useActions(pipelineHogFunctionConfigurationLogic)
const { showSource, configuration } = useValues(hogFunctionConfigurationLogic)
const { setConfigurationValue } = useActions(hogFunctionConfigurationLogic)

if (!configuration?.inputs_schema?.length) {
return <span className="italic text-muted-alt">This function does not require any input variables.</span>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { dayjs } from 'lib/dayjs'

import { HogWatcherState } from '~/types'

import { pipelineHogFunctionConfigurationLogic } from './pipelineHogFunctionConfigurationLogic'
import { hogFunctionConfigurationLogic } from './hogFunctionConfigurationLogic'

type DisplayOptions = { tagType: LemonTagProps['type']; display: string; description: JSX.Element }
const displayMap: Record<HogWatcherState, DisplayOptions> = {
Expand Down Expand Up @@ -56,7 +56,7 @@ const DEFAULT_DISPLAY: DisplayOptions = {
}

export function HogFunctionStatusIndicator(): JSX.Element | null {
const { hogFunction } = useValues(pipelineHogFunctionConfigurationLogic)
const { hogFunction } = useValues(hogFunctionConfigurationLogic)

if (!hogFunction || !hogFunction.enabled) {
return null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ import {
PluginConfigTypeNew,
} from '~/types'

import type { pipelineHogFunctionConfigurationLogicType } from './pipelineHogFunctionConfigurationLogicType'
import type { hogFunctionConfigurationLogicType } from './hogFunctionConfigurationLogicType'

export interface PipelineHogFunctionConfigurationLogicProps {
export interface HogFunctionConfigurationLogicProps {
templateId?: string
id?: string
}
Expand Down Expand Up @@ -99,12 +99,12 @@ export function sanitizeConfiguration(data: HogFunctionConfigurationType): HogFu
return payload
}

export const pipelineHogFunctionConfigurationLogic = kea<pipelineHogFunctionConfigurationLogicType>([
props({} as PipelineHogFunctionConfigurationLogicProps),
key(({ id, templateId }: PipelineHogFunctionConfigurationLogicProps) => {
export const hogFunctionConfigurationLogic = kea<hogFunctionConfigurationLogicType>([
props({} as HogFunctionConfigurationLogicProps),
key(({ id, templateId }: HogFunctionConfigurationLogicProps) => {
return id ?? templateId ?? 'new'
}),
path((id) => ['scenes', 'pipeline', 'pipelineHogFunctionConfigurationLogic', id]),
path((id) => ['scenes', 'pipeline', 'hogFunctionConfigurationLogic', id]),
actions({
setShowSource: (showSource: boolean) => ({ showSource }),
resetForm: (configuration?: HogFunctionConfigurationType) => ({ configuration }),
Expand Down Expand Up @@ -262,7 +262,7 @@ export const pipelineHogFunctionConfigurationLogic = kea<pipelineHogFunctionConf
globalVars: [(s) => [s.hogFunction], (): Record<string, any> => createExampleEvent()],
})),

listeners(({ actions, values, cache, props }) => ({
listeners(({ actions, values, cache }) => ({
loadTemplateSuccess: () => actions.resetForm(),
loadHogFunctionSuccess: () => actions.resetForm(),
upsertHogFunctionSuccess: () => actions.resetForm(),
Expand Down Expand Up @@ -298,18 +298,6 @@ export const pipelineHogFunctionConfigurationLogic = kea<pipelineHogFunctionConf
})
},

submitConfigurationSuccess: ({ configuration }) => {
if (!props.id) {
router.actions.replace(
urls.pipelineNode(
PipelineStage.Destination,
`hog-${configuration.id}`,
PipelineNodeTab.Configuration
)
)
}
},

duplicate: async () => {
if (values.hogFunction) {
const newConfig = {
Expand Down Expand Up @@ -386,5 +374,14 @@ export const pipelineHogFunctionConfigurationLogic = kea<pipelineHogFunctionConf
})
}
},

hogFunction: (hogFunction) => {
if (hogFunction && props.templateId) {
// Catch all for any scenario where we need to redirect away from the template to the actual hog function
router.actions.replace(
urls.pipelineNode(PipelineStage.Destination, `hog-${hogFunction.id}`, PipelineNodeTab.Configuration)
)
}
},
})),
])
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import { tryJsonParse } from 'lib/utils'

import { LogEntry } from '~/types'

import { hogFunctionConfigurationLogic, sanitizeConfiguration } from './hogFunctionConfigurationLogic'
import type { hogFunctionTestLogicType } from './hogFunctionTestLogicType'
import { pipelineHogFunctionConfigurationLogic, sanitizeConfiguration } from './pipelineHogFunctionConfigurationLogic'
import { createExampleEvent } from './utils/event-conversion'

export interface HogFunctionTestLogicProps {
Expand All @@ -29,8 +29,8 @@ export const hogFunctionTestLogic = kea<hogFunctionTestLogicType>([
key((props) => props.id),
path((id) => ['scenes', 'pipeline', 'hogfunctions', 'hogFunctionTestLogic', id]),
connect((props: HogFunctionTestLogicProps) => ({
values: [pipelineHogFunctionConfigurationLogic({ id: props.id }), ['configuration', 'configurationHasErrors']],
actions: [pipelineHogFunctionConfigurationLogic({ id: props.id }), ['touchConfigurationField']],
values: [hogFunctionConfigurationLogic({ id: props.id }), ['configuration', 'configurationHasErrors']],
actions: [hogFunctionConfigurationLogic({ id: props.id }), ['touchConfigurationField']],
})),
actions({
setTestResult: (result: HogFunctionTestInvocationResult | null) => ({ result }),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { SlackChannelPicker } from 'lib/integrations/SlackIntegrationHelpers'

import { HogFunctionInputSchemaType } from '~/types'

import { pipelineHogFunctionConfigurationLogic } from '../pipelineHogFunctionConfigurationLogic'
import { hogFunctionConfigurationLogic } from '../hogFunctionConfigurationLogic'

export type HogFunctionInputIntegrationFieldProps = {
schema: HogFunctionInputSchemaType
Expand All @@ -18,7 +18,7 @@ export function HogFunctionInputIntegrationField({
value,
onChange,
}: HogFunctionInputIntegrationFieldProps): JSX.Element {
const { configuration } = useValues(pipelineHogFunctionConfigurationLogic)
const { configuration } = useValues(hogFunctionConfigurationLogic)
const { integrationsLoading, integrations } = useValues(integrationsLogic)

if (integrationsLoading) {
Expand Down
127 changes: 69 additions & 58 deletions plugin-server/src/cdp/cdp-consumers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -251,66 +251,72 @@ abstract class CdpConsumerBase {
return await runInstrumentedFunction({
statsKey: `cdpConsumer.handleEachBatch.executeMatchingFunctions`,
func: async () => {
const results = (
await Promise.all(
invocationGlobals.map((globals) => {
const { functions, total, matching } = this.hogExecutor.findMatchingFunctions(globals)

counterFunctionInvocation.inc({ outcome: 'filtered' }, total - matching)

// Filter for overflowed and disabled functions
const [healthy, overflowed, disabled] = functions.reduce(
(acc, item) => {
const state = this.hogWatcher.getFunctionState(item.id)
if (state >= HogWatcherState.disabledForPeriod) {
acc[2].push(item)
} else if (state >= HogWatcherState.overflowed) {
acc[1].push(item)
} else {
acc[0].push(item)
}

return acc
},
[[], [], []] as [HogFunctionType[], HogFunctionType[], HogFunctionType[]]
)

if (overflowed.length) {
counterFunctionInvocation.inc({ outcome: 'overflowed' }, overflowed.length)
// TODO: Report to AppMetrics 2 when it is ready
status.debug('🔁', `Oveflowing functions`, {
count: overflowed.length,
})

this.messagesToProduce.push({
topic: KAFKA_CDP_FUNCTION_OVERFLOW,
value: {
source: 'event_invocations',
payload: {
hogFunctionIds: overflowed.map((x) => x.id),
globals,
},
},
key: globals.event.uuid,
})
const invocations: { globals: HogFunctionInvocationGlobals; hogFunction: HogFunctionType }[] = []

invocationGlobals.forEach((globals) => {
const { functions, total, matching } = this.hogExecutor.findMatchingFunctions(globals)

counterFunctionInvocation.inc({ outcome: 'filtered' }, total - matching)

// Filter for overflowed and disabled functions
const [healthy, overflowed, disabled] = functions.reduce(
(acc, item) => {
const state = this.hogWatcher.getFunctionState(item.id)
if (state >= HogWatcherState.disabledForPeriod) {
acc[2].push(item)
} else if (state >= HogWatcherState.overflowed) {
acc[1].push(item)
} else {
acc[0].push(item)
}

if (disabled.length) {
counterFunctionInvocation.inc({ outcome: 'disabled' }, disabled.length)
// TODO: Report to AppMetrics 2 when it is ready
status.debug('🔁', `Disabled functions skipped`, {
count: disabled.length,
})
}
return acc
},
[[], [], []] as [HogFunctionType[], HogFunctionType[], HogFunctionType[]]
)

if (overflowed.length) {
// Group all overflowed functions into one event
counterFunctionInvocation.inc({ outcome: 'overflowed' }, overflowed.length)
// TODO: Report to AppMetrics 2 when it is ready
status.debug('🔁', `Oveflowing functions`, {
count: overflowed.length,
})

this.messagesToProduce.push({
topic: KAFKA_CDP_FUNCTION_OVERFLOW,
value: {
source: 'event_invocations',
payload: {
hogFunctionIds: overflowed.map((x) => x.id),
globals,
},
},
key: globals.event.uuid,
})
}

return this.runManyWithHeartbeat(healthy, (x) =>
this.hogExecutor.executeFunction(globals, x)
)
if (disabled.length) {
counterFunctionInvocation.inc({ outcome: 'disabled' }, disabled.length)
// TODO: Report to AppMetrics 2 when it is ready
status.debug('🔁', `Disabled functions skipped`, {
count: disabled.length,
})
}

healthy.forEach((x) => {
invocations.push({
globals,
hogFunction: x,
})
})
})

const results = (
await this.runManyWithHeartbeat(invocations, (item) =>
this.hogExecutor.executeFunction(item.globals, item.hogFunction)
)
)
.flat()
.filter((x) => !!x) as HogFunctionInvocationResult[]
).filter((x) => !!x) as HogFunctionInvocationResult[]

this.hogWatcher.currentObservations.observeResults(results)
return results
Expand Down Expand Up @@ -556,9 +562,14 @@ export class CdpOverflowConsumer extends CdpConsumerBase {
.flat()

const results = (
await this.runManyWithHeartbeat(invocations, (item) =>
this.hogExecutor.executeFunction(item.globals, item.hogFunctionId)
)
await this.runManyWithHeartbeat(invocations, (item) => {
const state = this.hogWatcher.getFunctionState(item.hogFunctionId)
if (state >= HogWatcherState.disabledForPeriod) {
counterFunctionInvocation.inc({ outcome: 'disabled' })
return
}
return this.hogExecutor.executeFunction(item.globals, item.hogFunctionId)
})
).filter((x) => !!x) as HogFunctionInvocationResult[]

this.hogWatcher.currentObservations.observeResults(results)
Expand Down

0 comments on commit 9217f54

Please sign in to comment.