Skip to content

Commit

Permalink
Fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
benjackwhite committed Feb 21, 2024
1 parent 89dbff9 commit 212d4ed
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 27 deletions.
2 changes: 2 additions & 0 deletions docker-compose.hobby.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ services:
SENTRY_DSN: $SENTRY_DSN
SITE_URL: https://$DOMAIN
SECRET_KEY: $POSTHOG_SECRET
RECORDINGS_INGESTER_URL: http://plugins:6738

plugins:
extends:
file: docker-compose.base.yml
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import path from 'path'
import { Counter, Histogram } from 'prom-client'
import { PassThrough, Transform } from 'stream'
import { pipeline } from 'stream/promises'
import * as zlib from 'zlib'

import { PluginsServerConfig } from '../../../../types'
import { status } from '../../../../utils/status'
Expand All @@ -21,6 +22,9 @@ const S3_UPLOAD_WARN_TIME_SECONDS = 2 * 60 * 1000
// NOTE: To remove once released
const metricPrefix = 'v3_'

export const BUFFER_FILE_NAME = 'buffer.jsonl.gz'
export const FLUSH_FILE_EXTENSION = '.flush.jsonl.gz'

const counterS3FilesWritten = new Counter({
name: metricPrefix + 'recording_s3_files_written',
help: 'A single file flushed to S3',
Expand Down Expand Up @@ -130,7 +134,7 @@ export class SessionManagerV3 {
)
manager.buffer = {
context: bufferMetadata,
fileStream: manager.createFileStreamFor(path.join(context.dir, 'buffer.jsonl')),
fileStream: manager.createFileStreamFor(path.join(context.dir, BUFFER_FILE_NAME)),
}
} catch (error) {
// Indicates no buffer metadata file or it's corrupted
Expand All @@ -156,7 +160,7 @@ export class SessionManagerV3 {
}

private async getFlushFiles(): Promise<string[]> {
return (await readdir(this.context.dir)).filter((file) => file.endsWith('.flush.jsonl'))
return (await readdir(this.context.dir)).filter((file) => file.endsWith(FLUSH_FILE_EXTENSION))
}

private captureException(error: Error, extra: Record<string, any> = {}): void {
Expand Down Expand Up @@ -281,24 +285,23 @@ export class SessionManagerV3 {
// ADD FLUSH METRICS HERE

const { firstTimestamp, lastTimestamp } = buffer.context.eventsRange
const fileName = `${firstTimestamp}-${lastTimestamp}.flush.jsonl`
const fileName = `${firstTimestamp}-${lastTimestamp}${FLUSH_FILE_EXTENSION}`

counterS3FilesWritten.labels(reason).inc(1)
histogramS3LinesWritten.observe(buffer.context.count)
histogramS3KbWritten.observe(buffer.context.sizeEstimate / 1024)

// NOTE: We simplify everything by keeping the files as the same name for S3
await new Promise((resolve) => buffer.fileStream.end(resolve))
await rename(this.file('buffer.jsonl'), this.file(fileName))
await rename(this.file(BUFFER_FILE_NAME), this.file(fileName))
this.buffer = undefined

await this.save()
}

private async flushFiles(): Promise<void> {
// We read all files marked for flushing and write them to S3
const filesToFlush = (await readdir(this.context.dir)).filter((file) => file.endsWith('.flush.jsonl'))
console.log('filesToFlush', filesToFlush)
const filesToFlush = await this.getFlushFiles()
await Promise.all(filesToFlush.map((file) => this.flushFile(file)))
}

Expand Down Expand Up @@ -329,7 +332,7 @@ export class SessionManagerV3 {
const endFlushTimer = histogramFlushTimeSeconds.startTimer()

try {
const targetFileName = filename.replace('.flush.jsonl', '.jsonl')
const targetFileName = filename.replace(FLUSH_FILE_EXTENSION, '.jsonl.gz')
const baseKey = `${this.serverConfig.SESSION_RECORDING_REMOTE_FOLDER}/team_id/${this.context.teamId}/session_id/${this.context.sessionId}`
const dataKey = `${baseKey}/data/${targetFileName}`
readStream = createReadStream(file)
Expand All @@ -349,8 +352,7 @@ export class SessionManagerV3 {
params: {
Bucket: this.serverConfig.OBJECT_STORAGE_BUCKET,
Key: dataKey,
// TODO: Add back in gzip encoding
// ContentEncoding: 'gzip',
ContentEncoding: 'gzip',
ContentType: 'application/json',
Body: readStream,
},
Expand Down Expand Up @@ -403,7 +405,7 @@ export class SessionManagerV3 {
}
const buffer: SessionBuffer = {
context,
fileStream: this.createFileStreamFor(this.file('buffer.jsonl')),
fileStream: this.createFileStreamFor(this.file(BUFFER_FILE_NAME)),
}

this.buffer = buffer
Expand All @@ -422,6 +424,7 @@ export class SessionManagerV3 {
// The uncompressed file which we need for realtime playback
pipeline(
writeStream,
zlib.createGzip(),
createWriteStream(file, {
// Opens in append mode in case it already exists
flags: 'a',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import { expressApp } from '../../services/http-server'
import { ObjectStorage } from '../../services/object_storage'
import { runInstrumentedFunction } from '../../utils'
import { addSentryBreadcrumbsEventListeners } from '../kafka-metrics'
import { BUCKETS_KB_WRITTEN, SessionManagerV3 } from './services/session-manager-v3'
import { BUCKETS_KB_WRITTEN, BUFFER_FILE_NAME, SessionManagerV3 } from './services/session-manager-v3'
import { IncomingRecordingMessage } from './types'
import { parseKafkaMessage } from './utils'

Expand Down Expand Up @@ -382,22 +382,23 @@ export class SessionRecordingIngesterV3 {
return []
})

console.log(`Found on disk: ${keys} for partition ${partition}`, Object.keys(this.sessions))

// TODO: Below regex is a little crude. We should fix it
await Promise.all(
keys.map(async (key) => {
// TODO: Ensure sessionId can only be a uuid
const [teamId, sessionId] = key.split('__')

if (!this.sessions[key]) {
this.sessions[key] = await SessionManagerV3.create(this.config, this.objectStorage.s3, {
teamId: parseInt(teamId),
sessionId,
dir: this.dirForSession(partition, parseInt(teamId), sessionId),
partition,
})
}
})
keys
.filter((x) => /\d+__[a-zA-Z0-9\-]+/.test(x))
.map(async (key) => {
// TODO: Ensure sessionId can only be a uuid
const [teamId, sessionId] = key.split('__')

if (!this.sessions[key]) {
this.sessions[key] = await SessionManagerV3.create(this.config, this.objectStorage.s3, {
teamId: parseInt(teamId),
sessionId,
dir: this.dirForSession(partition, parseInt(teamId), sessionId),
partition,
})
}
})
)
})
)
Expand Down Expand Up @@ -438,7 +439,7 @@ export class SessionRecordingIngesterV3 {
continue
}

const fileStream = createReadStream(`${sessionDir}/buffer.jsonl`)
const fileStream = createReadStream(path.join(sessionDir, BUFFER_FILE_NAME))
fileStream.pipe(res)
return
}
Expand Down
1 change: 1 addition & 0 deletions posthog/session_recordings/session_recording_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@ def snapshots(self, request: request.Request, **kwargs):
response_data["sources"] = sources

elif source == "realtime":
# TODO: Swap to using the new API if supported
snapshots = get_realtime_snapshots(team_id=self.team.pk, session_id=str(recording.session_id)) or []

event_properties["source"] = "realtime"
Expand Down
2 changes: 2 additions & 0 deletions posthog/settings/session_replay.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,5 @@
)

REPLAY_EMBEDDINGS_ALLOWED_TEAMS: List[str] = get_list(get_from_env("REPLAY_EMBEDDINGS_ALLOWED_TEAM", "", type_cast=str))

RECORDINGS_INGESTER_URL = get_from_env("RECORDINGS_INGESTER_URL", "http://localhost:6738")

0 comments on commit 212d4ed

Please sign in to comment.