Skip to content

Commit

Permalink
Add shutdown method to buffer API
Browse files Browse the repository at this point in the history
Signed-off-by: Chase Engelbrecht <[email protected]>
  • Loading branch information
engechas committed Oct 9, 2023
1 parent a8eb76b commit ecf4281
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,7 @@ public interface Buffer<T extends Record<?>> {
default Duration getDrainTimeout() {
return Duration.ZERO;
}

default void shutdown() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,9 @@ public boolean isEmpty() {
public Duration getDrainTimeout() {
return buffer.getDrainTimeout();
}

@Override
public void shutdown() {
buffer.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,8 @@ public synchronized void shutdown() {
shutdownExecutorService(processorExecutorService, buffer.getDrainTimeout().toMillis() + processorShutdownTimeout.toMillis(), "processor");

processorSets.forEach(processorSet -> processorSet.forEach(Processor::shutdown));
buffer.shutdown();

sinks.stream()
.map(DataFlowComponent::getComponent)
.forEach(Sink::shutdown);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,10 @@ public Duration getDrainTimeout() {
.map(Buffer::getDrainTimeout)
.reduce(Duration.ZERO, Duration::plus);
}

@Override
public void shutdown() {
primaryBuffer.shutdown();
secondaryBuffers.forEach(Buffer::shutdown);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,23 @@
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

@DataPrepperPlugin(name = "kafka", pluginType = Buffer.class, pluginConfigurationType = KafkaBufferConfig.class)
public class KafkaBuffer<T extends Record<?>> extends AbstractBuffer<T> {

private static final Logger LOG = LoggerFactory.getLogger(KafkaBuffer.class);
static final long EXECUTOR_SERVICE_SHUTDOWN_TIMEOUT = 30L;
public static final int INNER_BUFFER_CAPACITY = 1000000;
public static final int INNER_BUFFER_BATCH_SIZE = 250000;
private final KafkaCustomProducer producer;
private final List<KafkaCustomConsumer> emptyCheckingConsumers;
private final AbstractBuffer innerBuffer;
private final ExecutorService executorService;
private final Duration drainTimeout;
private AtomicBoolean shutdownInProgress;

@DataPrepperPluginConstructor
public KafkaBuffer(final PluginSetting pluginSetting, final KafkaBufferConfig kafkaBufferConfig, final PluginFactory pluginFactory,
Expand All @@ -47,8 +51,11 @@ public KafkaBuffer(final PluginSetting pluginSetting, final KafkaBufferConfig ka
producer = kafkaCustomProducerFactory.createProducer(kafkaBufferConfig, pluginFactory, pluginSetting, null, null);
final KafkaCustomConsumerFactory kafkaCustomConsumerFactory = new KafkaCustomConsumerFactory();
innerBuffer = new BlockingBuffer<>(INNER_BUFFER_CAPACITY, INNER_BUFFER_BATCH_SIZE, pluginSetting.getPipelineName());
this.shutdownInProgress = new AtomicBoolean(false);
final List<KafkaCustomConsumer> consumers = kafkaCustomConsumerFactory.createConsumersForTopic(kafkaBufferConfig, kafkaBufferConfig.getTopic(),
innerBuffer, pluginMetrics, acknowledgementSetManager, new AtomicBoolean(false));
innerBuffer, pluginMetrics, acknowledgementSetManager, shutdownInProgress);
emptyCheckingConsumers = kafkaCustomConsumerFactory.createConsumersForTopic(kafkaBufferConfig, kafkaBufferConfig.getTopic(),
innerBuffer, pluginMetrics, acknowledgementSetManager, shutdownInProgress);
this.executorService = Executors.newFixedThreadPool(consumers.size());
consumers.forEach(this.executorService::submit);

Expand Down Expand Up @@ -97,4 +104,24 @@ public boolean isEmpty() {
public Duration getDrainTimeout() {
return drainTimeout;
}

@Override
public void shutdown() {
shutdownInProgress.set(true);
executorService.shutdown();

try {
if (executorService.awaitTermination(EXECUTOR_SERVICE_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS)) {
LOG.info("Successfully waited for consumer task to terminate");
} else {
LOG.warn("Consumer task did not terminate in time, forcing termination");
executorService.shutdownNow();
}
} catch (final InterruptedException e) {
LOG.error("Interrupted while waiting for consumer task to terminate", e);
executorService.shutdownNow();
}

innerBuffer.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public String getClientDnsLookup() {

@Override
public boolean getAcknowledgementsEnabled() {
return false;
return true;
}

public Duration getDrainTimeout() {
Expand Down

0 comments on commit ecf4281

Please sign in to comment.