Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(plugins-server): remove eachBatchLegacyIngestion #17894

Merged
merged 2 commits into from
Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .github/workflows/ci-plugin-server.yml
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,6 @@ jobs:
fail-fast: false
matrix:
POE_EMBRACE_JOIN_FOR_TEAMS: ['', '*']
KAFKA_CONSUMPTION_USE_RDKAFKA: ['true', 'false']

env:
REDIS_URL: 'redis://localhost'
Expand All @@ -199,7 +198,6 @@ jobs:
KAFKA_HOSTS: 'kafka:9092'
DATABASE_URL: 'postgres://posthog:posthog@localhost:5432/posthog'
POE_EMBRACE_JOIN_FOR_TEAMS: ${{matrix.POE_EMBRACE_JOIN_FOR_TEAMS}}
KAFKA_CONSUMPTION_USE_RDKAFKA: ${{matrix.KAFKA_CONSUMPTION_USE_RDKAFKA}}

steps:
- name: Code check out
Expand Down
1 change: 0 additions & 1 deletion plugin-server/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ export function getDefaultConfig(): PluginsServerConfig {
KAFKA_SASL_USER: undefined,
KAFKA_SASL_PASSWORD: undefined,
KAFKA_CLIENT_RACK: undefined,
KAFKA_CONSUMPTION_USE_RDKAFKA: true, // Transitional setting, ignored for consumers that only support one library
KAFKA_CONSUMPTION_MAX_BYTES: 10_485_760, // Default value for kafkajs
KAFKA_CONSUMPTION_MAX_BYTES_PER_PARTITION: 1_048_576, // Default value for kafkajs, must be bigger than message size
KAFKA_CONSUMPTION_MAX_WAIT_MS: 50, // Maximum time the broker may wait to fill the Fetch response with fetch.min.bytes of messages.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import { EachBatchPayload } from 'kafkajs'
import { Message } from 'node-rdkafka'
import * as schedule from 'node-schedule'
import { Counter } from 'prom-client'

import { KAFKA_EVENTS_PLUGIN_INGESTION, prefix as KAFKA_PREFIX } from '../../config/kafka-topics'
Expand All @@ -9,9 +7,7 @@ import { isIngestionOverflowEnabled } from '../../utils/env-utils'
import { status } from '../../utils/status'
import Piscina from '../../worker/piscina'
import { eachBatchParallelIngestion, IngestionOverflowMode } from './batch-processing/each-batch-ingestion'
import { eachBatchLegacyIngestion } from './batch-processing/each-batch-ingestion-kafkajs'
import { IngestionConsumer, KafkaJSIngestionConsumer } from './kafka-queue'
import { makeHealthCheck } from './on-event-handler-consumer'
import { IngestionConsumer } from './kafka-queue'

export const ingestionPartitionKeyOverflowed = new Counter({
name: 'ingestion_partition_key_overflowed',
Expand All @@ -26,9 +22,6 @@ export const startAnalyticsEventsIngestionConsumer = async ({
hub: Hub
piscina: Piscina
}) => {
if (!hub.KAFKA_CONSUMPTION_USE_RDKAFKA) {
return startLegacyAnalyticsEventsIngestionConsumer({ hub, piscina })
}
/*
Consumes analytics events from the Kafka topic `events_plugin_ingestion`
and processes them for ingestion into ClickHouse.
Expand Down Expand Up @@ -73,64 +66,3 @@ export const startAnalyticsEventsIngestionConsumer = async ({

return { queue, isHealthy }
}

const startLegacyAnalyticsEventsIngestionConsumer = async ({
hub, // TODO: remove needing to pass in the whole hub and be more selective on dependency injection.
piscina,
}: {
hub: Hub
piscina: Piscina
}) => {
/*
Consumes analytics events from the Kafka topic `events_plugin_ingestion`
and processes them for ingestion into ClickHouse.

Before processing, if isIngestionOverflowEnabled and an event has
overflowed the capacity for its (team_id, distinct_id) pair, it will not
be processed here but instead re-produced into the
`events_plugin_ingestion_overflow` topic for later processing.

At the moment this is just a wrapper around `IngestionConsumer`. We may
want to further remove that abstraction in the future.
*/
status.info('🔁', 'Starting analytics events consumer with kafkajs')

// NOTE: we are explicitly not maintaining backwards compatibility with
// previous functionality regards to consumer group id usage prior to the
// introduction of this file. Previouslty, when ingestion and export
// workloads ran on the same process they would share the same consumer
// group id. In these cases, updating to this version will result in the
// re-exporting of events still in Kafka `clickhouse_events_json` topic.

// We need a way to determine if ingestionOverflow is enabled when using
// separate deployments for ingestion consumers in order to scale them
// independently. Since ingestionOverflow may be enabled in a separate
// deployment, we require an env variable to be set to confirm this before
// enabling re-production of events to the OVERFLOW topic.

const overflowMode = isIngestionOverflowEnabled() ? IngestionOverflowMode.Reroute : IngestionOverflowMode.Disabled
const batchHandler = async (payload: EachBatchPayload, queue: KafkaJSIngestionConsumer): Promise<void> => {
await eachBatchLegacyIngestion(payload, queue, overflowMode)
}

const queue = new KafkaJSIngestionConsumer(
hub,
piscina,
KAFKA_EVENTS_PLUGIN_INGESTION,
`${KAFKA_PREFIX}clickhouse-ingestion`,
batchHandler
)

await queue.start()

schedule.scheduleJob('0 * * * * *', async () => {
await queue.emitConsumerGroupMetrics()
})

// Subscribe to the heatbeat event to track when the consumer has last
// successfully consumed a message. This is used to determine if the
// consumer is healthy.
const isHealthy = makeHealthCheck(queue.consumer, queue.sessionTimeout)

return { queue, isHealthy }
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
import { EachBatchPayload } from 'kafkajs'
import { Message } from 'node-rdkafka'
import * as schedule from 'node-schedule'

import { KAFKA_EVENTS_PLUGIN_INGESTION_HISTORICAL, prefix as KAFKA_PREFIX } from '../../config/kafka-topics'
import { Hub } from '../../types'
import { status } from '../../utils/status'
import Piscina from '../../worker/piscina'
import { eachBatchParallelIngestion, IngestionOverflowMode } from './batch-processing/each-batch-ingestion'
import { eachBatchLegacyIngestion } from './batch-processing/each-batch-ingestion-kafkajs'
import { IngestionConsumer, KafkaJSIngestionConsumer } from './kafka-queue'
import { makeHealthCheck } from './on-event-handler-consumer'
import { IngestionConsumer } from './kafka-queue'

export const startAnalyticsEventsIngestionHistoricalConsumer = async ({
hub, // TODO: remove needing to pass in the whole hub and be more selective on dependency injection.
Expand All @@ -18,9 +14,6 @@ export const startAnalyticsEventsIngestionHistoricalConsumer = async ({
hub: Hub
piscina: Piscina
}) => {
if (!hub.KAFKA_CONSUMPTION_USE_RDKAFKA) {
return startLegacyAnalyticsEventsIngestionHistoricalConsumer({ hub, piscina })
}
/*
Consumes analytics events from the Kafka topic `events_plugin_ingestion_historical`
and processes them for ingestion into ClickHouse.
Expand Down Expand Up @@ -50,49 +43,3 @@ export const startAnalyticsEventsIngestionHistoricalConsumer = async ({

return { queue, isHealthy }
}

export const startLegacyAnalyticsEventsIngestionHistoricalConsumer = async ({
hub, // TODO: remove needing to pass in the whole hub and be more selective on dependency injection.
piscina,
}: {
hub: Hub
piscina: Piscina
}) => {
/*
Consumes analytics events from the Kafka topic `events_plugin_ingestion_historical`
and processes them for ingestion into ClickHouse.

This is the historical events or "slow-lane" processing queue as it contains only
events that have timestamps in the past.
*/
status.info('🔁', 'Starting analytics events historical consumer with kafkajs')

/*
We don't want to move events to overflow from here, it's fine for the processing to
take longer, but we want the locality constraints to be respected like normal ingestion.
*/
const batchHandler = async (payload: EachBatchPayload, queue: KafkaJSIngestionConsumer): Promise<void> => {
await eachBatchLegacyIngestion(payload, queue, IngestionOverflowMode.Disabled)
}

const queue = new KafkaJSIngestionConsumer(
hub,
piscina,
KAFKA_EVENTS_PLUGIN_INGESTION_HISTORICAL,
`${KAFKA_PREFIX}clickhouse-ingestion-historical`,
batchHandler
)

await queue.start()

schedule.scheduleJob('0 * * * * *', async () => {
await queue.emitConsumerGroupMetrics()
})

// Subscribe to the heatbeat event to track when the consumer has last
// successfully consumed a message. This is used to determine if the
// consumer is healthy.
const isHealthy = makeHealthCheck(queue.consumer, queue.sessionTimeout)

return { queue, isHealthy }
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
import { EachBatchPayload } from 'kafkajs'
import { Message } from 'node-rdkafka'
import * as schedule from 'node-schedule'

import { KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW, prefix as KAFKA_PREFIX } from '../../config/kafka-topics'
import { Hub } from '../../types'
import { status } from '../../utils/status'
import Piscina from '../../worker/piscina'
import { eachBatchParallelIngestion, IngestionOverflowMode } from './batch-processing/each-batch-ingestion'
import { eachBatchLegacyIngestion } from './batch-processing/each-batch-ingestion-kafkajs'
import { IngestionConsumer, KafkaJSIngestionConsumer } from './kafka-queue'
import { IngestionConsumer } from './kafka-queue'

export const startAnalyticsEventsIngestionOverflowConsumer = async ({
hub, // TODO: remove needing to pass in the whole hub and be more selective on dependency injection.
Expand All @@ -17,9 +14,6 @@ export const startAnalyticsEventsIngestionOverflowConsumer = async ({
hub: Hub
piscina: Piscina
}) => {
if (!hub.KAFKA_CONSUMPTION_USE_RDKAFKA) {
return startLegacyAnalyticsEventsIngestionOverflowConsumer({ hub, piscina })
}
/*
Consumes analytics events from the Kafka topic `events_plugin_ingestion_overflow`
and processes them for ingestion into ClickHouse.
Expand Down Expand Up @@ -55,50 +49,3 @@ export const startAnalyticsEventsIngestionOverflowConsumer = async ({

return queue
}

export const startLegacyAnalyticsEventsIngestionOverflowConsumer = async ({
hub, // TODO: remove needing to pass in the whole hub and be more selective on dependency injection.
piscina,
}: {
hub: Hub
piscina: Piscina
}) => {
/*
Consumes analytics events from the Kafka topic `events_plugin_ingestion_overflow`
and processes them for ingestion into ClickHouse.

This is the overflow or "slow-lane" processing queue as it contains only events that
have exceed capacity.

At the moment this is just a wrapper around `IngestionConsumer`. We may
want to further remove that abstraction in the future.
*/
status.info('🔁', 'Starting analytics events overflow consumer with kafkajs')

// NOTE: we are explicitly not maintaining backwards compatibility with
// previous functionality regards to consumer group id usage prior to the
// introduction of this file. Previouslty, when ingestion and export
// workloads ran on the same process they would share the same consumer
// group id. In these cases, updating to this version will result in the
// re-exporting of events still in Kafka `clickhouse_events_json` topic.

const batchHandler = async (payload: EachBatchPayload, queue: KafkaJSIngestionConsumer): Promise<void> => {
await eachBatchLegacyIngestion(payload, queue, IngestionOverflowMode.Consume)
}

const queue = new KafkaJSIngestionConsumer(
hub,
piscina,
KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW,
`${KAFKA_PREFIX}clickhouse-ingestion-overflow`,
batchHandler
)

await queue.start()

schedule.scheduleJob('0 * * * * *', async () => {
await queue.emitConsumerGroupMetrics()
})

return queue
}
Loading
Loading