Skip to content

Commit

Permalink
Remove commented code
Browse files Browse the repository at this point in the history
  • Loading branch information
ArtemTetenkin committed Apr 25, 2024
1 parent e53ad97 commit 17eb01a
Show file tree
Hide file tree
Showing 3 changed files with 1 addition and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,7 @@ public void run() {
return;
}

// the latest bulk of records. May carry across the loop if the thread is woken
// up
// The latest bulk of records. May carry across the loop if the thread is woken up
// from blocking on the handover
PscConsumerPollMessageIterator<byte[], byte[]> records = null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,16 +153,6 @@ public void runFetchLoop() throws Exception {
}
}
}

// retired code; doesn't work for memq due to no support of `iteratorFor` method
// get the records for each topic partition
/* for (PscTopicUriPartitionState<T, TopicUriPartition> partition : subscribedPartitionStates()) {
Iterator<PscConsumerMessage<byte[], byte[]>> partitionRecords =
records.iteratorFor(partition.getPscTopicUriPartitionHandle());
topicUriPartitionConsumerRecordsHandler(partitionRecords, partition);
}*/
}
} finally {
// this signals the consumer thread that no more work is to be done
Expand Down Expand Up @@ -198,8 +188,6 @@ protected void topicUriPartitionConsumerRecordsHandler(
PscConsumerMessage<byte[], byte[]> record,
PscTopicUriPartitionState<T, TopicUriPartition> pscTopicUriPartitionState) throws Exception {

// while (topicUriPartitionMessages.hasNext()) {
// PscConsumerMessage<byte[], byte[]> record = topicUriPartitionMessages.next();
deserializer.deserialize(record, pscCollector);

// emit the actual records. this also updates offset state atomically and emits
Expand All @@ -213,9 +201,7 @@ protected void topicUriPartitionConsumerRecordsHandler(
if (pscCollector.isEndOfStreamSignalled()) {
// end of stream signaled
running = false;
// break;
}
// }
}

// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,6 @@ protected void topicUriPartitionConsumerRecordsHandler(
PscConsumerMessage<byte[], byte[]> record,
PscTopicUriPartitionState<T, TopicUriPartition> pscTopicUriPartitionState) throws Exception {

// while (topicUriPartitionMessages.hasNext()) {
// PscConsumerMessage<byte[], byte[]> record = topicUriPartitionMessages.next();
final PscShuffleElement element = pscShuffleElementDeserializer.deserialize(record);

// TODO: Do we need to check the end of stream if reaching the end watermark
Expand All @@ -137,7 +135,6 @@ protected void topicUriPartitionConsumerRecordsHandler(
Optional<Watermark> newWatermark = watermarkHandler.checkAndGetNewWatermark(watermark);
newWatermark.ifPresent(sourceContext::emitWatermark);
}
// }
}

/**
Expand Down

0 comments on commit 17eb01a

Please sign in to comment.