Skip to content

Commit

Permalink
Support for logging MDC in the Kafka buffer (opensearch-project#4131)
Browse files Browse the repository at this point in the history
Uses logging MDC within the KafkaBuffer entry points. Create the Kafka Buffer consumer threads with MDC. Name the consumer threads to help when tracking down thread dumps. First part of opensearch-project#4126

Signed-off-by: David Venable <[email protected]>
  • Loading branch information
dlvenable authored Feb 15, 2024
1 parent 89e4be1 commit faddf01
Show file tree
Hide file tree
Showing 5 changed files with 321 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,18 @@
import org.apache.kafka.common.errors.RecordBatchTooLargeException;
import org.opensearch.dataprepper.plugins.kafka.admin.KafkaAdminAccessor;
import org.opensearch.dataprepper.plugins.kafka.buffer.serialization.BufferSerializationFactory;
import org.opensearch.dataprepper.plugins.kafka.common.KafkaMdc;
import org.opensearch.dataprepper.plugins.kafka.common.serialization.CommonSerializationFactory;
import org.opensearch.dataprepper.plugins.kafka.common.serialization.SerializationFactory;
import org.opensearch.dataprepper.plugins.kafka.common.thread.KafkaPluginThreadFactory;
import org.opensearch.dataprepper.plugins.kafka.consumer.KafkaCustomConsumer;
import org.opensearch.dataprepper.plugins.kafka.consumer.KafkaCustomConsumerFactory;
import org.opensearch.dataprepper.plugins.kafka.producer.KafkaCustomProducer;
import org.opensearch.dataprepper.plugins.kafka.producer.KafkaCustomProducerFactory;
import org.opensearch.dataprepper.plugins.kafka.service.TopicServiceFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

import java.time.Duration;
import java.util.Collection;
Expand All @@ -53,6 +56,7 @@ public class KafkaBuffer extends AbstractBuffer<Record<Event>> {
public static final int INNER_BUFFER_BATCH_SIZE = 250000;
static final String WRITE = "Write";
static final String READ = "Read";
static final String MDC_KAFKA_PLUGIN_VALUE = "buffer";
private final KafkaCustomProducer producer;
private final KafkaAdminAccessor kafkaAdminAccessor;
private final AbstractBuffer<Record<Event>> innerBuffer;
Expand Down Expand Up @@ -80,7 +84,7 @@ public KafkaBuffer(final PluginSetting pluginSetting, final KafkaBufferConfig ka
final List<KafkaCustomConsumer> consumers = kafkaCustomConsumerFactory.createConsumersForTopic(kafkaBufferConfig, kafkaBufferConfig.getTopic(),
innerBuffer, consumerMetrics, acknowledgementSetManager, byteDecoder, shutdownInProgress, false, circuitBreaker);
this.kafkaAdminAccessor = new KafkaAdminAccessor(kafkaBufferConfig, List.of(kafkaBufferConfig.getTopic().getGroupId()));
this.executorService = Executors.newFixedThreadPool(consumers.size());
this.executorService = Executors.newFixedThreadPool(consumers.size(), KafkaPluginThreadFactory.defaultExecutorThreadFactory(MDC_KAFKA_PLUGIN_VALUE));
consumers.forEach(this.executorService::submit);

this.drainTimeout = kafkaBufferConfig.getDrainTimeout();
Expand All @@ -89,6 +93,7 @@ public KafkaBuffer(final PluginSetting pluginSetting, final KafkaBufferConfig ka
@Override
public void writeBytes(final byte[] bytes, final String key, int timeoutInMillis) throws Exception {
try {
setMdc();
producer.produceRawData(bytes, key);
} catch (final Exception e) {
LOG.error(e.getMessage(), e);
Expand All @@ -102,15 +107,21 @@ public void writeBytes(final byte[] bytes, final String key, int timeoutInMillis
throw new RuntimeException(e);
}
}
finally {
resetMdc();
}
}

@Override
public void doWrite(Record<Event> record, int timeoutInMillis) throws TimeoutException {
try {
setMdc();
producer.produceRecords(record);
} catch (final Exception e) {
LOG.error(e.getMessage(), e);
throw new RuntimeException(e);
} finally {
resetMdc();
}
}

Expand All @@ -121,29 +132,50 @@ public boolean isByteBuffer() {

@Override
public void doWriteAll(Collection<Record<Event>> records, int timeoutInMillis) throws Exception {
for ( Record<Event> record: records ) {
for (Record<Event> record : records) {
doWrite(record, timeoutInMillis);
}
}

@Override
public Map.Entry<Collection<Record<Event>>, CheckpointState> doRead(int timeoutInMillis) {
return innerBuffer.read(timeoutInMillis);
try {
setMdc();
return innerBuffer.read(timeoutInMillis);
} finally {
resetMdc();
}
}

@Override
public void postProcess(final Long recordsInBuffer) {
innerBuffer.postProcess(recordsInBuffer);
try {
setMdc();

innerBuffer.postProcess(recordsInBuffer);
} finally {
resetMdc();
}
}

@Override
public void doCheckpoint(CheckpointState checkpointState) {
innerBuffer.doCheckpoint(checkpointState);
try {
setMdc();
innerBuffer.doCheckpoint(checkpointState);
} finally {
resetMdc();
}
}

@Override
public boolean isEmpty() {
return kafkaAdminAccessor.areTopicsEmpty() && innerBuffer.isEmpty();
try {
setMdc();
return kafkaAdminAccessor.areTopicsEmpty() && innerBuffer.isEmpty();
} finally {
resetMdc();
}
}

@Override
Expand All @@ -158,21 +190,35 @@ public boolean isWrittenOffHeapOnly() {

@Override
public void shutdown() {
shutdownInProgress.set(true);
executorService.shutdown();

try {
if (executorService.awaitTermination(EXECUTOR_SERVICE_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS)) {
LOG.info("Successfully waited for consumer task to terminate");
} else {
LOG.warn("Consumer task did not terminate in time, forcing termination");
setMdc();

shutdownInProgress.set(true);
executorService.shutdown();

try {
if (executorService.awaitTermination(EXECUTOR_SERVICE_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS)) {
LOG.info("Successfully waited for consumer task to terminate");
} else {
LOG.warn("Consumer task did not terminate in time, forcing termination");
executorService.shutdownNow();
}
} catch (final InterruptedException e) {
LOG.error("Interrupted while waiting for consumer task to terminate", e);
executorService.shutdownNow();
}
} catch (final InterruptedException e) {
LOG.error("Interrupted while waiting for consumer task to terminate", e);
executorService.shutdownNow();

innerBuffer.shutdown();
} finally {
resetMdc();
}
}

private static void setMdc() {
MDC.put(KafkaMdc.MDC_KAFKA_PLUGIN_KEY, MDC_KAFKA_PLUGIN_VALUE);
}

innerBuffer.shutdown();
private static void resetMdc() {
MDC.remove(KafkaMdc.MDC_KAFKA_PLUGIN_KEY);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.kafka.common;public class KafkaMdc {
public static final String MDC_KAFKA_PLUGIN_KEY = "kafkaPluginType";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.kafka.common.thread;

import org.opensearch.dataprepper.plugins.kafka.common.KafkaMdc;
import org.slf4j.MDC;

import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

/**
* An implementation of {@link ThreadFactory} for Kafka plugin threads.
*/
public class KafkaPluginThreadFactory implements ThreadFactory {
private final ThreadFactory delegateThreadFactory;
private final String threadPrefix;
private final String kafkaPluginType;
private final AtomicInteger threadNumber = new AtomicInteger(1);

KafkaPluginThreadFactory(
final ThreadFactory delegateThreadFactory,
final String kafkaPluginType) {
this.delegateThreadFactory = delegateThreadFactory;
this.threadPrefix = "kafka-" + kafkaPluginType + "-";
this.kafkaPluginType = kafkaPluginType;
}

/**
* Creates an instance specifically for use with {@link Executors}.
*
* @param kafkaPluginType The name of the plugin type. e.g. sink, source, buffer
* @return An instance of the {@link KafkaPluginThreadFactory}.
*/
public static KafkaPluginThreadFactory defaultExecutorThreadFactory(final String kafkaPluginType) {
return new KafkaPluginThreadFactory(Executors.defaultThreadFactory(), kafkaPluginType);
}

@Override
public Thread newThread(final Runnable runnable) {
final Thread thread = delegateThreadFactory.newThread(() -> {
MDC.put(KafkaMdc.MDC_KAFKA_PLUGIN_KEY, kafkaPluginType);
try {
runnable.run();
} finally {
MDC.clear();
}
});

thread.setName(threadPrefix + threadNumber.getAndIncrement());

return thread;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@

package org.opensearch.dataprepper.plugins.kafka.buffer;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
Expand All @@ -25,6 +27,8 @@
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBuffer;
import org.opensearch.dataprepper.plugins.kafka.admin.KafkaAdminAccessor;
import org.opensearch.dataprepper.plugins.kafka.common.KafkaMdc;
import org.opensearch.dataprepper.plugins.kafka.common.thread.KafkaPluginThreadFactory;
import org.opensearch.dataprepper.plugins.kafka.configuration.AuthConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionType;
Expand All @@ -35,6 +39,7 @@
import org.opensearch.dataprepper.plugins.kafka.producer.KafkaCustomProducerFactory;
import org.opensearch.dataprepper.plugins.kafka.producer.ProducerWorker;
import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat;
import org.slf4j.MDC;

import java.time.Duration;
import java.util.Arrays;
Expand Down Expand Up @@ -151,7 +156,7 @@ public KafkaBuffer createObjectUnderTest(final List<KafkaCustomConsumer> consume
blockingBuffer = mock;
})) {

executorsMockedStatic.when(() -> Executors.newFixedThreadPool(anyInt())).thenReturn(executorService);
executorsMockedStatic.when(() -> Executors.newFixedThreadPool(anyInt(), any(KafkaPluginThreadFactory.class))).thenReturn(executorService);
return new KafkaBuffer(pluginSetting, bufferConfig, acknowledgementSetManager, null, awsCredentialsSupplier, circuitBreaker);
}
}
Expand Down Expand Up @@ -353,4 +358,84 @@ public void testShutdown_InterruptedException() throws InterruptedException {
verify(executorService).awaitTermination(eq(EXECUTOR_SERVICE_SHUTDOWN_TIMEOUT), eq(TimeUnit.SECONDS));
verify(executorService).shutdownNow();
}

@Nested
class MdcTests {
private MockedStatic<MDC> mdcMockedStatic;

@BeforeEach
void setUp() {
mdcMockedStatic = mockStatic(MDC.class);
}

@AfterEach
void tearDown() {
mdcMockedStatic.close();
}

@Test
void writeBytes_sets_and_clears_MDC() throws Exception {
createObjectUnderTest().writeBytes(new byte[] {}, UUID.randomUUID().toString(), 100);

mdcMockedStatic.verify(() -> MDC.put(KafkaMdc.MDC_KAFKA_PLUGIN_KEY, "buffer"));
mdcMockedStatic.verify(() -> MDC.remove(KafkaMdc.MDC_KAFKA_PLUGIN_KEY));
}

@Test
void doWrite_sets_and_clears_MDC() throws Exception {
createObjectUnderTest().doWrite(mock(Record.class), 100);

mdcMockedStatic.verify(() -> MDC.put(KafkaMdc.MDC_KAFKA_PLUGIN_KEY, "buffer"));
mdcMockedStatic.verify(() -> MDC.remove(KafkaMdc.MDC_KAFKA_PLUGIN_KEY));
}

@Test
void doWriteAll_sets_and_clears_MDC() throws Exception {
final List<Record<Event>> records = Collections.singletonList(mock(Record.class));
createObjectUnderTest().doWriteAll(records, 100);

mdcMockedStatic.verify(() -> MDC.put(KafkaMdc.MDC_KAFKA_PLUGIN_KEY, "buffer"));
mdcMockedStatic.verify(() -> MDC.remove(KafkaMdc.MDC_KAFKA_PLUGIN_KEY));
}

@Test
void doRead_sets_and_clears_MDC() throws Exception {
createObjectUnderTest().doRead(100);

mdcMockedStatic.verify(() -> MDC.put(KafkaMdc.MDC_KAFKA_PLUGIN_KEY, "buffer"));
mdcMockedStatic.verify(() -> MDC.remove(KafkaMdc.MDC_KAFKA_PLUGIN_KEY));
}

@Test
void doCheckpoint_sets_and_clears_MDC() throws Exception {
createObjectUnderTest().doCheckpoint(mock(CheckpointState.class));

mdcMockedStatic.verify(() -> MDC.put(KafkaMdc.MDC_KAFKA_PLUGIN_KEY, "buffer"));
mdcMockedStatic.verify(() -> MDC.remove(KafkaMdc.MDC_KAFKA_PLUGIN_KEY));
}

@Test
void postProcess_sets_and_clears_MDC() throws Exception {
createObjectUnderTest().postProcess(100L);

mdcMockedStatic.verify(() -> MDC.put(KafkaMdc.MDC_KAFKA_PLUGIN_KEY, "buffer"));
mdcMockedStatic.verify(() -> MDC.remove(KafkaMdc.MDC_KAFKA_PLUGIN_KEY));
}

@Test
void isEmpty_sets_and_clears_MDC() throws Exception {
createObjectUnderTest().isEmpty();

mdcMockedStatic.verify(() -> MDC.put(KafkaMdc.MDC_KAFKA_PLUGIN_KEY, "buffer"));
mdcMockedStatic.verify(() -> MDC.remove(KafkaMdc.MDC_KAFKA_PLUGIN_KEY));
}

@Test
void shutdown_sets_and_clears_MDC() throws Exception {
createObjectUnderTest().shutdown();

mdcMockedStatic.verify(() -> MDC.put(KafkaMdc.MDC_KAFKA_PLUGIN_KEY, "buffer"));
mdcMockedStatic.verify(() -> MDC.remove(KafkaMdc.MDC_KAFKA_PLUGIN_KEY));
}
}
}
Loading

0 comments on commit faddf01

Please sign in to comment.