Skip to content

Commit

Permalink
WIP in the middle of fixing PscSourceReaderTest
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffxiang committed Oct 8, 2024
1 parent 37661bc commit c42d001
Show file tree
Hide file tree
Showing 10 changed files with 134 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -150,13 +150,13 @@ public UserCodeClassLoader getUserCodeClassLoader() {
return readerContext.getUserCodeClassLoader();
}
});
final PscSourceReaderMetrics kafkaSourceReaderMetrics =
final PscSourceReaderMetrics pscSourceReaderMetrics =
new PscSourceReaderMetrics(readerContext.metricGroup());

Supplier<PscTopicUriPartitionSplitReader> splitReaderSupplier =
() -> {
try {
return new PscTopicUriPartitionSplitReader(props, readerContext, kafkaSourceReaderMetrics);
return new PscTopicUriPartitionSplitReader(props, readerContext, pscSourceReaderMetrics);
} catch (ConfigurationException | ClientException e) {
throw new RuntimeException("Failed to create new PscTopicUriParititionSplitReader", e);
}
Expand All @@ -170,7 +170,7 @@ public UserCodeClassLoader getUserCodeClassLoader() {
recordEmitter,
toConfiguration(props),
readerContext,
kafkaSourceReaderMetrics);
pscSourceReaderMetrics);
}

@Internal
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class KafkaSourceReaderMetricsUtil {
// Kafka raw metric names and group names
public static final String CONSUMER_FETCH_MANAGER_GROUP = "consumer-fetch-manager-metrics";
public static final String BYTES_CONSUMED_TOTAL = "bytes-consumed-total";
public static final String RECORDS_LAG = "records-lag";
public static final String RECORDS_LAG = "records-lag-max";

protected static Predicate<Map.Entry<MetricName, ? extends Metric>> createRecordLagFilter(TopicUriPartition tp) {
final String resolvedTopic = tp.getTopicUriAsString().replace('.', '_');
Expand All @@ -22,11 +22,11 @@ class KafkaSourceReaderMetricsUtil {
final Map<String, String> tags = metricName.tags();

return metricName.group().equals(CONSUMER_FETCH_MANAGER_GROUP)
&& metricName.name().equals(RECORDS_LAG)
&& tags.containsKey("topic")
&& tags.get("topic").equals(resolvedTopic)
&& tags.containsKey("partition")
&& tags.get("partition").equals(resolvedPartition);
&& metricName.name().equals(RECORDS_LAG);
// && tags.containsKey("topic")
// && tags.get("topic").equals(resolvedTopic)
// && tags.containsKey("partition")
// && tags.get("partition").equals(resolvedPartition);
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.pinterest.psc.consumer.OffsetCommitCallback;
import com.pinterest.psc.consumer.PscConsumer;
import com.pinterest.psc.consumer.PscConsumerMessage;
import com.pinterest.psc.consumer.PscConsumerMessagesIterable;
import com.pinterest.psc.consumer.PscConsumerPollMessageIterator;
import com.pinterest.psc.exception.ClientException;
import com.pinterest.psc.exception.consumer.ConsumerException;
Expand Down Expand Up @@ -77,11 +78,13 @@ public class PscTopicUriPartitionSplitReader

// Tracking empty splits that has not been added to finished splits in fetch()
private final Set<String> emptySplits = new HashSet<>();
private final Properties props;

public PscTopicUriPartitionSplitReader(
Properties props,
SourceReaderContext context,
PscSourceReaderMetrics pscSourceReaderMetrics) throws ConfigurationException, ClientException {
this.props = props;
this.subtaskId = context.getIndexOfSubtask();
this.pscSourceReaderMetrics = pscSourceReaderMetrics;
Properties consumerProps = new Properties();
Expand All @@ -90,35 +93,31 @@ public PscTopicUriPartitionSplitReader(
this.consumer = new PscConsumer<>(PscConfigurationUtils.propertiesToPscConfiguration(consumerProps));
this.stoppingOffsets = new HashMap<>();
this.groupId = consumerProps.getProperty(PscConfiguration.PSC_CONSUMER_GROUP_ID);

// Metric registration
maybeRegisterKafkaConsumerMetrics(props, pscSourceReaderMetrics, consumer);
this.pscSourceReaderMetrics.registerNumBytesIn(consumer);
}

@Override
public RecordsWithSplitIds<PscConsumerMessage<byte[], byte[]>> fetch() throws IOException {
PscConsumerPollMessageIterator<byte[], byte[]> consumerMessageIterator;
PscConsumerMessagesIterable<byte[], byte[]> consumerMessagesIterable;
try {
consumerMessageIterator = consumer.poll(Duration.ofMillis(POLL_TIMEOUT));
consumerMessagesIterable = new PscConsumerMessagesIterable<>(consumer.poll(Duration.ofMillis(POLL_TIMEOUT)));
} catch (ConsumerException e) {
// IllegalStateException will be thrown if the consumer is not assigned any partitions.
// This happens if all assigned partitions are invalid or empty (starting offset >=
// stopping offset). We just mark empty partitions as finished and return an empty
// record container, and this consumer will be closed by SplitFetcherManager.
PscPartitionSplitRecords recordsBySplits =
new PscPartitionSplitRecords(
PscConsumerPollMessageIterator.emptyIterator(), pscSourceReaderMetrics);
PscConsumerMessagesIterable.emptyIterable(), pscSourceReaderMetrics);
markEmptySplitsAsFinished(recordsBySplits);
return recordsBySplits;
}
PscPartitionSplitRecords recordsBySplits =
new PscPartitionSplitRecords(consumerMessageIterator, pscSourceReaderMetrics);
new PscPartitionSplitRecords(consumerMessagesIterable, pscSourceReaderMetrics);
List<TopicUriPartition> finishedPartitions = new ArrayList<>();
for (TopicUriPartition tp : consumerMessageIterator.getTopicUriPartitions()) {
for (TopicUriPartition tp : consumerMessagesIterable.getTopicUriPartitions()) {
long stoppingOffset = getStoppingOffset(tp);
final List<PscConsumerMessage<byte[], byte[]>> recordsFromPartition =
consumerMessageIterator.iteratorFor(tp).asList();
consumerMessagesIterable.getMessagesForTopicUriPartition(tp);

if (recordsFromPartition.size() > 0) {
final PscConsumerMessage<byte[], byte[]> lastRecord =
Expand Down Expand Up @@ -170,6 +169,7 @@ private void markEmptySplitsAsFinished(PscPartitionSplitRecords recordsBySplits)

@Override
public void handleSplitsChanges(SplitsChange<PscTopicUriPartitionSplit> splitsChange) {
System.out.println("handling splits changes");
// Get all the partition assignments and stopping offsets.
if (!(splitsChange instanceof SplitsAddition)) {
throw new UnsupportedOperationException(
Expand Down Expand Up @@ -213,6 +213,14 @@ public void handleSplitsChanges(SplitsChange<PscTopicUriPartitionSplit> splitsCh
throw new RuntimeException("Failed to assign PscConsumer", e);
}

// Metric registration
try {
maybeRegisterKafkaConsumerMetrics(props, pscSourceReaderMetrics, consumer);
this.pscSourceReaderMetrics.registerNumBytesIn(consumer);
} catch (ClientException e) {
throw new RuntimeException("Failed to register metrics for PscConsumer", e);
}

try {
// Seek on the newly assigned partitions to their stating offsets.
seekToStartingOffsets(
Expand Down Expand Up @@ -246,6 +254,7 @@ public void close() throws Exception {
public void notifyCheckpointComplete(
Collection<MessageId> offsetsToCommit,
OffsetCommitCallback offsetCommitCallback) throws ConfigurationException, ConsumerException {
System.out.println("commitAsync: " + offsetsToCommit);
consumer.commitAsync(offsetsToCommit, offsetCommitCallback);
}

Expand Down Expand Up @@ -492,17 +501,17 @@ private static class PscPartitionSplitRecords

private final Set<String> finishedSplits = new HashSet<>();
private final Map<TopicUriPartition, Long> stoppingOffsets = new HashMap<>();
private final PscConsumerPollMessageIterator<byte[], byte[]> consumerMessageIterator;
private final PscConsumerMessagesIterable<byte[], byte[]> consumerMessagesIterable;
private final PscSourceReaderMetrics metrics;
private final Iterator<TopicUriPartition> splitIterator;
private Iterator<PscConsumerMessage<byte[], byte[]>> recordIterator;
private TopicUriPartition currentTopicPartition;
private Long currentSplitStoppingOffset;

private PscPartitionSplitRecords(
PscConsumerPollMessageIterator<byte[], byte[]> consumerMessageIterator, PscSourceReaderMetrics metrics) {
this.consumerMessageIterator = consumerMessageIterator;
this.splitIterator = consumerMessageIterator.getTopicUriPartitions().iterator();
PscConsumerMessagesIterable<byte[], byte[]> consumerMessagesIterable, PscSourceReaderMetrics metrics) {
this.consumerMessagesIterable = consumerMessagesIterable;
this.splitIterator = consumerMessagesIterable.getTopicUriPartitions().iterator();
this.metrics = metrics;
}

Expand All @@ -520,7 +529,7 @@ private void addFinishedSplit(String splitId) {
public String nextSplit() {
if (splitIterator.hasNext()) {
currentTopicPartition = splitIterator.next();
recordIterator = consumerMessageIterator.iteratorFor(currentTopicPartition);
recordIterator = consumerMessagesIterable.getMessagesForTopicUriPartition(currentTopicPartition).iterator();
currentSplitStoppingOffset =
stoppingOffsets.getOrDefault(currentTopicPartition, Long.MAX_VALUE);
return currentTopicPartition.toString();
Expand All @@ -539,6 +548,7 @@ public PscConsumerMessage<byte[], byte[]> nextRecordFromSplit() {
currentTopicPartition,
"Make sure nextSplit() did not return null before "
+ "iterate over the records split.");
System.out.println("recordIteratorClass: " + recordIterator.getClass().getName());
if (recordIterator.hasNext()) {
final PscConsumerMessage<byte[], byte[]> message = recordIterator.next();
// Only emit records before stopping offset
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public PscSourceFetcherManager(

public void commitOffsets(
Collection<MessageId> offsetsToCommit, OffsetCommitCallback callback) {
LOG.debug("Committing offsets {}", offsetsToCommit);
LOG.info("Committing offsets {}", offsetsToCommit);
if (offsetsToCommit.isEmpty()) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ void testInitTransactionId() throws IOException {
reuse.abortTransaction();
}
assertNumTransactions(i);
assertThat(readRecords(topicUriStr).asList().size()).isEqualTo(i / 2);
assertThat(readRecords(topicUriStr).asIterable().getMessages().size()).isEqualTo(i / 2);
}
} catch (ConfigurationException | ProducerException | TopicUriSyntaxException | ConsumerException e) {
throw new RuntimeException(e);
Expand Down
Loading

0 comments on commit c42d001

Please sign in to comment.