diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/Buffer.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/Buffer.java index 50c1085bd7..e1ecb3311c 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/Buffer.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/Buffer.java @@ -58,4 +58,7 @@ public interface Buffer> { default Duration getDrainTimeout() { return Duration.ZERO; } + + default void shutdown() { + } } diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/CircuitBreakingBuffer.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/CircuitBreakingBuffer.java index 403149f8d9..0e63c682c4 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/CircuitBreakingBuffer.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/CircuitBreakingBuffer.java @@ -77,4 +77,9 @@ public boolean isEmpty() { public Duration getDrainTimeout() { return buffer.getDrainTimeout(); } + + @Override + public void shutdown() { + buffer.shutdown(); + } } diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/Pipeline.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/Pipeline.java index b60257d860..11553c89fa 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/Pipeline.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/Pipeline.java @@ -280,6 +280,8 @@ public synchronized void shutdown() { shutdownExecutorService(processorExecutorService, buffer.getDrainTimeout().toMillis() + processorShutdownTimeout.toMillis(), "processor"); processorSets.forEach(processorSet -> processorSet.forEach(Processor::shutdown)); + buffer.shutdown(); + sinks.stream() .map(DataFlowComponent::getComponent) .forEach(Sink::shutdown); diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugins/MultiBufferDecorator.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugins/MultiBufferDecorator.java index f7da240e5d..b8be2e0256 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugins/MultiBufferDecorator.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugins/MultiBufferDecorator.java @@ -64,4 +64,10 @@ public Duration getDrainTimeout() { .map(Buffer::getDrainTimeout) .reduce(Duration.ZERO, Duration::plus); } + + @Override + public void shutdown() { + primaryBuffer.shutdown(); + secondaryBuffers.forEach(Buffer::shutdown); + } } \ No newline at end of file diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java index 158cc8bee8..dce5e72e00 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -32,12 +33,15 @@ public class KafkaBuffer> extends AbstractBuffer { private static final Logger LOG = LoggerFactory.getLogger(KafkaBuffer.class); + static final long EXECUTOR_SERVICE_SHUTDOWN_TIMEOUT = 30L; public static final int INNER_BUFFER_CAPACITY = 1000000; public static final int INNER_BUFFER_BATCH_SIZE = 250000; private final KafkaCustomProducer producer; + private final List emptyCheckingConsumers; private final AbstractBuffer innerBuffer; private final ExecutorService executorService; private final Duration drainTimeout; + private AtomicBoolean shutdownInProgress; @DataPrepperPluginConstructor public KafkaBuffer(final PluginSetting pluginSetting, final KafkaBufferConfig kafkaBufferConfig, final PluginFactory pluginFactory, @@ -47,8 +51,11 @@ public KafkaBuffer(final PluginSetting pluginSetting, final KafkaBufferConfig ka producer = kafkaCustomProducerFactory.createProducer(kafkaBufferConfig, pluginFactory, pluginSetting, null, null); final KafkaCustomConsumerFactory kafkaCustomConsumerFactory = new KafkaCustomConsumerFactory(); innerBuffer = new BlockingBuffer<>(INNER_BUFFER_CAPACITY, INNER_BUFFER_BATCH_SIZE, pluginSetting.getPipelineName()); + this.shutdownInProgress = new AtomicBoolean(false); final List consumers = kafkaCustomConsumerFactory.createConsumersForTopic(kafkaBufferConfig, kafkaBufferConfig.getTopic(), - innerBuffer, pluginMetrics, acknowledgementSetManager, new AtomicBoolean(false)); + innerBuffer, pluginMetrics, acknowledgementSetManager, shutdownInProgress); + emptyCheckingConsumers = kafkaCustomConsumerFactory.createConsumersForTopic(kafkaBufferConfig, kafkaBufferConfig.getTopic(), + innerBuffer, pluginMetrics, acknowledgementSetManager, shutdownInProgress); this.executorService = Executors.newFixedThreadPool(consumers.size()); consumers.forEach(this.executorService::submit); @@ -97,4 +104,24 @@ public boolean isEmpty() { public Duration getDrainTimeout() { return drainTimeout; } + + @Override + public void shutdown() { + shutdownInProgress.set(true); + executorService.shutdown(); + + try { + if (executorService.awaitTermination(EXECUTOR_SERVICE_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS)) { + LOG.info("Successfully waited for consumer task to terminate"); + } else { + LOG.warn("Consumer task did not terminate in time, forcing termination"); + executorService.shutdownNow(); + } + } catch (final InterruptedException e) { + LOG.error("Interrupted while waiting for consumer task to terminate", e); + executorService.shutdownNow(); + } + + innerBuffer.shutdown(); + } } diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaBufferConfig.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaBufferConfig.java index 1cab8d7133..8634ffc082 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaBufferConfig.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaBufferConfig.java @@ -116,7 +116,7 @@ public String getClientDnsLookup() { @Override public boolean getAcknowledgementsEnabled() { - return false; + return true; } public Duration getDrainTimeout() {