Skip to content

Commit

Permalink
Remove generic <K> from Stamped and clean up Comparator logic (#187)
Browse files Browse the repository at this point in the history
Main change here is to remove the generic from the Stamped class and hard-code it to Bytes.

The rest of this PR is all the clean up that's enabled from this one change, mainly around removing generics from window store related iterators and from making the actual key generic extend Comparable instead of using the KeySpec class as a Comparator
  • Loading branch information
ableegoldman authored Nov 27, 2023
1 parent 5e64309 commit 7cf1973
Show file tree
Hide file tree
Showing 21 changed files with 173 additions and 235 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package dev.responsive.kafka.internal.db;

import java.util.Objects;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.utils.Bytes;

Expand All @@ -32,9 +31,4 @@ public int sizeInBytes(final Bytes key) {
return key.get().length;
}

@Override
public int compare(final Bytes o1, final Bytes o2) {
return Objects.compare(o1, o2, Bytes::compareTo);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public static CassandraTableSpec fromKVParams(

public static CassandraTableSpec fromWindowParams(
final ResponsiveWindowParams params,
final TablePartitioner<Stamped<Bytes>, SegmentPartition> partitioner
final TablePartitioner<Stamped, SegmentPartition> partitioner
) {
return new BaseTableSpec(params.name().remoteName(), partitioner);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ public String name() {
}

@Override
public WriterFactory<Stamped<Bytes>, SegmentPartition> init(
public WriterFactory<Stamped, SegmentPartition> init(
final int kafkaPartition
) {
final SegmentPartition metadataPartition = partitioner.metadataTablePartition(kafkaPartition);
Expand Down Expand Up @@ -755,19 +755,19 @@ public BoundStatement ensureEpoch(final SegmentPartition segmentPartition, final
@CheckReturnValue
public BoundStatement insert(
final int kafkaPartition,
final Stamped<Bytes> key,
final Stamped key,
final byte[] value,
final long epochMillis
) {
kafkaPartitionToPendingFlushInfo.get(kafkaPartition).maybeUpdatePendingStreamTime(key.stamp);
maybeUpdateStreamTime(kafkaPartition, key.timestamp);

final SegmentPartition remotePartition = partitioner.tablePartition(kafkaPartition, key);
return insert
.bind()
.setInt(PARTITION_KEY.bind(), remotePartition.tablePartition)
.setLong(SEGMENT_ID.bind(), remotePartition.segmentId)
.setByteBuffer(DATA_KEY.bind(), ByteBuffer.wrap(key.key.get()))
.setInstant(WINDOW_START.bind(), Instant.ofEpochMilli(key.stamp))
.setInstant(WINDOW_START.bind(), Instant.ofEpochMilli(key.timestamp))
.setByteBuffer(DATA_VALUE.bind(), ByteBuffer.wrap(value));
}

Expand All @@ -785,17 +785,17 @@ public BoundStatement insert(
@CheckReturnValue
public BoundStatement delete(
final int kafkaPartition,
final Stamped<Bytes> key
final Stamped key
) {
kafkaPartitionToPendingFlushInfo.get(kafkaPartition).maybeUpdatePendingStreamTime(key.stamp);
maybeUpdateStreamTime(kafkaPartition, key.timestamp);

final SegmentPartition segmentPartition = partitioner.tablePartition(kafkaPartition, key);
return delete
.bind()
.setInt(PARTITION_KEY.bind(), segmentPartition.tablePartition)
.setLong(SEGMENT_ID.bind(), segmentPartition.segmentId)
.setByteBuffer(DATA_KEY.bind(), ByteBuffer.wrap(key.key.get()))
.setInstant(WINDOW_START.bind(), Instant.ofEpochMilli(key.stamp));
.setInstant(WINDOW_START.bind(), Instant.ofEpochMilli(key.timestamp));
}

/**
Expand All @@ -812,7 +812,7 @@ public byte[] fetch(
final Bytes key,
final long windowStart
) {
final Stamped<Bytes> windowedKey = new Stamped<>(key, windowStart);
final Stamped windowedKey = new Stamped(key, windowStart);
final SegmentPartition segmentPartition =
partitioner.tablePartition(kafkaPartition, windowedKey);

Expand Down Expand Up @@ -845,13 +845,13 @@ public byte[] fetch(
* @return the windows previously stored
*/
@Override
public KeyValueIterator<Stamped<Bytes>, byte[]> fetch(
public KeyValueIterator<Stamped, byte[]> fetch(
final int kafkaPartition,
final Bytes key,
final long timeFrom,
final long timeTo
) {
final List<KeyValueIterator<Stamped<Bytes>, byte[]>> segmentIterators = new LinkedList<>();
final List<KeyValueIterator<Stamped, byte[]>> segmentIterators = new LinkedList<>();
for (final SegmentPartition partition : partitioner.range(kafkaPartition, timeFrom, timeTo)) {
final BoundStatement get = fetch
.bind()
Expand Down Expand Up @@ -880,13 +880,13 @@ public KeyValueIterator<Stamped<Bytes>, byte[]> fetch(
* @return the value previously set
*/
@Override
public KeyValueIterator<Stamped<Bytes>, byte[]> backFetch(
public KeyValueIterator<Stamped, byte[]> backFetch(
final int kafkaPartition,
final Bytes key,
final long timeFrom,
final long timeTo
) {
final List<KeyValueIterator<Stamped<Bytes>, byte[]>> segmentIterators = new LinkedList<>();
final List<KeyValueIterator<Stamped, byte[]>> segmentIterators = new LinkedList<>();
for (final var partition : partitioner.reverseRange(kafkaPartition, timeFrom, timeTo)) {
final BoundStatement get = backFetch
.bind()
Expand Down Expand Up @@ -917,14 +917,14 @@ public KeyValueIterator<Stamped<Bytes>, byte[]> backFetch(
* @return the value previously set
*/
@Override
public KeyValueIterator<Stamped<Bytes>, byte[]> fetchRange(
public KeyValueIterator<Stamped, byte[]> fetchRange(
final int kafkaPartition,
final Bytes fromKey,
final Bytes toKey,
final long timeFrom,
final long timeTo
) {
final List<KeyValueIterator<Stamped<Bytes>, byte[]>> segmentIterators = new LinkedList<>();
final List<KeyValueIterator<Stamped, byte[]>> segmentIterators = new LinkedList<>();
for (final SegmentPartition partition : partitioner.range(kafkaPartition, timeFrom, timeTo)) {
final BoundStatement get = fetchRange
.bind()
Expand All @@ -939,7 +939,7 @@ public KeyValueIterator<Stamped<Bytes>, byte[]> fetchRange(

return Iterators.filterKv(
Iterators.wrapped(segmentIterators),
k -> k.stamp >= timeFrom && k.stamp < timeTo
k -> k.timestamp >= timeFrom && k.timestamp < timeTo
);
}

Expand All @@ -957,14 +957,14 @@ public KeyValueIterator<Stamped<Bytes>, byte[]> fetchRange(
* @return the value previously set
*/
@Override
public KeyValueIterator<Stamped<Bytes>, byte[]> backFetchRange(
public KeyValueIterator<Stamped, byte[]> backFetchRange(
final int kafkaPartition,
final Bytes fromKey,
final Bytes toKey,
final long timeFrom,
final long timeTo
) {
final List<KeyValueIterator<Stamped<Bytes>, byte[]>> segmentIterators = new LinkedList<>();
final List<KeyValueIterator<Stamped, byte[]>> segmentIterators = new LinkedList<>();
for (final var partition : partitioner.reverseRange(kafkaPartition, timeFrom, timeTo)) {
final BoundStatement get = backFetchRange
.bind()
Expand All @@ -979,7 +979,7 @@ public KeyValueIterator<Stamped<Bytes>, byte[]> backFetchRange(

return Iterators.filterKv(
Iterators.wrapped(segmentIterators),
k -> k.stamp >= timeFrom && k.stamp < timeTo
k -> k.timestamp >= timeFrom && k.timestamp < timeTo
);
}

Expand All @@ -994,12 +994,12 @@ public KeyValueIterator<Stamped<Bytes>, byte[]> backFetchRange(
* @return the value previously set
*/
@Override
public KeyValueIterator<Stamped<Bytes>, byte[]> fetchAll(
public KeyValueIterator<Stamped, byte[]> fetchAll(
final int kafkaPartition,
final long timeFrom,
final long timeTo
) {
final List<KeyValueIterator<Stamped<Bytes>, byte[]>> segmentIterators = new LinkedList<>();
final List<KeyValueIterator<Stamped, byte[]>> segmentIterators = new LinkedList<>();
for (final SegmentPartition partition : partitioner.range(kafkaPartition, timeFrom, timeTo)) {
final BoundStatement get = fetchAll
.bind()
Expand All @@ -1014,7 +1014,7 @@ public KeyValueIterator<Stamped<Bytes>, byte[]> fetchAll(

return Iterators.filterKv(
Iterators.wrapped(segmentIterators),
k -> k.stamp >= timeFrom && k.stamp < timeTo
k -> k.timestamp >= timeFrom && k.timestamp < timeTo
);
}

Expand All @@ -1029,12 +1029,12 @@ public KeyValueIterator<Stamped<Bytes>, byte[]> fetchAll(
* @return the value previously set
*/
@Override
public KeyValueIterator<Stamped<Bytes>, byte[]> backFetchAll(
public KeyValueIterator<Stamped, byte[]> backFetchAll(
final int kafkaPartition,
final long timeFrom,
final long timeTo
) {
final List<KeyValueIterator<Stamped<Bytes>, byte[]>> segmentIterators = new LinkedList<>();
final List<KeyValueIterator<Stamped, byte[]>> segmentIterators = new LinkedList<>();
for (final var partition : partitioner.reverseRange(kafkaPartition, timeFrom, timeTo)) {
final BoundStatement get = backFetchAll
.bind()
Expand All @@ -1049,16 +1049,20 @@ public KeyValueIterator<Stamped<Bytes>, byte[]> backFetchAll(

return Iterators.filterKv(
Iterators.wrapped(segmentIterators),
k -> k.stamp >= timeFrom && k.stamp < timeTo
k -> k.timestamp >= timeFrom && k.timestamp < timeTo
);
}

private static KeyValue<Stamped<Bytes>, byte[]> windowRows(final Row row) {
private void maybeUpdateStreamTime(final int kafkaPartition, final long timestamp) {
kafkaPartitionToPendingFlushInfo.get(kafkaPartition).maybeUpdatePendingStreamTime(timestamp);
}

private static KeyValue<Stamped, byte[]> windowRows(final Row row) {
final long startTs = row.getInstant(WINDOW_START.column()).toEpochMilli();
final Bytes key = Bytes.wrap(row.getByteBuffer(DATA_KEY.column()).array());

return new KeyValue<>(
new Stamped<>(key, startTs),
new Stamped(key, startTs),
row.getByteBuffer(DATA_VALUE.column()).array()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@

package dev.responsive.kafka.internal.db;

import java.util.Comparator;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public interface KeySpec<K> extends Comparator<K> {
public interface KeySpec<K extends Comparable<K>> {

K keyFromRecord(final ConsumerRecord<byte[], byte[]> record);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,53 +20,52 @@
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.state.KeyValueIterator;

public interface RemoteWindowedTable<S> extends RemoteTable<Stamped<Bytes>, S> {
public interface RemoteWindowedTable<S> extends RemoteTable<Stamped, S> {

byte[] fetch(
int kafkaPartition,
int partition,
Bytes key,
long windowStart
);

KeyValueIterator<Stamped<Bytes>, byte[]> fetch(
int kafkaPartition,
KeyValueIterator<Stamped, byte[]> fetch(
int partition,
Bytes key,
long timeFrom,
long timeTo
);

KeyValueIterator<Stamped<Bytes>, byte[]> fetchRange(
int kafkaPartition,
Bytes fromKey,
Bytes toKey,
KeyValueIterator<Stamped, byte[]> backFetch(
int partition,
Bytes key,
long timeFrom,
long timeTo
);

KeyValueIterator<Stamped<Bytes>, byte[]> fetchAll(
int kafkaPartition,
KeyValueIterator<Stamped, byte[]> fetchRange(
int partition,
Bytes fromKey,
Bytes toKey,
long timeFrom,
long timeTo
);


KeyValueIterator<Stamped<Bytes>, byte[]> backFetch(
int kafkaPartition,
Bytes key,
KeyValueIterator<Stamped, byte[]> backFetchRange(
int partition,
Bytes fromKey,
Bytes toKey,
long timeFrom,
long timeTo
);

KeyValueIterator<Stamped<Bytes>, byte[]> backFetchRange(
int kafkaPartition,
Bytes fromKey,
Bytes toKey,
KeyValueIterator<Stamped, byte[]> fetchAll(
int partition,
long timeFrom,
long timeTo
);

KeyValueIterator<Stamped<Bytes>, byte[]> backFetchAll(
int kafkaPartition,
KeyValueIterator<Stamped, byte[]> backFetchAll(
int partition,
long timeFrom,
long timeTo
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,48 +25,33 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.utils.Bytes;

public class StampedKeySpec implements KeySpec<Stamped<Bytes>> {
public class StampedKeySpec implements KeySpec<Stamped> {

private final Predicate<Stamped<Bytes>> withinRetention;
private final Predicate<Stamped> withinRetention;

public StampedKeySpec(final Predicate<Stamped<Bytes>> withinRetention) {
public StampedKeySpec(final Predicate<Stamped> withinRetention) {
this.withinRetention = withinRetention;
}

@Override
public int sizeInBytes(final Stamped<Bytes> key) {
public int sizeInBytes(final Stamped key) {
return key.key.get().length + Long.BYTES;
}

@Override
public Stamped<Bytes> keyFromRecord(final ConsumerRecord<byte[], byte[]> record) {
public Stamped keyFromRecord(final ConsumerRecord<byte[], byte[]> record) {
final byte[] key = record.key();
final int size = key.length - TIMESTAMP_SIZE;

final ByteBuffer buffer = ByteBuffer.wrap(key);
final long startTs = buffer.getLong(size);
final Bytes kBytes = Bytes.wrap(Arrays.copyOfRange(key, 0, size));

return new Stamped<>(kBytes, startTs);
return new Stamped(kBytes, startTs);
}

@Override
public boolean retain(final Stamped<Bytes> key) {
public boolean retain(final Stamped key) {
return withinRetention.test(key);
}

@Override
public int compare(final Stamped<Bytes> o1, final Stamped<Bytes> o2) {
return compareKeys(o1, o2);
}

// TODO remove generic from Stamped<> and just implement Comparable
public static int compareKeys(final Stamped<Bytes> o1, final Stamped<Bytes> o2) {
final int key = o1.key.compareTo(o2.key);
if (key != 0) {
return key;
}

return Long.compare(o1.stamp, o2.stamp);
}
}
Loading

0 comments on commit 7cf1973

Please sign in to comment.