Skip to content

Commit

Permalink
fix stamped record deserializer (#190)
Browse files Browse the repository at this point in the history
  • Loading branch information
agavra authored Nov 28, 2023
1 parent 7cf1973 commit 285b525
Showing 1 changed file with 5 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,8 @@

package dev.responsive.kafka.internal.db;

import static org.apache.kafka.streams.state.StateSerdes.TIMESTAMP_SIZE;

import dev.responsive.kafka.internal.utils.Stamped;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.function.Predicate;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.utils.Bytes;
Expand All @@ -41,13 +38,13 @@ public int sizeInBytes(final Stamped key) {
@Override
public Stamped keyFromRecord(final ConsumerRecord<byte[], byte[]> record) {
final byte[] key = record.key();
final int size = key.length - TIMESTAMP_SIZE;
final byte[] val = record.value();

final ByteBuffer buffer = ByteBuffer.wrap(key);
final long startTs = buffer.getLong(size);
final Bytes kBytes = Bytes.wrap(Arrays.copyOfRange(key, 0, size));
// the timestamp is encoded in the value as the first 8 bytes
final ByteBuffer buffer = ByteBuffer.wrap(val);
final long startTs = buffer.getLong(0);

return new Stamped(kBytes, startTs);
return new Stamped(Bytes.wrap(key), startTs);
}

@Override
Expand Down

0 comments on commit 285b525

Please sign in to comment.