diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java index e514054d7..bdf9fa138 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java @@ -37,6 +37,7 @@ import java.time.Duration; import java.util.Properties; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicLong; import static org.apache.flink.util.Preconditions.checkState; @@ -55,6 +56,7 @@ class FlinkKafkaInternalProducer extends KafkaProducer { private volatile boolean inTransaction; private volatile boolean hasRecordsInTransaction; private volatile boolean closed; + private final AtomicLong pendingRecords = new AtomicLong(0); public FlinkKafkaInternalProducer(Properties properties, @Nullable String transactionalId) { super(withTransactionalId(properties, transactionalId)); @@ -72,12 +74,17 @@ private static Properties withTransactionalId( return props; } + public long getPendingRecordsCount() { + return pendingRecords.get(); + } + @Override public Future send(ProducerRecord record, Callback callback) { if (inTransaction) { hasRecordsInTransaction = true; } - return super.send(record, callback); + pendingRecords.incrementAndGet(); + return super.send(record, new TrackingCallback(callback)); } @Override @@ -86,6 +93,11 @@ public void flush() { if (inTransaction) { flushNewPartitions(); } + final long pendingRecordsCount = pendingRecords.get(); + if (pendingRecordsCount != 0) { + throw new IllegalStateException( + "Pending record count must be zero at this point: " + pendingRecordsCount); + } } @Override @@ -396,8 +408,27 @@ public String toString() { + transactionalId + "', inTransaction=" + inTransaction + + ", pendingRecords=" + + pendingRecords.get() + ", closed=" + closed + '}'; } + + public class TrackingCallback implements Callback { + + private final Callback actualCallback; + + public TrackingCallback(final Callback actualCallback) { + this.actualCallback = actualCallback; + } + + @Override + public void onCompletion(final RecordMetadata recordMetadata, final Exception e) { + pendingRecords.decrementAndGet(); + if (actualCallback != null) { + actualCallback.onCompletion(recordMetadata, e); + } + } + } } diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java index c9eceb982..d6246ed8a 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java @@ -487,6 +487,21 @@ void testAbortOnClose() throws Exception { } } + @Test + public void testAtLeastOnceFlushGate() throws Exception { + Properties properties = getKafkaClientConfiguration(); + try (final KafkaWriter writer = + createWriterWithConfiguration(properties, DeliveryGuarantee.AT_LEAST_ONCE)) { + writer.write(1, SINK_WRITER_CONTEXT); + assertThat(writer.getCurrentProducer().getPendingRecordsCount()) + .as("should have one pending record") + .isEqualTo(1); + assertThatCode(() -> writer.flush(false)) + .as("should not throw exception") + .doesNotThrowAnyException(); + } + } + private void assertKafkaMetricNotPresent( DeliveryGuarantee guarantee, String configKey, String configValue) throws Exception { final Properties config = getKafkaClientConfiguration();