-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-35600] Add timestamp for low and high watermark #3415
base: master
Are you sure you want to change the base?
Conversation
@leonardBang @ruanhang1993 PTAL. |
This pull request has been automatically marked as stale because it has not had recent activity for 60 days. It will be closed in 30 days if no further activity occurs. |
@JNSimba Thanks for this PR. Please add some tests to cover the changes. |
Thanks, itcase has been added,PTAL @ruanhang1993 |
@@ -187,6 +192,9 @@ protected SnapshotResult<MySqlOffsetContext> doExecute( | |||
} else { | |||
// Get the current binlog offset as HW | |||
highWatermark = DebeziumUtils.currentBinlogOffset(jdbcConnection); | |||
long epochSecond = clock.currentTime().getEpochSecond(); | |||
highWatermark.getOffset().put(BinlogOffset.TIMESTAMP_KEY, String.valueOf(epochSecond)); | |||
highWatermark.getOffset().put(BinlogOffset.SERVER_ID_KEY, String.valueOf(epochSecond)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to add the server-id here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the configureFilter method of BinlogSplitReader In the process, the highWatermark of all chunks will be compared to obtain the largest one.
When the table has multiple chunks
chunk-1:timestamp=1727423957,binlogpostion=1001
chunk-2:timestamp=1727423958,binlogpostion=1002
chunk-3:timestamp=1727423959,binlogpostion=1002
chunk-4:timestamp=1727423960,binlogpostion=1002
However, at this time, the serverid is 0, but the BinlogPosition may be different (because new data has been added during the period). According to the current logic of BinlogOffset.compare: if the serverid is the same, the postion/filename will be compared. However, for chunk-2/chunk-3/chunk-4, except for the timestamp, everything else is the same, so the calculated highWatermark is finally chunk-2, which will lead to duplicate data.
I changed this to compare timestamp in BinlogOffset.compare method, so there is no need to set serverid, PTAL, thanks @ruanhang1993 |
@lvyanquan Do you have time to help to review this PR again? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
https://issues.apache.org/jira/browse/FLINK-35600