Skip to content

Commit

Permalink
feat: searchable console logs (#17739)
Browse files Browse the repository at this point in the history
* feat: searchable console logs

* with feature flag wrapping search box

* fix

* fix

* fix the query

* Update frontend/src/lib/constants.tsx

Co-authored-by: David Newell <[email protected]>

* fix

* fix

* fix

* add an explanation of why we don't add an instance id

---------

Co-authored-by: David Newell <[email protected]>
  • Loading branch information
pauldambra and daibhin authored Oct 4, 2023
1 parent 14d0d78 commit 7ed8939
Show file tree
Hide file tree
Showing 18 changed files with 1,214 additions and 1,378 deletions.
1 change: 1 addition & 0 deletions frontend/src/lib/constants.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ export const FEATURE_FLAGS = {
WEBHOOKS_DENYLIST: 'webhooks-denylist', // owner: #team-pipeline
SURVEYS_SITE_APP_DEPRECATION: 'surveys-site-app-deprecation', // owner: @neilkakkar
SURVEYS_MULTIPLE_QUESTIONS: 'surveys-multiple-questions', // owner: @liyiy
CONSOLE_RECORDING_SEARCH: 'console-recording-search', // owner: #team-monitoring
} as const
export type FeatureFlagKey = (typeof FEATURE_FLAGS)[keyof typeof FEATURE_FLAGS]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ import {
} from '~/types'
import { DateFilter } from 'lib/components/DateFilter/DateFilter'
import { DurationFilter } from './DurationFilter'
import { LemonButtonWithDropdown, LemonCheckbox } from '@posthog/lemon-ui'
import { LemonButtonWithDropdown, LemonCheckbox, LemonInput } from '@posthog/lemon-ui'
import { TestAccountFilter } from 'scenes/insights/filters/TestAccountFilter'
import { teamLogic } from 'scenes/teamLogic'
import { useValues } from 'kea'
import { FEATURE_FLAGS } from 'lib/constants'
import { FlaggedFeature } from 'lib/components/FlaggedFeature'

export const AdvancedSessionRecordingsFilters = ({
filters,
Expand All @@ -33,6 +35,7 @@ export const AdvancedSessionRecordingsFilters = ({
showPropertyFilters?: boolean
}): JSX.Element => {
const { currentTeam } = useValues(teamLogic)

const hasGroupFilters = (currentTeam?.test_account_filters || [])
.map((x) => x.type)
.includes(PropertyFilterType.Group)
Expand Down Expand Up @@ -131,6 +134,19 @@ export const AdvancedSessionRecordingsFilters = ({
<LemonLabel info="Show recordings that have captured console log messages">
Filter by console logs
</LemonLabel>
<FlaggedFeature flag={FEATURE_FLAGS.CONSOLE_RECORDING_SEARCH}>
<LemonInput
size={'small'}
fullWidth={true}
placeholder={'containing text'}
value={filters.console_search_query}
onChange={(s) => {
setFilters({
console_search_query: s,
})
}}
/>
</FlaggedFeature>
<ConsoleFilters
filters={filters}
setConsoleFilters={(x) =>
Expand Down
1 change: 1 addition & 0 deletions frontend/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -750,6 +750,7 @@ export interface RecordingFilters {
properties?: AnyPropertyFilter[]
session_recording_duration?: RecordingDurationFilter
duration_type_filter?: DurationType
console_search_query?: string
console_logs?: FilterableLogLevel[]
filter_test_accounts?: boolean
}
Expand Down
2 changes: 2 additions & 0 deletions plugin-server/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ export function getDefaultConfig(): PluginsServerConfig {
SESSION_RECORDING_PARALLEL_CONSUMPTION: false,
POSTHOG_SESSION_RECORDING_REDIS_HOST: undefined,
POSTHOG_SESSION_RECORDING_REDIS_PORT: undefined,
// by default a very restricted list of teams, so we can slowly roll out
MAX_TEAM_TO_ALLOW_SESSION_RECORDING_CONSOLE_LOGS_INGESTION: 2,
}
}

Expand Down
3 changes: 3 additions & 0 deletions plugin-server/src/config/kafka-topics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,6 @@ export const KAFKA_CLICKHOUSE_SESSION_RECORDING_EVENTS = `${prefix}clickhouse_se
export const KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS = `${prefix}clickhouse_session_replay_events${suffix}`
// write performance events to ClickHouse
export const KAFKA_PERFORMANCE_EVENTS = `${prefix}clickhouse_performance_events${suffix}`

// log entries for ingestion into clickhouse
export const KAFKA_LOG_ENTRIES = `${prefix}log_entries${suffix}`
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
import { captureException } from '@sentry/node'
import { HighLevelProducer as RdKafkaProducer, NumberNullUndefined } from 'node-rdkafka-acosom'
import { Counter } from 'prom-client'

import { KAFKA_LOG_ENTRIES } from '../../../../config/kafka-topics'
import { createRdConnectionConfigFromEnvVars } from '../../../../kafka/config'
import { findOffsetsToCommit } from '../../../../kafka/consumer'
import { retryOnDependencyUnavailableError } from '../../../../kafka/error-handling'
import { createKafkaProducer, disconnectProducer, flushProducer, produce } from '../../../../kafka/producer'
import { PluginsServerConfig } from '../../../../types'
import { status } from '../../../../utils/status'
import { gatherConsoleLogEvents } from '../../../../worker/ingestion/process-event'
import { eventDroppedCounter } from '../../metrics'
import { IncomingRecordingMessage } from '../types'
import { OffsetHighWaterMarker } from './offset-high-water-marker'

const HIGH_WATERMARK_KEY = 'session_replay_console_logs_events_ingester'

const consoleLogEventsCounter = new Counter({
name: 'console_log_events_ingested',
help: 'Number of console log events successfully ingested',
})

// TODO this is an almost exact duplicate of the replay events ingester
// am going to leave this duplication and then collapse it when/if we add a performance events ingester
export class ConsoleLogsIngester {
producer?: RdKafkaProducer
max_allowed_team: number
constructor(
private readonly serverConfig: PluginsServerConfig,
private readonly persistentHighWaterMarker: OffsetHighWaterMarker
) {
this.max_allowed_team = serverConfig.MAX_TEAM_TO_ALLOW_SESSION_RECORDING_CONSOLE_LOGS_INGESTION
}

public async consumeBatch(messages: IncomingRecordingMessage[]) {
const pendingProduceRequests: Promise<NumberNullUndefined>[] = []

for (const message of messages) {
const results = await retryOnDependencyUnavailableError(() => this.consume(message))
if (results) {
pendingProduceRequests.push(...results)
}
}

// On each loop, we flush the producer to ensure that all messages
// are sent to Kafka.
try {
await flushProducer(this.producer!)
} catch (error) {
// Rather than handling errors from flush, we instead handle
// errors per produce request, which gives us a little more
// flexibility in terms of deciding if it is a terminal
// error or not.
}

// We wait on all the produce requests to complete. After the
// flush they should all have been resolved/rejected already. If
// we get an intermittent error, such as a Kafka broker being
// unavailable, we will throw. We are relying on the Producer
// already having handled retries internally.
for (const produceRequest of pendingProduceRequests) {
try {
await produceRequest
} catch (error) {
status.error('🔁', '[console-log-events-ingester] main_loop_error', { error })

if (error?.isRetriable) {
// We assume the if the error is retriable, then we
// are probably in a state where e.g. Kafka is down
// temporarily and we would rather simply throw and
// have the process restarted.
throw error
}
}
}

const topicPartitionOffsets = findOffsetsToCommit(messages.map((message) => message.metadata))
await Promise.all(
topicPartitionOffsets.map((tpo) => this.persistentHighWaterMarker.add(tpo, HIGH_WATERMARK_KEY, tpo.offset))
)
}

public async consume(event: IncomingRecordingMessage): Promise<Promise<number | null | undefined>[] | void> {
const warn = (text: string, labels: Record<string, any> = {}) =>
status.warn('⚠️', `[console-log-events-ingester] ${text}`, {
offset: event.metadata.offset,
partition: event.metadata.partition,
...labels,
})

const drop = (reason: string, labels: Record<string, any> = {}) => {
eventDroppedCounter
.labels({
event_type: 'session_recordings_console_log_events',
drop_cause: reason,
})
.inc()

warn(reason, {
reason,
...labels,
})
}

// capture the producer so that TypeScript knows it's not null below this check
const producer = this.producer
if (!producer) {
return drop('producer_not_ready')
}

if (
await this.persistentHighWaterMarker.isBelowHighWaterMark(
event.metadata,
HIGH_WATERMARK_KEY,
event.metadata.offset
)
) {
return drop('high_water_mark')
}

if (event.team_id > this.max_allowed_team) {
return drop('team_above_max_allowed')
}

try {
const consoleLogEvents = gatherConsoleLogEvents(event.team_id, event.session_id, event.events)

consoleLogEventsCounter.inc(consoleLogEvents.length)

return consoleLogEvents.map((cle) =>
produce({
producer,
topic: KAFKA_LOG_ENTRIES,
value: Buffer.from(JSON.stringify(cle)),
key: event.session_id,
})
)
} catch (error) {
status.error('⚠️', '[console-log-events-ingester] processing_error', {
error: error,
})
captureException(error, {
tags: { source: 'console-log-events-ingester', team_id: event.team_id, session_id: event.session_id },
})
}
}
public async start(): Promise<void> {
const connectionConfig = createRdConnectionConfigFromEnvVars(this.serverConfig)
this.producer = await createKafkaProducer(connectionConfig)
this.producer.connect()
}

public async stop(): Promise<void> {
status.info('🔁', '[console-log-events-ingester] stopping')

if (this.producer && this.producer.isConnected()) {
status.info('🔁', '[console-log-events-ingester] disconnecting kafka producer in batchConsumer stop')
await disconnectProducer(this.producer)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import { fetchTeamTokensWithRecordings } from '../../../worker/ingestion/team-ma
import { ObjectStorage } from '../../services/object_storage'
import { addSentryBreadcrumbsEventListeners } from '../kafka-metrics'
import { eventDroppedCounter } from '../metrics'
import { ConsoleLogsIngester } from './services/console-logs-ingester'
import { OffsetHighWaterMarker } from './services/offset-high-water-marker'
import { PartitionLocker } from './services/partition-locker'
import { RealtimeManager } from './services/realtime-manager'
Expand Down Expand Up @@ -101,6 +102,7 @@ export class SessionRecordingIngesterV2 {
persistentHighWaterMarker: OffsetHighWaterMarker
realtimeManager: RealtimeManager
replayEventsIngester: ReplayEventsIngester
consoleLogsIngester: ConsoleLogsIngester
partitionLocker: PartitionLocker
batchConsumer?: BatchConsumer
partitionAssignments: Record<number, PartitionMetrics> = {}
Expand Down Expand Up @@ -137,6 +139,7 @@ export class SessionRecordingIngesterV2 {

// NOTE: This is the only place where we need to use the shared server config
this.replayEventsIngester = new ReplayEventsIngester(globalServerConfig, this.persistentHighWaterMarker)
this.consoleLogsIngester = new ConsoleLogsIngester(globalServerConfig, this.persistentHighWaterMarker)

this.teamsRefresher = new BackgroundRefresher(async () => {
try {
Expand Down Expand Up @@ -407,6 +410,13 @@ export class SessionRecordingIngesterV2 {
await this.replayEventsIngester.consumeBatch(recordingMessages)
},
})

await runInstrumentedFunction({
statsKey: `recordingingester.handleEachBatch.consumeConsoleLogEvents`,
func: async () => {
await this.consoleLogsIngester.consumeBatch(recordingMessages)
},
})
},
})
}
Expand Down Expand Up @@ -435,6 +445,7 @@ export class SessionRecordingIngesterV2 {
// Load teams into memory
await this.teamsRefresher.refresh()
await this.replayEventsIngester.start()
await this.consoleLogsIngester.start()

if (this.config.SESSION_RECORDING_PARTITION_REVOKE_OPTIMIZATION) {
this.partitionLockInterval = setInterval(async () => {
Expand Down Expand Up @@ -520,6 +531,7 @@ export class SessionRecordingIngesterV2 {
void this.scheduleWork(this.onRevokePartitions(this.assignedTopicPartitions))
void this.scheduleWork(this.realtimeManager.unsubscribe())
void this.scheduleWork(this.replayEventsIngester.stop())
void this.scheduleWork(this.consoleLogsIngester.stop())

const promiseResults = await Promise.allSettled(this.promises)

Expand Down
2 changes: 2 additions & 0 deletions plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,8 @@ export interface PluginsServerConfig {
SESSION_RECORDING_REDIS_PREFIX: string
SESSION_RECORDING_PARTITION_REVOKE_OPTIMIZATION: boolean
SESSION_RECORDING_PARALLEL_CONSUMPTION: boolean
// team ids greater than this won't ingest console logs separately, allows controlled rollout by team
MAX_TEAM_TO_ALLOW_SESSION_RECORDING_CONSOLE_LOGS_INGESTION: number

// Dedicated infra values
SESSION_RECORDING_KAFKA_HOSTS: string | undefined
Expand Down
Loading

0 comments on commit 7ed8939

Please sign in to comment.