Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-34466] Lineage interfaces for kafka connector #130

Merged
merged 3 commits into from
Nov 14, 2024

Conversation

pawel-big-lebowski
Copy link
Contributor

@pawel-big-lebowski pawel-big-lebowski commented Oct 14, 2024

FLINK-31275 is aiming to provide native lineage support in Flink's codebase with custom job listeners that get notified about job state changes as well as lineage graph extracted. As a part of that, lineage interfaces introduced in FLINK-33210 need to be implemented on the connectors' side for sources and sinks to expose lineage metadata about input and output datasets of job runs.

https://issues.apache.org/jira/browse/FLINK-34466

Copy link

boring-cyborg bot commented Oct 14, 2024

Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)

@pawel-big-lebowski pawel-big-lebowski force-pushed the lineage-impl branch 2 times, most recently from 6bf144b to 5834583 Compare October 14, 2024 12:55
* Contains method which can be used for lineage schema facet extraction. Useful for classes like
* topic selectors or serialization schemas to extract dataset information from.
*/
public interface LineageFacetProvider {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel this interface can be moved to flink core repo.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so too. To me, it makes more sense to add it first into flink-core and remove here later when upgrading flink-core for flink-connector-kafka. This shouldn't block lineage interface implementation.

@HuangZhenQiu
Copy link
Contributor

HuangZhenQiu commented Oct 15, 2024

@pawel-big-lebowski
Would you please change the PR title with the jira ticket prefix. Please also enhance the PR summary with the original template.

@pawel-big-lebowski pawel-big-lebowski changed the title Lineage interfaces for kafka connector [FLINK-34466] Lineage interfaces for kafka connector Oct 15, 2024
Copy link
Contributor

@AHeise AHeise left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall approach looks good. I challenged a central piece of transferring the information via facets to the sink/source, so I haven't checked the tests yet. PTAL.

* Contains method which can be used for lineage schema facet extraction. Useful for classes like
* topic selectors or serialization schemas to extract dataset information from.
*/
public interface LineageFacetProvider {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be part of flink-core in the future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

*
* @return
*/
List<LineageDatasetFacet> getDatasetFacets();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: is Collection sufficient?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flink-core lineage interfaces like LineageVertex and LineageGraph also have lists, but Collection should be enough.

facets.stream().filter(f -> !f.equals(topicList)).collect(Collectors.toList());

topicList.get().topics.stream()
.forEach(t -> datasets.add(datasetOf(namespace, t, facetsWithoutTopicList)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: If you use functional style, forEach + add is rather an anti-pattern. You'd instead chain Streams and materialize them at the very end with a Collector.

* @param facets
* @return
*/
public static List<LineageDataset> datasetsFrom(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This whole information flow around the facets looks a bit unclean to me.
Both Source/Sink throw a bunch of information into a list of LineageDatasetFacet, then this method is applied to take that list apart and construct the actually intended LineageDataset. So we first deliberately lose the information of what the facets are about and then we need to use a lot of (hidden) if-else to extract that information again.

WDYT of replacing the List<LineageDatasetFacet> instead with a value class that contains all relevant information:

class KafkaFacet { 
  @Nullable
  String topicPattern;
  @Nullable
  List<String> topicList;
  Properties properties;
  @Nullable
  TypeInformation typeInformation;
}

Then you can access all the different pieces of information without the isInstance/cast pattern that you use.
You can then in this method still turn all the pieces of information into separate facets.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The difficulty of this approach is that KafkaFacet properties are collected in different classes and this is currently done with LineageFacetProvider having a method Collection<LineageDatasetFacet> getDatasetFacets().

A solution to this would be to create KafkaFacetProvider interfaces (instead of LineageFacetProvider) with a method:

void buildKafkaFacet(KafkaFacetBuilder builder) 

This would pass Kafka facet builder as an argument and let facet being enriched within the method calls.

@AHeise Is this something you had on your mind?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm I have not fully understood from which classes we actually need to extract the various data points. Could we recap here?

  • Source/Sink gives us the properties directly
  • Source gives us the type information directly but we also try to extract it from the deserialization schema (why?).
  • KafkaSubscriber of the source either gives us a topicPattern or a topicList.
  • SerializationSchema of the sink gives us the topicList

In the end, we emit a lineageVertex that has facets per topic (pattern) in some cross-product fashion. I have not fully understood how a given input looks fully expanded after datasetsFrom. Maybe you could summarize that.

Anyways, it feels like the KafkaFacet contains a list of topics that is filled through polymorphism and some parts that are filled statically. Can we maybe separate that? Would we be able to say that the topic selector/subscriber just return a list of facet names and we use them to create the facets with the statically set properties and type information?

public class KafkaPropertiesFacet implements LineageDatasetFacet {

public static final String KAFKA_PROPERTIES_FACET_NAME = "kafkaProperties";
public Properties properties;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What assumptions do we make about the mutability and thread-safety of the facades? Do we nned to make defensive copies of the mutable information such as the Properties?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't know the answers to those questions. I think it's safer to create new properties object.

@Override
public List<LineageDatasetFacet> getDatasetFacets() {
List<LineageDatasetFacet> facets = new ArrayList<>();
facets.add(new KafkaTopicListFacet(Arrays.asList(topicSelector.apply(null))));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is topicSelector.apply(null) guaranteed to work?
Is this even the right thing to do? TopicSelector could return different topics coming from different inputs.
I think we should instead check if TopicSelector is also a LineageProvider and ask it directly.
Our TopicSelectors should then implement it and we should add to the javadoc that LineageProvider is encouraged.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this. It works only for the scenario KafkaRecordSerializationSchema.builder().setTopic(DEFAULT_TOPIC). I've changed the implementation to more clear on this case.

TopicSelector deserves more abstraction than being just Function<? super IN, String>, but I don't think this should be part of the scope of this PR.

facets.add(new KafkaTopicListFacet(Arrays.asList(topicSelector.apply(null))));

// gets type information from serialize method signature
Arrays.stream(this.valueSerializationSchema.getClass().getMethods())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again we should probably check for the serializer to return the TypeInformation directly (by implementing ResultTypeQueryable).
If not we could fallback to extract that as you do, but I'd use things like org.apache.flink.shaded.guava31.com.google.common.reflect.TypeToken to be more robust. Your implementation fails if you have some intermediate interface that forward the type parameter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checking for ResultTypeQueryable first is fair. I've switched to guava reflection helpers as suggested, but I am not sure if this helps with implementation fails if you have some intermediate interface that forward the type parameter.. Could you provide example of this issue? Not sure if this is now covered or not.

Comment on lines 149 to 150
facets.addAll(LineageUtil.facetsFrom(recordSerializer));
facets.add(new KafkaPropertiesFacet(this.kafkaProducerConfig));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See feedback on LineageUtil

@pawel-big-lebowski pawel-big-lebowski force-pushed the lineage-impl branch 2 times, most recently from 6d7b4a0 to baafb00 Compare October 16, 2024 11:47
this.topicPattern = topicPattern;
}

public static KafkaDatasetIdentifier of(Pattern pattern) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public static KafkaDatasetIdentifier of(Pattern pattern) {
public static KafkaDatasetIdentifier ofPattern(Pattern pattern) {

Copy link
Contributor

@AHeise AHeise left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is definitively heading into the right direction. Structure of production code is good as-is but the handling of the properties doesn't look fully right.

Tests look complete at the first class but we need to get rid of mockito :).

return new KafkaDatasetIdentifier(Collections.emptyList(), pattern);
}

public static KafkaDatasetIdentifier of(List<String> fixedTopics) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public static KafkaDatasetIdentifier of(List<String> fixedTopics) {
public static KafkaDatasetIdentifier ofTopics(List<String> fixedTopics) {

* Record class to contain topics' identifier information which can be either a list of topics
* or a topic pattern.
*/
public static class KafkaDatasetIdentifier {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason to not make it top-level?

@@ -0,0 +1,97 @@
package org.apache.flink.connector.kafka.lineage.facets;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it coming to have a separate package for facets? If not, I'd use a package for all lineage classes. There are not that many.


public static final String KAFKA_FACET_NAME = "kafka";

public final Properties properties;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We usually avoid public and rather use the full jazz. It just makes it easier to later add more validation or defensive copies when needed.

this.typeInformation = typeInformation;
}

public void addProperties(Properties properties) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this method modifies the properties, the ctor should make a copy.

Comment on lines 442 to 450
Arrays.stream(this.valueSerializationSchema.getClass().getMethods())
.map(m -> Invokable.from(m))
.filter(m -> "serialize".equalsIgnoreCase(m.getName()))
.map(m -> m.getParameters().get(0))
.filter(p -> !p.getType().equals(TypeToken.of(Object.class)))
.findFirst()
.map(p -> p.getType())
.map(t -> TypeInformation.of(t.getRawType()))
.orElse(null);
Copy link
Contributor

@AHeise AHeise Oct 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks way more complicated as it should be. Here is what I had in mind.

TypeToken<? extends SerializationSchema> serializationSchemaType = TypeToken.of(valueSerializationSchema.getClass());
Class<?> parameterType = serializationSchemaType.resolveType(SerializationSchema.class.getTypeParameters()[0]).getRawType();
if (parameterType != Object.class) {
    typeInformation = TypeInformation.of(parameterType);
}


if (!kafkaDatasetFacet.isPresent()) {
LOG.warn("Provided did not return kafka dataset facet");
return null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we are allowed to return null. The interface doesn't specify @nullable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, interface doesn't specify @nullable. We can return LineageVertex with empty dataset list instead.

return null;
}

kafkaDatasetFacet.get().addProperties(this.kafkaProducerConfig);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we ever actually get the properties from the recordSerializer? So are we actually just setting here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, we can convert it to setter.

((ResultTypeQueryable<?>) this.valueSerializationSchema).getProducedType();
} else {
// gets type information from serialize method signature
typeInformation =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we use this type information later? This is the input type, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is returned within the facet and then listener (like OpenLineageJobListener) converts it to dataset schema format description. For OpenLineage, it's called SchemaDatasetFacet. I think this is not Kafka connector specific and there should be a general schema-alike facet within flink core. However, I don't feel I would be able to achieve this now. Schema information is valuable for both input and output datasets.

I hope typeInformation approach will work well for Avro and Protobuf. Hopefully, in some time, I create separate tests within OpenLineage job listener to verify this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes TypeInformationFacet sounds like a general concept. I'm convinced you want to pull it out of the KafkaFacet now. You probably want to name it "inputType" and "outputType" depending on the type of the connector (source/sink). I'd design it generally and pull it up into flink-core for Flink 2.0 later (so make it work in Kafka first and then propose to port it upwards).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LineageGraph in the flink-core contains separate lists of sources and sinks. Given that, I am not sure if we want to distinguish "inputType" from "outputType". From the facet perspective, this should be all type and the same facet can be used for both scenarios.

Comment on lines 289 to 290
when(((KafkaDatasetIdentifierProvider) topicSelector).getDatasetIdentifier())
.thenReturn(Optional.empty());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't looked too closely at the tests. But a high-level comment: In Flink, we don't use mockito (anymore). The main idea is that we use interfaces (as you did) and then just explicitly create our MockImplementation.

class MockTopicSelector implements TopicSelector, KafkaDatasetIdentifierProvider {
  KafkaDatasetIdentifier id; // init with ctor or factory method

  KafkaDatasetIdentifier getDatasetIdentifier() { return id; }
}

@pawel-big-lebowski pawel-big-lebowski force-pushed the lineage-impl branch 2 times, most recently from b25b5b7 to 27d8903 Compare October 29, 2024 13:52
public class LineageUtilTest {
@Test
public void testSourceLineageVertexOf() {
LineageDataset dataset = Mockito.mock(LineageDataset.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As called out by @AHeise, we need to move out from Mockito with testing classes. I am thinking. I should probably add these helper test classes in flink-core rather than implement in each of connector.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @HuangZhenQiu for noticing that place.

Copy link
Contributor

@HuangZhenQiu HuangZhenQiu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the contribution. This diff give a great example for connectors to support flink native lineage.

Copy link
Contributor

@AHeise AHeise left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LAGTM. A few more nits. The most important part is around documentation. Make sure all Public elements are properly annotated and that you link from existing interfaces to the new optional mixins.

@Nullable private final List<String> topics;
@Nullable private final Pattern topicPattern;

public DefaultKafkaDatasetIdentifier(List<String> fixedTopics, Pattern topicPattern) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public DefaultKafkaDatasetIdentifier(List<String> fixedTopics, Pattern topicPattern) {
public DefaultKafkaDatasetIdentifier(@Nullable List<String> fixedTopics, @Nullable Pattern topicPattern) {

Just try to be as consistent as possible.

Comment on lines +87 to +91
if (bootstrapServers.contains(COMMA)) {
bootstrapServers = bootstrapServers.split(COMMA)[0];
} else if (bootstrapServers.contains(SEMICOLON)) {
bootstrapServers = bootstrapServers.split(SEMICOLON)[0];
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you check if there is already some util in kafka that does that? If not, leave as is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like piece of code that has to be available somewhere, but I wasn't able to find it.


@Override
public List<LineageDataset> datasets() {
return datasets.stream().collect(Collectors.toList());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return datasets.stream().collect(Collectors.toList());
return List.copyOf(datasets);

* Returns a type dataset facet or `Optional.empty` in case an implementing class is not able to
* resolve type.
*
* @return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove all empty javadoc tags or let Copilot help you ;)

import java.util.Optional;

/** Contains method which allows extracting topic identifier. */
public interface KafkaDatasetIdentifierProvider {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sure to tag all public API with @PublicEvolving. It needs to be clearly visible if a user is supposed to touch the class or not (the easiest way is to not use public unless needed).

import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata;

/** A subscriber for a partition set. */
class PartitionSetSubscriber implements KafkaSubscriber {
class PartitionSetSubscriber implements KafkaDatasetIdentifierProvider, KafkaSubscriber {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
class PartitionSetSubscriber implements KafkaDatasetIdentifierProvider, KafkaSubscriber {
class PartitionSetSubscriber implements KafkaSubscriber, KafkaDatasetIdentifierProvider {

keep it consistent

.setKeySerializationSchema(serializationSchema)
.build();

assertThat(((KafkaDatasetFacetProvider) schema).getKafkaDatasetFacet()).isEmpty();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bit more assertj-ish would be

        assertThat(schema)
                .asInstanceOf(InstanceOfAssertFactories.type(KafkaDatasetFacetProvider.class))
                .returns(List.of(), KafkaDatasetFacetProvider::getKafkaDatasetFacet);

That would result in an assertion error instead of runtime error if the Schema does not implement the interface.

@@ -79,6 +95,7 @@
*/
@PublicEvolving
public class KafkaRecordSerializationSchemaBuilder<IN> {
private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not the correct place, but please update the docs of the KafkaRecordSerializationSchema to point to the FacetProvider interface. Same to all other APIs where you hope that optional interfaces are implemented.

Comment on lines 60 to 75
KafkaSource source =
new KafkaSource(
new KafkaSubscriber() {
@Override
public Set<TopicPartition> getSubscribedTopicPartitions(
AdminClient adminClient) {
return null;
}
},
null,
null,
Boundedness.CONTINUOUS_UNBOUNDED,
null,
kafkaProperties,
null);
assertThat(source.getLineageVertex().datasets()).isEmpty();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you use the builder instead? That should also be less verbose.

Copy link
Contributor

@AHeise AHeise left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. I triggered the hopefully final CI run.

Signed-off-by: Pawel Leszczynski <[email protected]>
@AHeise AHeise merged commit 727327d into apache:main Nov 14, 2024
6 checks passed
Copy link

boring-cyborg bot commented Nov 14, 2024

Awesome work, congrats on your first merged pull request!

@AHeise
Copy link
Contributor

AHeise commented Nov 14, 2024

Thank you very much for your contribution (and patience).

@pawel-big-lebowski
Copy link
Contributor Author

Awesome. @AHeise Thank you for your feedback and cooperation on that.

return new LineageVertex() {
@Override
public List<LineageDataset> datasets() {
return datasets.stream().collect(Collectors.toList());

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we not use the List.copyOf(datasets); as per Arvid's suggested change here also.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Merged version is not using List.copyOf. We agreed on that in offline discussion.

TypeToken serializationSchemaType =
TypeToken.of(valueSerializationSchema.getClass());
Class parameterType =
serializationSchemaType

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to avoid using this reflection - instanceof and Class. Maybe using a config driven approach and java SPI. The connectors / formats bring in serialization implementations in this way that avoid the overhead of reflection.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can avoid this with implementing ResultTypeQueryable of the value serialisation schema.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants