Skip to content

Commit

Permalink
chore: Remove v2 suffix for replay consumer (#17938)
Browse files Browse the repository at this point in the history
  • Loading branch information
benjackwhite authored Oct 12, 2023
1 parent 00c18b6 commit e47c2aa
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ type PartitionMetrics = {
lastKnownCommit?: number
}

export class SessionRecordingIngesterV2 {
export class SessionRecordingIngester {
redisPool: RedisPool
sessions: Record<string, SessionManager> = {}
sessionHighWaterMarker: OffsetHighWaterMarker
Expand Down
4 changes: 2 additions & 2 deletions plugin-server/src/main/pluginsServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import {
startAsyncWebhooksHandlerConsumer,
} from './ingestion-queues/on-event-handler-consumer'
import { startScheduledTasksConsumer } from './ingestion-queues/scheduled-tasks-consumer'
import { SessionRecordingIngesterV2 } from './ingestion-queues/session-recording/session-recordings-consumer-v2'
import { SessionRecordingIngester } from './ingestion-queues/session-recording/session-recordings-consumer'
import { createHttpServer } from './services/http-server'
import { getObjectStorage } from './services/object_storage'

Expand Down Expand Up @@ -411,7 +411,7 @@ export async function startPluginsServer(
throw new Error("Can't start session recording blob ingestion without object storage")
}
// NOTE: We intentionally pass in the original serverConfig as the ingester uses both kafkas
const ingester = new SessionRecordingIngesterV2(serverConfig, postgres, s3)
const ingester = new SessionRecordingIngester(serverConfig, postgres, s3)
await ingester.start()

const batchConsumer = ingester.batchConsumer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import path from 'path'

import { waitForExpect } from '../../../../functional_tests/expectations'
import { defaultConfig } from '../../../../src/config/config'
import { SessionRecordingIngesterV2 } from '../../../../src/main/ingestion-queues/session-recording/session-recordings-consumer-v2'
import { SessionRecordingIngester } from '../../../../src/main/ingestion-queues/session-recording/session-recordings-consumer'
import { Hub, PluginsServerConfig, Team } from '../../../../src/types'
import { createHub } from '../../../../src/utils/db/hub'
import { getFirstTeam, resetTestDatabase } from '../../../helpers/sql'
Expand Down Expand Up @@ -57,7 +57,7 @@ jest.mock('../../../../src/kafka/batch-consumer', () => {
jest.setTimeout(1000)

describe('ingester', () => {
let ingester: SessionRecordingIngesterV2
let ingester: SessionRecordingIngester

let hub: Hub
let closeHub: () => Promise<void>
Expand All @@ -76,7 +76,7 @@ describe('ingester', () => {
teamToken = team.api_token
await deleteKeysWithPrefix(hub)

ingester = new SessionRecordingIngesterV2(config, hub.postgres, hub.objectStorage)
ingester = new SessionRecordingIngester(config, hub.postgres, hub.objectStorage)
await ingester.start()
nextOffset = 1

Expand Down Expand Up @@ -522,11 +522,11 @@ describe('ingester', () => {
})

describe('simulated rebalancing', () => {
let otherIngester: SessionRecordingIngesterV2
let otherIngester: SessionRecordingIngester
jest.setTimeout(5000) // Increased to cover lock delay

beforeEach(async () => {
otherIngester = new SessionRecordingIngesterV2(config, hub.postgres, hub.objectStorage)
otherIngester = new SessionRecordingIngester(config, hub.postgres, hub.objectStorage)
await otherIngester.start()
})

Expand Down

0 comments on commit e47c2aa

Please sign in to comment.