Skip to content

Commit

Permalink
remove start and stop
Browse files Browse the repository at this point in the history
  • Loading branch information
pauldambra committed Mar 25, 2024
1 parent 75ae215 commit bcbeb8c
Show file tree
Hide file tree
Showing 6 changed files with 2 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -170,14 +170,4 @@ export class ConsoleLogsIngester {
})
}
}

public start(): void {
if (!this.producer.isConnected()) {
status.error('🔁', '[console-log-events-ingester] kakfa producer should have been connected by parent')
}
}

public stop(): void {
status.info('🔁', '[console-log-events-ingester] stopping')
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -175,13 +175,4 @@ export class ReplayEventsIngester {
})
}
}
public start(): void {
if (!this.producer.isConnected()) {
status.error('🔁', '[replay-events] kakfa producer should have been connected by parent')
}
}

public stop(): void {
status.info('🔁', '[replay-events] stopping')
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -291,12 +291,10 @@ export class SessionRecordingIngesterV3 {
// NOTE: This is the only place where we need to use the shared server config
if (this.config.SESSION_RECORDING_CONSOLE_LOGS_INGESTION_ENABLED) {
this.consoleLogsIngester = new ConsoleLogsIngester(producer)
this.consoleLogsIngester.start()
}

if (this.config.SESSION_RECORDING_REPLAY_EVENTS_INGESTION_ENABLED) {
this.replayEventsIngester = new ReplayEventsIngester(producer)
this.replayEventsIngester.start()
}

// Create a node-rdkafka consumer that fetches batches of messages, runs
Expand Down Expand Up @@ -354,15 +352,6 @@ export class SessionRecordingIngesterV3 {
)
)

// stop is effectively a no-op on both of these but is kept here
// in case we want to add any cleanup logic in the future
if (this.replayEventsIngester) {
this.replayEventsIngester.stop()
}
if (this.consoleLogsIngester) {
this.consoleLogsIngester.stop()
}

const promiseResults = await Promise.allSettled(this.promises)

await this.mainKafkaClusterProducer?.disconnect()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -455,12 +455,10 @@ export class SessionRecordingIngester {
// NOTE: This is the only place where we need to use the shared server config
if (this.config.SESSION_RECORDING_CONSOLE_LOGS_INGESTION_ENABLED) {
this.consoleLogsIngester = new ConsoleLogsIngester(producer, this.persistentHighWaterMarker)
this.consoleLogsIngester.start()
}

if (this.config.SESSION_RECORDING_REPLAY_EVENTS_INGESTION_ENABLED) {
this.replayEventsIngester = new ReplayEventsIngester(producer, this.persistentHighWaterMarker)
this.replayEventsIngester.start()
}

// Create a node-rdkafka consumer that fetches batches of messages, runs
Expand Down Expand Up @@ -543,15 +541,6 @@ export class SessionRecordingIngester {
void this.scheduleWork(this.onRevokePartitions(assignedPartitions))
void this.scheduleWork(this.realtimeManager.unsubscribe())

// stop is effectively a no-op on both of these but is kept here
// in case we want to add any cleanup logic in the future
if (this.replayEventsIngester) {
this.replayEventsIngester.stop()
}
if (this.consoleLogsIngester) {
this.consoleLogsIngester.stop()
}

const promiseResults = await Promise.allSettled(this.promises)

// Finally we clear up redis once we are sure everything else has been handled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const makeIncomingMessage = (
topic: 'topic',
timestamp: 0,
consoleLogIngestionEnabled,
rawSize: 0,
},
session_id: '',
team_id: 0,
Expand All @@ -45,7 +46,6 @@ describe('console log ingester', () => {
mockProducer as unknown as HighLevelProducer,
mockedHighWaterMarker
)
consoleLogIngester.start()
})
describe('when enabled on team', () => {
test('it truncates large console logs', async () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const makeIncomingMessage = (source: string | null, timestamp: number): Incoming
topic: 'topic',
timestamp: timestamp,
consoleLogIngestionEnabled: true,
rawSize: 0,
},
session_id: '',
team_id: 0,
Expand All @@ -42,7 +43,6 @@ describe('replay events ingester', () => {

const mockedHighWaterMarker = { isBelowHighWaterMark: jest.fn() } as unknown as OffsetHighWaterMarker
ingester = new ReplayEventsIngester(mockProducer as unknown as HighLevelProducer, mockedHighWaterMarker)
ingester.start()
})

test('does not ingest messages from a month in the future', async () => {
Expand Down

0 comments on commit bcbeb8c

Please sign in to comment.