Skip to content

Commit

Permalink
[FLINK-31980][Connectors/Kinesis] Implement EFO subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
hlteoh37 committed Nov 4, 2024
1 parent ccd4eb5 commit b293b77
Show file tree
Hide file tree
Showing 25 changed files with 2,224 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,22 @@
import org.apache.flink.connector.aws.util.AWSClientUtil;
import org.apache.flink.connector.aws.util.AWSGeneralUtil;
import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.kinesis.sink.KinesisStreamsConfigConstants;
import org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions;
import org.apache.flink.connector.kinesis.source.enumerator.KinesisShardAssigner;
import org.apache.flink.connector.kinesis.source.enumerator.KinesisStreamsSourceEnumerator;
import org.apache.flink.connector.kinesis.source.enumerator.KinesisStreamsSourceEnumeratorState;
import org.apache.flink.connector.kinesis.source.enumerator.KinesisStreamsSourceEnumeratorStateSerializer;
import org.apache.flink.connector.kinesis.source.exception.KinesisStreamsSourceException;
import org.apache.flink.connector.kinesis.source.metrics.KinesisShardMetrics;
import org.apache.flink.connector.kinesis.source.proxy.KinesisAsyncStreamProxy;
import org.apache.flink.connector.kinesis.source.proxy.KinesisStreamProxy;
import org.apache.flink.connector.kinesis.source.proxy.StreamProxy;
import org.apache.flink.connector.kinesis.source.reader.KinesisStreamsRecordEmitter;
import org.apache.flink.connector.kinesis.source.reader.KinesisStreamsSourceReader;
import org.apache.flink.connector.kinesis.source.reader.fanout.FanOutKinesisShardSplitReader;
import org.apache.flink.connector.kinesis.source.reader.fanout.StreamConsumerRegistrar;
import org.apache.flink.connector.kinesis.source.reader.polling.PollingKinesisShardSplitReader;
import org.apache.flink.connector.kinesis.source.serialization.KinesisDeserializationSchema;
import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
Expand All @@ -54,16 +61,32 @@
import software.amazon.awssdk.core.internal.retry.SdkDefaultRetryStrategy;
import software.amazon.awssdk.http.SdkHttpClient;
import software.amazon.awssdk.http.apache.ApacheHttpClient;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
import software.amazon.awssdk.retries.StandardRetryStrategy;
import software.amazon.awssdk.retries.api.BackoffStrategy;
import software.amazon.awssdk.retries.api.RetryStrategy;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamConsumerResponse;
import software.amazon.awssdk.services.kinesis.model.LimitExceededException;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import software.amazon.awssdk.utils.AttributeMap;

import java.time.Duration;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;

import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.EFO_CONSUMER_NAME;
import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.EFO_CONSUMER_SUBSCRIPTION_TIMEOUT;
import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.EFO_DESCRIBE_CONSUMER_RETRY_STRATEGY_MAX_ATTEMPTS_OPTION;
import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.EFO_DESCRIBE_CONSUMER_RETRY_STRATEGY_MAX_DELAY_OPTION;
import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.EFO_DESCRIBE_CONSUMER_RETRY_STRATEGY_MIN_DELAY_OPTION;
import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.READER_TYPE;

/**
* The {@link KinesisStreamsSource} is an exactly-once parallel streaming data source that
* subscribes to a single AWS Kinesis data stream. It is able to handle resharding of streams, and
Expand Down Expand Up @@ -136,22 +159,15 @@ public Boundedness getBoundedness() {
public SourceReader<T, KinesisShardSplit> createReader(SourceReaderContext readerContext)
throws Exception {
setUpDeserializationSchema(readerContext);

Map<String, KinesisShardMetrics> shardMetricGroupMap = new ConcurrentHashMap<>();

// We create a new stream proxy for each split reader since they have their own independent
// lifecycle.
Supplier<PollingKinesisShardSplitReader> splitReaderSupplier =
() ->
new PollingKinesisShardSplitReader(
createKinesisStreamProxy(sourceConfig),
shardMetricGroupMap,
sourceConfig);
KinesisStreamsRecordEmitter<T> recordEmitter =
new KinesisStreamsRecordEmitter<>(deserializationSchema);

return new KinesisStreamsSourceReader<>(
new SingleThreadFetcherManager<>(splitReaderSupplier::get),
new SingleThreadFetcherManager<>(
getKinesisShardSplitReaderSupplier(sourceConfig, shardMetricGroupMap)),
recordEmitter,
sourceConfig,
readerContext,
Expand All @@ -170,14 +186,16 @@ public SplitEnumerator<KinesisShardSplit, KinesisStreamsSourceEnumeratorState> c
SplitEnumeratorContext<KinesisShardSplit> enumContext,
KinesisStreamsSourceEnumeratorState checkpoint)
throws Exception {
StreamProxy streamProxy = createKinesisStreamProxy(sourceConfig);
return new KinesisStreamsSourceEnumerator(
enumContext,
streamArn,
sourceConfig,
createKinesisStreamProxy(sourceConfig),
streamProxy,
kinesisShardAssigner,
checkpoint,
preserveShardOrder);
preserveShardOrder,
new StreamConsumerRegistrar(sourceConfig, streamArn, streamProxy));
}

@Override
Expand All @@ -191,7 +209,91 @@ public SimpleVersionedSerializer<KinesisShardSplit> getSplitSerializer() {
return new KinesisStreamsSourceEnumeratorStateSerializer(new KinesisShardSplitSerializer());
}

private Supplier<SplitReader<Record, KinesisShardSplit>> getKinesisShardSplitReaderSupplier(
Configuration sourceConfig, Map<String, KinesisShardMetrics> shardMetricGroupMap) {
KinesisSourceConfigOptions.ReaderType readerType = sourceConfig.get(READER_TYPE);
switch (readerType) {
// We create a new stream proxy for each split reader since they have their own
// independent lifecycle.
case POLLING:
return () ->
new PollingKinesisShardSplitReader(
createKinesisStreamProxy(sourceConfig),
shardMetricGroupMap,
sourceConfig);
case EFO:
String consumerArn = getConsumerArn(streamArn, sourceConfig.get(EFO_CONSUMER_NAME));
return () ->
new FanOutKinesisShardSplitReader(
createKinesisAsyncStreamProxy(streamArn, sourceConfig),
consumerArn,
shardMetricGroupMap,
sourceConfig.get(EFO_CONSUMER_SUBSCRIPTION_TIMEOUT));
default:
throw new IllegalArgumentException("Unsupported reader type: " + readerType);
}
}

private String getConsumerArn(final String streamArn, final String consumerName) {
StandardRetryStrategy.Builder retryStrategyBuilder =
createExpBackoffRetryStrategyBuilder(
sourceConfig.get(EFO_DESCRIBE_CONSUMER_RETRY_STRATEGY_MIN_DELAY_OPTION),
sourceConfig.get(EFO_DESCRIBE_CONSUMER_RETRY_STRATEGY_MAX_DELAY_OPTION),
sourceConfig.get(EFO_DESCRIBE_CONSUMER_RETRY_STRATEGY_MAX_ATTEMPTS_OPTION));
retryStrategyBuilder.retryOnExceptionOrCauseInstanceOf(ResourceNotFoundException.class);
retryStrategyBuilder.retryOnExceptionOrCauseInstanceOf(LimitExceededException.class);

try (StreamProxy streamProxy =
createKinesisStreamProxy(sourceConfig, retryStrategyBuilder.build())) {
DescribeStreamConsumerResponse response =
streamProxy.describeStreamConsumer(streamArn, consumerName);
return response.consumerDescription().consumerARN();
} catch (Exception e) {
throw new KinesisStreamsSourceException(
"Unable to lookup consumer ARN for stream "
+ streamArn
+ " and consumer "
+ consumerName,
e);
}
}

private KinesisAsyncStreamProxy createKinesisAsyncStreamProxy(
String streamArn, Configuration consumerConfig) {
String region =
AWSGeneralUtil.getRegionFromArn(streamArn)
.orElseThrow(
() ->
new IllegalStateException(
"Unable to determine region from stream arn"));
Properties kinesisClientProperties = new Properties();
consumerConfig.addAllToProperties(kinesisClientProperties);
kinesisClientProperties.put(AWSConfigConstants.AWS_REGION, region);

SdkAsyncHttpClient asyncHttpClient =
AWSGeneralUtil.createAsyncHttpClient(
AttributeMap.builder().build(), NettyNioAsyncHttpClient.builder());
KinesisAsyncClient kinesisAsyncClient =
AWSClientUtil.createAwsAsyncClient(
kinesisClientProperties,
asyncHttpClient,
KinesisAsyncClient.builder(),
KinesisStreamsConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT,
KinesisStreamsConfigConstants.KINESIS_CLIENT_USER_AGENT_PREFIX);
return new KinesisAsyncStreamProxy(kinesisAsyncClient, asyncHttpClient);
}

private KinesisStreamProxy createKinesisStreamProxy(Configuration consumerConfig) {
return createKinesisStreamProxy(
consumerConfig,
createExpBackoffRetryStrategy(
sourceConfig.get(AWSConfigOptions.RETRY_STRATEGY_MIN_DELAY_OPTION),
sourceConfig.get(AWSConfigOptions.RETRY_STRATEGY_MAX_DELAY_OPTION),
sourceConfig.get(AWSConfigOptions.RETRY_STRATEGY_MAX_ATTEMPTS_OPTION)));
}

private KinesisStreamProxy createKinesisStreamProxy(
Configuration consumerConfig, RetryStrategy retryStrategy) {
String region =
AWSGeneralUtil.getRegionFromArn(streamArn)
.orElseThrow(
Expand All @@ -203,16 +305,7 @@ private KinesisStreamProxy createKinesisStreamProxy(Configuration consumerConfig
kinesisClientProperties.put(AWSConfigConstants.AWS_REGION, region);

final ClientOverrideConfiguration.Builder overrideBuilder =
ClientOverrideConfiguration.builder()
.retryStrategy(
createExpBackoffRetryStrategy(
sourceConfig.get(
AWSConfigOptions.RETRY_STRATEGY_MIN_DELAY_OPTION),
sourceConfig.get(
AWSConfigOptions.RETRY_STRATEGY_MAX_DELAY_OPTION),
sourceConfig.get(
AWSConfigOptions
.RETRY_STRATEGY_MAX_ATTEMPTS_OPTION)));
ClientOverrideConfiguration.builder().retryStrategy(retryStrategy);

SdkHttpClient httpClient =
AWSGeneralUtil.createSyncHttpClient(
Expand Down Expand Up @@ -248,13 +341,19 @@ public UserCodeClassLoader getUserCodeClassLoader() {

private RetryStrategy createExpBackoffRetryStrategy(
Duration initialDelay, Duration maxDelay, int maxAttempts) {
return createExpBackoffRetryStrategyBuilder(initialDelay, maxDelay, maxAttempts).build();
}

private StandardRetryStrategy.Builder createExpBackoffRetryStrategyBuilder(
Duration initialDelay, Duration maxDelay, int maxAttempts) {
final BackoffStrategy backoffStrategy =
BackoffStrategy.exponentialDelayHalfJitter(initialDelay, maxDelay);

return SdkDefaultRetryStrategy.standardRetryStrategyBuilder()
.backoffStrategy(backoffStrategy)
.throttlingBackoffStrategy(backoffStrategy)
.maxAttempts(maxAttempts)
.build();
.circuitBreakerEnabled(false)
.retryOnExceptionOrCauseInstanceOf(LimitExceededException.class)
.maxAttempts(maxAttempts);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,17 @@
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions;
import org.apache.flink.connector.kinesis.source.enumerator.KinesisShardAssigner;
import org.apache.flink.connector.kinesis.source.enumerator.assigner.ShardAssignerFactory;
import org.apache.flink.connector.kinesis.source.enumerator.assigner.UniformShardAssigner;
import org.apache.flink.connector.kinesis.source.serialization.KinesisDeserializationSchema;

import java.time.Duration;

import static org.apache.flink.connector.aws.config.AWSConfigOptions.RETRY_STRATEGY_MAX_ATTEMPTS_OPTION;
import static org.apache.flink.connector.aws.config.AWSConfigOptions.RETRY_STRATEGY_MAX_DELAY_OPTION;
import static org.apache.flink.connector.aws.config.AWSConfigOptions.RETRY_STRATEGY_MIN_DELAY_OPTION;

/**
* Builder to construct the {@link KinesisStreamsSource}.
*
Expand Down Expand Up @@ -128,15 +131,9 @@ public KinesisStreamsSource<T> build() {
}

private void setSourceConfigurations() {
overrideIfExists(
KinesisSourceConfigOptions.RETRY_STRATEGY_MIN_DELAY_OPTION,
this.retryStrategyMinDelay);
overrideIfExists(
KinesisSourceConfigOptions.RETRY_STRATEGY_MAX_DELAY_OPTION,
this.retryStrategyMaxDelay);
overrideIfExists(
KinesisSourceConfigOptions.RETRY_STRATEGY_MAX_ATTEMPTS_OPTION,
this.retryStrategyMaxAttempts);
overrideIfExists(RETRY_STRATEGY_MIN_DELAY_OPTION, this.retryStrategyMinDelay);
overrideIfExists(RETRY_STRATEGY_MAX_DELAY_OPTION, this.retryStrategyMaxDelay);
overrideIfExists(RETRY_STRATEGY_MAX_ATTEMPTS_OPTION, this.retryStrategyMaxAttempts);
}

private <E> void overrideIfExists(ConfigOption<E> configOption, E value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,33 @@
package org.apache.flink.connector.kinesis.source.config;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.connector.aws.config.AWSConfigOptions;

import java.time.Duration;

/** Constants to be used with the KinesisStreamsSource. */
@Experimental
@PublicEvolving
public class KinesisSourceConfigOptions extends AWSConfigOptions {
public class KinesisSourceConfigOptions {
/** Marks the initial position to use when reading from the Kinesis stream. */
public enum InitialPosition {
LATEST,
TRIM_HORIZON,
AT_TIMESTAMP
}

/** Defines mechanism used to consume records from Kinesis stream. */
public enum ReaderType {
POLLING,
EFO
}

/** Defines lifecycle management of EFO consumer on Kinesis stream. */
public enum ConsumerLifecycle {
JOB_MANAGED,
SELF_MANAGED
}

public static final ConfigOption<InitialPosition> STREAM_INITIAL_POSITION =
ConfigOptions.key("source.init.position")
.enumType(InitialPosition.class)
Expand Down Expand Up @@ -69,4 +78,57 @@ public enum InitialPosition {
.defaultValue(10000)
.withDescription(
"The maximum number of records to try to get each time we fetch records from a AWS Kinesis shard");

public static final ConfigOption<ReaderType> READER_TYPE =
ConfigOptions.key("source.reader.type")
.enumType(ReaderType.class)
.defaultValue(ReaderType.POLLING)
.withDescription("The type of reader used to read from the Kinesis stream.");

public static final ConfigOption<ConsumerLifecycle> EFO_CONSUMER_LIFECYCLE =
ConfigOptions.key("source.efo.lifecycle")
.enumType(ConsumerLifecycle.class)
.defaultValue(ConsumerLifecycle.JOB_MANAGED)
.withDescription(
"Setting to control whether the lifecycle of EFO consumer is managed by the Flink job. If JOB_MANAGED, then the Flink job will register the consumer on startup and deregister it on shutdown.");

public static final ConfigOption<String> EFO_CONSUMER_NAME =
ConfigOptions.key("source.efo.consumer.name").stringType().noDefaultValue();

public static final ConfigOption<Duration> EFO_CONSUMER_SUBSCRIPTION_TIMEOUT =
ConfigOptions.key("source.efo.subscription.timeout")
.durationType()
.defaultValue(Duration.ofMillis(60000))
.withDescription("Timeout for EFO Consumer subscription.");

public static final ConfigOption<Duration>
EFO_DESCRIBE_CONSUMER_RETRY_STRATEGY_MIN_DELAY_OPTION =
ConfigOptions.key("source.efo.describe.retry-strategy.delay.min")
.durationType()
.defaultValue(Duration.ofMillis(2000))
.withDescription(
"Base delay for the exponential backoff retry strategy");

public static final ConfigOption<Duration>
EFO_DESCRIBE_CONSUMER_RETRY_STRATEGY_MAX_DELAY_OPTION =
ConfigOptions.key("source.efo.describe.retry-strategy.delay.max")
.durationType()
.defaultValue(Duration.ofMillis(60000))
.withDescription(
"Max delay for the exponential backoff retry strategy");

public static final ConfigOption<Integer>
EFO_DESCRIBE_CONSUMER_RETRY_STRATEGY_MAX_ATTEMPTS_OPTION =
ConfigOptions.key("source.efo.describe.retry-strategy.attempts.max")
.intType()
.defaultValue(100)
.withDescription(
"Maximum number of attempts for the exponential backoff retry strategy");

public static final ConfigOption<Duration> EFO_DEREGISTER_CONSUMER_TIMEOUT =
ConfigOptions.key("source.efo.deregister.timeout")
.durationType()
.defaultValue(Duration.ofMillis(10000))
.withDescription(
"Timeout for consumer deregistration. When timeout is reached, code will continue as per normal.");
}
Loading

0 comments on commit b293b77

Please sign in to comment.