Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Protobuf (probably avro?) not compatible with Confluent schema registry #347

Closed
xav-ie opened this issue Feb 16, 2024 · 4 comments
Closed

Comments

@xav-ie
Copy link

xav-ie commented Feb 16, 2024

When serializing data with protobuf, you can send data just fine to the schema registry. With data encoded from this library. However, when trying to create a stream from that data, kafka schema registry will try to deserialize it, and run into deserialization issue, it is missing the messages index.

Here is an error log I got only after trying to emit the topic into a stream. The main error is "Invalid message indexes":
ksqldb-server    | [2024-02-15 19:04:11,802] ERROR {"type":0,"deserializationError":{"target":"value","errorMessage":"Error deserializing message from topic: orders-topic","recordB64":null,"cause":["Failed to deserialize data for topic orders-topic to Protobuf: ","Error deserializing Protobuf message for id 1","Invalid message indexes: io.confluent.kafka.schemaregistry.protobuf.MessageIndexes@3fb03c91"],"topic":"orders-topic"},"recordProcessingError":null,"productionError":null,"serializationError":null,"kafkaStreamsThreadError":null} (processing.transient_ORDERS_PROTO_SIMPLE_4698783426306005545.KsqlTopic.Source.deserializer)
ksqldb-server    | [2024-02-15 19:04:11,802] WARN stream-thread [_confluent-ksql-default_transient_transient_ORDERS_PROTO_SIMPLE_4698783426306005545_1708023851703-32aade83-3678-44dc-b176-912cd78ee7d7-StreamThread-1] task [0_0] Skipping record due to deserialization error. topic=[orders-topic] partition=[0] offset=[50] (org.apache.kafka.streams.processor.internals.RecordDeserializer)
ksqldb-server    | org.apache.kafka.common.errors.SerializationException: Error deserializing message from topic: orders-topic
ksqldb-server    |      at io.confluent.ksql.serde.connect.KsqlConnectDeserializer.deserialize(KsqlConnectDeserializer.java:55)
ksqldb-server    |      at io.confluent.ksql.serde.tls.ThreadLocalDeserializer.deserialize(ThreadLocalDeserializer.java:37)
ksqldb-server    |      at io.confluent.ksql.serde.connect.ConnectFormat$StructToListDeserializer.deserialize(ConnectFormat.java:239)
ksqldb-server    |      at io.confluent.ksql.serde.connect.ConnectFormat$StructToListDeserializer.deserialize(ConnectFormat.java:218)
ksqldb-server    |      at io.confluent.ksql.serde.GenericDeserializer.deserialize(GenericDeserializer.java:59)
ksqldb-server    |      at io.confluent.ksql.logging.processing.LoggingDeserializer.tryDeserialize(LoggingDeserializer.java:61)
ksqldb-server    |      at io.confluent.ksql.logging.processing.LoggingDeserializer.deserialize(LoggingDeserializer.java:48)
ksqldb-server    |      at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
ksqldb-server    |      at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:58)
ksqldb-server    |      at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
ksqldb-server    |      at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:204)
ksqldb-server    |      at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:128)
ksqldb-server    |      at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:304)
ksqldb-server    |      at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:1002)
ksqldb-server    |      at org.apache.kafka.streams.processor.internals.TaskManager.addRecordsToTasks(TaskManager.java:1630)
ksqldb-server    |      at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:992)
ksqldb-server    |      at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:766)
ksqldb-server    |      at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617)
ksqldb-server    |      at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579)
ksqldb-server    | Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic orders-topic to Protobuf:
ksqldb-server    |      at io.confluent.connect.protobuf.ProtobufConverter.toConnectData(ProtobufConverter.java:154)
ksqldb-server    |      at io.confluent.connect.protobuf.ProtobufConverter.toConnectData(ProtobufConverter.java:126)
ksqldb-server    |      at io.confluent.ksql.serde.connect.KsqlConnectDeserializer.deserialize(KsqlConnectDeserializer.java:49)
ksqldb-server    |      ... 18 more
ksqldb-server    | Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Protobuf message for id 1
ksqldb-server    |      at io.confluent.kafka.serializers.protobuf.AbstractKafkaProtobufDeserializer.deserialize(AbstractKafkaProtobufDeserializer.java:228)
ksqldb-server    |      at io.confluent.kafka.serializers.protobuf.AbstractKafkaProtobufDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaProtobufDeserializer.java:292)
ksqldb-server    |      at io.confluent.connect.protobuf.ProtobufConverter$Deserializer.deserialize(ProtobufConverter.java:200)
ksqldb-server    |      at io.confluent.connect.protobuf.ProtobufConverter.toConnectData(ProtobufConverter.java:132)
ksqldb-server    |      ... 20 more
ksqldb-server    | Caused by: java.lang.IllegalArgumentException: Invalid message indexes: io.confluent.kafka.schemaregistry.protobuf.MessageIndexes@3fb03c91
ksqldb-server    |      at io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema.toMessageName(ProtobufSchema.java:2202)
ksqldb-server    |      at io.confluent.kafka.serializers.protobuf.AbstractKafkaProtobufDeserializer.deserialize(AbstractKafkaProtobufDeserializer.java:140)
ksqldb-server    |      ... 23 more

The message indexes part of the binary is actually not included when serializing. I was hoping that this is an actual issue and this library hopes to addess it.

In kafkajs/confluent-schema-registry, there is a PR trying to do just that kafkajs/confluent-schema-registry#258, but I don't really know that much about the mysterious message indexes part. I am trying to learn more, and wanted to create this issue in the meantime. You can find out more about message indexes here:
https://docs.confluent.io/cloud/current/sr/fundamentals/serdes-develop/index.html#wire-format

I think it is funny that their table is very misleading and omits the message-indexes part.

I also think it is funny that I am for some reason allowed to write messages into a topic using a schema id, but I do not get errors. It is not until I try and deserialize the messages I start to get errors...

@konqi
Copy link
Owner

konqi commented Feb 27, 2024

Hi,
schemas are not validated by kafka, that's why you will never get errors while sending messages. It is your job as a developer to make sure that the actual payload is in the form that is promised by providing the schema id. It's more of a contractual thing.
Regarding the message indexes. I don't find the documentation misleading. The basic concept is as described in the readme of this lib. The library sees the message-indexes as part of the protobuf payload. So basically you can write your encode-function in a way that it prepends the message-indexes to the actual payload. The documentation also says that if your message contains just a single type you can instead just put 0 before the actual payload.
This library is more or less oblivious to the actual message and its type, therefor I couldn't provide the message-indexes even if I wanted to. It would also break the agnostic nature of this lib.
If your message contains just a single type, try prefixing the buffer with a 0 before sending and it should work. If your message contains more than one type, it's a bit more work.
I might be able to help with the zigzag encoding of the message-indexes, if you provide a test case.

@xav-ie
Copy link
Author

xav-ie commented Feb 27, 2024

Thank you. I am now come back with more knowledge and you are exactly correct. My mistake. I will try and provide good test cases/examples of how one should do it in the next two weeks.

@konqi
Copy link
Owner

konqi commented Feb 27, 2024

I played around with it for a bit and my understand now is:

  • Each kafka message can only contain a single protobuf message
  • However a schema uploaded to the registry may contain multiple messages and also nested messages
  • The message-indexes are an array, but they identify a single message in the schema
  • It would be difficult to find the correct protobuf message in the schema via the actual message
  • If this is still something you want to pursue I could add a set of additional encode-functions where you can pass along the message-indexes. I have my doubts that this is practical though - I would have to see a real live example to make sense of it

@xav-ie
Copy link
Author

xav-ie commented Feb 27, 2024

You are spot on. The schema registry also seems to choose the first message it sees as the main message for encoding. So, sub-message types should come later. This could greatly simplify encode decode due to it always having index 0.

I am not sure if schema registry even allows you specify a different message type other than the first one in the schema to encode/decode with.

I am also not sure if there is valid use case where you cannot put the main message schema for encoding first. I am new to protobuf and need to learn more.

In any case, I will try and show some samples of how I am trying to use it soon.

Thank you so much for responding and helping me.

@konqi konqi closed this as completed May 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants