diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java index e514054d7..dc00aa52b 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java @@ -54,6 +54,7 @@ class FlinkKafkaInternalProducer extends KafkaProducer { @Nullable private String transactionalId; private volatile boolean inTransaction; private volatile boolean hasRecordsInTransaction; + private volatile boolean hasRecordsInBuffer; private volatile boolean closed; public FlinkKafkaInternalProducer(Properties properties, @Nullable String transactionalId) { @@ -77,6 +78,7 @@ public Future send(ProducerRecord record, Callback callbac if (inTransaction) { hasRecordsInTransaction = true; } + hasRecordsInBuffer = true; return super.send(record, callback); } @@ -86,6 +88,7 @@ public void flush() { if (inTransaction) { flushNewPartitions(); } + hasRecordsInBuffer = false; } @Override @@ -120,6 +123,10 @@ public boolean hasRecordsInTransaction() { return hasRecordsInTransaction; } + public boolean hasRecordsInBuffer() { + return hasRecordsInBuffer; + } + @Override public void close() { closed = true; @@ -133,12 +140,14 @@ public void close() { // If this is outside of a transaction, we should be able to cleanly shutdown. super.close(Duration.ofHours(1)); } + hasRecordsInBuffer = false; } @Override public void close(Duration timeout) { closed = true; super.close(timeout); + hasRecordsInBuffer = false; } public boolean isClosed() { diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommittable.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommittable.java index 72a3281e6..891189791 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommittable.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommittable.java @@ -32,26 +32,45 @@ class KafkaCommittable { private final long producerId; private final short epoch; private final String transactionalId; + private final boolean transactional; @Nullable private Recyclable> producer; public KafkaCommittable( long producerId, short epoch, String transactionalId, + boolean transactional, @Nullable Recyclable> producer) { this.producerId = producerId; this.epoch = epoch; this.transactionalId = transactionalId; + this.transactional = transactional; this.producer = producer; } + public KafkaCommittable( + long producerId, + short epoch, + String transactionalId, + @Nullable Recyclable> producer) { + this(producerId, epoch, transactionalId, true, producer); + } + public static KafkaCommittable of( FlinkKafkaInternalProducer producer, Consumer> recycler) { + return KafkaCommittable.of(producer, true, recycler); + } + + public static KafkaCommittable of( + FlinkKafkaInternalProducer producer, + boolean transactional, + Consumer> recycler) { return new KafkaCommittable( producer.getProducerId(), producer.getEpoch(), producer.getTransactionalId(), + transactional, new Recyclable<>(producer, recycler)); } @@ -67,6 +86,10 @@ public String getTransactionalId() { return transactionalId; } + public boolean isTransactional() { + return transactional; + } + public Optional>> getProducer() { return Optional.ofNullable(producer); } @@ -76,6 +99,8 @@ public String toString() { return "KafkaCommittable{" + "producerId=" + producerId + + "transactional=" + + transactional + ", epoch=" + epoch + ", transactionalId=" @@ -94,11 +119,12 @@ public boolean equals(Object o) { KafkaCommittable that = (KafkaCommittable) o; return producerId == that.producerId && epoch == that.epoch + && transactional == that.transactional && transactionalId.equals(that.transactionalId); } @Override public int hashCode() { - return Objects.hash(producerId, epoch, transactionalId); + return Objects.hash(producerId, epoch, transactional, transactionalId); } } diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java index 4dbeaf9e7..ec03d0858 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java @@ -70,7 +70,9 @@ public void commit(Collection> requests) recyclable .>map(Recyclable::getObject) .orElseGet(() -> getRecoveryProducer(committable)); - producer.commitTransaction(); + if (committable.isTransactional()) { + producer.commitTransaction(); + } producer.flush(); recyclable.ifPresent(Recyclable::close); } catch (RetriableException e) { diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java index 0cc16b219..c9c53c7a1 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java @@ -209,22 +209,27 @@ public void flush(boolean endOfInput) throws IOException, InterruptedException { @Override public Collection prepareCommit() { - if (deliveryGuarantee != DeliveryGuarantee.EXACTLY_ONCE) { + if (deliveryGuarantee == DeliveryGuarantee.NONE) { return Collections.emptyList(); } // only return a KafkaCommittable if the current transaction has been written some data - if (currentProducer.hasRecordsInTransaction()) { + // or if there's any record in buffer after our first flush + if (currentProducer.hasRecordsInTransaction() || currentProducer.hasRecordsInBuffer()) { final List committables = Collections.singletonList( - KafkaCommittable.of(currentProducer, producerPool::add)); + KafkaCommittable.of( + currentProducer, + deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE, + producerPool::add)); LOG.debug("Committing {} committables.", committables); return committables; + } else if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) { + // otherwise, we commit the empty transaction as is (no-op) and just recycle the + // producer + currentProducer.commitTransaction(); + producerPool.add(currentProducer); } - - // otherwise, we commit the empty transaction as is (no-op) and just recycle the producer - currentProducer.commitTransaction(); - producerPool.add(currentProducer); return Collections.emptyList(); } diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java index 811ffa207..6041c0308 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java @@ -507,6 +507,26 @@ void testAbortOnClose() throws Exception { } } + @Test + public void testAtLeastOnceSecondFlush() throws Exception { + Properties properties = getKafkaClientConfiguration(); + try (final KafkaWriter writer = + createWriterWithConfiguration(properties, DeliveryGuarantee.AT_LEAST_ONCE)) { + writer.write(1, SINK_WRITER_CONTEXT); + writer.flush(false); + Collection commitables = writer.prepareCommit(); + assertThat(commitables.size()).isEqualTo(0); + + writer.write(2, SINK_WRITER_CONTEXT); + commitables = writer.prepareCommit(); + assertThat(commitables.size()).isGreaterThan(0); + + writer.flush(false); + commitables = writer.prepareCommit(); + assertThat(commitables.size()).isEqualTo(0); + } + } + private void assertKafkaMetricNotPresent( DeliveryGuarantee guarantee, String configKey, String configValue) throws Exception { final Properties config = getKafkaClientConfiguration();