Skip to content

Commit

Permalink
tests passing
Browse files Browse the repository at this point in the history
  • Loading branch information
ableegoldman committed Nov 21, 2023
1 parent 4795923 commit ecd310e
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,20 +107,26 @@ public class CassandraWindowedTable implements

// TODO: move this into the LWTWriter/Factory to keep this class stateless
private static class PendingFlushInfo {
private long lastFlushStreamTime = UNINITIALIZED_STREAM_TIME;
private long pendingFlushStreamTime = UNINITIALIZED_STREAM_TIME;
private SegmentRoll pendingSegmentRoll;

private long lastFlushStreamTime;
private long pendingFlushStreamTime;
private SegmentRoll segmentRoll;

public PendingFlushInfo(final long persistedStreamTime) {
this.lastFlushStreamTime = persistedStreamTime;
this.pendingFlushStreamTime = persistedStreamTime;
}

void maybeUpdatePendingStreamTime(final long recordTimestamp) {
this.pendingFlushStreamTime = Math.max(pendingFlushStreamTime, recordTimestamp);
}

void initSegmentRoll(final SegmentRoll pendingSegmentRoll) {
this.pendingSegmentRoll = pendingSegmentRoll;
this.segmentRoll = pendingSegmentRoll;
}

void finalizeFlush() {
pendingSegmentRoll = null;
segmentRoll = null;
lastFlushStreamTime = pendingFlushStreamTime;
}
}
Expand Down Expand Up @@ -482,10 +488,9 @@ public String name() {
public WriterFactory<Stamped<Bytes>, SegmentPartition> init(
final int kafkaPartition
) {
kafkaPartitionToPendingFlushInfo.put(kafkaPartition, new PendingFlushInfo());
final SegmentPartition metadataPartition = partitioner.metadataTablePartition(kafkaPartition);

client.execute(
final var initMetadata = client.execute(
QueryBuilder.insertInto(name)
.value(PARTITION_KEY.column(), PARTITION_KEY.literal(metadataPartition.tablePartition))
.value(SEGMENT_ID.column(), SEGMENT_ID.literal(metadataPartition.segmentId))
Expand All @@ -499,13 +504,20 @@ public WriterFactory<Stamped<Bytes>, SegmentPartition> init(
.build()
);

if (initMetadata.wasApplied()) {
LOG.info("Created new metadata segment for kafka partition {}", kafkaPartition);
}

final long epoch = fetchEpoch(metadataPartition) + 1;
final var reserveMetadataEpoch = client.execute(reserveEpoch(metadataPartition, epoch));
if (!reserveMetadataEpoch.wasApplied()) {
handleEpochFencing(kafkaPartition, metadataPartition, epoch);
}

final long streamTime = fetchStreamTime(kafkaPartition);
kafkaPartitionToPendingFlushInfo.put(kafkaPartition, new PendingFlushInfo(streamTime));
LOG.info("Initialized stream-time to {} with epoch {} for kafka partition {}",
streamTime, epoch, kafkaPartition);

// since the active data segments depend on the current stream-time for the windowed table,
// which we won't know until we initialize it from the remote, the metadata like epoch and
Expand All @@ -518,6 +530,9 @@ public WriterFactory<Stamped<Bytes>, SegmentPartition> init(

if (!reserveSegmentEpoch.wasApplied()) {
handleEpochFencing(kafkaPartition, tablePartition, epoch);
} else {
LOG.info("SOPHIE: reserved epoch {} for kafka partition {} and segment {}",
epoch, kafkaPartition, tablePartition);
}
}

Expand Down Expand Up @@ -562,6 +577,7 @@ public void preCommit(
final SegmentRoll pendingRoll = partitioner.rolledSegments(
name, pendingFlush.lastFlushStreamTime, pendingFlush.pendingFlushStreamTime
);

pendingFlush.initSegmentRoll(pendingRoll);
for (final long segmentId : pendingRoll.segmentsToCreate) {
final SegmentPartition segment = new SegmentPartition(kafkaPartition, segmentId);
Expand All @@ -587,7 +603,7 @@ public void postCommit(
final long epoch
) {
final PendingFlushInfo pendingFlush = kafkaPartitionToPendingFlushInfo.get(kafkaPartition);
for (final long segmentId : pendingFlush.pendingSegmentRoll.segmentsToExpire) {
for (final long segmentId : pendingFlush.segmentRoll.segmentsToExpire) {
expireSegment(new SegmentPartition(kafkaPartition, segmentId));
}
pendingFlush.finalizeFlush();
Expand Down Expand Up @@ -668,6 +684,9 @@ public BoundStatement setStreamTime(
) {
final PendingFlushInfo pendingFlush = kafkaPartitionToPendingFlushInfo.get(kafkaPartition);

LOG.debug("Updating stream-time to {} with epoch {} for kafkaPartition {}",
pendingFlush.pendingFlushStreamTime, epoch, kafkaPartition);

final SegmentPartition metadataPartition = partitioner.metadataTablePartition(kafkaPartition);
return setStreamTime
.bind()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package dev.responsive.kafka.internal.db.partitioning;

import static java.util.Collections.emptyList;

import dev.responsive.kafka.api.stores.ResponsiveWindowParams;
import dev.responsive.kafka.internal.db.partitioning.SegmentPartitioner.SegmentPartition;
import dev.responsive.kafka.internal.utils.Stamped;
Expand Down Expand Up @@ -105,6 +107,7 @@ public SegmentPartition(final int tablePartition, final long segmentId) {
public String toString() {
return "SegmentPartition{"
+ "partitionKey=" + tablePartition
+ "tablePartition=" + tablePartition
+ ", segmentId=" + segmentId
+ '}';
}
Expand Down Expand Up @@ -151,7 +154,11 @@ public Iterable<SegmentPartition> activeSegments(
final int kafkaPartition,
final long streamTime
) {
return range(kafkaPartition, minValidTs(streamTime), streamTime);
if (streamTime == UNINITIALIZED_STREAM_TIME) {
return emptyList();
} else {
return range(kafkaPartition, minValidTs(streamTime), streamTime);
}
}

/**
Expand All @@ -168,7 +175,7 @@ public Iterable<SegmentPartition> range(
final long timeFrom,
final long timeTo
) {
return LongStream.range(segmentId(timeFrom), segmentId(timeTo))
return LongStream.range(segmentId(timeFrom), segmentId(timeTo) + 1)
.mapToObj(segmentId -> new SegmentPartition(kafkaPartition, segmentId))
.collect(Collectors.toList());
}
Expand All @@ -188,7 +195,7 @@ public Iterable<SegmentPartition> reverseRange(
final long timeFrom,
final long timeTo
) {
return LongStream.range(segmentId(timeFrom), segmentId(timeTo))
return LongStream.range(segmentId(timeFrom), segmentId(timeTo) + 1)
.boxed()
.sorted(Collections.reverseOrder())
.map(segmentId -> new SegmentPartition(kafkaPartition, segmentId))
Expand Down Expand Up @@ -265,10 +272,18 @@ public SegmentRoll(final long[] segmentsToExpire, final long[] segmentsToCreate)

@Override
public String toString() {
return String.format("SegmentRoll: expired segment(s)=[%d-%d], new segments(s)=[%d-%d]",
segmentsToExpire[0], segmentsToExpire[segmentsToExpire.length - 1],
segmentsToCreate[0], segmentsToCreate[segmentsToCreate.length - 1]
);
final int numExpired = segmentsToExpire.length;
final String expired = numExpired == 0
? "[]"
: String.format("[%d-%d]", segmentsToExpire[0], segmentsToExpire[numExpired - 1]);

final int numCreated = segmentsToCreate.length;
final String created = numCreated == 0
? "[]"
: String.format("[%d-%d]", segmentsToCreate[0], segmentsToCreate[numCreated - 1]);

return String.format("SegmentRoll: expired segment(s)=%s, new segments(s)=%s",
expired, created);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,8 @@ public boolean await() {
}
}
}


//@Test
public void shouldComputeWindowedJoinUsingRanges() throws InterruptedException {
// Given:
final Map<String, Object> properties = getMutableProperties();
Expand Down

0 comments on commit ecd310e

Please sign in to comment.