Skip to content

Commit

Permalink
[FLINK-33545][Connectors/Kafka] KafkaSink implementation can cause da…
Browse files Browse the repository at this point in the history
…taloss during broker issue when not using EXACTLY_ONCE if there's any batching
  • Loading branch information
kevin_tseng committed Jan 23, 2024
1 parent 6f06f15 commit 0ea39ea
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class FlinkKafkaInternalProducer<K, V> extends KafkaProducer<K, V> {
@Nullable private String transactionalId;
private volatile boolean inTransaction;
private volatile boolean hasRecordsInTransaction;
private volatile boolean hasRecordsInBuffer;
private volatile boolean closed;

public FlinkKafkaInternalProducer(Properties properties, @Nullable String transactionalId) {
Expand All @@ -77,6 +78,7 @@ public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callbac
if (inTransaction) {
hasRecordsInTransaction = true;
}
hasRecordsInBuffer = true;
return super.send(record, callback);
}

Expand All @@ -86,6 +88,7 @@ public void flush() {
if (inTransaction) {
flushNewPartitions();
}
hasRecordsInBuffer = false;
}

@Override
Expand Down Expand Up @@ -120,6 +123,10 @@ public boolean hasRecordsInTransaction() {
return hasRecordsInTransaction;
}

public boolean hasRecordsInBuffer() {
return hasRecordsInBuffer;
}

@Override
public void close() {
closed = true;
Expand All @@ -133,12 +140,14 @@ public void close() {
// If this is outside of a transaction, we should be able to cleanly shutdown.
super.close(Duration.ofHours(1));
}
hasRecordsInBuffer = false;
}

@Override
public void close(Duration timeout) {
closed = true;
super.close(timeout);
hasRecordsInBuffer = false;
}

public boolean isClosed() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,26 +32,45 @@ class KafkaCommittable {
private final long producerId;
private final short epoch;
private final String transactionalId;
private final boolean transactional;
@Nullable private Recyclable<? extends FlinkKafkaInternalProducer<?, ?>> producer;

public KafkaCommittable(
long producerId,
short epoch,
String transactionalId,
boolean transactional,
@Nullable Recyclable<? extends FlinkKafkaInternalProducer<?, ?>> producer) {
this.producerId = producerId;
this.epoch = epoch;
this.transactionalId = transactionalId;
this.transactional = transactional;
this.producer = producer;
}

public KafkaCommittable(
long producerId,
short epoch,
String transactionalId,
@Nullable Recyclable<? extends FlinkKafkaInternalProducer<?, ?>> producer) {
this(producerId, epoch, transactionalId, true, producer);
}

public static <K, V> KafkaCommittable of(
FlinkKafkaInternalProducer<K, V> producer,
Consumer<FlinkKafkaInternalProducer<K, V>> recycler) {
return KafkaCommittable.of(producer, true, recycler);
}

public static <K, V> KafkaCommittable of(
FlinkKafkaInternalProducer<K, V> producer,
boolean transactional,
Consumer<FlinkKafkaInternalProducer<K, V>> recycler) {
return new KafkaCommittable(
producer.getProducerId(),
producer.getEpoch(),
producer.getTransactionalId(),
transactional,
new Recyclable<>(producer, recycler));
}

Expand All @@ -67,6 +86,10 @@ public String getTransactionalId() {
return transactionalId;
}

public boolean isTransactional() {
return transactional;
}

public Optional<Recyclable<? extends FlinkKafkaInternalProducer<?, ?>>> getProducer() {
return Optional.ofNullable(producer);
}
Expand All @@ -76,6 +99,8 @@ public String toString() {
return "KafkaCommittable{"
+ "producerId="
+ producerId
+ "transactional="
+ transactional
+ ", epoch="
+ epoch
+ ", transactionalId="
Expand All @@ -94,11 +119,12 @@ public boolean equals(Object o) {
KafkaCommittable that = (KafkaCommittable) o;
return producerId == that.producerId
&& epoch == that.epoch
&& transactional == that.transactional
&& transactionalId.equals(that.transactionalId);
}

@Override
public int hashCode() {
return Objects.hash(producerId, epoch, transactionalId);
return Objects.hash(producerId, epoch, transactional, transactionalId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ public void commit(Collection<CommitRequest<KafkaCommittable>> requests)
recyclable
.<FlinkKafkaInternalProducer<?, ?>>map(Recyclable::getObject)
.orElseGet(() -> getRecoveryProducer(committable));
producer.commitTransaction();
if (committable.isTransactional()) {
producer.commitTransaction();
}
producer.flush();
recyclable.ifPresent(Recyclable::close);
} catch (RetriableException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,22 +209,27 @@ public void flush(boolean endOfInput) throws IOException, InterruptedException {

@Override
public Collection<KafkaCommittable> prepareCommit() {
if (deliveryGuarantee != DeliveryGuarantee.EXACTLY_ONCE) {
if (deliveryGuarantee == DeliveryGuarantee.NONE) {
return Collections.emptyList();
}

// only return a KafkaCommittable if the current transaction has been written some data
if (currentProducer.hasRecordsInTransaction()) {
// or if there's any record in buffer after our first flush
if (currentProducer.hasRecordsInTransaction() || currentProducer.hasRecordsInBuffer()) {
final List<KafkaCommittable> committables =
Collections.singletonList(
KafkaCommittable.of(currentProducer, producerPool::add));
KafkaCommittable.of(
currentProducer,
deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE,
producerPool::add));
LOG.debug("Committing {} committables.", committables);
return committables;
} else if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {
// otherwise, we commit the empty transaction as is (no-op) and just recycle the
// producer
currentProducer.commitTransaction();
producerPool.add(currentProducer);
}

// otherwise, we commit the empty transaction as is (no-op) and just recycle the producer
currentProducer.commitTransaction();
producerPool.add(currentProducer);
return Collections.emptyList();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,26 @@ void testAbortOnClose() throws Exception {
}
}

@Test
public void testAtLeastOnceSecondFlush() throws Exception {
Properties properties = getKafkaClientConfiguration();
try (final KafkaWriter<Integer> writer =
createWriterWithConfiguration(properties, DeliveryGuarantee.AT_LEAST_ONCE)) {
writer.write(1, SINK_WRITER_CONTEXT);
writer.flush(false);
Collection<KafkaCommittable> commitables = writer.prepareCommit();
assertThat(commitables.size()).isEqualTo(0);

writer.write(2, SINK_WRITER_CONTEXT);
commitables = writer.prepareCommit();
assertThat(commitables.size()).isGreaterThan(0);

writer.flush(false);
commitables = writer.prepareCommit();
assertThat(commitables.size()).isEqualTo(0);
}
}

private void assertKafkaMetricNotPresent(
DeliveryGuarantee guarantee, String configKey, String configValue) throws Exception {
final Properties config = getKafkaClientConfiguration();
Expand Down

0 comments on commit 0ea39ea

Please sign in to comment.