Skip to content

Commit

Permalink
WIP finish PscSourceITCase, need to look into BaseTopicUri.equals() a…
Browse files Browse the repository at this point in the history
…nd whether we can introduce logic in validate() to build the correct subclass of TopicUri
  • Loading branch information
jeffxiang committed Oct 10, 2024
1 parent 19c20c4 commit cd5a930
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,6 @@ PartitionChange getPartitionChange(Set<TopicUriPartition> fetchedPartitions) {
removedPartitions.add(tp);
}
};

assignedPartitions.forEach(dedupOrMarkAsRemoved);
pendingPartitionSplitAssignment.forEach(
(reader, splits) ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,17 +311,17 @@ private void seekToStartingOffsets(
Map<TopicUriPartition, Long> partitionsStartingFromSpecifiedOffsets) throws ConsumerException {

if (!partitionsStartingFromEarliest.isEmpty()) {
LOG.trace("Seeking starting offsets to beginning: {}", partitionsStartingFromEarliest);
LOG.info("Seeking starting offsets to beginning: {}", partitionsStartingFromEarliest);
consumer.seekToBeginning(partitionsStartingFromEarliest);
}

if (!partitionsStartingFromLatest.isEmpty()) {
LOG.trace("Seeking starting offsets to end: {}", partitionsStartingFromLatest);
LOG.info("Seeking starting offsets to end: {}", partitionsStartingFromLatest);
consumer.seekToEnd(partitionsStartingFromLatest);
}

if (!partitionsStartingFromSpecifiedOffsets.isEmpty()) {
LOG.trace(
LOG.info(
"Seeking starting offsets to specified offsets: {}",
partitionsStartingFromSpecifiedOffsets);
partitionsStartingFromSpecifiedOffsets.forEach((tup, offset) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.pinterest.flink.connector.psc.source.PscSourceBuilder;
import com.pinterest.flink.connector.psc.source.enumerator.initializer.OffsetsInitializer;
import com.pinterest.flink.connector.psc.source.reader.deserializer.PscRecordDeserializationSchema;
import com.pinterest.flink.streaming.connectors.psc.PscTestEnvironmentWithKafkaAsPubSub;
import com.pinterest.psc.common.TopicUriPartition;
import com.pinterest.psc.config.PscConfiguration;
import com.pinterest.psc.serde.ByteArraySerializer;
Expand Down Expand Up @@ -52,9 +53,10 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.regex.Pattern;

import static com.pinterest.flink.connector.psc.testutils.PscTestUtils.injectDiscoveryConfigs;
import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric;

/** External context for testing {@link KafkaSource}. */
/** External context for testing {@link PscSource}. */
public class PscSourceExternalContext implements DataStreamSourceExternalContext<String> {

private static final Logger LOG = LoggerFactory.getLogger(PscSourceExternalContext.class);
Expand All @@ -67,6 +69,7 @@ public class PscSourceExternalContext implements DataStreamSourceExternalContext
private final List<URL> connectorJarPaths;
private final String bootstrapServers;
private final String topicName;
private final String topicUriStr;
private final SplitMappingMode splitMappingMode;
private final AdminClient adminClient;
private final List<PscTopicUriPartitionDataWriter> writers = new ArrayList<>();
Expand All @@ -78,6 +81,7 @@ protected PscSourceExternalContext(
this.connectorJarPaths = connectorJarPaths;
this.bootstrapServers = bootstrapServers;
this.topicName = randomize(TOPIC_NAME_PREFIX);
this.topicUriStr = PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX + this.topicName;
this.splitMappingMode = splitMappingMode;
this.adminClient = createAdminClient();
}
Expand All @@ -91,8 +95,14 @@ public List<URL> getConnectorJarPaths() {
public Source<String, ?, ?> createSource(TestingSourceSettings sourceSettings) {
final PscSourceBuilder<String> builder = PscSource.builder();

Properties props = new Properties();
props.setProperty(PscConfiguration.PSC_AUTO_RESOLUTION_ENABLED, "false");
injectDiscoveryConfigs(props, bootstrapServers, PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX);

builder
// .setBootstrapServers(bootstrapServers)
.setClusterUri(PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX)
.setProperties(props)
.setTopicUriPattern(TOPIC_NAME_PATTERN)
.setGroupId(randomize(GROUP_ID_PREFIX))
.setDeserializer(
Expand Down Expand Up @@ -182,16 +192,18 @@ private AdminClient createAdminClient() {

private PscTopicUriPartitionDataWriter createSinglePartitionTopic(int topicIndex) throws Exception {
String newTopicName = topicName + "-" + topicIndex;
String newTopicUriStr = topicUriStr + "-" + topicIndex;
LOG.info("Creating topic '{}'", newTopicName);
adminClient
.createTopics(Collections.singletonList(new NewTopic(newTopicName, 1, (short) 1)))
.all()
.get();
return new PscTopicUriPartitionDataWriter(
getPscProducerProperties(topicIndex), new TopicUriPartition(newTopicName, 0));
getPscProducerProperties(topicIndex), new TopicUriPartition(newTopicUriStr, 0));
}

private PscTopicUriPartitionDataWriter scaleOutTopic(String topicName) throws Exception {
final String topicUriStr = PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX + topicName;
final Set<String> topics = adminClient.listTopics().names().get();
if (topics.contains(topicName)) {
final Map<String, TopicDescription> topicDescriptions =
Expand All @@ -206,15 +218,15 @@ private PscTopicUriPartitionDataWriter scaleOutTopic(String topicName) throws Ex
.get();
return new PscTopicUriPartitionDataWriter(
getPscProducerProperties(numPartitions),
new TopicUriPartition(topicName, numPartitions));
new TopicUriPartition(topicUriStr, numPartitions));
} else {
LOG.info("Creating topic '{}'", topicName);
adminClient
.createTopics(Collections.singletonList(new NewTopic(topicName, 1, (short) 1)))
.all()
.get();
return new PscTopicUriPartitionDataWriter(
getPscProducerProperties(0), new TopicUriPartition(topicName, 0));
getPscProducerProperties(0), new TopicUriPartition(topicUriStr, 0));
}
}

Expand All @@ -226,13 +238,14 @@ private Properties getPscProducerProperties(int producerId) {
PscConfiguration.PSC_PRODUCER_CLIENT_ID,
String.join(
"-",
"flink-kafka-split-writer",
"flink-psc-split-writer",
Integer.toString(producerId),
Long.toString(ThreadLocalRandom.current().nextLong(Long.MAX_VALUE))));
pscProducerProperties.setProperty(
PscConfiguration.PSC_PRODUCER_KEY_SERIALIZER, ByteArraySerializer.class.getName());
pscProducerProperties.setProperty(
PscConfiguration.PSC_PRODUCER_VALUE_SERIALIZER, ByteArraySerializer.class.getName());
injectDiscoveryConfigs(pscProducerProperties, bootstrapServers, PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX);
return pscProducerProperties;
}

Expand Down
6 changes: 5 additions & 1 deletion psc/src/main/java/com/pinterest/psc/common/BaseTopicUri.java
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,13 @@ public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other == null || getClass() != other.getClass()) {
if (other == null) {
return false;
}
if (getClass() != other.getClass()) {
if (!(other instanceof TopicUri)) // this allows for comparison with other implementations of TopicUri
return false;
}
BaseTopicUri otherBaseTopicUri = (BaseTopicUri) other;
return PscCommon.equals(topicUriAsString, otherBaseTopicUri.topicUriAsString) &&
PscCommon.equals(protocol, otherBaseTopicUri.protocol) &&
Expand Down

0 comments on commit cd5a930

Please sign in to comment.