diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarSource.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarSource.java
new file mode 100644
index 00000000000..c761a881650
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarSource.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.pulsar.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
+import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumStateSerializer;
+import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumerator;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator;
+import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
+import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchemaInitializationContext;
+import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
+import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplitSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+/**
+ * The Source implementation of Pulsar. Please use a {@link PulsarSourceBuilder} to construct a
+ * {@link PulsarSource}. The following example shows how to create a PulsarSource emitting records
+ * of String
type.
+ *
+ *
{@code
+ * PulsarSource source = PulsarSource
+ * .builder()
+ * .setTopics(TOPIC1, TOPIC2)
+ * .setServiceUrl(getServiceUrl())
+ * .setAdminUrl(getAdminUrl())
+ * .setSubscriptionName("test")
+ * .setDeserializationSchema(PulsarDeserializationSchema.flinkSchema(new SimpleStringSchema()))
+ * .setBounded(StopCursor::defaultStopCursor)
+ * .build();
+ * }
+ *
+ * See {@link PulsarSourceBuilder} for more details.
+ *
+ * @param The output type of the source.
+ */
+@PublicEvolving
+public final class PulsarSource
+ implements
+ Source,
+ ResultTypeQueryable {
+
+ private static final long serialVersionUID = 7773108631275567433L;
+
+ /**
+ * The configuration for pulsar source, we don't support the pulsar's configuration class
+ * directly.
+ */
+ private final SourceConfiguration sourceConfiguration;
+
+ private final PulsarSubscriber subscriber;
+
+ private final RangeGenerator rangeGenerator;
+
+ private final StartCursor startCursor;
+
+ private final StopCursor stopCursor;
+
+ private final Boundedness boundedness;
+
+ /** The pulsar deserialization schema used for deserializing message. */
+ private final PulsarDeserializationSchema deserializationSchema;
+
+ /**
+ * The constructor for PulsarSource, it's package protected for forcing using {@link
+ * PulsarSourceBuilder}.
+ */
+ PulsarSource(
+ SourceConfiguration sourceConfiguration,
+ PulsarSubscriber subscriber,
+ RangeGenerator rangeGenerator,
+ StartCursor startCursor,
+ StopCursor stopCursor,
+ Boundedness boundedness,
+ PulsarDeserializationSchema deserializationSchema) {
+ this.sourceConfiguration = sourceConfiguration;
+ this.subscriber = subscriber;
+ this.rangeGenerator = rangeGenerator;
+ this.startCursor = startCursor;
+ this.stopCursor = stopCursor;
+ this.boundedness = boundedness;
+ this.deserializationSchema = deserializationSchema;
+ }
+
+ /**
+ * Get a PulsarSourceBuilder to builder a {@link PulsarSource}.
+ *
+ * @return a Pulsar source builder.
+ */
+ public static PulsarSourceBuilder builder() {
+ return new PulsarSourceBuilder<>();
+ }
+
+ @Override
+ public Boundedness getBoundedness() {
+ return boundedness;
+ }
+
+ @Internal
+ @Override
+ public SourceReader createReader(SourceReaderContext readerContext)
+ throws Exception {
+ // Initialize the deserialization schema before creating the pulsar reader.
+ PulsarDeserializationSchemaInitializationContext initializationContext =
+ new PulsarDeserializationSchemaInitializationContext(readerContext);
+ deserializationSchema.open(initializationContext, sourceConfiguration);
+
+ return PulsarSourceReaderFactory.create(
+ readerContext, deserializationSchema, sourceConfiguration);
+ }
+
+ @Internal
+ @Override
+ public SplitEnumerator createEnumerator(
+ SplitEnumeratorContext enumContext) {
+ return new PulsarSourceEnumerator(
+ subscriber,
+ startCursor,
+ stopCursor,
+ rangeGenerator,
+ sourceConfiguration,
+ enumContext);
+ }
+
+ @Internal
+ @Override
+ public SplitEnumerator restoreEnumerator(
+ SplitEnumeratorContext enumContext,
+ PulsarSourceEnumState checkpoint) {
+ return new PulsarSourceEnumerator(
+ subscriber,
+ startCursor,
+ stopCursor,
+ rangeGenerator,
+ sourceConfiguration,
+ enumContext,
+ checkpoint);
+ }
+
+ @Internal
+ @Override
+ public SimpleVersionedSerializer getSplitSerializer() {
+ return PulsarPartitionSplitSerializer.INSTANCE;
+ }
+
+ @Internal
+ @Override
+ public SimpleVersionedSerializer getEnumeratorCheckpointSerializer() {
+ return PulsarSourceEnumStateSerializer.INSTANCE;
+ }
+
+ @Override
+ public TypeInformation getProducedType() {
+ return deserializationSchema.getProducedType();
+ }
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarSourceBuilder.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarSourceBuilder.java
new file mode 100644
index 00000000000..ad765231e6c
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarSourceBuilder.java
@@ -0,0 +1,519 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.pulsar.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.pulsar.common.config.PulsarConfigBuilder;
+import org.apache.flink.connector.pulsar.common.config.PulsarOptions;
+import org.apache.flink.connector.pulsar.source.PulsarSourceOptions;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.range.FullRangeGenerator;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.range.SplitRangeGenerator;
+import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
+import org.apache.pulsar.client.api.RegexSubscriptionMode;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.regex.Pattern;
+
+import static java.lang.Boolean.FALSE;
+import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
+import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ENABLE_TRANSACTION;
+import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
+import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_CONSUMER_NAME;
+import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE;
+import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS;
+import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_READ_TRANSACTION_TIMEOUT;
+import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
+import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_TYPE;
+import static org.apache.flink.connector.pulsar.source.config.PulsarSourceConfigUtils.SOURCE_CONFIG_VALIDATOR;
+import static org.apache.flink.util.InstantiationUtil.isSerializable;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The builder class for {@link PulsarSource} to make it easier for the users to construct a {@link
+ * PulsarSource}.
+ *
+ * The following example shows the minimum setup to create a PulsarSource that reads the String
+ * values from a Pulsar topic.
+ *
+ *
{@code
+ * PulsarSource source = PulsarSource
+ * .builder()
+ * .setServiceUrl(PULSAR_BROKER_URL)
+ * .setAdminUrl(PULSAR_BROKER_HTTP_URL)
+ * .setSubscriptionName("flink-source-1")
+ * .setTopics(Arrays.asList(TOPIC1, TOPIC2))
+ * .setDeserializationSchema(PulsarDeserializationSchema.flinkSchema(new SimpleStringSchema()))
+ * .build();
+ * }
+ *
+ * The service url, admin url, subscription name, topics to consume, and the record deserializer
+ * are required fields that must be set.
+ *
+ *
To specify the starting position of PulsarSource, one can call {@link
+ * #setStartCursor(StartCursor)}.
+ *
+ *
By default, the PulsarSource runs in an {@link Boundedness#CONTINUOUS_UNBOUNDED} mode and
+ * never stop until the Flink job is canceled or fails. To let the PulsarSource run in {@link
+ * Boundedness#CONTINUOUS_UNBOUNDED} but stops at some given offsets, one can call {@link
+ * #setUnboundedStopCursor(StopCursor)} and disable auto partition discovery as described below. For
+ * example the following PulsarSource stops after it consumes up to a event time when the Flink
+ * started.
+ *
+ *
To stop the connector user has to disable the auto partition discovery. As auto partition
+ * discovery always expected new splits to come and not exiting. To disable auto partition
+ * discovery, use builder.setConfig({@link
+ * PulsarSourceOptions#PULSAR_PARTITION_DISCOVERY_INTERVAL_MS}, -1).
+ *
+ *
{@code
+ * PulsarSource source = PulsarSource
+ * .builder()
+ * .setServiceUrl(PULSAR_BROKER_URL)
+ * .setAdminUrl(PULSAR_BROKER_HTTP_URL)
+ * .setSubscriptionName("flink-source-1")
+ * .setTopics(Arrays.asList(TOPIC1, TOPIC2))
+ * .setDeserializationSchema(PulsarDeserializationSchema.flinkSchema(new SimpleStringSchema()))
+ * .setUnboundedStopCursor(StopCursor.atEventTime(System.currentTimeMillis()))
+ * .build();
+ * }
+ *
+ * @param The output type of the source.
+ */
+@PublicEvolving
+public final class PulsarSourceBuilder {
+
+ private static final Logger LOG = LoggerFactory.getLogger(PulsarSourceBuilder.class);
+
+ private final PulsarConfigBuilder configBuilder;
+
+ private PulsarSubscriber subscriber;
+ private RangeGenerator rangeGenerator;
+ private StartCursor startCursor;
+ private StopCursor stopCursor;
+ private Boundedness boundedness;
+ private PulsarDeserializationSchema deserializationSchema;
+
+ // private builder constructor.
+ PulsarSourceBuilder() {
+ this.configBuilder = new PulsarConfigBuilder();
+ this.startCursor = StartCursor.defaultStartCursor();
+ this.stopCursor = StopCursor.defaultStopCursor();
+ this.boundedness = Boundedness.CONTINUOUS_UNBOUNDED;
+ }
+
+ /**
+ * Sets the admin endpoint for the PulsarAdmin of the PulsarSource.
+ *
+ * @param adminUrl the url for the PulsarAdmin.
+ * @return this PulsarSourceBuilder.
+ */
+ public PulsarSourceBuilder setAdminUrl(String adminUrl) {
+ return setConfig(PULSAR_ADMIN_URL, adminUrl);
+ }
+
+ /**
+ * Sets the server's link for the PulsarConsumer of the PulsarSource.
+ *
+ * @param serviceUrl the server url of the Pulsar cluster.
+ * @return this PulsarSourceBuilder.
+ */
+ public PulsarSourceBuilder setServiceUrl(String serviceUrl) {
+ return setConfig(PULSAR_SERVICE_URL, serviceUrl);
+ }
+
+ /**
+ * Sets the name for this pulsar subscription.
+ *
+ * @param subscriptionName the server url of the Pulsar cluster.
+ * @return this PulsarSourceBuilder.
+ */
+ public PulsarSourceBuilder setSubscriptionName(String subscriptionName) {
+ return setConfig(PULSAR_SUBSCRIPTION_NAME, subscriptionName);
+ }
+
+ /**
+ * {@link SubscriptionType} is the consuming behavior for pulsar, we would generator different
+ * split by the given subscription type. Please take some time to consider which subscription
+ * type matches your application best. Default is {@link SubscriptionType#Shared}.
+ *
+ * @param subscriptionType The type of subscription.
+ * @return this PulsarSourceBuilder.
+ * @see Pulsar
+ * Subscriptions
+ */
+ public PulsarSourceBuilder setSubscriptionType(SubscriptionType subscriptionType) {
+ return setConfig(PULSAR_SUBSCRIPTION_TYPE, subscriptionType);
+ }
+
+ /**
+ * Set a pulsar topic list for flink source. Some topic may not exist currently, consuming this
+ * non-existed topic wouldn't throw any exception. But the best solution is just consuming by
+ * using a topic regex. You can set topics once either with {@link #setTopics} or {@link
+ * #setTopicPattern} in this builder.
+ *
+ * @param topics The topic list you would like to consume message.
+ * @return this PulsarSourceBuilder.
+ */
+ public PulsarSourceBuilder setTopics(String... topics) {
+ return setTopics(Arrays.asList(topics));
+ }
+
+ /**
+ * Set a pulsar topic list for flink source. Some topic may not exist currently, consuming this
+ * non-existed topic wouldn't throw any exception. But the best solution is just consuming by
+ * using a topic regex. You can set topics once either with {@link #setTopics} or {@link
+ * #setTopicPattern} in this builder.
+ *
+ * @param topics The topic list you would like to consume message.
+ * @return this PulsarSourceBuilder.
+ */
+ public PulsarSourceBuilder setTopics(List topics) {
+ ensureSubscriberIsNull("topics");
+ List distinctTopics = TopicNameUtils.distinctTopics(topics);
+ this.subscriber = PulsarSubscriber.getTopicListSubscriber(distinctTopics);
+ return this;
+ }
+
+ /**
+ * Set a topic pattern to consume from the java regex str. You can set topics once either with
+ * {@link #setTopics} or {@link #setTopicPattern} in this builder.
+ *
+ * @param topicsPattern the pattern of the topic name to consume from.
+ * @return this PulsarSourceBuilder.
+ */
+ public PulsarSourceBuilder setTopicPattern(String topicsPattern) {
+ return setTopicPattern(Pattern.compile(topicsPattern));
+ }
+
+ /**
+ * Set a topic pattern to consume from the java {@link Pattern}. You can set topics once either
+ * with {@link #setTopics} or {@link #setTopicPattern} in this builder.
+ *
+ * @param topicsPattern the pattern of the topic name to consume from.
+ * @return this PulsarSourceBuilder.
+ */
+ public PulsarSourceBuilder setTopicPattern(Pattern topicsPattern) {
+ return setTopicPattern(topicsPattern, RegexSubscriptionMode.AllTopics);
+ }
+
+ /**
+ * Set a topic pattern to consume from the java regex str. You can set topics once either with
+ * {@link #setTopics} or {@link #setTopicPattern} in this builder.
+ *
+ * @param topicsPattern the pattern of the topic name to consume from.
+ * @param regexSubscriptionMode The topic filter for regex subscription.
+ * @return this PulsarSourceBuilder.
+ */
+ public PulsarSourceBuilder setTopicPattern(
+ String topicsPattern, RegexSubscriptionMode regexSubscriptionMode) {
+ return setTopicPattern(Pattern.compile(topicsPattern), regexSubscriptionMode);
+ }
+
+ /**
+ * Set a topic pattern to consume from the java {@link Pattern}. You can set topics once either
+ * with {@link #setTopics} or {@link #setTopicPattern} in this builder.
+ *
+ * @param topicsPattern the pattern of the topic name to consume from.
+ * @param regexSubscriptionMode When subscribing to a topic using a regular expression, you can
+ * pick a certain type of topics.
+ *
+ * - PersistentOnly: only subscribe to persistent topics.
+ *
- NonPersistentOnly: only subscribe to non-persistent topics.
+ *
- AllTopics: subscribe to both persistent and non-persistent topics.
+ *
+ *
+ * @return this PulsarSourceBuilder.
+ */
+ public PulsarSourceBuilder setTopicPattern(
+ Pattern topicsPattern, RegexSubscriptionMode regexSubscriptionMode) {
+ ensureSubscriberIsNull("topic pattern");
+ this.subscriber =
+ PulsarSubscriber.getTopicPatternSubscriber(topicsPattern, regexSubscriptionMode);
+ return this;
+ }
+
+ /**
+ * The consumer name is informative, and it can be used to identify a particular consumer
+ * instance from the topic stats.
+ */
+ public PulsarSourceBuilder setConsumerName(String consumerName) {
+ return setConfig(PULSAR_CONSUMER_NAME, consumerName);
+ }
+
+ /**
+ * Set a topic range generator for Key_Shared subscription.
+ *
+ * @param rangeGenerator A generator which would generate a set of {@link TopicRange} for given
+ * topic.
+ * @return this PulsarSourceBuilder.
+ */
+ public PulsarSourceBuilder setRangeGenerator(RangeGenerator rangeGenerator) {
+ if (configBuilder.contains(PULSAR_SUBSCRIPTION_TYPE)) {
+ SubscriptionType subscriptionType = configBuilder.get(PULSAR_SUBSCRIPTION_TYPE);
+ checkArgument(
+ subscriptionType == SubscriptionType.Key_Shared,
+ "Key_Shared subscription should be used for custom rangeGenerator instead of %s",
+ subscriptionType);
+ } else {
+ LOG.warn("No subscription type provided, set it to Key_Shared.");
+ setSubscriptionType(SubscriptionType.Key_Shared);
+ }
+ this.rangeGenerator = checkNotNull(rangeGenerator);
+ return this;
+ }
+
+ /**
+ * Specify from which offsets the PulsarSource should start consume from by providing an {@link
+ * StartCursor}.
+ *
+ * @param startCursor set the starting offsets for the Source.
+ * @return this PulsarSourceBuilder.
+ */
+ public PulsarSourceBuilder setStartCursor(StartCursor startCursor) {
+ this.startCursor = checkNotNull(startCursor);
+ return this;
+ }
+
+ /**
+ * By default, the PulsarSource runs in an {@link Boundedness#CONTINUOUS_UNBOUNDED} mode and
+ * never stop until the Flink job is canceled or fails. To let the PulsarSource run in {@link
+ * Boundedness#CONTINUOUS_UNBOUNDED} but stops at some given offsets, one can call {@link
+ * #setUnboundedStopCursor(StopCursor)} and disable auto partition discovery as described below.
+ *
+ * This method is different from {@link #setBoundedStopCursor(StopCursor)} that after setting
+ * the stopping offsets with this method, {@link PulsarSource#getBoundedness()} will still
+ * return {@link Boundedness#CONTINUOUS_UNBOUNDED} even though it will stop at the stopping
+ * offsets specified by the stopping offsets {@link StopCursor}.
+ *
+ *
To stop the connector user has to disable the auto partition discovery. As auto partition
+ * discovery always expected new splits to come and not exiting. To disable auto partition
+ * discovery, use builder.setConfig({@link
+ * PulsarSourceOptions#PULSAR_PARTITION_DISCOVERY_INTERVAL_MS}, -1).
+ *
+ * @param stopCursor The {@link StopCursor} to specify the stopping offset.
+ * @return this PulsarSourceBuilder.
+ * @see #setBoundedStopCursor(StopCursor)
+ */
+ public PulsarSourceBuilder setUnboundedStopCursor(StopCursor stopCursor) {
+ this.boundedness = Boundedness.CONTINUOUS_UNBOUNDED;
+ this.stopCursor = checkNotNull(stopCursor);
+ return this;
+ }
+
+ /**
+ * By default, the PulsarSource is set to run in {@link Boundedness#CONTINUOUS_UNBOUNDED} manner
+ * and thus never stops until the Flink job fails or is canceled. To let the PulsarSource run in
+ * {@link Boundedness#BOUNDED} manner and stops at some point, one can set an {@link StopCursor}
+ * to specify the stopping offsets for each partition. When all the partitions have reached
+ * their stopping offsets, the PulsarSource will then exit.
+ *
+ * This method is different from {@link #setUnboundedStopCursor(StopCursor)} that after
+ * setting the stopping offsets with this method, {@link PulsarSource#getBoundedness()} will
+ * return {@link Boundedness#BOUNDED} instead of {@link Boundedness#CONTINUOUS_UNBOUNDED}.
+ *
+ * @param stopCursor the {@link StopCursor} to specify the stopping offsets.
+ * @return this PulsarSourceBuilder.
+ * @see #setUnboundedStopCursor(StopCursor)
+ */
+ public PulsarSourceBuilder setBoundedStopCursor(StopCursor stopCursor) {
+ this.boundedness = Boundedness.BOUNDED;
+ this.stopCursor = checkNotNull(stopCursor);
+ return this;
+ }
+
+ /**
+ * DeserializationSchema is required for getting the {@link Schema} for deserialize message from
+ * pulsar and getting the {@link TypeInformation} for message serialization in flink.
+ *
+ * We have defined a set of implementations, using {@code
+ * PulsarDeserializationSchema#pulsarSchema} or {@code PulsarDeserializationSchema#flinkSchema}
+ * for creating the desired schema.
+ */
+ public PulsarSourceBuilder setDeserializationSchema(
+ PulsarDeserializationSchema deserializationSchema) {
+ PulsarSourceBuilder self = specialized();
+ self.deserializationSchema = deserializationSchema;
+ return self;
+ }
+
+ /**
+ * Set an arbitrary property for the PulsarSource and Pulsar Consumer. The valid keys can be
+ * found in {@link PulsarSourceOptions} and {@link PulsarOptions}.
+ *
+ * Make sure the option could be set only once or with same value.
+ *
+ * @param key the key of the property.
+ * @param value the value of the property.
+ * @return this PulsarSourceBuilder.
+ */
+ public PulsarSourceBuilder setConfig(ConfigOption key, T value) {
+ configBuilder.set(key, value);
+ return this;
+ }
+
+ /**
+ * Set arbitrary properties for the PulsarSource and Pulsar Consumer. The valid keys can be
+ * found in {@link PulsarSourceOptions} and {@link PulsarOptions}.
+ *
+ * @param config the config to set for the PulsarSource.
+ * @return this PulsarSourceBuilder.
+ */
+ public PulsarSourceBuilder setConfig(Configuration config) {
+ configBuilder.set(config);
+ return this;
+ }
+
+ /**
+ * Set arbitrary properties for the PulsarSource and Pulsar Consumer. The valid keys can be
+ * found in {@link PulsarSourceOptions} and {@link PulsarOptions}.
+ *
+ * This method is mainly used for future flink SQL binding.
+ *
+ * @param properties the config properties to set for the PulsarSource.
+ * @return this PulsarSourceBuilder.
+ */
+ public PulsarSourceBuilder setProperties(Properties properties) {
+ configBuilder.set(properties);
+ return this;
+ }
+
+ /**
+ * Build the {@link PulsarSource}.
+ *
+ * @return a PulsarSource with the settings made for this builder.
+ */
+ @SuppressWarnings("java:S3776")
+ public PulsarSource build() {
+
+ // Ensure the topic subscriber for pulsar.
+ checkNotNull(subscriber, "No topic names or topic pattern are provided.");
+
+ SubscriptionType subscriptionType = configBuilder.get(PULSAR_SUBSCRIPTION_TYPE);
+ if (subscriptionType == SubscriptionType.Key_Shared) {
+ if (rangeGenerator == null) {
+ LOG.warn(
+ "No range generator provided for key_shared subscription,"
+ + " we would use the SplitRangeGenerator as the default range generator.");
+ this.rangeGenerator = new SplitRangeGenerator();
+ }
+ } else {
+ // Override the range generator.
+ this.rangeGenerator = new FullRangeGenerator();
+ }
+
+ if (boundedness == null) {
+ LOG.warn("No boundedness was set, mark it as a endless stream.");
+ this.boundedness = Boundedness.CONTINUOUS_UNBOUNDED;
+ }
+ if (boundedness == Boundedness.BOUNDED
+ && configBuilder.get(PULSAR_PARTITION_DISCOVERY_INTERVAL_MS) >= 0) {
+ LOG.warn(
+ "{} property is overridden to -1 because the source is bounded.",
+ PULSAR_PARTITION_DISCOVERY_INTERVAL_MS);
+ configBuilder.override(PULSAR_PARTITION_DISCOVERY_INTERVAL_MS, -1L);
+ }
+
+ checkNotNull(deserializationSchema, "deserializationSchema should be set.");
+
+ // Enable transaction if the cursor auto commit is disabled for Key_Shared & Shared.
+ if (FALSE.equals(configBuilder.get(PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE))
+ && (subscriptionType == SubscriptionType.Key_Shared
+ || subscriptionType == SubscriptionType.Shared)) {
+ LOG.info(
+ "Pulsar cursor auto commit is disabled, make sure checkpoint is enabled "
+ + "and your pulsar cluster is support the transaction.");
+ configBuilder.override(PULSAR_ENABLE_TRANSACTION, true);
+
+ if (!configBuilder.contains(PULSAR_READ_TRANSACTION_TIMEOUT)) {
+ LOG.warn(
+ "The default pulsar transaction timeout is 3 hours, "
+ + "make sure it was greater than your checkpoint interval.");
+ } else {
+ Long timeout = configBuilder.get(PULSAR_READ_TRANSACTION_TIMEOUT);
+ LOG.warn(
+ "The configured transaction timeout is {} mille seconds, "
+ + "make sure it was greater than your checkpoint interval.",
+ timeout);
+ }
+ }
+
+ if (!configBuilder.contains(PULSAR_CONSUMER_NAME)) {
+ LOG.warn(
+ "We recommend set a readable consumer name through setConsumerName(String) in production mode.");
+ } else {
+ String consumerName = configBuilder.get(PULSAR_CONSUMER_NAME);
+ if (!consumerName.contains("%s")) {
+ configBuilder.override(PULSAR_CONSUMER_NAME, consumerName + " - %s");
+ }
+ }
+
+ // Since these implementations could be a lambda, make sure they are serializable.
+ checkState(isSerializable(startCursor), "StartCursor isn't serializable");
+ checkState(isSerializable(stopCursor), "StopCursor isn't serializable");
+ checkState(isSerializable(rangeGenerator), "RangeGenerator isn't serializable");
+
+ // Check builder configuration.
+ SourceConfiguration sourceConfiguration =
+ configBuilder.build(SOURCE_CONFIG_VALIDATOR, SourceConfiguration::new);
+
+ return new PulsarSource<>(
+ sourceConfiguration,
+ subscriber,
+ rangeGenerator,
+ startCursor,
+ stopCursor,
+ boundedness,
+ deserializationSchema);
+ }
+
+ // ------------- private helpers --------------
+
+ /** Helper method for java compiler recognizes the generic type. */
+ @SuppressWarnings("unchecked")
+ private PulsarSourceBuilder specialized() {
+ return (PulsarSourceBuilder) this;
+ }
+
+ /** Topic name and topic pattern is conflict, make sure they are set only once. */
+ private void ensureSubscriberIsNull(String attemptingSubscribeMode) {
+ if (subscriber != null) {
+ throw new IllegalStateException(
+ String.format(
+ "Cannot use %s for consumption because a %s is already set for consumption.",
+ attemptingSubscribeMode, subscriber.getClass().getSimpleName()));
+ }
+ }
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarSourceReaderFactory.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarSourceReaderFactory.java
new file mode 100644
index 00000000000..b3a78628334
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/PulsarSourceReaderFactory.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.pulsar.source;
+
+import org.apache.inlong.sort.pulsar.source.reader.PulsarOrderedSourceReader;
+import org.apache.inlong.sort.pulsar.source.reader.PulsarUnorderedSourceReader;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
+import org.apache.flink.connector.pulsar.source.reader.message.PulsarMessage;
+import org.apache.flink.connector.pulsar.source.reader.split.PulsarOrderedPartitionSplitReader;
+import org.apache.flink.connector.pulsar.source.reader.split.PulsarUnorderedPartitionSplitReader;
+import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+
+import java.util.function.Supplier;
+
+import static org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createAdmin;
+import static org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createClient;
+
+/**
+ * This factory class is used for creating different types of source reader for different
+ * subscription type.
+ *
+ *
+ * - Failover, Exclusive: We would create {@link PulsarOrderedSourceReader}.
+ *
- Shared, Key_Shared: We would create {@link PulsarUnorderedSourceReader}.
+ *
+ */
+@Internal
+public final class PulsarSourceReaderFactory {
+
+ private PulsarSourceReaderFactory() {
+ // No public constructor.
+ }
+
+ @SuppressWarnings("java:S2095")
+ public static SourceReader create(
+ SourceReaderContext readerContext,
+ PulsarDeserializationSchema deserializationSchema,
+ SourceConfiguration sourceConfiguration) {
+
+ PulsarClient pulsarClient = createClient(sourceConfiguration);
+ PulsarAdmin pulsarAdmin = createAdmin(sourceConfiguration);
+
+ // Create a message queue with the predefined source option.
+ int queueCapacity = sourceConfiguration.getMessageQueueCapacity();
+ FutureCompletingBlockingQueue>> elementsQueue =
+ new FutureCompletingBlockingQueue<>(queueCapacity);
+
+ // Create different pulsar source reader by subscription type.
+ SubscriptionType subscriptionType = sourceConfiguration.getSubscriptionType();
+ if (subscriptionType == SubscriptionType.Failover
+ || subscriptionType == SubscriptionType.Exclusive) {
+ // Create an ordered split reader supplier.
+ Supplier> splitReaderSupplier =
+ () -> new PulsarOrderedPartitionSplitReader<>(
+ pulsarClient,
+ pulsarAdmin,
+ sourceConfiguration,
+ deserializationSchema);
+
+ return new PulsarOrderedSourceReader<>(
+ elementsQueue,
+ splitReaderSupplier,
+ readerContext,
+ sourceConfiguration,
+ pulsarClient,
+ pulsarAdmin,
+ deserializationSchema);
+ } else if (subscriptionType == SubscriptionType.Shared
+ || subscriptionType == SubscriptionType.Key_Shared) {
+ TransactionCoordinatorClient coordinatorClient =
+ ((PulsarClientImpl) pulsarClient).getTcClient();
+ if (coordinatorClient == null
+ && !sourceConfiguration.isEnableAutoAcknowledgeMessage()) {
+ throw new IllegalStateException("Transaction is required but didn't enabled");
+ }
+
+ Supplier> splitReaderSupplier =
+ () -> new PulsarUnorderedPartitionSplitReader<>(
+ pulsarClient,
+ pulsarAdmin,
+ sourceConfiguration,
+ deserializationSchema,
+ coordinatorClient);
+
+ return new PulsarUnorderedSourceReader<>(
+ elementsQueue,
+ splitReaderSupplier,
+ readerContext,
+ sourceConfiguration,
+ pulsarClient,
+ pulsarAdmin,
+ coordinatorClient,
+ deserializationSchema);
+ } else {
+ throw new UnsupportedOperationException(
+ "This subscription type is not " + subscriptionType + " supported currently.");
+ }
+ }
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarOrderedSourceReader.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarOrderedSourceReader.java
new file mode 100644
index 00000000000..6e90840e507
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarOrderedSourceReader.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.pulsar.source.reader;
+
+import org.apache.inlong.sort.pulsar.table.PulsarTableDeserializationSchema;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
+import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
+import org.apache.flink.connector.pulsar.source.reader.fetcher.PulsarOrderedFetcherManager;
+import org.apache.flink.connector.pulsar.source.reader.message.PulsarMessage;
+import org.apache.flink.connector.pulsar.source.reader.split.PulsarOrderedPartitionSplitReader;
+import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
+import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplitState;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+
+/**
+ * The source reader for pulsar subscription Failover and Exclusive, which consumes the ordered
+ * messages.
+ */
+@Internal
+public class PulsarOrderedSourceReader extends PulsarSourceReaderBase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(PulsarOrderedSourceReader.class);
+
+ @VisibleForTesting
+ final SortedMap> cursorsToCommit;
+ private final ConcurrentMap cursorsOfFinishedSplits;
+ private final AtomicReference cursorCommitThrowable = new AtomicReference<>();
+ private final PulsarDeserializationSchema deserializationSchema;
+ private ScheduledExecutorService cursorScheduler;
+
+ public PulsarOrderedSourceReader(
+ FutureCompletingBlockingQueue>> elementsQueue,
+ Supplier> splitReaderSupplier,
+ SourceReaderContext context,
+ SourceConfiguration sourceConfiguration,
+ PulsarClient pulsarClient,
+ PulsarAdmin pulsarAdmin,
+ PulsarDeserializationSchema deserializationSchema) {
+ super(
+ elementsQueue,
+ new PulsarOrderedFetcherManager<>(elementsQueue, splitReaderSupplier::get),
+ context,
+ sourceConfiguration,
+ pulsarClient,
+ pulsarAdmin);
+
+ this.cursorsToCommit = Collections.synchronizedSortedMap(new TreeMap<>());
+ this.cursorsOfFinishedSplits = new ConcurrentHashMap<>();
+ this.deserializationSchema = deserializationSchema;
+ }
+
+ @Override
+ public void start() {
+ super.start();
+ if (sourceConfiguration.isEnableAutoAcknowledgeMessage()) {
+ this.cursorScheduler = Executors.newSingleThreadScheduledExecutor();
+
+ // Auto commit cursor, this could be enabled when checkpoint is also enabled.
+ cursorScheduler.scheduleAtFixedRate(
+ this::cumulativeAcknowledgmentMessage,
+ sourceConfiguration.getMaxFetchTime().toMillis(),
+ sourceConfiguration.getAutoCommitCursorInterval(),
+ TimeUnit.MILLISECONDS);
+ }
+ }
+
+ @Override
+ public InputStatus pollNext(ReaderOutput output) throws Exception {
+ checkErrorAndRethrow();
+ return super.pollNext(output);
+ }
+
+ @Override
+ protected void onSplitFinished(Map finishedSplitIds) {
+ // We don't require new splits, all the splits are pre-assigned by source enumerator.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("onSplitFinished event: {}", finishedSplitIds);
+ }
+
+ for (Map.Entry entry : finishedSplitIds.entrySet()) {
+ PulsarPartitionSplitState state = entry.getValue();
+ MessageId latestConsumedId = state.getLatestConsumedId();
+ if (latestConsumedId != null) {
+ cursorsOfFinishedSplits.put(state.getPartition(), latestConsumedId);
+ }
+ }
+ }
+
+ @Override
+ public List snapshotState(long checkpointId) {
+ if (deserializationSchema instanceof PulsarTableDeserializationSchema) {
+ ((PulsarTableDeserializationSchema) deserializationSchema).updateCurrentCheckpointId(checkpointId);
+ }
+ List splits = super.snapshotState(checkpointId);
+
+ // Perform a snapshot for these splits.
+ Map cursors =
+ cursorsToCommit.computeIfAbsent(checkpointId, id -> new HashMap<>());
+ // Put the cursors of the active splits.
+ for (PulsarPartitionSplit split : splits) {
+ MessageId latestConsumedId = split.getLatestConsumedId();
+ if (latestConsumedId != null) {
+ cursors.put(split.getPartition(), latestConsumedId);
+ }
+ }
+ // Put cursors of all the finished splits.
+ cursors.putAll(cursorsOfFinishedSplits);
+
+ return splits;
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {
+ LOG.debug("Committing cursors for checkpoint {}", checkpointId);
+
+ Map cursors = cursorsToCommit.get(checkpointId);
+ try {
+ ((PulsarOrderedFetcherManager) splitFetcherManager).acknowledgeMessages(cursors);
+ LOG.debug("Successfully acknowledge cursors for checkpoint {}", checkpointId);
+
+ // Clean up the cursors.
+ cursorsOfFinishedSplits.keySet().removeAll(cursors.keySet());
+ cursorsToCommit.headMap(checkpointId + 1).clear();
+ if (deserializationSchema instanceof PulsarTableDeserializationSchema) {
+ PulsarTableDeserializationSchema pulsarTableDeserializationSchema =
+ (PulsarTableDeserializationSchema) deserializationSchema;
+ pulsarTableDeserializationSchema.flushAudit();
+ pulsarTableDeserializationSchema.updateLastCheckpointId(checkpointId);
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to acknowledge cursors for checkpoint {}", checkpointId, e);
+ cursorCommitThrowable.compareAndSet(null, e);
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (cursorScheduler != null) {
+ cursorScheduler.shutdown();
+ }
+
+ super.close();
+ }
+
+ // ----------------- helper methods --------------
+
+ private void checkErrorAndRethrow() {
+ Throwable cause = cursorCommitThrowable.get();
+ if (cause != null) {
+ throw new RuntimeException("An error occurred in acknowledge message.", cause);
+ }
+ }
+
+ /** Acknowledge the pulsar topic partition cursor by the last consumed message id. */
+ private void cumulativeAcknowledgmentMessage() {
+ Map cursors = new HashMap<>(cursorsOfFinishedSplits);
+
+ // We reuse snapshotState for acquiring a consume status snapshot.
+ // So the checkpoint didn't really happen, so we just pass a fake checkpoint id.
+ List splits = super.snapshotState(1L);
+ for (PulsarPartitionSplit split : splits) {
+ MessageId latestConsumedId = split.getLatestConsumedId();
+ if (latestConsumedId != null) {
+ cursors.put(split.getPartition(), latestConsumedId);
+ }
+ }
+
+ try {
+ ((PulsarOrderedFetcherManager) splitFetcherManager).acknowledgeMessages(cursors);
+ // Clean up the finish splits.
+ cursorsOfFinishedSplits.keySet().removeAll(cursors.keySet());
+ } catch (Exception e) {
+ LOG.error("Fail in auto cursor commit.", e);
+ cursorCommitThrowable.compareAndSet(null, e);
+ }
+ }
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarSourceReaderBase.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarSourceReaderBase.java
new file mode 100644
index 00000000000..d454038df27
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarSourceReaderBase.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.pulsar.source.reader;
+
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.SourceReaderBase;
+import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import org.apache.flink.connector.pulsar.source.reader.emitter.PulsarRecordEmitter;
+import org.apache.flink.connector.pulsar.source.reader.fetcher.PulsarFetcherManagerBase;
+import org.apache.flink.connector.pulsar.source.reader.message.PulsarMessage;
+import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
+import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplitState;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.PulsarClient;
+
+/**
+ * The common pulsar source reader for both ordered & unordered message consuming.
+ *
+ * @param The output message type for flink.
+ */
+abstract class PulsarSourceReaderBase
+ extends
+ SourceReaderBase, OUT, PulsarPartitionSplit, PulsarPartitionSplitState> {
+
+ protected final SourceConfiguration sourceConfiguration;
+ protected final PulsarClient pulsarClient;
+ protected final PulsarAdmin pulsarAdmin;
+
+ protected PulsarSourceReaderBase(
+ FutureCompletingBlockingQueue>> elementsQueue,
+ PulsarFetcherManagerBase splitFetcherManager,
+ SourceReaderContext context,
+ SourceConfiguration sourceConfiguration,
+ PulsarClient pulsarClient,
+ PulsarAdmin pulsarAdmin) {
+ super(
+ elementsQueue,
+ splitFetcherManager,
+ new PulsarRecordEmitter<>(),
+ sourceConfiguration,
+ context);
+
+ this.sourceConfiguration = sourceConfiguration;
+ this.pulsarClient = pulsarClient;
+ this.pulsarAdmin = pulsarAdmin;
+ }
+
+ @Override
+ protected PulsarPartitionSplitState initializedState(PulsarPartitionSplit split) {
+ return new PulsarPartitionSplitState(split);
+ }
+
+ @Override
+ protected PulsarPartitionSplit toSplitType(
+ String splitId, PulsarPartitionSplitState splitState) {
+ return splitState.toPulsarPartitionSplit();
+ }
+
+ @Override
+ public void close() throws Exception {
+ // Close the all the consumers first.
+ super.close();
+
+ // Close shared pulsar resources.
+ pulsarClient.shutdown();
+ pulsarAdmin.close();
+ }
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarUnorderedSourceReader.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarUnorderedSourceReader.java
new file mode 100644
index 00000000000..cca709e9b68
--- /dev/null
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarUnorderedSourceReader.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.pulsar.source.reader;
+
+import org.apache.inlong.sort.pulsar.table.PulsarTableDeserializationSchema;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
+import org.apache.flink.connector.pulsar.source.reader.fetcher.PulsarUnorderedFetcherManager;
+import org.apache.flink.connector.pulsar.source.reader.message.PulsarMessage;
+import org.apache.flink.connector.pulsar.source.reader.split.PulsarUnorderedPartitionSplitReader;
+import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
+import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplitState;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.function.Supplier;
+
+import static java.util.stream.Collectors.toList;
+
+/**
+ * The source reader for pulsar subscription Shared and Key_Shared, which consumes the unordered
+ * messages.
+ */
+@Internal
+public class PulsarUnorderedSourceReader extends PulsarSourceReaderBase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(PulsarUnorderedSourceReader.class);
+
+ @Nullable
+ private final TransactionCoordinatorClient coordinatorClient;
+ private final SortedMap> transactionsToCommit;
+ private final List transactionsOfFinishedSplits;
+ private final PulsarDeserializationSchema deserializationSchema;
+ private boolean started = false;
+
+ public PulsarUnorderedSourceReader(
+ FutureCompletingBlockingQueue>> elementsQueue,
+ Supplier> splitReaderSupplier,
+ SourceReaderContext context,
+ SourceConfiguration sourceConfiguration,
+ PulsarClient pulsarClient,
+ PulsarAdmin pulsarAdmin,
+ @Nullable TransactionCoordinatorClient coordinatorClient,
+ PulsarDeserializationSchema deserializationSchema) {
+ super(
+ elementsQueue,
+ new PulsarUnorderedFetcherManager<>(elementsQueue, splitReaderSupplier::get),
+ context,
+ sourceConfiguration,
+ pulsarClient,
+ pulsarAdmin);
+ this.coordinatorClient = coordinatorClient;
+ this.transactionsToCommit = Collections.synchronizedSortedMap(new TreeMap<>());
+ this.transactionsOfFinishedSplits = Collections.synchronizedList(new ArrayList<>());
+ this.deserializationSchema = deserializationSchema;
+ }
+
+ @Override
+ public void start() {
+ this.started = true;
+ super.start();
+ }
+
+ @Override
+ public void addSplits(List splits) {
+ if (started) {
+ // We only accept splits after this reader is started and registered to the pipeline.
+ // This would ignore the splits from the state.
+ super.addSplits(splits);
+ } else {
+ // Abort the pending transaction in this split.
+ for (PulsarPartitionSplit split : splits) {
+ LOG.info("Ignore the split {} saved in checkpoint.", split);
+
+ TxnID transactionId = split.getUncommittedTransactionId();
+ if (transactionId != null && coordinatorClient != null) {
+ try {
+ coordinatorClient.abort(transactionId);
+ } catch (Exception e) {
+ LOG.debug(
+ "Error in aborting transaction {} from the checkpoint",
+ transactionId,
+ e);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ protected void onSplitFinished(Map finishedSplitIds) {
+ // We don't require new splits, all the splits are pre-assigned by source enumerator.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("onSplitFinished event: {}", finishedSplitIds);
+ }
+
+ if (coordinatorClient != null) {
+ // Commit the uncommitted transaction
+ for (Map.Entry entry : finishedSplitIds.entrySet()) {
+ PulsarPartitionSplitState state = entry.getValue();
+ TxnID uncommittedTransactionId = state.getUncommittedTransactionId();
+ if (uncommittedTransactionId != null) {
+ transactionsOfFinishedSplits.add(uncommittedTransactionId);
+ }
+ }
+ }
+ }
+
+ @Override
+ public List snapshotState(long checkpointId) {
+ LOG.debug("Trigger the new transaction for downstream readers.");
+ if (deserializationSchema instanceof PulsarTableDeserializationSchema) {
+ ((PulsarTableDeserializationSchema) deserializationSchema).updateCurrentCheckpointId(checkpointId);
+ }
+ List splits =
+ ((PulsarUnorderedFetcherManager) splitFetcherManager).snapshotState();
+
+ if (coordinatorClient == null) {
+ return splits;
+ }
+ // Snapshot the transaction status and commit it after checkpoint finishing.
+ List txnIDs =
+ transactionsToCommit.computeIfAbsent(checkpointId, id -> new ArrayList<>());
+ for (PulsarPartitionSplit split : splits) {
+ TxnID uncommittedTransactionId = split.getUncommittedTransactionId();
+ if (uncommittedTransactionId != null) {
+ txnIDs.add(uncommittedTransactionId);
+ }
+ }
+ return splits;
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ LOG.debug("Committing transactions for checkpoint {}", checkpointId);
+ if (coordinatorClient == null) {
+ return;
+ }
+ List checkpointIds =
+ transactionsToCommit.keySet().stream()
+ .filter(id -> id <= checkpointId)
+ .collect(toList());
+ for (Long id : checkpointIds) {
+ List transactions = transactionsToCommit.remove(id);
+ if (transactions != null) {
+ for (TxnID transaction : transactions) {
+ coordinatorClient.commit(transaction);
+ transactionsOfFinishedSplits.remove(transaction);
+ }
+ }
+ }
+ if (deserializationSchema instanceof PulsarTableDeserializationSchema) {
+ PulsarTableDeserializationSchema pulsarTableDeserializationSchema =
+ (PulsarTableDeserializationSchema) deserializationSchema;
+ pulsarTableDeserializationSchema.flushAudit();
+ pulsarTableDeserializationSchema.updateLastCheckpointId(checkpointId);
+ }
+ }
+}
diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java
index 2dae39e8283..554c81a33f9 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java
@@ -19,7 +19,7 @@
import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.MetricsCollector;
-import org.apache.inlong.sort.base.metric.SourceMetricData;
+import org.apache.inlong.sort.base.metric.SourceExactlyMetric;
import org.apache.flink.api.common.functions.util.ListCollector;
import org.apache.flink.api.common.serialization.DeserializationSchema;
@@ -59,7 +59,7 @@ public class PulsarTableDeserializationSchema implements PulsarDeserializationSc
private final boolean innerFormat;
- private SourceMetricData sourceMetricData;
+ private SourceExactlyMetric sourceExactlyMetric;
private MetricOption metricOption;
@@ -90,7 +90,7 @@ public void open(DeserializationSchema.InitializationContext context, SourceConf
keyDeserialization.open(context);
}
if (metricOption != null) {
- sourceMetricData = new SourceMetricData(metricOption);
+ sourceExactlyMetric = new SourceExactlyMetric(metricOption);
}
valueDeserialization.open(context);
}
@@ -98,7 +98,6 @@ public void open(DeserializationSchema.InitializationContext context, SourceConf
@Override
public void deserialize(Message message, Collector collector)
throws IOException {
-
// Get the key row data
List keyRowData = new ArrayList<>();
if (keyDeserialization != null) {
@@ -114,7 +113,7 @@ public void deserialize(Message message, Collector collector)
}
MetricsCollector metricsCollector =
- new MetricsCollector<>(new ListCollector<>(valueRowData), sourceMetricData);
+ new MetricsCollector<>(new ListCollector<>(valueRowData), sourceExactlyMetric);
// reset timestamp if the deserialize schema has not inner format
if (!innerFormat) {
@@ -131,4 +130,22 @@ public void deserialize(Message message, Collector collector)
public TypeInformation getProducedType() {
return producedTypeInfo;
}
+
+ public void flushAudit() {
+ if (sourceExactlyMetric != null) {
+ sourceExactlyMetric.flushAudit();
+ }
+ }
+
+ public void updateCurrentCheckpointId(long checkpointId) {
+ if (sourceExactlyMetric != null) {
+ sourceExactlyMetric.updateCurrentCheckpointId(checkpointId);
+ }
+ }
+
+ public void updateLastCheckpointId(long checkpointId) {
+ if (sourceExactlyMetric != null) {
+ sourceExactlyMetric.updateLastCheckpointId(checkpointId);
+ }
+ }
}
\ No newline at end of file
diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableSource.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableSource.java
index 84a076d5022..e5df5486500 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableSource.java
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableSource.java
@@ -17,8 +17,9 @@
package org.apache.inlong.sort.pulsar.table;
+import org.apache.inlong.sort.pulsar.source.PulsarSource;
+
import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.connector.pulsar.source.PulsarSource;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;