diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraWindowedTable.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraWindowedTable.java index 933bbe13b..654e83ae2 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraWindowedTable.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/CassandraWindowedTable.java @@ -107,20 +107,26 @@ public class CassandraWindowedTable implements // TODO: move this into the LWTWriter/Factory to keep this class stateless private static class PendingFlushInfo { - private long lastFlushStreamTime = UNINITIALIZED_STREAM_TIME; - private long pendingFlushStreamTime = UNINITIALIZED_STREAM_TIME; - private SegmentRoll pendingSegmentRoll; + + private long lastFlushStreamTime; + private long pendingFlushStreamTime; + private SegmentRoll segmentRoll; + + public PendingFlushInfo(final long persistedStreamTime) { + this.lastFlushStreamTime = persistedStreamTime; + this.pendingFlushStreamTime = persistedStreamTime; + } void maybeUpdatePendingStreamTime(final long recordTimestamp) { this.pendingFlushStreamTime = Math.max(pendingFlushStreamTime, recordTimestamp); } void initSegmentRoll(final SegmentRoll pendingSegmentRoll) { - this.pendingSegmentRoll = pendingSegmentRoll; + this.segmentRoll = pendingSegmentRoll; } void finalizeFlush() { - pendingSegmentRoll = null; + segmentRoll = null; lastFlushStreamTime = pendingFlushStreamTime; } } @@ -482,10 +488,9 @@ public String name() { public WriterFactory, SegmentPartition> init( final int kafkaPartition ) { - kafkaPartitionToPendingFlushInfo.put(kafkaPartition, new PendingFlushInfo()); final SegmentPartition metadataPartition = partitioner.metadataTablePartition(kafkaPartition); - client.execute( + final var initMetadata = client.execute( QueryBuilder.insertInto(name) .value(PARTITION_KEY.column(), PARTITION_KEY.literal(metadataPartition.tablePartition)) .value(SEGMENT_ID.column(), SEGMENT_ID.literal(metadataPartition.segmentId)) @@ -499,6 +504,10 @@ public WriterFactory, SegmentPartition> init( .build() ); + if (initMetadata.wasApplied()) { + LOG.info("Created new metadata segment for kafka partition {}", kafkaPartition); + } + final long epoch = fetchEpoch(metadataPartition) + 1; final var reserveMetadataEpoch = client.execute(reserveEpoch(metadataPartition, epoch)); if (!reserveMetadataEpoch.wasApplied()) { @@ -506,6 +515,9 @@ public WriterFactory, SegmentPartition> init( } final long streamTime = fetchStreamTime(kafkaPartition); + kafkaPartitionToPendingFlushInfo.put(kafkaPartition, new PendingFlushInfo(streamTime)); + LOG.info("Initialized stream-time to {} with epoch {} for kafka partition {}", + streamTime, epoch, kafkaPartition); // since the active data segments depend on the current stream-time for the windowed table, // which we won't know until we initialize it from the remote, the metadata like epoch and @@ -518,6 +530,9 @@ public WriterFactory, SegmentPartition> init( if (!reserveSegmentEpoch.wasApplied()) { handleEpochFencing(kafkaPartition, tablePartition, epoch); + } else { + LOG.info("SOPHIE: reserved epoch {} for kafka partition {} and segment {}", + epoch, kafkaPartition, tablePartition); } } @@ -562,6 +577,7 @@ public void preCommit( final SegmentRoll pendingRoll = partitioner.rolledSegments( name, pendingFlush.lastFlushStreamTime, pendingFlush.pendingFlushStreamTime ); + pendingFlush.initSegmentRoll(pendingRoll); for (final long segmentId : pendingRoll.segmentsToCreate) { final SegmentPartition segment = new SegmentPartition(kafkaPartition, segmentId); @@ -587,7 +603,7 @@ public void postCommit( final long epoch ) { final PendingFlushInfo pendingFlush = kafkaPartitionToPendingFlushInfo.get(kafkaPartition); - for (final long segmentId : pendingFlush.pendingSegmentRoll.segmentsToExpire) { + for (final long segmentId : pendingFlush.segmentRoll.segmentsToExpire) { expireSegment(new SegmentPartition(kafkaPartition, segmentId)); } pendingFlush.finalizeFlush(); @@ -668,6 +684,9 @@ public BoundStatement setStreamTime( ) { final PendingFlushInfo pendingFlush = kafkaPartitionToPendingFlushInfo.get(kafkaPartition); + LOG.debug("Updating stream-time to {} with epoch {} for kafkaPartition {}", + pendingFlush.pendingFlushStreamTime, epoch, kafkaPartition); + final SegmentPartition metadataPartition = partitioner.metadataTablePartition(kafkaPartition); return setStreamTime .bind() diff --git a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/partitioning/SegmentPartitioner.java b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/partitioning/SegmentPartitioner.java index 621b4a646..cfeab9f40 100644 --- a/kafka-client/src/main/java/dev/responsive/kafka/internal/db/partitioning/SegmentPartitioner.java +++ b/kafka-client/src/main/java/dev/responsive/kafka/internal/db/partitioning/SegmentPartitioner.java @@ -16,6 +16,8 @@ package dev.responsive.kafka.internal.db.partitioning; +import static java.util.Collections.emptyList; + import dev.responsive.kafka.api.stores.ResponsiveWindowParams; import dev.responsive.kafka.internal.db.partitioning.SegmentPartitioner.SegmentPartition; import dev.responsive.kafka.internal.utils.Stamped; @@ -105,6 +107,7 @@ public SegmentPartition(final int tablePartition, final long segmentId) { public String toString() { return "SegmentPartition{" + "partitionKey=" + tablePartition + + "tablePartition=" + tablePartition + ", segmentId=" + segmentId + '}'; } @@ -151,7 +154,11 @@ public Iterable activeSegments( final int kafkaPartition, final long streamTime ) { - return range(kafkaPartition, minValidTs(streamTime), streamTime); + if (streamTime == UNINITIALIZED_STREAM_TIME) { + return emptyList(); + } else { + return range(kafkaPartition, minValidTs(streamTime), streamTime); + } } /** @@ -168,7 +175,7 @@ public Iterable range( final long timeFrom, final long timeTo ) { - return LongStream.range(segmentId(timeFrom), segmentId(timeTo)) + return LongStream.range(segmentId(timeFrom), segmentId(timeTo) + 1) .mapToObj(segmentId -> new SegmentPartition(kafkaPartition, segmentId)) .collect(Collectors.toList()); } @@ -188,7 +195,7 @@ public Iterable reverseRange( final long timeFrom, final long timeTo ) { - return LongStream.range(segmentId(timeFrom), segmentId(timeTo)) + return LongStream.range(segmentId(timeFrom), segmentId(timeTo) + 1) .boxed() .sorted(Collections.reverseOrder()) .map(segmentId -> new SegmentPartition(kafkaPartition, segmentId)) @@ -265,10 +272,18 @@ public SegmentRoll(final long[] segmentsToExpire, final long[] segmentsToCreate) @Override public String toString() { - return String.format("SegmentRoll: expired segment(s)=[%d-%d], new segments(s)=[%d-%d]", - segmentsToExpire[0], segmentsToExpire[segmentsToExpire.length - 1], - segmentsToCreate[0], segmentsToCreate[segmentsToCreate.length - 1] - ); + final int numExpired = segmentsToExpire.length; + final String expired = numExpired == 0 + ? "[]" + : String.format("[%d-%d]", segmentsToExpire[0], segmentsToExpire[numExpired - 1]); + + final int numCreated = segmentsToCreate.length; + final String created = numCreated == 0 + ? "[]" + : String.format("[%d-%d]", segmentsToCreate[0], segmentsToCreate[numCreated - 1]); + + return String.format("SegmentRoll: expired segment(s)=%s, new segments(s)=%s", + expired, created); } } } diff --git a/kafka-client/src/test/java/dev/responsive/kafka/integration/ResponsiveWindowStoreIntegrationTest.java b/kafka-client/src/test/java/dev/responsive/kafka/integration/ResponsiveWindowStoreIntegrationTest.java index ec3295287..479bc7b55 100644 --- a/kafka-client/src/test/java/dev/responsive/kafka/integration/ResponsiveWindowStoreIntegrationTest.java +++ b/kafka-client/src/test/java/dev/responsive/kafka/integration/ResponsiveWindowStoreIntegrationTest.java @@ -252,7 +252,8 @@ public boolean await() { } } } - + + //@Test public void shouldComputeWindowedJoinUsingRanges() throws InterruptedException { // Given: final Map properties = getMutableProperties();