From 34a4f6baf1e22a724393153e937266684cc7b030 Mon Sep 17 00:00:00 2001 From: Ben White Date: Wed, 25 Dec 2024 10:24:06 +0100 Subject: [PATCH] chore: Remove kafka inspector (#27125) --- frontend/src/lib/constants.tsx | 1 - .../SystemStatus/KafkaInspectorTab.tsx | 50 ----------- .../scenes/instance/SystemStatus/index.tsx | 21 +---- .../SystemStatus/kafkaInspectorLogic.ts | 39 -------- .../SystemStatus/systemStatusLogic.ts | 5 +- frontend/src/scenes/scenes.ts | 1 - frontend/src/scenes/urls.ts | 1 - posthog/api/__init__.py | 2 - posthog/api/kafka_inspector.py | 89 ------------------- posthog/api/test/test_kafka_inspector.py | 62 ------------- 10 files changed, 3 insertions(+), 268 deletions(-) delete mode 100644 frontend/src/scenes/instance/SystemStatus/KafkaInspectorTab.tsx delete mode 100644 frontend/src/scenes/instance/SystemStatus/kafkaInspectorLogic.ts delete mode 100644 posthog/api/kafka_inspector.py delete mode 100644 posthog/api/test/test_kafka_inspector.py diff --git a/frontend/src/lib/constants.tsx b/frontend/src/lib/constants.tsx index f7168483b3e98..148862540ea8a 100644 --- a/frontend/src/lib/constants.tsx +++ b/frontend/src/lib/constants.tsx @@ -138,7 +138,6 @@ export const WEBHOOK_SERVICES: Record = { export const FEATURE_FLAGS = { // Experiments / beta features FUNNELS_CUE_OPT_OUT: 'funnels-cue-opt-out-7301', // owner: @neilkakkar - KAFKA_INSPECTOR: 'kafka-inspector', // owner: @yakkomajuri HISTORICAL_EXPORTS_V2: 'historical-exports-v2', // owner @macobo INGESTION_WARNINGS_ENABLED: 'ingestion-warnings-enabled', // owner: @tiina303 SESSION_RESET_ON_LOAD: 'session-reset-on-load', // owner: @benjackwhite diff --git a/frontend/src/scenes/instance/SystemStatus/KafkaInspectorTab.tsx b/frontend/src/scenes/instance/SystemStatus/KafkaInspectorTab.tsx deleted file mode 100644 index 4c13d01457827..0000000000000 --- a/frontend/src/scenes/instance/SystemStatus/KafkaInspectorTab.tsx +++ /dev/null @@ -1,50 +0,0 @@ -import { LemonButton, LemonDivider } from '@posthog/lemon-ui' -import { useValues } from 'kea' -import { Field, Form } from 'kea-forms' -import { CodeSnippet, Language } from 'lib/components/CodeSnippet' -import { LemonInput } from 'lib/lemon-ui/LemonInput/LemonInput' - -import { kafkaInspectorLogic } from './kafkaInspectorLogic' - -export function KafkaInspectorTab(): JSX.Element { - const { kafkaMessage } = useValues(kafkaInspectorLogic) - - return ( -
-

Kafka Inspector

-
Debug Kafka messages using the inspector tool.
- -
-
-
-
-
- - - -
-
- - - -
-
- - - -
-
- - Fetch Message - -
-
-
-
-
- - {kafkaMessage ? JSON.stringify(kafkaMessage, null, 4) : '\n'} - -
- ) -} diff --git a/frontend/src/scenes/instance/SystemStatus/index.tsx b/frontend/src/scenes/instance/SystemStatus/index.tsx index a9bba3c522dbd..56e8302485fa8 100644 --- a/frontend/src/scenes/instance/SystemStatus/index.tsx +++ b/frontend/src/scenes/instance/SystemStatus/index.tsx @@ -4,11 +4,9 @@ import { IconInfo } from '@posthog/icons' import { LemonBanner, Link } from '@posthog/lemon-ui' import { useActions, useValues } from 'kea' import { PageHeader } from 'lib/components/PageHeader' -import { FEATURE_FLAGS } from 'lib/constants' import { LemonTab, LemonTabs } from 'lib/lemon-ui/LemonTabs' import { LemonTag } from 'lib/lemon-ui/LemonTag/LemonTag' import { Tooltip } from 'lib/lemon-ui/Tooltip' -import { featureFlagLogic } from 'lib/logic/featureFlagLogic' import { InternalMetricsTab } from 'scenes/instance/SystemStatus/InternalMetricsTab' import { OverviewTab } from 'scenes/instance/SystemStatus/OverviewTab' import { preflightLogic } from 'scenes/PreflightCheck/preflightLogic' @@ -16,7 +14,6 @@ import { SceneExport } from 'scenes/sceneTypes' import { userLogic } from 'scenes/userLogic' import { InstanceConfigTab } from './InstanceConfigTab' -import { KafkaInspectorTab } from './KafkaInspectorTab' import { StaffUsersTab } from './StaffUsersTab' import { InstanceStatusTabName, systemStatusLogic } from './systemStatusLogic' @@ -30,7 +27,6 @@ export function SystemStatus(): JSX.Element { const { setTab } = useActions(systemStatusLogic) const { preflight, siteUrlMisconfigured } = useValues(preflightLogic) const { user } = useValues(userLogic) - const { featureFlags } = useValues(featureFlagLogic) let tabs = [ { @@ -58,7 +54,7 @@ export function SystemStatus(): JSX.Element { label: ( <> Settings{' '} - + Beta @@ -71,21 +67,6 @@ export function SystemStatus(): JSX.Element { content: , }, ]) - - if (featureFlags[FEATURE_FLAGS.KAFKA_INSPECTOR]) { - tabs.push({ - key: 'kafka_inspector', - label: ( - <> - Kafka Inspector{' '} - - Beta - - - ), - content: , - }) - } } return ( diff --git a/frontend/src/scenes/instance/SystemStatus/kafkaInspectorLogic.ts b/frontend/src/scenes/instance/SystemStatus/kafkaInspectorLogic.ts deleted file mode 100644 index 3fcb9422d906c..0000000000000 --- a/frontend/src/scenes/instance/SystemStatus/kafkaInspectorLogic.ts +++ /dev/null @@ -1,39 +0,0 @@ -import { actions, kea, path } from 'kea' -import { forms } from 'kea-forms' -import { loaders } from 'kea-loaders' -import api from 'lib/api' - -import type { kafkaInspectorLogicType } from './kafkaInspectorLogicType' -export interface KafkaMessage { - topic: string - partition: number - offset: number - timestamp: number - key: string - value: Record | string -} - -export const kafkaInspectorLogic = kea([ - path(['scenes', 'instance', 'SystemStatus', 'kafkaInspectorLogic']), - actions({ - fetchKafkaMessage: (topic: string, partition: number, offset: number) => ({ topic, partition, offset }), - }), - loaders({ - kafkaMessage: [ - null as KafkaMessage | null, - { - fetchKafkaMessage: async ({ topic, partition, offset }) => { - return await api.create('api/kafka_inspector/fetch_message', { topic, partition, offset }) - }, - }, - ], - }), - forms(({ actions }) => ({ - fetchKafkaMessage: { - defaults: { topic: 'clickhouse_events_json', partition: 0, offset: 0 }, - submit: ({ topic, partition, offset }: { topic: string; partition: number; offset: number }) => { - actions.fetchKafkaMessage(topic, Number(partition), Number(offset)) - }, - }, - })), -]) diff --git a/frontend/src/scenes/instance/SystemStatus/systemStatusLogic.ts b/frontend/src/scenes/instance/SystemStatus/systemStatusLogic.ts index 5ee7e0b6e3c97..7bfaa40093c80 100644 --- a/frontend/src/scenes/instance/SystemStatus/systemStatusLogic.ts +++ b/frontend/src/scenes/instance/SystemStatus/systemStatusLogic.ts @@ -18,7 +18,7 @@ export enum ConfigMode { Saving = 'saving', } -export type InstanceStatusTabName = 'overview' | 'metrics' | 'settings' | 'staff_users' | 'kafka_inspector' +export type InstanceStatusTabName = 'overview' | 'metrics' | 'settings' | 'staff_users' /** * We allow the specific instance settings that can be edited via the /instance/status page. @@ -196,8 +196,7 @@ export const systemStatusLogic = kea([ })), urlToAction(({ actions, values }) => ({ '/instance(/:tab)': ({ tab }: { tab?: InstanceStatusTabName }) => { - const currentTab = - tab && ['metrics', 'settings', 'staff_users', 'kafka_inspector'].includes(tab) ? tab : 'overview' + const currentTab = tab && ['metrics', 'settings', 'staff_users'].includes(tab) ? tab : 'overview' if (currentTab !== values.tab) { actions.setTab(currentTab) } diff --git a/frontend/src/scenes/scenes.ts b/frontend/src/scenes/scenes.ts index 98990142b8d32..6a3dadf7d5c1e 100644 --- a/frontend/src/scenes/scenes.ts +++ b/frontend/src/scenes/scenes.ts @@ -591,7 +591,6 @@ export const routes: Record = { [urls.instanceStatus()]: Scene.SystemStatus, [urls.instanceSettings()]: Scene.SystemStatus, [urls.instanceStaffUsers()]: Scene.SystemStatus, - [urls.instanceKafkaInspector()]: Scene.SystemStatus, [urls.instanceMetrics()]: Scene.SystemStatus, [urls.asyncMigrations()]: Scene.AsyncMigrations, [urls.asyncMigrationsFuture()]: Scene.AsyncMigrations, diff --git a/frontend/src/scenes/urls.ts b/frontend/src/scenes/urls.ts index 477064de6497e..183766b86db2b 100644 --- a/frontend/src/scenes/urls.ts +++ b/frontend/src/scenes/urls.ts @@ -213,7 +213,6 @@ export const urls = { // Self-hosted only instanceStatus: (): string => '/instance/status', instanceStaffUsers: (): string => '/instance/staff_users', - instanceKafkaInspector: (): string => '/instance/kafka_inspector', instanceSettings: (): string => '/instance/settings', instanceMetrics: (): string => `/instance/metrics`, asyncMigrations: (): string => '/instance/async_migrations', diff --git a/posthog/api/__init__.py b/posthog/api/__init__.py index 72342189f2764..5ebd759df4e1f 100644 --- a/posthog/api/__init__.py +++ b/posthog/api/__init__.py @@ -40,7 +40,6 @@ instance_settings, instance_status, integration, - kafka_inspector, notebook, organization, organization_domain, @@ -378,7 +377,6 @@ def register_grandfathered_environment_nested_viewset( router.register(r"dead_letter_queue", dead_letter_queue.DeadLetterQueueViewSet, "dead_letter_queue") router.register(r"async_migrations", async_migration.AsyncMigrationsViewset, "async_migrations") router.register(r"instance_settings", instance_settings.InstanceSettingsViewset, "instance_settings") -router.register(r"kafka_inspector", kafka_inspector.KafkaInspectorViewSet, "kafka_inspector") router.register("debug_ch_queries/", debug_ch_queries.DebugCHQueries, "debug_ch_queries") from posthog.api.action import ActionViewSet # noqa: E402 diff --git a/posthog/api/kafka_inspector.py b/posthog/api/kafka_inspector.py deleted file mode 100644 index e966c3e374394..0000000000000 --- a/posthog/api/kafka_inspector.py +++ /dev/null @@ -1,89 +0,0 @@ -from typing import Union - -from kafka import TopicPartition -from rest_framework import serializers, viewsets -from posthog.api.utils import action -from rest_framework.response import Response - -from posthog.kafka_client.client import build_kafka_consumer -from posthog.permissions import IsStaffUser - -KAFKA_CONSUMER_TIMEOUT = 1000 - - -# the kafka package doesn't expose ConsumerRecord -class KafkaConsumerRecord: - topic: str - partition: int - offset: int - timestamp: int - key: str - value: Union[dict, str] - - def __init__(self, topic, partition, offset, timestamp, key, value): - self.topic = topic - self.partition = partition - self.offset = offset - self.value = value - self.timestamp = timestamp - self.key = key - - -class KafkaMessageSerializer(serializers.Serializer): - topic = serializers.CharField(read_only=True) - partition = serializers.IntegerField(read_only=True) - offset = serializers.IntegerField(read_only=True) - timestamp = serializers.IntegerField(read_only=True) - key = serializers.CharField(read_only=True) - value = serializers.JSONField(read_only=True) - - -class KafkaInspectorViewSet(viewsets.ViewSet): - permission_classes = [IsStaffUser] - - @action(methods=["POST"], detail=False) - def fetch_message(self, request): - topic = request.data.get("topic", None) - partition = request.data.get("partition", None) - offset = request.data.get("offset", None) - - if not isinstance(topic, str): - return Response({"error": "Invalid topic."}, status=400) - - if not isinstance(partition, int): - return Response({"error": "Invalid partition."}, status=400) - - if not isinstance(offset, int): - return Response({"error": "Invalid offset."}, status=400) - - try: - message = get_kafka_message(topic, partition, offset) - serializer = KafkaMessageSerializer(message, context={"request": request}) - return Response(serializer.data) - except AssertionError: - return Response({"error": "Invalid partition/offset pair."}, status=400) - except StopIteration: - return Response( - { - "error": f"Error reading message, most likely the consumer timed out after {KAFKA_CONSUMER_TIMEOUT}ms." - }, - status=400, - ) - except Exception as e: - return Response({"error": e.__str__()}, status=500) - - -def get_kafka_message(topic: str, partition: int, offset: int) -> KafkaConsumerRecord: - consumer = build_kafka_consumer( - topic=None, - auto_offset_reset="earliest", - group_id="kafka-inspector", - consumer_timeout_ms=KAFKA_CONSUMER_TIMEOUT, - ) - - consumer.assign([TopicPartition(topic, partition)]) - consumer.seek(partition=TopicPartition(topic, partition), offset=offset) - - message = next(consumer) - - return message diff --git a/posthog/api/test/test_kafka_inspector.py b/posthog/api/test/test_kafka_inspector.py deleted file mode 100644 index b9a02d0464e14..0000000000000 --- a/posthog/api/test/test_kafka_inspector.py +++ /dev/null @@ -1,62 +0,0 @@ -import json -from typing import Union -from unittest.mock import patch - -from rest_framework import status - -from posthog.api.kafka_inspector import KafkaConsumerRecord -from posthog.test.base import APIBaseTest - - -class TestKafkaInspector(APIBaseTest): - def setUp(self): - super().setUp() - self.user.is_staff = True - self.user.save() - - def _to_json(self, data: Union[dict, list]) -> str: - return json.dumps(data) - - @patch( - "posthog.api.kafka_inspector.get_kafka_message", - side_effect=lambda _, __, ___: KafkaConsumerRecord("foo", 0, 0, 1650375470233, "k", "v"), - ) - def test_fetch_message(self, _): - response = self.client.post( - "/api/kafka_inspector/fetch_message", - data={"topic": "foo", "partition": 1, "offset": 0}, - ) - self.assertEqual(response.status_code, status.HTTP_200_OK) - self.assertEqual( - response.json(), - { - "key": "k", - "offset": 0, - "partition": 0, - "timestamp": 1650375470233, - "topic": "foo", - "value": "v", - }, - ) - - def test_fetch_message_invalid_params(self): - response = self.client.post( - "/api/kafka_inspector/fetch_message", - data={"topic": "foo", "partition": "1", "offset": 0}, - ) - self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) - self.assertEqual(response.json(), {"error": "Invalid partition."}) - - response = self.client.post( - "/api/kafka_inspector/fetch_message", - data={"topic": 42, "partition": 1, "offset": 0}, - ) - self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) - self.assertEqual(response.json(), {"error": "Invalid topic."}) - - response = self.client.post( - "/api/kafka_inspector/fetch_message", - data={"topic": "foo", "partition": 1, "offset": "0"}, - ) - self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) - self.assertEqual(response.json(), {"error": "Invalid offset."})