Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-33545][Connectors/Kafka] KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching #70

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.time.Duration;
import java.util.Properties;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;

import static org.apache.flink.util.Preconditions.checkState;

Expand All @@ -55,6 +56,7 @@ class FlinkKafkaInternalProducer<K, V> extends KafkaProducer<K, V> {
private volatile boolean inTransaction;
private volatile boolean hasRecordsInTransaction;
private volatile boolean closed;
private final AtomicLong pendingRecords = new AtomicLong(0);

public FlinkKafkaInternalProducer(Properties properties, @Nullable String transactionalId) {
super(withTransactionalId(properties, transactionalId));
Expand All @@ -72,12 +74,17 @@ private static Properties withTransactionalId(
return props;
}

public long getPendingRecordsCount() {
return pendingRecords.get();
}

@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
if (inTransaction) {
hasRecordsInTransaction = true;
}
return super.send(record, callback);
pendingRecords.incrementAndGet();
return super.send(record, new TrackingCallback(callback));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change creates a new callback with every send. Since the callback being passed in our codebase is mostly constant, we should add a simple cache like new LRUMap(3);. The number is kind of arbitrary and 1 should work already. The most important part is that it shouldn't grow boundless or we get the next memory leak if I overlooked a dynamic usage ;).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just want to check, are we proposing putting instance of TrackingCallback into the rotating cache? wouldn't that caused the previous callback that might not have been invoked to be ignored?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't quite follow. I was proposing to use

return super.send(record, callbackCache.computeIfAbsent(callback, TrackingCallback::new));

So we have 3 cases:

  • New callback, wrap in TackingCallback and cache.
  • Existing callback (common case), retrieve existing callback and use it.
  • Remove existing TackingCallback from cache if full.

In all cases, both the TackingCallback and the original callback will be invoked. The only difference to the code without cache is that we avoiding creating extra TrackingCallback instances around the same original callback.

}

@Override
Expand All @@ -86,6 +93,11 @@ public void flush() {
if (inTransaction) {
flushNewPartitions();
}
final long pendingRecordsCount = pendingRecords.get();
if (pendingRecordsCount != 0) {
throw new IllegalStateException(
"Pending record count must be zero at this point: " + pendingRecordsCount);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: how about this message like this:

n pending records after flush. There must be no pending records left.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd improve the error message as follows:

Some records have not been fully persisted in Kafka. As a precaution, Flink will restart to resume from previous checkpoint. Please report this issue with logs on https://issues.apache.org/jira/browse/FLINK-33545.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think having reference to the issue reported might allow us better tracking the potential problem

}
}

@Override
Expand Down Expand Up @@ -396,8 +408,27 @@ public String toString() {
+ transactionalId
+ "', inTransaction="
+ inTransaction
+ ", pendingRecords="
+ pendingRecords.get()
+ ", closed="
+ closed
+ '}';
}

public class TrackingCallback implements Callback {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: private


private final Callback actualCallback;

public TrackingCallback(final Callback actualCallback) {
this.actualCallback = actualCallback;
}

@Override
public void onCompletion(final RecordMetadata recordMetadata, final Exception e) {
pendingRecords.decrementAndGet();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to decrement after the callback is completed? What's the best approach semantically?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it already happened, we should probably decrement as soon as possible?

if (actualCallback != null) {
actualCallback.onCompletion(recordMetadata, e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,21 @@ void testAbortOnClose() throws Exception {
}
}

@Test
public void testAtLeastOnceFlushGate() throws Exception {
Properties properties = getKafkaClientConfiguration();
try (final KafkaWriter<Integer> writer =
createWriterWithConfiguration(properties, DeliveryGuarantee.AT_LEAST_ONCE)) {
writer.write(1, SINK_WRITER_CONTEXT);
assertThat(writer.getCurrentProducer().getPendingRecordsCount())
.as("should have one pending record")
.isEqualTo(1);
assertThatCode(() -> writer.flush(false))
.as("should not throw exception")
.doesNotThrowAnyException();
}
}

private void assertKafkaMetricNotPresent(
DeliveryGuarantee guarantee, String configKey, String configValue) throws Exception {
final Properties config = getKafkaClientConfiguration();
Expand Down
Loading