Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-25509][connector/kafka] Add RecordEvaluator to dynamically stop source based on de-serialized records #76

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

ruanhang1993
Copy link
Contributor

Add RecordEvaluator to dynamically stop source based on de-serialized records.

@MartijnVisser
Copy link
Contributor

@ruanhang1993 Can you rebase?

@MartijnVisser
Copy link
Contributor

@ruanhang1993 If you want to add support for this feature, I think you need to propose to drop support for Flink 1.18 in this version

@ruanhang1993
Copy link
Contributor Author

Thanks, @MartijnVisser .

This PR relies on the API changes in FLIP-208, which released in 1.18.
So we need wait until the Flink 1.17 is dropped in Kafka connector and use the 1.18 Flink to check this PR.

@MartijnVisser
Copy link
Contributor

So we need wait until the Flink 1.17 is dropped in Kafka connector and use the 1.18 Flink to check this PR.

@ruanhang1993 I think that means that in this PR, you'll need to remove support for 1.17. Since this feature requires 1.18, it would mean that v3.0 supports Flink 1.16 and 1.17, while main will only support 1.18 (meaning this feature would also require at least a minor version bump from 3.0 to 3.1)

@ruanhang1993
Copy link
Contributor Author

Hi, @MartijnVisser. You are right.
As the comments in FLINK-25509, I need to put this feature to a version which only supports Flink 1.18 and newer. Do we need version v3.1 to support Flink 1.17 and 1.18? If so, we should put this in the version after v3.1.

@mas-chen
Copy link
Contributor

I don't think we need 3.1 to support Flink 1.17 and Flink 1.18. We can use 3.0 for the purpose of supporting 1.17, but we need to continue supporting patch fixes on it.

* @param eofRecordEvaluator a {@link RecordEvaluator recordEvaluator}
* @return this KafkaSourceBuilder.
*/
public KafkaSourceBuilder<OUT> setEofRecordEvaluator(RecordEvaluator<OUT> eofRecordEvaluator) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When this condition is met, do we expect the Flink job to go into FINISHED state? If so, I think we need to only support this in bounded mode, as unbounded mode can't reach FINISHED state. This is because the KafkaSource and DynamicKafkaSource only send "notifyNoMoreSplits" for the bounded case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this comment, @mas-chen .
Yes, we should make the source FINISHED. I will fix this part later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @mas-chen .

I read some parts about the stoppingOffsetsInitializer. Actually the eofRecordEvaluator does the same job as stoppingOffsetsInitializer. We want to use both eofRecordEvaluator and stoppingOffsetsInitializer to decide when to stop reading and support to use them in one same source together.

But I find the method KafkaSourceBuilder<OUT> setUnbounded(OffsetsInitializer stoppingOffsetsInitializer) in builder, which supports to set a stoppingOffsetsInitializer in an unbounded source. I want to keep the same behavior as stoppingOffsetsInitializer and do not add the limitation for eofRecordEvaluator. What do you think?

Looking forward to your option.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, then we should be fine. The job won't terminate (and I guess this is desired in some situations, just learned this) but the source will terminate eventually

@MartijnVisser
Copy link
Contributor

I don't think we need 3.1 to support Flink 1.17 and Flink 1.18. We can use 3.0 for the purpose of supporting 1.17, but we need to continue supporting patch fixes on it.

We don't need it indeed. We just need to decide to which branch/version we'll need to submit backports.

I'm inclined to do the following:

  1. Review/merge [FLINK-32417] Add DynamicKafkaSource documentation for setter methods… #80 and then cut a v3.1 release branch, and make a release. That means that v3.1 would have support for 1.17 and 1.18
  2. Afterwards, merge this PR and drop support for both 1.17 and 1.18 in main and only support 1.19. Since we're breaking the SinkV2 implementation with Flink 1.19, that would be a new Flink Kafka major version anyway, so when 1.19 is released, we would cut main and make that v4.0. We would then have to do backports to both v3.1 and v4.0, since we'll need to support the last two minor Flink versions. For now that would be 1.17 and 1.18 (which are in v3.1), and when 1.19 is released we would backport to both v3.1 and v4.0.

@MartijnVisser
Copy link
Contributor

@mas-chen @ruanhang1993 I've brought this discussion (and my proposal) back to the Dev mailing list, see https://lists.apache.org/thread/0hp782dv6kjcv3ypthhfhyp7kglg83d0

@MartijnVisser
Copy link
Contributor

@ruanhang1993 Do you want to update your PR, and drop support for testing against 1.17 and 1.18 as a separate commit? That should unblock this PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants