Skip to content

Commit

Permalink
Fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
benjackwhite committed Dec 25, 2024
1 parent dc27719 commit b57919c
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 92 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import { HighLevelProducer } from 'node-rdkafka'

import { KafkaProducerWrapper, produce } from '../../../../../src/kafka/producer'
import { KafkaProducerWrapper } from '../../../../../src/kafka/producer'
import { ConsoleLogsIngester } from '../../../../../src/main/ingestion-queues/session-recording/services/console-logs-ingester'
import { OffsetHighWaterMarker } from '../../../../../src/main/ingestion-queues/session-recording/services/offset-high-water-marker'
import { IncomingRecordingMessage } from '../../../../../src/main/ingestion-queues/session-recording/types'
Expand Down Expand Up @@ -34,18 +32,17 @@ const makeIncomingMessage = (

describe('console log ingester', () => {
let consoleLogIngester: ConsoleLogsIngester
const mockProducer: jest.Mock = jest.fn()
const mockProducer: KafkaProducerWrapper = {
queueMessages: jest.fn(),
connect: jest.fn(),
isConnected: jest.fn(),
} as unknown as KafkaProducerWrapper

beforeEach(() => {
mockProducer.mockClear()
mockProducer['connect'] = jest.fn()
mockProducer['isConnected'] = () => true
jest.mocked(mockProducer.queueMessages).mockClear()

const mockedHighWaterMarker = { isBelowHighWaterMark: jest.fn() } as unknown as OffsetHighWaterMarker
consoleLogIngester = new ConsoleLogsIngester(
mockProducer as unknown as KafkaProducerWrapper,
mockedHighWaterMarker
)
consoleLogIngester = new ConsoleLogsIngester(mockProducer, mockedHighWaterMarker)
})
describe('when enabled on team', () => {
test('it truncates large console logs', async () => {
Expand All @@ -61,23 +58,24 @@ describe('console log ingester', () => {
)
)
expect(jest.mocked(status.debug).mock.calls).toEqual([])
expect(jest.mocked(produce).mock.calls).toEqual([
expect(jest.mocked(mockProducer.queueMessages).mock.calls).toEqual([
[
{
key: '',
producer: mockProducer,
topic: 'log_entries_test',
value: Buffer.from(
JSON.stringify({
team_id: 0,
message: 'a'.repeat(2999),
level: 'info',
log_source: 'session_replay',
log_source_id: '',
instance_id: null,
timestamp: '1970-01-01 00:00:00.000',
})
),
messages: [
{
key: '',
value: JSON.stringify({
team_id: 0,
message: 'a'.repeat(2999),
level: 'info',
log_source: 'session_replay',
log_source_id: '',
instance_id: null,
timestamp: '1970-01-01 00:00:00.000',
}),
},
],
},
],
])
Expand All @@ -104,42 +102,37 @@ describe('console log ingester', () => {
)
)
expect(jest.mocked(status.debug).mock.calls).toEqual([])
expect(jest.mocked(produce)).toHaveBeenCalledTimes(2)
expect(jest.mocked(produce).mock.calls).toEqual([
[
{
key: '',
producer: mockProducer,
topic: 'log_entries_test',
value: Buffer.from(
JSON.stringify({
team_id: 0,
message: 'aaaaa',
level: 'info',
log_source: 'session_replay',
log_source_id: '',
instance_id: null,
timestamp: '1970-01-01 00:00:00.000',
})
),
},
],
expect(jest.mocked(mockProducer.queueMessages)).toHaveBeenCalledTimes(1)
expect(jest.mocked(mockProducer.queueMessages).mock.calls).toEqual([
[
{
key: '',
producer: mockProducer,
topic: 'log_entries_test',
value: Buffer.from(
JSON.stringify({
team_id: 0,
message: 'ccccc',
level: 'info',
log_source: 'session_replay',
log_source_id: '',
instance_id: null,
timestamp: '1970-01-01 00:00:00.000',
})
),
messages: [
{
key: '',
value: JSON.stringify({
team_id: 0,
message: 'aaaaa',
level: 'info',
log_source: 'session_replay',
log_source_id: '',
instance_id: null,
timestamp: '1970-01-01 00:00:00.000',
}),
},
{
key: '',
value: JSON.stringify({
team_id: 0,
message: 'ccccc',
level: 'info',
log_source: 'session_replay',
log_source_id: '',
instance_id: null,
timestamp: '1970-01-01 00:00:00.000',
}),
},
],
},
],
])
Expand All @@ -162,23 +155,24 @@ describe('console log ingester', () => {
)
)
expect(jest.mocked(status.debug).mock.calls).toEqual([])
expect(jest.mocked(produce).mock.calls).toEqual([
expect(jest.mocked(mockProducer.queueMessages).mock.calls).toEqual([
[
{
key: '',
producer: mockProducer,
topic: 'log_entries_test',
value: Buffer.from(
JSON.stringify({
team_id: 0,
message: 'aaaaa',
level: 'info',
log_source: 'session_replay',
log_source_id: '',
instance_id: null,
timestamp: '1970-01-01 00:00:00.000',
})
),
messages: [
{
key: '',
value: JSON.stringify({
team_id: 0,
message: 'aaaaa',
level: 'info',
log_source: 'session_replay',
log_source_id: '',
instance_id: null,
timestamp: '1970-01-01 00:00:00.000',
}),
},
],
},
],
])
Expand All @@ -188,12 +182,12 @@ describe('console log ingester', () => {
describe('when disabled on team', () => {
test('it drops console logs', async () => {
await consoleLogIngester.consume(makeIncomingMessage([{ plugin: 'rrweb/console@1' }], false))
expect(jest.mocked(produce)).not.toHaveBeenCalled()
expect(jest.mocked(mockProducer.queueMessages)).not.toHaveBeenCalled()
})
test('it does not drop events with no console logs', async () => {
await consoleLogIngester.consume(makeIncomingMessage([{ plugin: 'some-other-plugin' }], false))
expect(jest.mocked(status.debug).mock.calls).toEqual([])
expect(jest.mocked(produce)).not.toHaveBeenCalled()
expect(jest.mocked(mockProducer.queueMessages)).not.toHaveBeenCalled()
})
})
})
Original file line number Diff line number Diff line change
Expand Up @@ -212,15 +212,13 @@ describe('session-recording utils', () => {
[
[
{
kafkaMessage: {
messages: [
expectedIngestionWarningMessage({
libVersion: '1.74.0',
parsedVersion: { major: 1, minor: 74 },
}),
],
topic: 'clickhouse_ingestion_warnings_test',
},
messages: [
expectedIngestionWarningMessage({
libVersion: '1.74.0',
parsedVersion: { major: 1, minor: 74 },
}),
],
topic: 'clickhouse_ingestion_warnings_test',
},
],
],
Expand All @@ -231,15 +229,13 @@ describe('session-recording utils', () => {
[
[
{
kafkaMessage: {
messages: [
expectedIngestionWarningMessage({
libVersion: '1.32.0',
parsedVersion: { major: 1, minor: 32 },
}),
],
topic: 'clickhouse_ingestion_warnings_test',
},
messages: [
expectedIngestionWarningMessage({
libVersion: '1.32.0',
parsedVersion: { major: 1, minor: 32 },
}),
],
topic: 'clickhouse_ingestion_warnings_test',
},
],
],
Expand Down

0 comments on commit b57919c

Please sign in to comment.