Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: New ingestion consumer #27668

Open
wants to merge 73 commits into
base: fix/tests
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 62 commits
Commits
Show all changes
73 commits
Select commit Hold shift + click to select a range
e3bab2c
Fixes
benjackwhite Jan 18, 2025
5330f51
Fixes
benjackwhite Jan 18, 2025
c3e8070
More changes
benjackwhite Jan 18, 2025
38dfb3d
More checks
benjackwhite Jan 18, 2025
9f94ca5
Fix
benjackwhite Jan 18, 2025
b87a8e9
Fixes
benjackwhite Jan 18, 2025
410e36d
Fixess
benjackwhite Jan 18, 2025
c128b81
Fixes
benjackwhite Jan 18, 2025
17621a9
Refactor producer
benjackwhite Jan 19, 2025
46837c1
Fixes
benjackwhite Jan 19, 2025
a4327cb
Fix
benjackwhite Jan 19, 2025
73f623c
Fixes
benjackwhite Jan 19, 2025
3cef6f7
fix
benjackwhite Jan 19, 2025
6f0d9b6
Fixes
benjackwhite Jan 19, 2025
02753d8
Fixes
benjackwhite Jan 19, 2025
2ac0021
Fixes
benjackwhite Jan 19, 2025
8228d91
fix
benjackwhite Jan 19, 2025
d5ff970
Merge branch 'feat/producer-refactor' into feat/new-ingestion-consumer
benjackwhite Jan 19, 2025
e9a3ce7
Fix for different types
benjackwhite Jan 19, 2025
4a6575c
Fix
benjackwhite Jan 19, 2025
e8f6a41
Fixes
benjackwhite Jan 19, 2025
704831e
Fixes
benjackwhite Jan 19, 2025
a76761f
Fixes
benjackwhite Jan 19, 2025
7609504
Fixes
benjackwhite Jan 19, 2025
db07b59
Fixes
benjackwhite Jan 19, 2025
39f3277
Fixes
benjackwhite Jan 19, 2025
18bb3dd
Started adding tests
benjackwhite Jan 19, 2025
63d909d
Fix
benjackwhite Jan 19, 2025
cbc73df
Merge branch 'feat/producer-refactor' into feat/new-ingestion-consumer
benjackwhite Jan 19, 2025
352b849
Added helper for sanitizing snapshots
benjackwhite Jan 19, 2025
845b12f
Fixes
benjackwhite Jan 19, 2025
d9c6b83
fixes
benjackwhite Jan 19, 2025
aa4cdac
fix config check
benjackwhite Jan 19, 2025
cb4f6ca
Added good tests for drpping logs
benjackwhite Jan 19, 2025
dbe314f
Merge branch 'master' into feat/producer-refactor
benjackwhite Jan 20, 2025
b008281
Merge branch 'feat/producer-refactor' into feat/new-ingestion-consumer
benjackwhite Jan 20, 2025
c4be5b3
Fixes
benjackwhite Jan 20, 2025
a05c1ad
Merge branch 'feat/producer-refactor' into feat/new-ingestion-consumer
benjackwhite Jan 20, 2025
0f80ba7
Fixes imports
benjackwhite Jan 20, 2025
8759516
Fix types compiling
benjackwhite Jan 20, 2025
e0ad253
Merge branch 'fix/tests' into feat/new-ingestion-consumer
benjackwhite Jan 20, 2025
6860d0d
Fix
benjackwhite Jan 20, 2025
1542760
Fix
benjackwhite Jan 20, 2025
b8c1bf9
Update UI snapshots for `chromium` (1)
github-actions[bot] Jan 20, 2025
f62e86c
Merge branch 'master' into feat/producer-refactor
benjackwhite Jan 21, 2025
b611df9
Merge branch 'feat/producer-refactor' into feat/new-ingestion-consumer
benjackwhite Jan 21, 2025
21a5027
Merge branch 'fix/tests' into feat/new-ingestion-consumer
benjackwhite Jan 21, 2025
f41936f
fix
benjackwhite Jan 21, 2025
c3b47c9
fix
benjackwhite Jan 21, 2025
1fa3395
Update UI snapshots for `chromium` (1)
github-actions[bot] Jan 21, 2025
d46a653
Update UI snapshots for `chromium` (1)
github-actions[bot] Jan 21, 2025
a3d5857
Merge branch 'fix/tests' into feat/new-ingestion-consumer
benjackwhite Jan 21, 2025
0353b86
Fix skip logic
benjackwhite Jan 21, 2025
d97f65f
Added test
benjackwhite Jan 21, 2025
00ceb09
fix
benjackwhite Jan 21, 2025
1c7439e
fix
benjackwhite Jan 21, 2025
d9fc72f
Fix
benjackwhite Jan 21, 2025
0ce4169
Merge branch 'fix/tests' into feat/new-ingestion-consumer
benjackwhite Jan 21, 2025
d3751c1
Merge branch 'feat/improve-overflow' into feat/new-ingestion-consumer
benjackwhite Jan 21, 2025
f99f7d8
Merge branch 'fix/tests' into feat/new-ingestion-consumer
benjackwhite Jan 21, 2025
03b5078
Fix
benjackwhite Jan 21, 2025
7057835
Fixes
benjackwhite Jan 21, 2025
4e80a92
Fixes
benjackwhite Jan 21, 2025
59b0380
Fix
benjackwhite Jan 21, 2025
0f68b84
Merge branch 'fix/tests' into feat/new-ingestion-consumer
benjackwhite Jan 21, 2025
e619928
Fix tests
benjackwhite Jan 21, 2025
6b21bd5
Fixes
benjackwhite Jan 21, 2025
1618c17
Fix
benjackwhite Jan 21, 2025
7d1b544
Added happy path tests
benjackwhite Jan 21, 2025
e9a10b0
Update UI snapshots for `chromium` (1)
github-actions[bot] Jan 21, 2025
1002c84
Fixes
benjackwhite Jan 21, 2025
a0ca3ce
Merge branch 'fix/tests' into feat/new-ingestion-consumer
benjackwhite Jan 21, 2025
4ad069e
fix
benjackwhite Jan 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@
"OBJECT_STORAGE_ENABLED": "True",
"HOG_HOOK_URL": "http://localhost:3300/hoghook",
"CDP_ASYNC_FUNCTIONS_RUSTY_HOOK_TEAMS": "",
"CDP_CYCLOTRON_ENABLED_TEAMS": "*"
"CDP_CYCLOTRON_ENABLED_TEAMS": "*",
"PLUGIN_SERVER_MODE": "all-v2"
},
"presentation": {
"group": "main"
Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ import { ReleaseConditionsModal, ReleaseConditionsTable } from './ReleaseConditi
import { SummaryTable } from './SummaryTable'

const ResultsTab = (): JSX.Element => {
const { experiment, metricResults } = useValues(experimentLogic)
const { experiment, metricResults, primaryMetricsLengthWithSharedMetrics } = useValues(experimentLogic)
const hasSomeResults = metricResults?.some((result) => result?.insight)

const hasSinglePrimaryMetric = experiment.metrics.length === 1
const hasSinglePrimaryMetric = primaryMetricsLengthWithSharedMetrics === 1

return (
<>
Expand Down
16 changes: 4 additions & 12 deletions frontend/src/scenes/experiments/Metrics/SharedMetricModal.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { LemonBanner, LemonButton, LemonModal, Link } from '@posthog/lemon-ui'
import { useActions, useValues } from 'kea'
import { IconOpenInNew } from 'lib/lemon-ui/icons'
import { LemonTable } from 'lib/lemon-ui/LemonTable'
import { useEffect, useState } from 'react'
import { useState } from 'react'
import { urls } from 'scenes/urls'

import { Experiment } from '~/types'
Expand Down Expand Up @@ -37,15 +37,7 @@ export function SharedMetricModal({
} = useActions(experimentLogic({ experimentId }))

const [selectedMetricIds, setSelectedMetricIds] = useState<SharedMetric['id'][]>([])
const [selectedMetricId, setSelectedMetricId] = useState<SharedMetric['id'] | null>(null)
const [mode, setMode] = useState<'create' | 'edit'>('create')

useEffect(() => {
if (editingSharedMetricId) {
setSelectedMetricId(editingSharedMetricId)
setMode('edit')
}
}, [editingSharedMetricId])
const mode = editingSharedMetricId ? 'edit' : 'create'

if (!sharedMetrics) {
return <></>
Expand Down Expand Up @@ -196,10 +188,10 @@ export function SharedMetricModal({
</div>
)}

{selectedMetricId && (
{editingSharedMetricId && (
<div>
{(() => {
const metric = sharedMetrics.find((m: SharedMetric) => m.id === selectedMetricId)
const metric = sharedMetrics.find((m: SharedMetric) => m.id === editingSharedMetricId)
if (!metric) {
return <></>
}
Expand Down
2 changes: 0 additions & 2 deletions frontend/src/scenes/experiments/experimentLogic.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -534,8 +534,6 @@ export const experimentLogic = kea<experimentLogicType>([
{
openPrimarySharedMetricModal: (_, { sharedMetricId }) => sharedMetricId,
openSecondarySharedMetricModal: (_, { sharedMetricId }) => sharedMetricId,
closePrimarySharedMetricModal: () => null,
closeSecondarySharedMetricModal: () => null,
updateExperimentGoal: () => null,
},
],
Expand Down
2 changes: 1 addition & 1 deletion frontend/utils.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ export function gatherProductUrls(products) {
}

export function gatherProductManifests() {
const products = fse.readdirSync('products').filter((p) => p !== '__pycache__')
const products = fse.readdirSync('products').filter((p) => !['__pycache__', 'README.md'].includes(p))
const allScenes = {}
const allRoutes = {}
const allRedirects = {}
Expand Down
58 changes: 39 additions & 19 deletions plugin-server/src/capabilities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,49 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin
: null
const sharedCapabilities = !isTestEnv() ? { http: true } : {}

const singleProcessCapabilities: PluginServerCapabilities = {
mmdb: true,
ingestion: true,
ingestionOverflow: true,
ingestionHistorical: true,
eventsIngestionPipelines: true, // with null PluginServerMode we run all of them
pluginScheduledTasks: true,
processPluginJobs: true,
processAsyncOnEventHandlers: true,
processAsyncWebhooksHandlers: true,
sessionRecordingBlobIngestion: true,
sessionRecordingBlobOverflowIngestion: config.SESSION_RECORDING_OVERFLOW_ENABLED,
sessionRecordingBlobIngestionV2: true,
sessionRecordingBlobIngestionV2Overflow: config.SESSION_RECORDING_OVERFLOW_ENABLED,
appManagementSingleton: true,
preflightSchedules: true,
cdpProcessedEvents: true,
cdpInternalEvents: true,
cdpFunctionCallbacks: true,
cdpCyclotronWorker: true,
syncInlinePlugins: true,
...sharedCapabilities,
}

switch (mode) {
case null:
return {
...singleProcessCapabilities,
...sharedCapabilities,
benjackwhite marked this conversation as resolved.
Show resolved Hide resolved
}
case PluginServerMode.all_v2:
return {
...singleProcessCapabilities,
...sharedCapabilities,
ingestionV2Combined: true,
}

case PluginServerMode.ingestion_v2:
// NOTE: this mode will be removed in the future and replaced with
// `analytics-ingestion` and `recordings-ingestion` modes.
return {
mmdb: true,
ingestion: true,
ingestionOverflow: true,
ingestionHistorical: true,
eventsIngestionPipelines: true, // with null PluginServerMode we run all of them
pluginScheduledTasks: true,
processPluginJobs: true,
processAsyncOnEventHandlers: true,
processAsyncWebhooksHandlers: true,
sessionRecordingBlobIngestion: true,
sessionRecordingBlobOverflowIngestion: config.SESSION_RECORDING_OVERFLOW_ENABLED,
sessionRecordingBlobIngestionV2: true,
sessionRecordingBlobIngestionV2Overflow: config.SESSION_RECORDING_OVERFLOW_ENABLED,
appManagementSingleton: true,
preflightSchedules: true,
cdpProcessedEvents: true,
cdpInternalEvents: true,
cdpFunctionCallbacks: true,
cdpCyclotronWorker: true,
syncInlinePlugins: true,
ingestionV2: true,
...sharedCapabilities,
}
case PluginServerMode.ingestion:
Expand Down
8 changes: 8 additions & 0 deletions plugin-server/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
KAFKA_CLICKHOUSE_HEATMAP_EVENTS,
KAFKA_EVENTS_JSON,
KAFKA_EVENTS_PLUGIN_INGESTION,
KAFKA_EVENTS_PLUGIN_INGESTION_DLQ,
KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW,
KAFKA_EXCEPTION_SYMBOLIFICATION_EVENTS,
} from './kafka-topics'
Expand Down Expand Up @@ -127,6 +128,7 @@ export function getDefaultConfig(): PluginsServerConfig {
EXTERNAL_REQUEST_TIMEOUT_MS: 10 * 1000, // 10 seconds
DROP_EVENTS_BY_TOKEN_DISTINCT_ID: '',
DROP_EVENTS_BY_TOKEN: '',
SKIP_PERSONS_PROCESSING_BY_TOKEN_DISTINCT_ID: '',
PIPELINE_STEP_STALLED_LOG_TIMEOUT: 30,
RELOAD_PLUGIN_JITTER_MAX_MS: 60000,
RUSTY_HOOK_FOR_TEAMS: '',
Expand Down Expand Up @@ -205,6 +207,12 @@ export function getDefaultConfig(): PluginsServerConfig {
: '',

CYCLOTRON_SHARD_DEPTH_LIMIT: 1000000,

// New IngestionConsumer config
INGESTION_CONSUMER_GROUP_ID: 'events-ingestion-consumer',
INGESTION_CONSUMER_CONSUME_TOPIC: KAFKA_EVENTS_PLUGIN_INGESTION,
INGESTION_CONSUMER_OVERFLOW_TOPIC: KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW,
INGESTION_CONSUMER_DLQ_TOPIC: KAFKA_EVENTS_PLUGIN_INGESTION_DLQ,
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Jest Snapshot v1, https://goo.gl/fbAQLP

exports[`IngestionConsumer general should process a standard event 1`] = `
[
{
"key": "<REPLACED-UUID-0>",
"topic": "clickhouse_events_json_test",
"value": {
"created_at": "2025-01-31 23:00:00.000",
"distinct_id": "user-1",
"elements_chain": "",
"event": "$pageview",
"person_created_at": "2025-01-31 23:00:00",
"person_id": "<REPLACED-UUID-1>",
"person_mode": "full",
"person_properties": "{"$current_url":"http://localhost:8000","$creator_event_uuid":"<REPLACED-UUID-2>","$initial_current_url":"http://localhost:8000"}",
"project_id": 2,
"properties": "{"$current_url":"http://localhost:8000","$ip":"127.0.0.1","$set":{"$current_url":"http://localhost:8000"},"$set_once":{"$initial_current_url":"http://localhost:8000"}}",
"team_id": 2,
"timestamp": "2025-01-31 23:00:00.000",
"uuid": "<REPLACED-UUID-1>",
},
},
{
"key": null,
"topic": "clickhouse_person_test",
"value": {
"created_at": "2025-01-31 23:00:00",
"id": "<REPLACED-UUID-2>",
"is_deleted": 0,
"is_identified": 0,
"properties": "{"$current_url":"http://localhost:8000","$creator_event_uuid":"<REPLACED-UUID-2>","$initial_current_url":"http://localhost:8000"}",
"team_id": 2,
"version": 0,
},
},
{
"key": null,
"topic": "clickhouse_person_distinct_id_test",
"value": {
"distinct_id": "user-1",
"is_deleted": 0,
"person_id": "<REPLACED-UUID-2>",
"team_id": 2,
"version": 0,
},
},
]
`;
Loading
Loading