-
Notifications
You must be signed in to change notification settings - Fork 122
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-33545][Connectors/Kafka] KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching #70
base: main
Are you sure you want to change the base?
Conversation
Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html) |
@hhktseng Can you rebase your PR? |
@MartijnVisser can you point me to which commit to rebase onto? thanks |
@hhktseng On the latest changes from |
created patch and applied after syncing to latest commit, then replaced forked branch with latest sync + patch |
0ea39ea
to
029b011
Compare
...or-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java
Outdated
Show resolved
Hide resolved
…taloss during broker issue when not using EXACTLY_ONCE if there's any batching
029b011
to
0d81bc8
Compare
+ ", closed=" | ||
+ closed | ||
+ '}'; | ||
} | ||
|
||
public class TrackingCallback implements Callback { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: private
|
||
@Override | ||
public void onCompletion(final RecordMetadata recordMetadata, final Exception e) { | ||
pendingRecords.decrementAndGet(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to decrement after the callback is completed? What's the best approach semantically?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since it already happened, we should probably decrement as soon as possible?
final long pendingRecordsCount = pendingRecords.get(); | ||
if (pendingRecordsCount != 0) { | ||
throw new IllegalStateException( | ||
"Pending record count must be zero at this point: " + pendingRecordsCount); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: how about this message like this:
n pending records after flush. There must be no pending records left.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd improve the error message as follows:
Some records have not been fully persisted in Kafka. As a precaution, Flink will restart to resume from previous checkpoint. Please report this issue with logs on https://issues.apache.org/jira/browse/FLINK-33545
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think having reference to the issue reported might allow us better tracking the potential problem
@hhktseng were you able to test that this change mitigates your original issue? Is there a way to repro in the tests? |
@hhktseng thanks for working on this. Can you please address the review comments? |
Please check my comment here. https://issues.apache.org/jira/browse/FLINK-33545?focusedCommentId=17863737&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17863737 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you very much for your contribution. Changes look mostly good to me. I have added two small suggestions.
I'm also wondering if we should add a property around this new behavior, so folks can turn off the check if it has unintended side-effects (performance degration or it's interfering with DeliveryGuarantee.NONE)
@Override | ||
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { | ||
if (inTransaction) { | ||
hasRecordsInTransaction = true; | ||
} | ||
return super.send(record, callback); | ||
pendingRecords.incrementAndGet(); | ||
return super.send(record, new TrackingCallback(callback)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change creates a new callback with every send
. Since the callback being passed in our codebase is mostly constant, we should add a simple cache like new LRUMap(3);
. The number is kind of arbitrary and 1 should work already. The most important part is that it shouldn't grow boundless or we get the next memory leak if I overlooked a dynamic usage ;).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just want to check, are we proposing putting instance of TrackingCallback into the rotating cache? wouldn't that caused the previous callback that might not have been invoked to be ignored?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't quite follow. I was proposing to use
return super.send(record, callbackCache.computeIfAbsent(callback, TrackingCallback::new));
So we have 3 cases:
- New callback, wrap in
TackingCallback
and cache. - Existing callback (common case), retrieve existing callback and use it.
- Remove existing
TackingCallback
from cache if full.
In all cases, both the TackingCallback and the original callback will be invoked. The only difference to the code without cache is that we avoiding creating extra TrackingCallback instances around the same original callback.
final long pendingRecordsCount = pendingRecords.get(); | ||
if (pendingRecordsCount != 0) { | ||
throw new IllegalStateException( | ||
"Pending record count must be zero at this point: " + pendingRecordsCount); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd improve the error message as follows:
Some records have not been fully persisted in Kafka. As a precaution, Flink will restart to resume from previous checkpoint. Please report this issue with logs on https://issues.apache.org/jira/browse/FLINK-33545
.
What is the purpose of the change
To address the current flow of KafkaProducer having exact same behavior for
DeliveryGuarantee.NONE
andDeliveryGuarantee.AT_LEAST_ONCE
It is based on the understanding that the existing flush performed on producer via
prepareSnapshotPreBarrier
and the actual checkpoint completion on commit has a very rare race condition where there could be data being invoked via processElement after the PreBarrier flush, and if KafkaProducer is having retry on a batched data that has yet thrown any error, upon job failure (caused by broker) will cause the batched data to never be committed, and since checkpoint was successful, these data will be lost.This PR address the issue by enabling AT_LEAST_ONCE to have an opportunity to flush again when commit is happening when needed to avoid this issue. This is to ensure at the end of the checkpoint cycle, producer will definitely have no data left in its buffer.
Please comment or verify on the above understanding.
Brief change log
hasRecordsInBuffer
toFlinkKafkaInternalProducer
and will be set when send/flush/close are calledtransactional
toKafkaCommittable
to track whether a committable is transactional or notKafkaCommittable
for Unit Test backward compatibilityprepareCommit
also return list of commitable for DeliveryGuarantee.AT_LEAST_ONCE inprepareCommit()
KafkaCommitter
check the new transactional value onKafkaCommittable
to performcommitTransaction()
to preserve original EXACTLY_ONCE pathwayVerifying this change
This change added tests and can be verified as follows:
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: noDocumentation