Skip to content

Commit

Permalink
chore(plugins-server): remove eachBatchLegacyIngestion
Browse files Browse the repository at this point in the history
  • Loading branch information
xvello committed Oct 10, 2023
1 parent 2ea5e84 commit ce6db75
Show file tree
Hide file tree
Showing 10 changed files with 6 additions and 928 deletions.
1 change: 0 additions & 1 deletion plugin-server/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ export function getDefaultConfig(): PluginsServerConfig {
KAFKA_SASL_USER: undefined,
KAFKA_SASL_PASSWORD: undefined,
KAFKA_CLIENT_RACK: undefined,
KAFKA_CONSUMPTION_USE_RDKAFKA: true, // Transitional setting, ignored for consumers that only support one library
KAFKA_CONSUMPTION_MAX_BYTES: 10_485_760, // Default value for kafkajs
KAFKA_CONSUMPTION_MAX_BYTES_PER_PARTITION: 1_048_576, // Default value for kafkajs, must be bigger than message size
KAFKA_CONSUMPTION_MAX_WAIT_MS: 50, // Maximum time the broker may wait to fill the Fetch response with fetch.min.bytes of messages.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import { EachBatchPayload } from 'kafkajs'
import { Message } from 'node-rdkafka'
import * as schedule from 'node-schedule'
import { Counter } from 'prom-client'

import { KAFKA_EVENTS_PLUGIN_INGESTION, prefix as KAFKA_PREFIX } from '../../config/kafka-topics'
Expand All @@ -9,9 +7,7 @@ import { isIngestionOverflowEnabled } from '../../utils/env-utils'
import { status } from '../../utils/status'
import Piscina from '../../worker/piscina'
import { eachBatchParallelIngestion, IngestionOverflowMode } from './batch-processing/each-batch-ingestion'
import { eachBatchLegacyIngestion } from './batch-processing/each-batch-ingestion-kafkajs'
import { IngestionConsumer, KafkaJSIngestionConsumer } from './kafka-queue'
import { makeHealthCheck } from './on-event-handler-consumer'
import { IngestionConsumer } from './kafka-queue'

export const ingestionPartitionKeyOverflowed = new Counter({
name: 'ingestion_partition_key_overflowed',
Expand All @@ -26,9 +22,6 @@ export const startAnalyticsEventsIngestionConsumer = async ({
hub: Hub
piscina: Piscina
}) => {
if (!hub.KAFKA_CONSUMPTION_USE_RDKAFKA) {
return startLegacyAnalyticsEventsIngestionConsumer({ hub, piscina })
}
/*
Consumes analytics events from the Kafka topic `events_plugin_ingestion`
and processes them for ingestion into ClickHouse.
Expand Down Expand Up @@ -73,64 +66,3 @@ export const startAnalyticsEventsIngestionConsumer = async ({

return { queue, isHealthy }
}

const startLegacyAnalyticsEventsIngestionConsumer = async ({
hub, // TODO: remove needing to pass in the whole hub and be more selective on dependency injection.
piscina,
}: {
hub: Hub
piscina: Piscina
}) => {
/*
Consumes analytics events from the Kafka topic `events_plugin_ingestion`
and processes them for ingestion into ClickHouse.
Before processing, if isIngestionOverflowEnabled and an event has
overflowed the capacity for its (team_id, distinct_id) pair, it will not
be processed here but instead re-produced into the
`events_plugin_ingestion_overflow` topic for later processing.
At the moment this is just a wrapper around `IngestionConsumer`. We may
want to further remove that abstraction in the future.
*/
status.info('🔁', 'Starting analytics events consumer with kafkajs')

// NOTE: we are explicitly not maintaining backwards compatibility with
// previous functionality regards to consumer group id usage prior to the
// introduction of this file. Previouslty, when ingestion and export
// workloads ran on the same process they would share the same consumer
// group id. In these cases, updating to this version will result in the
// re-exporting of events still in Kafka `clickhouse_events_json` topic.

// We need a way to determine if ingestionOverflow is enabled when using
// separate deployments for ingestion consumers in order to scale them
// independently. Since ingestionOverflow may be enabled in a separate
// deployment, we require an env variable to be set to confirm this before
// enabling re-production of events to the OVERFLOW topic.

const overflowMode = isIngestionOverflowEnabled() ? IngestionOverflowMode.Reroute : IngestionOverflowMode.Disabled
const batchHandler = async (payload: EachBatchPayload, queue: KafkaJSIngestionConsumer): Promise<void> => {
await eachBatchLegacyIngestion(payload, queue, overflowMode)
}

const queue = new KafkaJSIngestionConsumer(
hub,
piscina,
KAFKA_EVENTS_PLUGIN_INGESTION,
`${KAFKA_PREFIX}clickhouse-ingestion`,
batchHandler
)

await queue.start()

schedule.scheduleJob('0 * * * * *', async () => {
await queue.emitConsumerGroupMetrics()
})

// Subscribe to the heatbeat event to track when the consumer has last
// successfully consumed a message. This is used to determine if the
// consumer is healthy.
const isHealthy = makeHealthCheck(queue.consumer, queue.sessionTimeout)

return { queue, isHealthy }
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
import { EachBatchPayload } from 'kafkajs'
import { Message } from 'node-rdkafka'
import * as schedule from 'node-schedule'

import { KAFKA_EVENTS_PLUGIN_INGESTION_HISTORICAL, prefix as KAFKA_PREFIX } from '../../config/kafka-topics'
import { Hub } from '../../types'
import { status } from '../../utils/status'
import Piscina from '../../worker/piscina'
import { eachBatchParallelIngestion, IngestionOverflowMode } from './batch-processing/each-batch-ingestion'
import { eachBatchLegacyIngestion } from './batch-processing/each-batch-ingestion-kafkajs'
import { IngestionConsumer, KafkaJSIngestionConsumer } from './kafka-queue'
import { makeHealthCheck } from './on-event-handler-consumer'
import { IngestionConsumer } from './kafka-queue'

export const startAnalyticsEventsIngestionHistoricalConsumer = async ({
hub, // TODO: remove needing to pass in the whole hub and be more selective on dependency injection.
Expand All @@ -18,9 +14,6 @@ export const startAnalyticsEventsIngestionHistoricalConsumer = async ({
hub: Hub
piscina: Piscina
}) => {
if (!hub.KAFKA_CONSUMPTION_USE_RDKAFKA) {
return startLegacyAnalyticsEventsIngestionHistoricalConsumer({ hub, piscina })
}
/*
Consumes analytics events from the Kafka topic `events_plugin_ingestion_historical`
and processes them for ingestion into ClickHouse.
Expand Down Expand Up @@ -50,49 +43,3 @@ export const startAnalyticsEventsIngestionHistoricalConsumer = async ({

return { queue, isHealthy }
}

export const startLegacyAnalyticsEventsIngestionHistoricalConsumer = async ({
hub, // TODO: remove needing to pass in the whole hub and be more selective on dependency injection.
piscina,
}: {
hub: Hub
piscina: Piscina
}) => {
/*
Consumes analytics events from the Kafka topic `events_plugin_ingestion_historical`
and processes them for ingestion into ClickHouse.
This is the historical events or "slow-lane" processing queue as it contains only
events that have timestamps in the past.
*/
status.info('🔁', 'Starting analytics events historical consumer with kafkajs')

/*
We don't want to move events to overflow from here, it's fine for the processing to
take longer, but we want the locality constraints to be respected like normal ingestion.
*/
const batchHandler = async (payload: EachBatchPayload, queue: KafkaJSIngestionConsumer): Promise<void> => {
await eachBatchLegacyIngestion(payload, queue, IngestionOverflowMode.Disabled)
}

const queue = new KafkaJSIngestionConsumer(
hub,
piscina,
KAFKA_EVENTS_PLUGIN_INGESTION_HISTORICAL,
`${KAFKA_PREFIX}clickhouse-ingestion-historical`,
batchHandler
)

await queue.start()

schedule.scheduleJob('0 * * * * *', async () => {
await queue.emitConsumerGroupMetrics()
})

// Subscribe to the heatbeat event to track when the consumer has last
// successfully consumed a message. This is used to determine if the
// consumer is healthy.
const isHealthy = makeHealthCheck(queue.consumer, queue.sessionTimeout)

return { queue, isHealthy }
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
import { EachBatchPayload } from 'kafkajs'
import { Message } from 'node-rdkafka'
import * as schedule from 'node-schedule'

import { KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW, prefix as KAFKA_PREFIX } from '../../config/kafka-topics'
import { Hub } from '../../types'
import { status } from '../../utils/status'
import Piscina from '../../worker/piscina'
import { eachBatchParallelIngestion, IngestionOverflowMode } from './batch-processing/each-batch-ingestion'
import { eachBatchLegacyIngestion } from './batch-processing/each-batch-ingestion-kafkajs'
import { IngestionConsumer, KafkaJSIngestionConsumer } from './kafka-queue'
import { IngestionConsumer } from './kafka-queue'

export const startAnalyticsEventsIngestionOverflowConsumer = async ({
hub, // TODO: remove needing to pass in the whole hub and be more selective on dependency injection.
Expand All @@ -17,9 +14,6 @@ export const startAnalyticsEventsIngestionOverflowConsumer = async ({
hub: Hub
piscina: Piscina
}) => {
if (!hub.KAFKA_CONSUMPTION_USE_RDKAFKA) {
return startLegacyAnalyticsEventsIngestionOverflowConsumer({ hub, piscina })
}
/*
Consumes analytics events from the Kafka topic `events_plugin_ingestion_overflow`
and processes them for ingestion into ClickHouse.
Expand Down Expand Up @@ -55,50 +49,3 @@ export const startAnalyticsEventsIngestionOverflowConsumer = async ({

return queue
}

export const startLegacyAnalyticsEventsIngestionOverflowConsumer = async ({
hub, // TODO: remove needing to pass in the whole hub and be more selective on dependency injection.
piscina,
}: {
hub: Hub
piscina: Piscina
}) => {
/*
Consumes analytics events from the Kafka topic `events_plugin_ingestion_overflow`
and processes them for ingestion into ClickHouse.
This is the overflow or "slow-lane" processing queue as it contains only events that
have exceed capacity.
At the moment this is just a wrapper around `IngestionConsumer`. We may
want to further remove that abstraction in the future.
*/
status.info('🔁', 'Starting analytics events overflow consumer with kafkajs')

// NOTE: we are explicitly not maintaining backwards compatibility with
// previous functionality regards to consumer group id usage prior to the
// introduction of this file. Previouslty, when ingestion and export
// workloads ran on the same process they would share the same consumer
// group id. In these cases, updating to this version will result in the
// re-exporting of events still in Kafka `clickhouse_events_json` topic.

const batchHandler = async (payload: EachBatchPayload, queue: KafkaJSIngestionConsumer): Promise<void> => {
await eachBatchLegacyIngestion(payload, queue, IngestionOverflowMode.Consume)
}

const queue = new KafkaJSIngestionConsumer(
hub,
piscina,
KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW,
`${KAFKA_PREFIX}clickhouse-ingestion-overflow`,
batchHandler
)

await queue.start()

schedule.scheduleJob('0 * * * * *', async () => {
await queue.emitConsumerGroupMetrics()
})

return queue
}
Loading

0 comments on commit ce6db75

Please sign in to comment.