You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
If messages are produced by asynchronous Producer with compression, then offset boundaries will span multiple messages. If offset = 0 represents a batch, current implementation of KafkaInputFormat.nextKeyValue() test fails because test is for "next > 0" and next is still = 0. This results in the the partition not being consumed and the mapper preemptively returning.
A simple fix for the read problem is to change test to "next >= 0". This will allow for full reads. However, there is still a problem with how KafkaContext derives "prior offset" for the mapper key. This will return garbage because 1. the offset does not always advance and 2. the message data is compressed. In fact, I don't think the current logic will produce an accurate prior offset for any compressed messages.
Because of the compression I think it's best to simplify the Mapper Key logic to just use a "priorOffset" reference. At least that will support an accurate reading of "current offset" to a map function, though in reality most people probably ignore this value anyway as the Kafka Hadoop process is an all-or-none commit anyway.
If the above sounds ok to you I can make the changes and issue apull request.
The text was updated successfully, but these errors were encountered:
If messages are produced by asynchronous Producer with compression, then offset boundaries will span multiple messages. If offset = 0 represents a batch, current implementation of KafkaInputFormat.nextKeyValue() test fails because test is for "next > 0" and next is still = 0. This results in the the partition not being consumed and the mapper preemptively returning.
A simple fix for the read problem is to change test to "next >= 0". This will allow for full reads. However, there is still a problem with how KafkaContext derives "prior offset" for the mapper key. This will return garbage because 1. the offset does not always advance and 2. the message data is compressed. In fact, I don't think the current logic will produce an accurate prior offset for any compressed messages.
Because of the compression I think it's best to simplify the Mapper Key logic to just use a "priorOffset" reference. At least that will support an accurate reading of "current offset" to a map function, though in reality most people probably ignore this value anyway as the Kafka Hadoop process is an all-or-none commit anyway.
If the above sounds ok to you I can make the changes and issue apull request.
The text was updated successfully, but these errors were encountered: