Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: scaffold session-recording-v2 #27498

Merged
merged 29 commits into from
Jan 20, 2025
Merged

refactor: scaffold session-recording-v2 #27498

merged 29 commits into from
Jan 20, 2025

Conversation

pl
Copy link
Contributor

@pl pl commented Jan 14, 2025

Problem

We are working on a major refactoring of the session recording pipeline to improve its reliability and performance. Scaffolding a new session recording service is the first step in this process.

Changes

Creates a new session-recording-v2 module with a blackhole ingestion service based on the current session-recording implementation.

Does this work well for both Cloud and self-hosted?

Works for both cloud and self-hosted.

How did you test this code?

  • Implemented a bunch of unit tests for the new components
  • Ran it locally
  • Will test on dev and production later, as it needs a new deployment

@pl pl requested a review from a team January 14, 2025 12:56
Copy link
Contributor

@meikelmosby meikelmosby left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pretty nice PR. found nothing blocking & added a couple of opinions :)

import { KafkaMetrics } from '../../../../../src/main/ingestion-queues/session-recording-v2/kafka/metrics'
import { KafkaParser } from '../../../../../src/main/ingestion-queues/session-recording-v2/kafka/parser'

const do_gzip = promisify(gzip)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: idk if there is a convention around already but usually is it camelCase for function names.
e.g.

Suggested change
const do_gzip = promisify(gzip)
const compressWithGzip = promisify(gzip)

public async handleEachBatch(messages: Message[], context: { heartbeat: () => void }): Promise<void> {
context.heartbeat()

if (messages.length !== 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (messages.length !== 0) {
if (messages.length > 0) {

sendTimeoutGuardToSentry: false,
func: async () => {
// Increment message received counter for each message
for (const message of messages) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could be a bit more readable with (pure preference)

    messages.forEach((message) => this.metrics.incrementMessageReceived(message.partition));


this.metrics.observeKafkaBatchSize(messages.length)
this.metrics.observeKafkaBatchSizeKb(
messages.reduce((acc, m) => (m.value?.length ?? 0) + acc, 0) / 1024
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could pull this out as

   const batchSizeKb = messages.reduce((acc, m) => (m.value?.length ?? 0) + acc, 0) / 1024;
    const batchSize = messages.length;

and do (also preference)

   this.metrics.observeKafkaBatchSize(batchSize);
   this.metrics.observeKafkaBatchSizeKb(batchSizeKb);

})
}

await runInstrumentedFunction({
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(preference): to make things more readable and instead of inlining everything you could pull things out a bit more e.g.

async function processBatchMessages(messages) {
    const batchSizeKb = messages.reduce((acc, m) => (m.value?.length ?? 0) + acc, 0) / 1024;
    const batchSize = messages.length;

    // Increment message received counter for each message
    messages.forEach((message) => this.metrics.incrementMessageReceived(message.partition));

    // Record Kafka batch size and size in KB
    this.metrics.observeKafkaBatchSize(batchSize);
    this.metrics.observeKafkaBatchSizeKb(batchSizeKb);

    // Parse messages
    const parsedMessages = await runInstrumentedFunction({
        statsKey: 'recordingingesterv2.handleEachBatch.parseKafkaMessages',
        func: async () => this.messageProcessor.parseBatch(messages),
    });

    context.heartbeat();

    // Consume messages in parallel or sequentially
    if (this.config.SESSION_RECORDING_PARALLEL_CONSUMPTION) {
        await consumeMessagesInParallel(parsedMessages);
    } else {
        await consumeMessagesSequentially(parsedMessages);
    }
}

async function consumeMessagesInParallel(parsedMessages) {
    await Promise.all(parsedMessages.map((m) => this.consume(m)));
}

async function consumeMessagesSequentially(parsedMessages) {
    for (const message of parsedMessages) {
        await this.consume(message);
    }
}

so in the end we would only have

await runInstrumentedFunction({
    statsKey: 'recordingingesterv2.handleEachBatch',
    sendTimeoutGuardToSentry: false,
    func: async () => {
        await processBatchMessages(messages);
    },
});

but then again preference as i like things broken out rather than in one function. no need to change your implementation if you do not see that similar :)

import { ParsedMessageData } from './types'

const GZIP_HEADER = Buffer.from([0x1f, 0x8b, 0x08, 0x00])
const do_unzip = promisify(unzip)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same with camelCasing it

Suggested change
const do_unzip = promisify(unzip)
const decompressWithGzip = promisify(unzip)

}
}

private isGzipped(buffer: Buffer): boolean {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we really want to squeeze for performance we could technically get rid of the loop here i think with

Suggested change
private isGzipped(buffer: Buffer): boolean {
private isGzipped(buffer: Buffer): boolean {
return buffer.length >= GZIP_HEADER.length && buffer.slice(0, GZIP_HEADER.length).equals(GZIP_HEADER);
}

private readonly metrics: VersionMetrics
) {}

public async parseBatch(messages: TInput[]): Promise<MessageWithTeam[]> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as before if ordering does not matter we could do something like

public async parseBatch(messages: TInput[]): Promise<MessageWithTeam[]> {
    // Parse the messages batch using the source processor
    const processedMessages = await this.sourceProcessor.parseBatch(messages)

    // Use Promise.all for parallel processing of the checkLibVersion function
    await Promise.all(processedMessages.map((message) => this.checkLibVersion(message)))

    return processedMessages
}

to enable concurrency

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I don't think we care about the ordering of the warnings.

Copy link
Contributor

@benjackwhite benjackwhite left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me. In theory I might structure things differently but its hard to say till we get to real code so happy to approve 👍

@pl pl merged commit 5069350 into master Jan 20, 2025
92 checks passed
@pl pl deleted the pl/mrbv2_scaffolding branch January 20, 2025 13:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants