-
Notifications
You must be signed in to change notification settings - Fork 122
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-33545][Connectors/Kafka] KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching #70
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -37,6 +37,7 @@ | |
import java.time.Duration; | ||
import java.util.Properties; | ||
import java.util.concurrent.Future; | ||
import java.util.concurrent.atomic.AtomicLong; | ||
|
||
import static org.apache.flink.util.Preconditions.checkState; | ||
|
||
|
@@ -55,6 +56,7 @@ class FlinkKafkaInternalProducer<K, V> extends KafkaProducer<K, V> { | |
private volatile boolean inTransaction; | ||
private volatile boolean hasRecordsInTransaction; | ||
private volatile boolean closed; | ||
private final AtomicLong pendingRecords = new AtomicLong(0); | ||
|
||
public FlinkKafkaInternalProducer(Properties properties, @Nullable String transactionalId) { | ||
super(withTransactionalId(properties, transactionalId)); | ||
|
@@ -72,12 +74,17 @@ private static Properties withTransactionalId( | |
return props; | ||
} | ||
|
||
public long getPendingRecordsCount() { | ||
return pendingRecords.get(); | ||
} | ||
|
||
@Override | ||
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { | ||
if (inTransaction) { | ||
hasRecordsInTransaction = true; | ||
} | ||
return super.send(record, callback); | ||
pendingRecords.incrementAndGet(); | ||
return super.send(record, new TrackingCallback(callback)); | ||
} | ||
|
||
@Override | ||
|
@@ -86,6 +93,11 @@ public void flush() { | |
if (inTransaction) { | ||
flushNewPartitions(); | ||
} | ||
final long pendingRecordsCount = pendingRecords.get(); | ||
if (pendingRecordsCount != 0) { | ||
throw new IllegalStateException( | ||
"Pending record count must be zero at this point: " + pendingRecordsCount); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: how about this message like this:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd improve the error message as follows:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i think having reference to the issue reported might allow us better tracking the potential problem |
||
} | ||
} | ||
|
||
@Override | ||
|
@@ -396,8 +408,27 @@ public String toString() { | |
+ transactionalId | ||
+ "', inTransaction=" | ||
+ inTransaction | ||
+ ", pendingRecords=" | ||
+ pendingRecords.get() | ||
+ ", closed=" | ||
+ closed | ||
+ '}'; | ||
} | ||
|
||
public class TrackingCallback implements Callback { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: private |
||
|
||
private final Callback actualCallback; | ||
|
||
public TrackingCallback(final Callback actualCallback) { | ||
this.actualCallback = actualCallback; | ||
} | ||
|
||
@Override | ||
public void onCompletion(final RecordMetadata recordMetadata, final Exception e) { | ||
pendingRecords.decrementAndGet(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we want to decrement after the callback is completed? What's the best approach semantically? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since it already happened, we should probably decrement as soon as possible? |
||
if (actualCallback != null) { | ||
actualCallback.onCompletion(recordMetadata, e); | ||
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change creates a new callback with every
send
. Since the callback being passed in our codebase is mostly constant, we should add a simple cache likenew LRUMap(3);
. The number is kind of arbitrary and 1 should work already. The most important part is that it shouldn't grow boundless or we get the next memory leak if I overlooked a dynamic usage ;).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just want to check, are we proposing putting instance of TrackingCallback into the rotating cache? wouldn't that caused the previous callback that might not have been invoked to be ignored?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't quite follow. I was proposing to use
return super.send(record, callbackCache.computeIfAbsent(callback, TrackingCallback::new));
So we have 3 cases:
TackingCallback
and cache.TackingCallback
from cache if full.In all cases, both the TackingCallback and the original callback will be invoked. The only difference to the code without cache is that we avoiding creating extra TrackingCallback instances around the same original callback.