Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: spike console logs #17589

Closed
wants to merge 23 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
1057c4e
feat: Record S3 BatchExport errors
tomasfarias Sep 19, 2023
07c5cf2
feat: Display batch export run latest error
tomasfarias Sep 19, 2023
438cc1c
refactor: Abstract some S3 batch export test setup
tomasfarias Sep 19, 2023
8bfd7db
feat: Handle BigQuery BatchExport errors
tomasfarias Sep 19, 2023
b9e9567
feat: Handle Postgres export errors
tomasfarias Sep 19, 2023
d1f709f
feat: Handle Snowflake export errors
tomasfarias Sep 19, 2023
bd0561d
fix: Revert frontend changes to show single error
tomasfarias Sep 20, 2023
5964c28
feat: Implement kafka logging adapter for batch exports
tomasfarias Sep 21, 2023
fa7467e
feat: ClickHouse migration for new Kafka table
tomasfarias Sep 21, 2023
8685421
feat: S3 produce logs to Kafka
tomasfarias Sep 21, 2023
32ee89b
feat: Postgres produce logs to Kafka
tomasfarias Sep 21, 2023
f460789
feat: BigQuery produce logs to Kafka
tomasfarias Sep 21, 2023
4974746
feat: Snowflake produce logs to Kafka
tomasfarias Sep 21, 2023
3512789
fix: Import paths in test files
tomasfarias Sep 21, 2023
a0c19af
fix: Appease the typing gods
tomasfarias Sep 21, 2023
4e618df
fix: Import paths and typing
tomasfarias Sep 21, 2023
7482ce1
refactor: Use non-blocking logger for BatchExports
tomasfarias Sep 21, 2023
17e3b59
refactor: Make log entries table generic
tomasfarias Sep 22, 2023
03bb0bd
feat: Add BatchExport logs API
tomasfarias Sep 22, 2023
1c1e22e
fix: Use new generic fields when logging from batch exports
tomasfarias Sep 22, 2023
e953b19
Update query snapshots
github-actions[bot] Sep 22, 2023
5c11ba6
the spikiest of spikes
pauldambra Sep 22, 2023
f6d5495
and search
pauldambra Sep 22, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import {
} from '~/types'
import { DateFilter } from 'lib/components/DateFilter/DateFilter'
import { DurationFilter } from './DurationFilter'
import { LemonButtonWithDropdown, LemonCheckbox } from '@posthog/lemon-ui'
import { LemonButtonWithDropdown, LemonCheckbox, LemonInput } from '@posthog/lemon-ui'
import { TestAccountFilter } from 'scenes/insights/filters/TestAccountFilter'
import { teamLogic } from 'scenes/teamLogic'
import { useValues } from 'kea'
Expand Down Expand Up @@ -131,6 +131,17 @@ export const AdvancedSessionRecordingsFilters = ({
<LemonLabel info="Show recordings that have captured console log messages">
Filter by console logs
</LemonLabel>
<LemonInput
size={'small'}
fullWidth={true}
placeholder={'containing text'}
value={filters.console_search_query}
onChange={(s) => {
setFilters({
console_search_query: s,
})
}}
/>
<ConsoleFilters
filters={filters}
setConsoleFilters={(x) =>
Expand Down
1 change: 1 addition & 0 deletions frontend/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,7 @@ export interface RecordingFilters {
properties?: AnyPropertyFilter[]
session_recording_duration?: RecordingDurationFilter
duration_type_filter?: DurationType
console_search_query?: string
console_logs?: FilterableLogLevel[]
filter_test_accounts?: boolean
}
Expand Down
3 changes: 3 additions & 0 deletions plugin-server/src/config/kafka-topics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,6 @@ export const KAFKA_CLICKHOUSE_SESSION_RECORDING_EVENTS = `${prefix}clickhouse_se
export const KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS = `${prefix}clickhouse_session_replay_events${suffix}`
// write performance events to ClickHouse
export const KAFKA_PERFORMANCE_EVENTS = `${prefix}clickhouse_performance_events${suffix}`

// log entries for ingestion into clickhouse
export const KAFKA_LOG_ENTRIES = `${prefix}log_entries${suffix}`
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { DateTime } from 'luxon'
import { HighLevelProducer as RdKafkaProducer, NumberNullUndefined } from 'node-rdkafka-acosom'
import { Counter } from 'prom-client'

import { KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS } from '../../../../config/kafka-topics'
import { KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS, KAFKA_LOG_ENTRIES } from '../../../../config/kafka-topics'
import { createRdConnectionConfigFromEnvVars } from '../../../../kafka/config'
import { findOffsetsToCommit } from '../../../../kafka/consumer'
import { retryOnDependencyUnavailableError } from '../../../../kafka/error-handling'
Expand Down Expand Up @@ -116,7 +116,7 @@ export class ReplayEventsIngester {
}

try {
const replayRecord = createSessionReplayEvent(
const [replayRecord, consoleLogEntries] = createSessionReplayEvent(
randomUUID(),
event.team_id,
event.distinct_id,
Expand Down Expand Up @@ -160,13 +160,25 @@ export class ReplayEventsIngester {

replayEventsCounter.inc()

const s1 = JSON.stringify(replayRecord)
const consoleLogProduces = consoleLogEntries.map((consoleLogEntry) => {
const s = JSON.stringify(consoleLogEntry)
status.info('🔁', '[replay-events] console_log', { s, s1 })
return produce({
producer: this.producer,
topic: KAFKA_LOG_ENTRIES,
value: Buffer.from(s),
key: event.session_id,
})
})
return [
produce({
producer: this.producer,
topic: KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS,
value: Buffer.from(JSON.stringify(replayRecord)),
value: Buffer.from(s1),
key: event.session_id,
}),
...consoleLogProduces,
]
} catch (error) {
status.error('⚠️', '[replay-events] processing_error', {
Expand Down
43 changes: 41 additions & 2 deletions plugin-server/src/worker/ingestion/process-event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { DateTime } from 'luxon'

import { activeMilliseconds } from '../../main/ingestion-queues/session-recording/snapshot-segmenter'
import {
ClickHouseTimestamp,
Element,
GroupTypeIndex,
Hub,
Expand Down Expand Up @@ -286,13 +287,39 @@ export interface SummarizedSessionRecordingEvent {
message_count: number
}

type ConsoleLogEntry = {
team_id: number
message: string
log_level: 'info' | 'warn' | 'error'
log_source: 'session_replay'
// the session_id
log_source_id: string
// maybe we omit this?
instance_id: string
timestamp: ClickHouseTimestamp
}

function safeString(payload: string[]) {
let candidate = payload.join(' ')

if (candidate.startsWith('"') || candidate.startsWith("'")) {
candidate = candidate.substring(1)
}

if (candidate.endsWith('"') || candidate.endsWith("'")) {
candidate = candidate.substring(0, candidate.length - 1)
}

return candidate
}

export const createSessionReplayEvent = (
uuid: string,
team_id: number,
distinct_id: string,
session_id: string,
events: RRWebEvent[]
) => {
): [SummarizedSessionRecordingEvent, ConsoleLogEntry[]] => {
const timestamps = events
.filter((e) => !!e?.timestamp)
.map((e) => castTimestampOrNow(DateTime.fromMillis(e.timestamp), TimestampFormat.ClickHouse))
Expand All @@ -315,6 +342,8 @@ export const createSessionReplayEvent = (
let consoleWarnCount = 0
let consoleErrorCount = 0
let url: string | null = null
const consoleLogEntries: ConsoleLogEntry[] = []

events.forEach((event) => {
if (event.type === 3) {
mouseActivity += 1
Expand All @@ -337,6 +366,16 @@ export const createSessionReplayEvent = (
} else if (level === 'error') {
consoleErrorCount += 1
}
consoleLogEntries.push({
team_id,
// when is it not a single item array?
message: safeString(event.data.payload?.payload),
log_level: level,
log_source: 'session_replay',
log_source_id: session_id,
instance_id: event.data.instanceId,
timestamp: castTimestampOrNow(DateTime.fromMillis(event.timestamp), TimestampFormat.ClickHouse),
})
}
})

Expand Down Expand Up @@ -364,7 +403,7 @@ export const createSessionReplayEvent = (
message_count: 1,
}

return data
return [data, consoleLogEntries]
}

export function createPerformanceEvent(uuid: string, team_id: number, distinct_id: string, properties: Properties) {
Expand Down
20 changes: 18 additions & 2 deletions posthog/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from posthog.batch_exports import http as batch_exports
from posthog.settings import EE_AVAILABLE
from posthog.warehouse.api import saved_query, table, view_link

from ..session_recordings.session_recording_api import SessionRecordingViewSet
from . import (
activity_log,
annotation,
Expand Down Expand Up @@ -40,7 +42,6 @@
)
from .dashboards import dashboard, dashboard_templates
from .data_management import DataManagementViewSet
from ..session_recordings.session_recording_api import SessionRecordingViewSet


@decorators.api_view(["GET", "HEAD", "POST", "PUT", "PATCH", "DELETE"])
Expand Down Expand Up @@ -132,7 +133,22 @@ def api_not_found(request):
batch_exports_router = projects_router.register(
r"batch_exports", batch_exports.BatchExportViewSet, "batch_exports", ["team_id"]
)
batch_exports_router.register(r"runs", batch_exports.BatchExportRunViewSet, "runs", ["team_id", "batch_export_id"])
batch_export_runs_router = batch_exports_router.register(
r"runs", batch_exports.BatchExportRunViewSet, "runs", ["team_id", "batch_export_id"]
)
batch_exports_router.register(
r"logs",
batch_exports.BatchExportLogViewSet,
"batch_export_run_logs",
["team_id", "batch_export_id"],
)

batch_export_runs_router.register(
r"logs",
batch_exports.BatchExportLogViewSet,
"batch_export_logs",
["team_id", "batch_export_id", "run_id"],
)

projects_router.register(r"warehouse_tables", table.TableViewSet, "project_warehouse_tables", ["team_id"])
projects_router.register(
Expand Down
8 changes: 8 additions & 0 deletions posthog/api/test/batch_exports/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,11 @@ def patch_batch_export(client, team_id, batch_export_id, new_batch_export_data):
new_batch_export_data,
content_type="application/json",
)


def get_batch_export_log_entries(client: TestClient, team_id: int, batch_export_id: str):
return client.get(f"/api/projects/{team_id}/batch_exports/{batch_export_id}/logs")


def get_batch_export_run_log_entries(client: TestClient, team_id: int, batch_export_id: str, run_id):
return client.get(f"/api/projects/{team_id}/batch_exports/{batch_export_id}/runs/{run_id}/logs")
Loading