From 8f3653b0dda4475e7497a607f9e8f8bcbfd41e8b Mon Sep 17 00:00:00 2001 From: Hong Teoh Date: Thu, 12 Sep 2024 16:05:44 +0100 Subject: [PATCH] [FLINK-31980][Connectors/Kinesis] Implement EFO subscription --- .../kinesis/source/KinesisStreamsSource.java | 145 ++++++-- .../source/KinesisStreamsSourceBuilder.java | 17 +- .../config/KinesisSourceConfigOptions.java | 70 +++- .../KinesisStreamsSourceEnumerator.java | 18 +- .../source/proxy/AsyncStreamProxy.java | 37 ++ .../source/proxy/KinesisAsyncStreamProxy.java | 66 ++++ .../source/proxy/KinesisStreamProxy.java | 34 ++ .../kinesis/source/proxy/StreamProxy.java | 32 ++ .../reader/KinesisShardSplitReaderBase.java | 10 +- .../fanout/FanOutKinesisShardSplitReader.java | 56 ++- .../FanOutKinesisShardSubscription.java | 335 +++++++++++++++++ .../fanout/StreamConsumerRegistrar.java | 150 ++++++++ .../PollingKinesisShardSplitReader.java | 13 +- .../source/split/StartingPositionUtil.java | 72 ++++ .../source/KinesisStreamsSourceITCase.java | 5 +- .../KinesisStreamsSourceEnumeratorTest.java | 37 +- .../proxy/KinesisAsyncStreamProxyTest.java | 99 +++++ .../PollingKinesisShardSplitReaderTest.java | 9 +- .../FanOutKinesisShardSplitReaderTest.java | 221 +++++++++++ .../fanout/StreamConsumerRegistrarTest.java | 212 +++++++++++ .../split/StartingPositionUtilTest.java | 72 ++++ .../FakeKinesisFanOutBehaviorsFactory.java | 345 ++++++++++++++++++ .../util/KinesisAsyncClientProvider.java | 75 ++++ .../util/KinesisStreamProxyProvider.java | 107 ++++++ .../kinesis/source/util/TestUtil.java | 2 + 25 files changed, 2176 insertions(+), 63 deletions(-) create mode 100644 flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/proxy/AsyncStreamProxy.java create mode 100644 flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/proxy/KinesisAsyncStreamProxy.java create mode 100644 flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscription.java create mode 100644 flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/StreamConsumerRegistrar.java create mode 100644 flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/split/StartingPositionUtil.java create mode 100644 flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/proxy/KinesisAsyncStreamProxyTest.java create mode 100644 flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReaderTest.java create mode 100644 flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/fanout/StreamConsumerRegistrarTest.java create mode 100644 flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/split/StartingPositionUtilTest.java create mode 100644 flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/FakeKinesisFanOutBehaviorsFactory.java create mode 100644 flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/KinesisAsyncClientProvider.java diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java index 0fabdf557..5fba71c68 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java @@ -32,15 +32,22 @@ import org.apache.flink.connector.aws.util.AWSClientUtil; import org.apache.flink.connector.aws.util.AWSGeneralUtil; import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; import org.apache.flink.connector.kinesis.sink.KinesisStreamsConfigConstants; +import org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions; import org.apache.flink.connector.kinesis.source.enumerator.KinesisShardAssigner; import org.apache.flink.connector.kinesis.source.enumerator.KinesisStreamsSourceEnumerator; import org.apache.flink.connector.kinesis.source.enumerator.KinesisStreamsSourceEnumeratorState; import org.apache.flink.connector.kinesis.source.enumerator.KinesisStreamsSourceEnumeratorStateSerializer; +import org.apache.flink.connector.kinesis.source.exception.KinesisStreamsSourceException; import org.apache.flink.connector.kinesis.source.metrics.KinesisShardMetrics; +import org.apache.flink.connector.kinesis.source.proxy.KinesisAsyncStreamProxy; import org.apache.flink.connector.kinesis.source.proxy.KinesisStreamProxy; +import org.apache.flink.connector.kinesis.source.proxy.StreamProxy; import org.apache.flink.connector.kinesis.source.reader.KinesisStreamsRecordEmitter; import org.apache.flink.connector.kinesis.source.reader.KinesisStreamsSourceReader; +import org.apache.flink.connector.kinesis.source.reader.fanout.FanOutKinesisShardSplitReader; +import org.apache.flink.connector.kinesis.source.reader.fanout.StreamConsumerRegistrar; import org.apache.flink.connector.kinesis.source.reader.polling.PollingKinesisShardSplitReader; import org.apache.flink.connector.kinesis.source.serialization.KinesisDeserializationSchema; import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit; @@ -54,9 +61,18 @@ import software.amazon.awssdk.core.internal.retry.SdkDefaultRetryStrategy; import software.amazon.awssdk.http.SdkHttpClient; import software.amazon.awssdk.http.apache.ApacheHttpClient; +import software.amazon.awssdk.http.async.SdkAsyncHttpClient; +import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; +import software.amazon.awssdk.retries.StandardRetryStrategy; import software.amazon.awssdk.retries.api.BackoffStrategy; import software.amazon.awssdk.retries.api.RetryStrategy; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisClient; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamConsumerResponse; +import software.amazon.awssdk.services.kinesis.model.LimitExceededException; +import software.amazon.awssdk.services.kinesis.model.Record; +import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; +import software.amazon.awssdk.utils.AttributeMap; import java.time.Duration; import java.util.Map; @@ -64,6 +80,13 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.function.Supplier; +import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.EFO_CONSUMER_NAME; +import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.EFO_CONSUMER_SUBSCRIPTION_TIMEOUT; +import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.EFO_DESCRIBE_CONSUMER_RETRY_STRATEGY_MAX_ATTEMPTS_OPTION; +import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.EFO_DESCRIBE_CONSUMER_RETRY_STRATEGY_MAX_DELAY_OPTION; +import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.EFO_DESCRIBE_CONSUMER_RETRY_STRATEGY_MIN_DELAY_OPTION; +import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.READER_TYPE; + /** * The {@link KinesisStreamsSource} is an exactly-once parallel streaming data source that * subscribes to a single AWS Kinesis data stream. It is able to handle resharding of streams, and @@ -136,22 +159,15 @@ public Boundedness getBoundedness() { public SourceReader createReader(SourceReaderContext readerContext) throws Exception { setUpDeserializationSchema(readerContext); - Map shardMetricGroupMap = new ConcurrentHashMap<>(); // We create a new stream proxy for each split reader since they have their own independent // lifecycle. - Supplier splitReaderSupplier = - () -> - new PollingKinesisShardSplitReader( - createKinesisStreamProxy(sourceConfig), - shardMetricGroupMap, - sourceConfig); KinesisStreamsRecordEmitter recordEmitter = new KinesisStreamsRecordEmitter<>(deserializationSchema); - return new KinesisStreamsSourceReader<>( - new SingleThreadFetcherManager<>(splitReaderSupplier::get), + new SingleThreadFetcherManager<>( + getKinesisShardSplitReaderSupplier(sourceConfig, shardMetricGroupMap)), recordEmitter, sourceConfig, readerContext, @@ -170,14 +186,16 @@ public SplitEnumerator c SplitEnumeratorContext enumContext, KinesisStreamsSourceEnumeratorState checkpoint) throws Exception { + StreamProxy streamProxy = createKinesisStreamProxy(sourceConfig); return new KinesisStreamsSourceEnumerator( enumContext, streamArn, sourceConfig, - createKinesisStreamProxy(sourceConfig), + streamProxy, kinesisShardAssigner, checkpoint, - preserveShardOrder); + preserveShardOrder, + new StreamConsumerRegistrar(sourceConfig, streamArn, streamProxy)); } @Override @@ -191,7 +209,91 @@ public SimpleVersionedSerializer getSplitSerializer() { return new KinesisStreamsSourceEnumeratorStateSerializer(new KinesisShardSplitSerializer()); } + private Supplier> getKinesisShardSplitReaderSupplier( + Configuration sourceConfig, Map shardMetricGroupMap) { + KinesisSourceConfigOptions.ReaderType readerType = sourceConfig.get(READER_TYPE); + switch (readerType) { + // We create a new stream proxy for each split reader since they have their own + // independent lifecycle. + case POLLING: + return () -> + new PollingKinesisShardSplitReader( + createKinesisStreamProxy(sourceConfig), + shardMetricGroupMap, + sourceConfig); + case EFO: + String consumerArn = getConsumerArn(streamArn, sourceConfig.get(EFO_CONSUMER_NAME)); + return () -> + new FanOutKinesisShardSplitReader( + createKinesisAsyncStreamProxy(streamArn, sourceConfig), + consumerArn, + shardMetricGroupMap, + sourceConfig.get(EFO_CONSUMER_SUBSCRIPTION_TIMEOUT)); + default: + throw new IllegalArgumentException("Unsupported reader type: " + readerType); + } + } + + private String getConsumerArn(final String streamArn, final String consumerName) { + StandardRetryStrategy.Builder retryStrategyBuilder = + createExpBackoffRetryStrategyBuilder( + sourceConfig.get(EFO_DESCRIBE_CONSUMER_RETRY_STRATEGY_MIN_DELAY_OPTION), + sourceConfig.get(EFO_DESCRIBE_CONSUMER_RETRY_STRATEGY_MAX_DELAY_OPTION), + sourceConfig.get(EFO_DESCRIBE_CONSUMER_RETRY_STRATEGY_MAX_ATTEMPTS_OPTION)); + retryStrategyBuilder.retryOnExceptionOrCauseInstanceOf(ResourceNotFoundException.class); + retryStrategyBuilder.retryOnExceptionOrCauseInstanceOf(LimitExceededException.class); + + try (StreamProxy streamProxy = + createKinesisStreamProxy(sourceConfig, retryStrategyBuilder.build())) { + DescribeStreamConsumerResponse response = + streamProxy.describeStreamConsumer(streamArn, consumerName); + return response.consumerDescription().consumerARN(); + } catch (Exception e) { + throw new KinesisStreamsSourceException( + "Unable to lookup consumer ARN for stream " + + streamArn + + " and consumer " + + consumerName, + e); + } + } + + private KinesisAsyncStreamProxy createKinesisAsyncStreamProxy( + String streamArn, Configuration consumerConfig) { + String region = + AWSGeneralUtil.getRegionFromArn(streamArn) + .orElseThrow( + () -> + new IllegalStateException( + "Unable to determine region from stream arn")); + Properties kinesisClientProperties = new Properties(); + consumerConfig.addAllToProperties(kinesisClientProperties); + kinesisClientProperties.put(AWSConfigConstants.AWS_REGION, region); + + SdkAsyncHttpClient asyncHttpClient = + AWSGeneralUtil.createAsyncHttpClient( + AttributeMap.builder().build(), NettyNioAsyncHttpClient.builder()); + KinesisAsyncClient kinesisAsyncClient = + AWSClientUtil.createAwsAsyncClient( + kinesisClientProperties, + asyncHttpClient, + KinesisAsyncClient.builder(), + KinesisStreamsConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT, + KinesisStreamsConfigConstants.KINESIS_CLIENT_USER_AGENT_PREFIX); + return new KinesisAsyncStreamProxy(kinesisAsyncClient, asyncHttpClient); + } + private KinesisStreamProxy createKinesisStreamProxy(Configuration consumerConfig) { + return createKinesisStreamProxy( + consumerConfig, + createExpBackoffRetryStrategy( + sourceConfig.get(AWSConfigOptions.RETRY_STRATEGY_MIN_DELAY_OPTION), + sourceConfig.get(AWSConfigOptions.RETRY_STRATEGY_MAX_DELAY_OPTION), + sourceConfig.get(AWSConfigOptions.RETRY_STRATEGY_MAX_ATTEMPTS_OPTION))); + } + + private KinesisStreamProxy createKinesisStreamProxy( + Configuration consumerConfig, RetryStrategy retryStrategy) { String region = AWSGeneralUtil.getRegionFromArn(streamArn) .orElseThrow( @@ -203,16 +305,7 @@ private KinesisStreamProxy createKinesisStreamProxy(Configuration consumerConfig kinesisClientProperties.put(AWSConfigConstants.AWS_REGION, region); final ClientOverrideConfiguration.Builder overrideBuilder = - ClientOverrideConfiguration.builder() - .retryStrategy( - createExpBackoffRetryStrategy( - sourceConfig.get( - AWSConfigOptions.RETRY_STRATEGY_MIN_DELAY_OPTION), - sourceConfig.get( - AWSConfigOptions.RETRY_STRATEGY_MAX_DELAY_OPTION), - sourceConfig.get( - AWSConfigOptions - .RETRY_STRATEGY_MAX_ATTEMPTS_OPTION))); + ClientOverrideConfiguration.builder().retryStrategy(retryStrategy); SdkHttpClient httpClient = AWSGeneralUtil.createSyncHttpClient( @@ -248,13 +341,19 @@ public UserCodeClassLoader getUserCodeClassLoader() { private RetryStrategy createExpBackoffRetryStrategy( Duration initialDelay, Duration maxDelay, int maxAttempts) { + return createExpBackoffRetryStrategyBuilder(initialDelay, maxDelay, maxAttempts).build(); + } + + private StandardRetryStrategy.Builder createExpBackoffRetryStrategyBuilder( + Duration initialDelay, Duration maxDelay, int maxAttempts) { final BackoffStrategy backoffStrategy = BackoffStrategy.exponentialDelayHalfJitter(initialDelay, maxDelay); return SdkDefaultRetryStrategy.standardRetryStrategyBuilder() .backoffStrategy(backoffStrategy) .throttlingBackoffStrategy(backoffStrategy) - .maxAttempts(maxAttempts) - .build(); + .circuitBreakerEnabled(false) + .retryOnExceptionOrCauseInstanceOf(LimitExceededException.class) + .maxAttempts(maxAttempts); } } diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSourceBuilder.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSourceBuilder.java index 05a8d77be..8a6a503ee 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSourceBuilder.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSourceBuilder.java @@ -22,7 +22,6 @@ import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; -import org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions; import org.apache.flink.connector.kinesis.source.enumerator.KinesisShardAssigner; import org.apache.flink.connector.kinesis.source.enumerator.assigner.ShardAssignerFactory; import org.apache.flink.connector.kinesis.source.enumerator.assigner.UniformShardAssigner; @@ -30,6 +29,10 @@ import java.time.Duration; +import static org.apache.flink.connector.aws.config.AWSConfigOptions.RETRY_STRATEGY_MAX_ATTEMPTS_OPTION; +import static org.apache.flink.connector.aws.config.AWSConfigOptions.RETRY_STRATEGY_MAX_DELAY_OPTION; +import static org.apache.flink.connector.aws.config.AWSConfigOptions.RETRY_STRATEGY_MIN_DELAY_OPTION; + /** * Builder to construct the {@link KinesisStreamsSource}. * @@ -128,15 +131,9 @@ public KinesisStreamsSource build() { } private void setSourceConfigurations() { - overrideIfExists( - KinesisSourceConfigOptions.RETRY_STRATEGY_MIN_DELAY_OPTION, - this.retryStrategyMinDelay); - overrideIfExists( - KinesisSourceConfigOptions.RETRY_STRATEGY_MAX_DELAY_OPTION, - this.retryStrategyMaxDelay); - overrideIfExists( - KinesisSourceConfigOptions.RETRY_STRATEGY_MAX_ATTEMPTS_OPTION, - this.retryStrategyMaxAttempts); + overrideIfExists(RETRY_STRATEGY_MIN_DELAY_OPTION, this.retryStrategyMinDelay); + overrideIfExists(RETRY_STRATEGY_MAX_DELAY_OPTION, this.retryStrategyMaxDelay); + overrideIfExists(RETRY_STRATEGY_MAX_ATTEMPTS_OPTION, this.retryStrategyMaxAttempts); } private void overrideIfExists(ConfigOption configOption, E value) { diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/config/KinesisSourceConfigOptions.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/config/KinesisSourceConfigOptions.java index 122f1396d..92e18896e 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/config/KinesisSourceConfigOptions.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/config/KinesisSourceConfigOptions.java @@ -19,17 +19,14 @@ package org.apache.flink.connector.kinesis.source.config; import org.apache.flink.annotation.Experimental; -import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; -import org.apache.flink.connector.aws.config.AWSConfigOptions; import java.time.Duration; /** Constants to be used with the KinesisStreamsSource. */ @Experimental -@PublicEvolving -public class KinesisSourceConfigOptions extends AWSConfigOptions { +public class KinesisSourceConfigOptions { /** Marks the initial position to use when reading from the Kinesis stream. */ public enum InitialPosition { LATEST, @@ -37,6 +34,18 @@ public enum InitialPosition { AT_TIMESTAMP } + /** Defines mechanism used to consume records from Kinesis stream. */ + public enum ReaderType { + POLLING, + EFO + } + + /** Defines lifecycle management of EFO consumer on Kinesis stream. */ + public enum ConsumerLifecycle { + JOB_MANAGED, + SELF_MANAGED + } + public static final ConfigOption STREAM_INITIAL_POSITION = ConfigOptions.key("source.init.position") .enumType(InitialPosition.class) @@ -69,4 +78,57 @@ public enum InitialPosition { .defaultValue(10000) .withDescription( "The maximum number of records to try to get each time we fetch records from a AWS Kinesis shard"); + + public static final ConfigOption READER_TYPE = + ConfigOptions.key("source.reader.type") + .enumType(ReaderType.class) + .defaultValue(ReaderType.POLLING) + .withDescription("The type of reader used to read from the Kinesis stream."); + + public static final ConfigOption EFO_CONSUMER_LIFECYCLE = + ConfigOptions.key("source.efo.lifecycle") + .enumType(ConsumerLifecycle.class) + .defaultValue(ConsumerLifecycle.JOB_MANAGED) + .withDescription( + "Setting to control whether the lifecycle of EFO consumer is managed by the Flink job. If JOB_MANAGED, then the Flink job will register the consumer on startup and deregister it on shutdown."); + + public static final ConfigOption EFO_CONSUMER_NAME = + ConfigOptions.key("source.efo.consumer.name").stringType().noDefaultValue(); + + public static final ConfigOption EFO_CONSUMER_SUBSCRIPTION_TIMEOUT = + ConfigOptions.key("source.efo.subscription.timeout") + .durationType() + .defaultValue(Duration.ofMillis(60000)) + .withDescription("Timeout for EFO Consumer subscription."); + + public static final ConfigOption + EFO_DESCRIBE_CONSUMER_RETRY_STRATEGY_MIN_DELAY_OPTION = + ConfigOptions.key("source.efo.describe.retry-strategy.delay.min") + .durationType() + .defaultValue(Duration.ofMillis(2000)) + .withDescription( + "Base delay for the exponential backoff retry strategy"); + + public static final ConfigOption + EFO_DESCRIBE_CONSUMER_RETRY_STRATEGY_MAX_DELAY_OPTION = + ConfigOptions.key("source.efo.describe.retry-strategy.delay.max") + .durationType() + .defaultValue(Duration.ofMillis(60000)) + .withDescription( + "Max delay for the exponential backoff retry strategy"); + + public static final ConfigOption + EFO_DESCRIBE_CONSUMER_RETRY_STRATEGY_MAX_ATTEMPTS_OPTION = + ConfigOptions.key("source.efo.describe.retry-strategy.attempts.max") + .intType() + .defaultValue(100) + .withDescription( + "Maximum number of attempts for the exponential backoff retry strategy"); + + public static final ConfigOption EFO_DEREGISTER_CONSUMER_TIMEOUT = + ConfigOptions.key("source.efo.deregister.timeout") + .durationType() + .defaultValue(Duration.ofMillis(10000)) + .withDescription( + "Timeout for consumer deregistration. When timeout is reached, code will continue as per normal."); } diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumerator.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumerator.java index 245b945a1..0a39450f2 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumerator.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumerator.java @@ -32,6 +32,7 @@ import org.apache.flink.connector.kinesis.source.exception.KinesisStreamsSourceException; import org.apache.flink.connector.kinesis.source.proxy.ListShardsStartingPosition; import org.apache.flink.connector.kinesis.source.proxy.StreamProxy; +import org.apache.flink.connector.kinesis.source.reader.fanout.StreamConsumerRegistrar; import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit; import org.apache.flink.connector.kinesis.source.split.StartingPosition; @@ -81,6 +82,7 @@ public class KinesisStreamsSourceEnumerator private final Map> splitAssignment = new HashMap<>(); private String lastSeenShardId; + private final StreamConsumerRegistrar streamConsumerRegistrar; public KinesisStreamsSourceEnumerator( SplitEnumeratorContext context, @@ -89,13 +91,15 @@ public KinesisStreamsSourceEnumerator( StreamProxy streamProxy, KinesisShardAssigner shardAssigner, KinesisStreamsSourceEnumeratorState state, - boolean preserveShardOrder) { + boolean preserveShardOrder, + StreamConsumerRegistrar streamConsumerRegistrar) { this.context = context; this.streamArn = streamArn; this.sourceConfig = sourceConfig; this.streamProxy = streamProxy; this.shardAssigner = shardAssigner; this.shardAssignerContext = new ShardAssignerContext(splitAssignment, context); + this.streamConsumerRegistrar = streamConsumerRegistrar; if (state == null) { this.lastSeenShardId = null; this.splitTracker = new SplitTracker(preserveShardOrder); @@ -107,6 +111,8 @@ public KinesisStreamsSourceEnumerator( @Override public void start() { + streamConsumerRegistrar.registerStreamConsumer(); + if (lastSeenShardId == null) { context.callAsync(this::initialDiscoverSplits, this::processDiscoveredSplits); } @@ -163,6 +169,7 @@ public KinesisStreamsSourceEnumeratorState snapshotState(long checkpointId) thro @Override public void close() throws IOException { + streamConsumerRegistrar.deregisterStreamConsumer(); streamProxy.close(); } @@ -210,6 +217,10 @@ ListShardsStartingPosition getInitialPositionForShardDiscovery( InitialPosition initialPosition, Instant currentTime) { switch (initialPosition) { case LATEST: + LOG.info( + "Starting consumption from stream {} from LATEST. This translates into AT_TIMESTAMP from {}", + streamArn, + currentTime.toString()); return ListShardsStartingPosition.fromTimestamp(currentTime); case AT_TIMESTAMP: Instant timestamp = @@ -218,8 +229,13 @@ ListShardsStartingPosition getInitialPositionForShardDiscovery( () -> new IllegalArgumentException( "Stream initial timestamp must be specified when initial position set to AT_TIMESTAMP")); + LOG.info( + "Starting consumption from stream {} from AT_TIMESTAMP, starting from {}", + streamArn, + timestamp.toString()); return ListShardsStartingPosition.fromTimestamp(timestamp); case TRIM_HORIZON: + LOG.info("Starting consumption from stream {} from TRIM_HORIZON.", streamArn); return ListShardsStartingPosition.fromStart(); } diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/proxy/AsyncStreamProxy.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/proxy/AsyncStreamProxy.java new file mode 100644 index 000000000..9cdcb40df --- /dev/null +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/proxy/AsyncStreamProxy.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kinesis.source.proxy; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.kinesis.source.split.StartingPosition; + +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler; + +import java.io.Closeable; +import java.util.concurrent.CompletableFuture; + +/** Interface for interactions with async client of Kinesis streams. */ +@Internal +public interface AsyncStreamProxy extends Closeable { + CompletableFuture subscribeToShard( + String consumerArn, + String shardId, + StartingPosition startingPosition, + SubscribeToShardResponseHandler responseHandler); +} diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/proxy/KinesisAsyncStreamProxy.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/proxy/KinesisAsyncStreamProxy.java new file mode 100644 index 000000000..56d097e99 --- /dev/null +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/proxy/KinesisAsyncStreamProxy.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kinesis.source.proxy; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.kinesis.source.split.StartingPosition; + +import software.amazon.awssdk.http.async.SdkAsyncHttpClient; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; + +import static org.apache.flink.connector.kinesis.source.split.StartingPositionUtil.toSdkStartingPosition; + +/** Implementation of async stream proxy for the Kinesis client. */ +@Internal +public class KinesisAsyncStreamProxy implements AsyncStreamProxy { + private final KinesisAsyncClient kinesisAsyncClient; + private final SdkAsyncHttpClient asyncHttpClient; + + public KinesisAsyncStreamProxy( + KinesisAsyncClient kinesisAsyncClient, SdkAsyncHttpClient asyncHttpClient) { + this.kinesisAsyncClient = kinesisAsyncClient; + this.asyncHttpClient = asyncHttpClient; + } + + @Override + public CompletableFuture subscribeToShard( + String consumerArn, + String shardId, + StartingPosition startingPosition, + SubscribeToShardResponseHandler responseHandler) { + SubscribeToShardRequest request = + SubscribeToShardRequest.builder() + .consumerARN(consumerArn) + .shardId(shardId) + .startingPosition(toSdkStartingPosition(startingPosition)) + .build(); + return kinesisAsyncClient.subscribeToShard(request, responseHandler); + } + + @Override + public void close() throws IOException { + kinesisAsyncClient.close(); + asyncHttpClient.close(); + } +} diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/proxy/KinesisStreamProxy.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/proxy/KinesisStreamProxy.java index 706c58887..1a64d1336 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/proxy/KinesisStreamProxy.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/proxy/KinesisStreamProxy.java @@ -25,6 +25,10 @@ import org.slf4j.LoggerFactory; import software.amazon.awssdk.http.SdkHttpClient; import software.amazon.awssdk.services.kinesis.KinesisClient; +import software.amazon.awssdk.services.kinesis.model.DeregisterStreamConsumerRequest; +import software.amazon.awssdk.services.kinesis.model.DeregisterStreamConsumerResponse; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamConsumerRequest; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamConsumerResponse; import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryRequest; import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryResponse; import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException; @@ -33,6 +37,8 @@ import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest; import software.amazon.awssdk.services.kinesis.model.ListShardsRequest; import software.amazon.awssdk.services.kinesis.model.ListShardsResponse; +import software.amazon.awssdk.services.kinesis.model.RegisterStreamConsumerRequest; +import software.amazon.awssdk.services.kinesis.model.RegisterStreamConsumerResponse; import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.awssdk.services.kinesis.model.StreamDescriptionSummary; @@ -167,6 +173,34 @@ private GetRecordsResponse getRecords( .build()); } + // Enhanced Fan-Out Consumer - related methods + + @Override + public RegisterStreamConsumerResponse registerStreamConsumer( + String streamArn, String consumerName) { + return kinesisClient.registerStreamConsumer( + RegisterStreamConsumerRequest.builder() + .streamARN(streamArn) + .consumerName(consumerName) + .build()); + } + + @Override + public DeregisterStreamConsumerResponse deregisterStreamConsumer(String consumerArn) { + return kinesisClient.deregisterStreamConsumer( + DeregisterStreamConsumerRequest.builder().consumerARN(consumerArn).build()); + } + + @Override + public DescribeStreamConsumerResponse describeStreamConsumer( + String streamArn, String consumerName) { + return kinesisClient.describeStreamConsumer( + DescribeStreamConsumerRequest.builder() + .streamARN(streamArn) + .consumerName(consumerName) + .build()); + } + @Override public void close() throws IOException { kinesisClient.close(); diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/proxy/StreamProxy.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/proxy/StreamProxy.java index bb449db71..7f6ed46f6 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/proxy/StreamProxy.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/proxy/StreamProxy.java @@ -21,7 +21,10 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.connector.kinesis.source.split.StartingPosition; +import software.amazon.awssdk.services.kinesis.model.DeregisterStreamConsumerResponse; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamConsumerResponse; import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; +import software.amazon.awssdk.services.kinesis.model.RegisterStreamConsumerResponse; import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.awssdk.services.kinesis.model.StreamDescriptionSummary; @@ -64,4 +67,33 @@ GetRecordsResponse getRecords( String shardId, StartingPosition startingPosition, int maxRecordsToGet); + + // Enhanced Fan-Out Consumer related methods + /** + * Registers an enhanced fan-out consumer against the stream. + * + * @param streamArn the ARN of the stream + * @param consumerName the consumerName + * @return the register stream consumer response + */ + RegisterStreamConsumerResponse registerStreamConsumer( + final String streamArn, final String consumerName); + + /** + * De-registers an enhanced fan-out consumer against the stream. + * + * @param consumerArn the ARN of the consumer to deregister + * @return the de-register stream consumer response + */ + DeregisterStreamConsumerResponse deregisterStreamConsumer(final String consumerArn); + + /** + * Describe stream consumer. + * + * @param streamArn the ARN of the Kinesis stream + * @param consumerName the name of the Kinesis consumer + * @return the describe stream consumer response + */ + DescribeStreamConsumerResponse describeStreamConsumer( + final String streamArn, final String consumerName); } diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisShardSplitReaderBase.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisShardSplitReaderBase.java index 94cb50101..3de94484b 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisShardSplitReaderBase.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisShardSplitReaderBase.java @@ -19,7 +19,7 @@ package org.apache.flink.connector.kinesis.source.reader; import org.apache.flink.annotation.Internal; -=import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; import org.apache.flink.connector.kinesis.source.metrics.KinesisShardMetrics; @@ -90,6 +90,11 @@ public RecordsWithSplitIds fetch() throws IOException { Collections.emptyIterator(), splitState.getSplitId(), true); } + if (recordBatch == null) { + assignedSplits.add(splitState); + return INCOMPLETE_SHARD_EMPTY_RECORDS; + } + if (!recordBatch.isCompleted()) { assignedSplits.add(splitState); } @@ -124,7 +129,8 @@ public RecordsWithSplitIds fetch() throws IOException { * Main method implementations must implement to fetch records from Kinesis. * * @param splitState split to fetch records for - * @return RecordBatch containing the fetched records and metadata + * @return RecordBatch containing the fetched records and metadata. Returns null if there are no + * records but fetching should be retried at a later time. */ protected abstract RecordBatch fetchRecords(KinesisShardSplitState splitState); diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReader.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReader.java index 21d0e81ff..8ae20293d 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReader.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReader.java @@ -19,10 +19,17 @@ package org.apache.flink.connector.kinesis.source.reader.fanout; import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; import org.apache.flink.connector.kinesis.source.metrics.KinesisShardMetrics; +import org.apache.flink.connector.kinesis.source.proxy.AsyncStreamProxy; import org.apache.flink.connector.kinesis.source.reader.KinesisShardSplitReaderBase; +import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit; import org.apache.flink.connector.kinesis.source.split.KinesisShardSplitState; +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; + +import java.time.Duration; +import java.util.HashMap; import java.util.Map; /** @@ -31,15 +38,58 @@ */ @Internal public class FanOutKinesisShardSplitReader extends KinesisShardSplitReaderBase { - protected FanOutKinesisShardSplitReader(Map shardMetricGroupMap) { + private final AsyncStreamProxy asyncStreamProxy; + private final String consumerArn; + private final Duration subscriptionTimeout; + + private final Map splitSubscriptions = new HashMap<>(); + + public FanOutKinesisShardSplitReader( + AsyncStreamProxy asyncStreamProxy, + String consumerArn, + Map shardMetricGroupMap, + Duration subscriptionTimeout) { super(shardMetricGroupMap); + this.asyncStreamProxy = asyncStreamProxy; + this.consumerArn = consumerArn; + this.subscriptionTimeout = subscriptionTimeout; } @Override protected RecordBatch fetchRecords(KinesisShardSplitState splitState) { - return null; + FanOutKinesisShardSubscription subscription = + splitSubscriptions.get(splitState.getShardId()); + + SubscribeToShardEvent event = subscription.nextEvent(); + if (event == null) { + return null; + } + + boolean shardCompleted = event.continuationSequenceNumber() == null; + if (shardCompleted) { + splitSubscriptions.remove(splitState.getShardId()); + } + return new RecordBatch(event.records(), event.millisBehindLatest(), shardCompleted); } @Override - public void close() throws Exception {} + public void handleSplitsChanges(SplitsChange splitsChanges) { + super.handleSplitsChanges(splitsChanges); + for (KinesisShardSplit split : splitsChanges.splits()) { + FanOutKinesisShardSubscription subscription = + new FanOutKinesisShardSubscription( + asyncStreamProxy, + consumerArn, + split.getShardId(), + split.getStartingPosition(), + subscriptionTimeout); + subscription.activateSubscription(); + splitSubscriptions.put(split.splitId(), subscription); + } + } + + @Override + public void close() throws Exception { + asyncStreamProxy.close(); + } } diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscription.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscription.java new file mode 100644 index 000000000..9167467cc --- /dev/null +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscription.java @@ -0,0 +1,335 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kinesis.source.reader.fanout; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.kinesis.source.exception.KinesisStreamsSourceException; +import org.apache.flink.connector.kinesis.source.proxy.AsyncStreamProxy; +import org.apache.flink.connector.kinesis.source.split.StartingPosition; +import org.apache.flink.util.ExceptionUtils; + +import io.netty.handler.timeout.ReadTimeoutException; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.kinesis.model.InternalFailureException; +import software.amazon.awssdk.services.kinesis.model.LimitExceededException; +import software.amazon.awssdk.services.kinesis.model.ResourceInUseException; +import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream; +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler; + +import java.io.IOException; +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +/** + * FanOutSubscription class responsible for handling the subscription to a single shard of the + * Kinesis stream. Given a shardId, it will manage the lifecycle of the subscription, and eagerly + * keep the next batch of records available for consumption when next polled. + */ +@Internal +public class FanOutKinesisShardSubscription { + private static final Logger LOG = LoggerFactory.getLogger(FanOutKinesisShardSubscription.class); + private static final List> RECOVERABLE_EXCEPTIONS = + Arrays.asList( + InternalFailureException.class, + ResourceNotFoundException.class, + ResourceInUseException.class, + ReadTimeoutException.class, + TimeoutException.class, + IOException.class, + LimitExceededException.class); + + private final AsyncStreamProxy kinesis; + private final String consumerArn; + private final String shardId; + + private final Duration subscriptionTimeout; + + // Queue is meant for eager retrieval of records from the Kinesis stream. We will always have 2 + // record batches available on next read. + private final BlockingQueue eventQueue = new LinkedBlockingQueue<>(2); + private final AtomicBoolean subscriptionActive = new AtomicBoolean(false); + private final AtomicReference subscriptionException = new AtomicReference<>(); + + // Store the current starting position for this subscription. Will be updated each time new + // batch of records is consumed + private StartingPosition startingPosition; + private FanOutShardSubscriber shardSubscriber; + + public FanOutKinesisShardSubscription( + AsyncStreamProxy kinesis, + String consumerArn, + String shardId, + StartingPosition startingPosition, + Duration subscriptionTimeout) { + this.kinesis = kinesis; + this.consumerArn = consumerArn; + this.shardId = shardId; + this.startingPosition = startingPosition; + this.subscriptionTimeout = subscriptionTimeout; + } + + /** Method to allow eager activation of the subscription. */ + public void activateSubscription() { + LOG.info( + "Activating subscription to shard {} with starting position {} for consumer {}.", + shardId, + startingPosition, + consumerArn); + if (subscriptionActive.get()) { + LOG.warn("Skipping activation of subscription since it is already active."); + return; + } + + // We have to use our own CountDownLatch to wait for subscription to be acquired because + // subscription event is tracked via the handler. + CountDownLatch waitForSubscriptionLatch = new CountDownLatch(1); + shardSubscriber = new FanOutShardSubscriber(waitForSubscriptionLatch); + SubscribeToShardResponseHandler responseHandler = + SubscribeToShardResponseHandler.builder() + .subscriber(() -> shardSubscriber) + .onError( + throwable -> { + // Errors that occur when obtaining a subscription are thrown + // here. + // After subscription is acquired, these errors can be ignored. + if (waitForSubscriptionLatch.getCount() > 0) { + terminateSubscription(throwable); + waitForSubscriptionLatch.countDown(); + } + }) + .build(); + + // We don't need to keep track of the future here because we monitor subscription success + // using our own CountDownLatch + kinesis.subscribeToShard(consumerArn, shardId, startingPosition, responseHandler) + .exceptionally( + throwable -> { + // If consumer exists and is still activating, we want to countdown. + if (ExceptionUtils.findThrowable( + throwable, ResourceInUseException.class) + .isPresent()) { + waitForSubscriptionLatch.countDown(); + return null; + } + LOG.error( + "Error subscribing to shard {} with starting position {} for consumer {}.", + shardId, + startingPosition, + consumerArn, + throwable); + terminateSubscription(throwable); + return null; + }); + + // We have to handle timeout for subscriptions separately because Java 8 does not support a + // fluent orTimeout() methods on CompletableFuture. + CompletableFuture.runAsync( + () -> { + try { + if (waitForSubscriptionLatch.await( + subscriptionTimeout.toMillis(), TimeUnit.MILLISECONDS)) { + LOG.info( + "Successfully subscribed to shard {} with starting position {} for consumer {}.", + shardId, + startingPosition, + consumerArn); + subscriptionActive.set(true); + // Request first batch of records. + shardSubscriber.requestRecords(); + } else { + String errorMessage = + "Timeout when subscribing to shard " + + shardId + + " with starting position " + + startingPosition + + " for consumer " + + consumerArn + + "."; + LOG.error(errorMessage); + terminateSubscription(new TimeoutException(errorMessage)); + } + } catch (InterruptedException e) { + LOG.warn("Interrupted while waiting for subscription to complete.", e); + terminateSubscription(e); + Thread.currentThread().interrupt(); + } + }); + } + + private void terminateSubscription(Throwable t) { + if (!subscriptionException.compareAndSet(null, t)) { + LOG.warn( + "Another subscription exception has been queued, ignoring subsequent exceptions", + t); + } + shardSubscriber.cancel(); + } + + /** + * This is the main entrypoint for this subscription class. It will retrieve the next batch of + * records from the Kinesis stream shard. It will throw any unrecoverable exceptions encountered + * during the subscription process. + * + * @return next FanOut subscription event containing records. Returns null if subscription is + * not yet active and fetching should be retried at a later time. + */ + public SubscribeToShardEvent nextEvent() { + Throwable throwable = subscriptionException.getAndSet(null); + if (throwable != null) { + // If consumer is still activating, we want to wait. + if (ExceptionUtils.findThrowable(throwable, ResourceInUseException.class).isPresent()) { + return null; + } + // We don't want to wrap ResourceNotFoundExceptions because it is handled via a + // try-catch loop + if (throwable instanceof ResourceNotFoundException) { + throw (ResourceNotFoundException) throwable; + } + Optional recoverableException = + RECOVERABLE_EXCEPTIONS.stream() + .map(clazz -> ExceptionUtils.findThrowable(throwable, clazz)) + .filter(Optional::isPresent) + .map(Optional::get) + .findFirst(); + if (recoverableException.isPresent()) { + LOG.warn( + "Recoverable exception encountered while subscribing to shard. Ignoring.", + recoverableException.get()); + shardSubscriber.cancel(); + activateSubscription(); + return null; + } + LOG.error("Subscription encountered unrecoverable exception.", throwable); + throw new KinesisStreamsSourceException( + "Subscription encountered unrecoverable exception.", throwable); + } + + if (!subscriptionActive.get()) { + LOG.debug( + "Subscription to shard {} for consumer {} is not yet active. Skipping.", + shardId, + consumerArn); + return null; + } + + return eventQueue.poll(); + } + + /** + * Implementation of {@link Subscriber} to retrieve events from Kinesis stream using Reactive + * Streams. + */ + private class FanOutShardSubscriber implements Subscriber { + private final CountDownLatch subscriptionLatch; + + private Subscription subscription; + + private FanOutShardSubscriber(CountDownLatch subscriptionLatch) { + this.subscriptionLatch = subscriptionLatch; + } + + public void requestRecords() { + subscription.request(1); + } + + public void cancel() { + if (!subscriptionActive.get()) { + LOG.warn("Trying to cancel inactive subscription. Ignoring."); + return; + } + subscriptionActive.set(false); + if (subscription != null) { + subscription.cancel(); + } + } + + @Override + public void onSubscribe(Subscription subscription) { + LOG.info( + "Successfully subscribed to shard {} at {} using consumer {}.", + shardId, + startingPosition, + consumerArn); + this.subscription = subscription; + subscriptionLatch.countDown(); + } + + @Override + public void onNext(SubscribeToShardEventStream subscribeToShardEventStream) { + subscribeToShardEventStream.accept( + new SubscribeToShardResponseHandler.Visitor() { + @Override + public void visit(SubscribeToShardEvent event) { + try { + LOG.debug( + "Received event: {}, {}", + event.getClass().getSimpleName(), + event); + eventQueue.put(event); + + // Update the starting position in case we have to recreate the + // subscription + startingPosition = + StartingPosition.continueFromSequenceNumber( + event.continuationSequenceNumber()); + + // Replace the record just consumed in the Queue + requestRecords(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new KinesisStreamsSourceException( + "Interrupted while adding Kinesis record to internal buffer.", + e); + } + } + }); + } + + @Override + public void onError(Throwable throwable) { + if (!subscriptionException.compareAndSet(null, throwable)) { + LOG.warn( + "Another subscription exception has been queued, ignoring subsequent exceptions", + throwable); + } + } + + @Override + public void onComplete() { + LOG.info("Subscription complete - {} ({})", shardId, consumerArn); + cancel(); + activateSubscription(); + } + } +} diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/StreamConsumerRegistrar.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/StreamConsumerRegistrar.java new file mode 100644 index 000000000..9720d0fe4 --- /dev/null +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/StreamConsumerRegistrar.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kinesis.source.reader.fanout; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.kinesis.source.proxy.StreamProxy; +import org.apache.flink.util.Preconditions; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.arns.Arn; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamConsumerResponse; +import software.amazon.awssdk.services.kinesis.model.RegisterStreamConsumerResponse; +import software.amazon.awssdk.services.kinesis.model.ResourceInUseException; +import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; + +import java.time.Instant; + +import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.ConsumerLifecycle.JOB_MANAGED; +import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.EFO_CONSUMER_LIFECYCLE; +import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.EFO_CONSUMER_NAME; +import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.EFO_DEREGISTER_CONSUMER_TIMEOUT; +import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.READER_TYPE; +import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.ReaderType.EFO; + +/** Responsible for registering and deregistering EFO stream consumers. */ +@Internal +public class StreamConsumerRegistrar { + + private static final Logger LOG = LoggerFactory.getLogger(StreamConsumerRegistrar.class); + + private final Configuration sourceConfig; + private final String streamArn; + private final StreamProxy kinesisStreamProxy; + + private String consumerArn; + + public StreamConsumerRegistrar( + Configuration sourceConfig, String streamArn, StreamProxy kinesisStreamProxy) { + this.sourceConfig = sourceConfig; + this.streamArn = streamArn; + this.kinesisStreamProxy = kinesisStreamProxy; + } + + /** + * Register stream consumer against specified stream. This method does not wait until consumer + * to become active. + */ + public void registerStreamConsumer() { + if (sourceConfig.get(READER_TYPE) != EFO) { + return; + } + + String streamConsumerName = sourceConfig.get(EFO_CONSUMER_NAME); + Preconditions.checkNotNull( + streamConsumerName, "For EFO reader type, EFO consumer name must be specified."); + Preconditions.checkArgument( + !streamConsumerName.isEmpty(), + "For EFO reader type, EFO consumer name cannot be empty."); + + switch (sourceConfig.get(EFO_CONSUMER_LIFECYCLE)) { + case JOB_MANAGED: + try { + LOG.info("Registering stream consumer - {}::{}", streamArn, streamConsumerName); + RegisterStreamConsumerResponse response = + kinesisStreamProxy.registerStreamConsumer( + streamArn, streamConsumerName); + consumerArn = response.consumer().consumerARN(); + LOG.info( + "Registered stream consumer - {}::{}", + streamArn, + response.consumer().consumerARN()); + } catch (ResourceInUseException e) { + LOG.warn( + "Found existing consumer {} on stream {}. Proceeding to read from consumer.", + streamConsumerName, + streamArn, + e); + } + break; + case SELF_MANAGED: + // This helps the job to fail fast if the EFO consumer requested does not exist. + DescribeStreamConsumerResponse response = + kinesisStreamProxy.describeStreamConsumer(streamArn, streamConsumerName); + LOG.info("Discovered stream consumer - {}", response); + break; + default: + throw new IllegalArgumentException( + "Unsupported EFO consumer lifecycle: " + + sourceConfig.get(EFO_CONSUMER_LIFECYCLE)); + } + } + + /** De-registers stream consumer from specified stream, if needed. */ + public void deregisterStreamConsumer() { + if (sourceConfig.get(READER_TYPE) == EFO + && sourceConfig.get(EFO_CONSUMER_LIFECYCLE) == JOB_MANAGED) { + LOG.info("De-registering stream consumer - {}", consumerArn); + if (consumerArn == null) { + LOG.warn( + "Unable to deregister stream consumer as there was no consumer ARN stored in the StreamConsumerRegistrar. There may be leaked EFO consumers on the Kinesis stream."); + return; + } + kinesisStreamProxy.deregisterStreamConsumer(consumerArn); + LOG.info("De-registered stream consumer - {}", consumerArn); + + Instant timeout = Instant.now().plus(sourceConfig.get(EFO_DEREGISTER_CONSUMER_TIMEOUT)); + String consumerName = getConsumerNameFromArn(consumerArn); + while (Instant.now().isBefore(timeout)) { + try { + DescribeStreamConsumerResponse response = + kinesisStreamProxy.describeStreamConsumer(streamArn, consumerName); + LOG.info( + "Waiting for stream consumer to be deregistered - {} {} {}", + streamArn, + consumerName, + response.consumerDescription().consumerStatusAsString()); + + } catch (ResourceNotFoundException e) { + LOG.info("Stream consumer {} has been deregistered", consumerArn); + return; + } + } + LOG.warn( + "Timed out waiting for stream consumer to be deregistered. There may be leaked EFO consumers on the Kinesis stream."); + } + } + + private String getConsumerNameFromArn(String consumerArn) { + String consumerQualifier = Arn.fromString(consumerArn).resource().qualifier().orElseThrow(() -> new IllegalArgumentException("Unable to parse consumer name from consumer ARN")); + return StringUtils.substringBetween(consumerQualifier, "/", ":"); + } +} diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/polling/PollingKinesisShardSplitReader.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/polling/PollingKinesisShardSplitReader.java index 2e1af120e..d2d6cd541 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/polling/PollingKinesisShardSplitReader.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/polling/PollingKinesisShardSplitReader.java @@ -19,6 +19,8 @@ package org.apache.flink.connector.kinesis.source.reader.polling; import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions; import org.apache.flink.connector.kinesis.source.metrics.KinesisShardMetrics; import org.apache.flink.connector.kinesis.source.proxy.StreamProxy; import org.apache.flink.connector.kinesis.source.reader.KinesisShardSplitReaderBase; @@ -35,11 +37,17 @@ @Internal public class PollingKinesisShardSplitReader extends KinesisShardSplitReaderBase { private final StreamProxy kinesis; + private final Configuration configuration; + private final int maxRecordsToGet; public PollingKinesisShardSplitReader( - StreamProxy kinesisProxy, Map shardMetricGroupMap) { + StreamProxy kinesisProxy, + Map shardMetricGroupMap, + Configuration configuration) { super(shardMetricGroupMap); this.kinesis = kinesisProxy; + this.configuration = configuration; + this.maxRecordsToGet = configuration.get(KinesisSourceConfigOptions.SHARD_GET_RECORDS_MAX); } @Override @@ -48,7 +56,8 @@ protected RecordBatch fetchRecords(KinesisShardSplitState splitState) { kinesis.getRecords( splitState.getStreamArn(), splitState.getShardId(), - splitState.getNextStartingPosition()); + splitState.getNextStartingPosition(), + this.maxRecordsToGet); boolean isCompleted = getRecordsResponse.nextShardIterator() == null; return new RecordBatch( getRecordsResponse.records(), getRecordsResponse.millisBehindLatest(), isCompleted); diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/split/StartingPositionUtil.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/split/StartingPositionUtil.java new file mode 100644 index 000000000..92d234fd6 --- /dev/null +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/split/StartingPositionUtil.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kinesis.source.split; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.kinesis.source.KinesisStreamsSource; +import org.apache.flink.util.Preconditions; + +import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; + +import java.time.Instant; + +/** + * Util class with methods relating to {@link KinesisStreamsSource}'s internal representation of + * {@link StartingPosition}. + */ +@Internal +public class StartingPositionUtil { + + private StartingPositionUtil() { + // prevent initialization of util class. + } + + /** + * @param startingPosition {@link KinesisStreamsSource}'s internal representation of {@link + * StartingPosition} + * @return AWS SDK's representation of {@link StartingPosition}. + */ + public static software.amazon.awssdk.services.kinesis.model.StartingPosition + toSdkStartingPosition(StartingPosition startingPosition) { + ShardIteratorType shardIteratorType = startingPosition.getShardIteratorType(); + Object startingMarker = startingPosition.getStartingMarker(); + + software.amazon.awssdk.services.kinesis.model.StartingPosition.Builder builder = + software.amazon.awssdk.services.kinesis.model.StartingPosition.builder() + .type(shardIteratorType); + + switch (shardIteratorType) { + case LATEST: + case TRIM_HORIZON: + return builder.type(shardIteratorType).build(); + case AT_TIMESTAMP: + Preconditions.checkArgument( + startingMarker instanceof Instant, + "Invalid StartingPosition. When ShardIteratorType is AT_TIMESTAMP, startingMarker must be an Instant."); + return builder.timestamp((Instant) startingMarker).build(); + case AT_SEQUENCE_NUMBER: + case AFTER_SEQUENCE_NUMBER: + Preconditions.checkArgument( + startingMarker instanceof String, + "Invalid StartingPosition. When ShardIteratorType is AT_SEQUENCE_NUMBER or AFTER_SEQUENCE_NUMBER, startingMarker must be a String."); + return builder.sequenceNumber((String) startingMarker).build(); + } + throw new IllegalArgumentException("Unsupported shardIteratorType " + shardIteratorType); + } +} diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSourceITCase.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSourceITCase.java index b859b778e..bf40d6cb5 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSourceITCase.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSourceITCase.java @@ -8,7 +8,6 @@ import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils; import org.apache.flink.connector.aws.testutils.LocalstackContainer; import org.apache.flink.connector.aws.util.AWSGeneralUtil; -import org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.test.junit5.MiniClusterExtension; @@ -53,6 +52,7 @@ import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_SECRET_ACCESS_KEY; import static org.apache.flink.connector.aws.config.AWSConfigConstants.HTTP_PROTOCOL_VERSION; import static org.apache.flink.connector.aws.config.AWSConfigConstants.TRUST_ALL_CERTIFICATES; +import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.InitialPosition; import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.STREAM_INITIAL_POSITION; /** @@ -146,8 +146,7 @@ private Configuration getDefaultConfiguration() { configuration.setString(AWS_REGION, Region.AP_SOUTHEAST_1.toString()); configuration.setString(TRUST_ALL_CERTIFICATES, "true"); configuration.setString(HTTP_PROTOCOL_VERSION, "HTTP1_1"); - configuration.set( - STREAM_INITIAL_POSITION, KinesisSourceConfigOptions.InitialPosition.TRIM_HORIZON); + configuration.set(STREAM_INITIAL_POSITION, InitialPosition.TRIM_HORIZON); return configuration; } diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumeratorTest.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumeratorTest.java index 42ff68313..d2e6640fd 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumeratorTest.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumeratorTest.java @@ -26,6 +26,7 @@ import org.apache.flink.connector.kinesis.source.enumerator.assigner.ShardAssignerFactory; import org.apache.flink.connector.kinesis.source.proxy.ListShardsStartingPosition; import org.apache.flink.connector.kinesis.source.proxy.StreamProxy; +import org.apache.flink.connector.kinesis.source.reader.fanout.StreamConsumerRegistrar; import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit; import org.apache.flink.connector.kinesis.source.util.TestUtil; import org.apache.flink.util.FlinkRuntimeException; @@ -85,7 +86,8 @@ void testStartWithoutStateDiscoversAndAssignsShards( streamProxy, ShardAssignerFactory.uniformShardAssigner(), null, - true); + true, + new StreamConsumerRegistrar(sourceConfig, STREAM_ARN, streamProxy)); // When enumerator starts enumerator.start(); // Then initial discovery scheduled, with periodic discovery after @@ -182,7 +184,8 @@ void testStartWithStateDoesNotAssignCompletedShards( streamProxy, ShardAssignerFactory.uniformShardAssigner(), state, - true); + true, + new StreamConsumerRegistrar(sourceConfig, STREAM_ARN, streamProxy)); // When enumerator starts enumerator.start(); // Then no initial discovery is scheduled, but a periodic discovery is scheduled @@ -243,7 +246,8 @@ void testInitialPositionForListShardsMapping( streamProxy, ShardAssignerFactory.uniformShardAssigner(), null, - true); + true, + new StreamConsumerRegistrar(sourceConfig, STREAM_ARN, streamProxy)); assertThat( enumerator.getInitialPositionForShardDiscovery( @@ -297,7 +301,8 @@ void testAssignSplitsSurfacesThrowableIfUnableToListShards() throws Throwable { streamProxy, ShardAssignerFactory.uniformShardAssigner(), null, - true); + true, + new StreamConsumerRegistrar(sourceConfig, STREAM_ARN, streamProxy)); enumerator.start(); // Given List Shard request throws an Exception @@ -327,7 +332,8 @@ void testAssignSplitsHandlesRepeatSplitsGracefully() throws Throwable { streamProxy, ShardAssignerFactory.uniformShardAssigner(), null, - true); + true, + new StreamConsumerRegistrar(sourceConfig, STREAM_ARN, streamProxy)); enumerator.start(); // Given enumerator is initialised with one registered reader, with 4 shards in stream @@ -381,7 +387,8 @@ void testAssignSplitWithoutRegisteredReaders() throws Throwable { streamProxy, ShardAssignerFactory.uniformShardAssigner(), null, - true); + true, + new StreamConsumerRegistrar(sourceConfig, STREAM_ARN, streamProxy)); enumerator.start(); // Given enumerator is initialised without a reader @@ -440,7 +447,8 @@ void testAssignSplitWithInsufficientRegisteredReaders() throws Throwable { streamProxy, ShardAssignerFactory.uniformShardAssigner(), null, - true); + true, + new StreamConsumerRegistrar(sourceConfig, STREAM_ARN, streamProxy)); enumerator.start(); // Given enumerator is initialised with only one reader @@ -502,7 +510,8 @@ void testRestoreFromStateRemembersLastSeenShardId() throws Throwable { streamProxy, ShardAssignerFactory.uniformShardAssigner(), null, - true); + true, + new StreamConsumerRegistrar(sourceConfig, STREAM_ARN, streamProxy)); enumerator.start(); // Given enumerator is initialised with one registered reader, with 4 shards in stream @@ -530,7 +539,8 @@ void testRestoreFromStateRemembersLastSeenShardId() throws Throwable { streamProxy, ShardAssignerFactory.uniformShardAssigner(), snapshottedState, - true); + true, + new StreamConsumerRegistrar(sourceConfig, STREAM_ARN, streamProxy)); restoredEnumerator.start(); // Given enumerator is initialised with one registered reader, with 4 shards in stream restoredContext.registerReader(TestUtil.getTestReaderInfo(subtaskId)); @@ -561,7 +571,8 @@ void testHandleUnrecognisedSourceEventIsNoOp() throws Throwable { streamProxy, ShardAssignerFactory.uniformShardAssigner(), null, - true); + true, + new StreamConsumerRegistrar(sourceConfig, STREAM_ARN, streamProxy)); assertThatNoException() .isThrownBy(() -> enumerator.handleSourceEvent(1, new SourceEvent() {})); @@ -582,7 +593,8 @@ void testCloseClosesStreamProxy() throws Throwable { streamProxy, ShardAssignerFactory.uniformShardAssigner(), null, - true); + true, + new StreamConsumerRegistrar(sourceConfig, STREAM_ARN, streamProxy)); enumerator.start(); assertThatNoException().isThrownBy(enumerator::close); @@ -601,7 +613,8 @@ private KinesisStreamsSourceEnumerator getSimpleEnumeratorWithNoState( streamProxy, ShardAssignerFactory.uniformShardAssigner(), null, - true); + true, + new StreamConsumerRegistrar(sourceConfig, STREAM_ARN, streamProxy)); enumerator.start(); assertThat(context.getOneTimeCallables()).hasSize(1); assertThat(context.getPeriodicCallables()).hasSize(1); diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/proxy/KinesisAsyncStreamProxyTest.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/proxy/KinesisAsyncStreamProxyTest.java new file mode 100644 index 000000000..3c780fb39 --- /dev/null +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/proxy/KinesisAsyncStreamProxyTest.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kinesis.source.proxy; + +import org.apache.flink.connector.kinesis.source.split.StartingPosition; +import org.apache.flink.connector.kinesis.source.util.KinesisAsyncClientProvider.TestingAsyncKinesisClient; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import software.amazon.awssdk.http.async.SdkAsyncHttpClient; +import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler; + +import java.time.Instant; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Stream; + +import static org.apache.flink.connector.kinesis.source.split.StartingPositionUtil.toSdkStartingPosition; +import static org.apache.flink.connector.kinesis.source.util.KinesisAsyncClientProvider.TestingAsyncKinesisClient.SUBSCRIBE_TO_SHARD_RESPONSE_FUTURE; +import static org.apache.flink.connector.kinesis.source.util.TestUtil.CONSUMER_ARN; +import static org.apache.flink.connector.kinesis.source.util.TestUtil.generateShardId; +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + +class KinesisAsyncStreamProxyTest { + private static final SdkAsyncHttpClient HTTP_CLIENT = NettyNioAsyncHttpClient.builder().build(); + + private static final String STREAM_ARN = + "arn:aws:kinesis:us-east-1:123456789012:stream/stream-name"; + + private TestingAsyncKinesisClient testKinesisClient; + private KinesisAsyncStreamProxy kinesisAsyncStreamProxy; + + @BeforeEach + public void setUp() { + testKinesisClient = new TestingAsyncKinesisClient(); + kinesisAsyncStreamProxy = new KinesisAsyncStreamProxy(testKinesisClient, HTTP_CLIENT); + } + + @ParameterizedTest + @MethodSource("provideSubscribeToShardStartingPosition") + public void testSubscribeToShard( + final String shardId, final StartingPosition startingPosition) { + // Given subscription arguments + SubscribeToShardResponseHandler noOpResponseHandler = + SubscribeToShardResponseHandler.builder() + .subscriber(event -> {}) + .onError(throwable -> {}) + .onComplete(() -> {}) + .build(); + + // When proxy is invoked + CompletableFuture result = + kinesisAsyncStreamProxy.subscribeToShard( + CONSUMER_ARN, shardId, startingPosition, noOpResponseHandler); + + // Then correct request is passed through to the Kinesis client + SubscribeToShardRequest expectedRequest = + SubscribeToShardRequest.builder() + .consumerARN(CONSUMER_ARN) + .shardId(shardId) + .startingPosition(toSdkStartingPosition(startingPosition)) + .build(); + assertThat(result).isEqualTo(SUBSCRIBE_TO_SHARD_RESPONSE_FUTURE); + assertThat(testKinesisClient.getSubscribeToShardRequest()).isEqualTo(expectedRequest); + assertThat(testKinesisClient.getSubscribeToShardResponseHandler()) + .isEqualTo(noOpResponseHandler); + } + + private static Stream provideSubscribeToShardStartingPosition() { + return Stream.of( + // Randomly generated shardIds + Arguments.of(generateShardId(0), StartingPosition.fromStart()), + Arguments.of(generateShardId(1), StartingPosition.fromStart()), + // Check all starting positions + Arguments.of(generateShardId(0), StartingPosition.fromTimestamp(Instant.now())), + Arguments.of( + generateShardId(0), + StartingPosition.continueFromSequenceNumber("seq-num"))); + } +} diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReaderTest.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReaderTest.java index 6e63d5425..2308b6705 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReaderTest.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReaderTest.java @@ -21,7 +21,6 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition; -import org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions; import org.apache.flink.connector.kinesis.source.metrics.KinesisShardMetrics; import org.apache.flink.connector.kinesis.source.reader.polling.PollingKinesisShardSplitReader; import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit; @@ -44,6 +43,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.SHARD_GET_RECORDS_MAX; import static org.apache.flink.connector.kinesis.source.util.KinesisStreamProxyProvider.getTestStreamProxy; import static org.apache.flink.connector.kinesis.source.util.TestUtil.STREAM_ARN; import static org.apache.flink.connector.kinesis.source.util.TestUtil.generateShardId; @@ -67,7 +67,7 @@ public void init() { shardMetricGroupMap = new ConcurrentHashMap<>(); sourceConfig = new Configuration(); - sourceConfig.set(KinesisSourceConfigOptions.SHARD_GET_RECORDS_MAX, 50); + sourceConfig.set(SHARD_GET_RECORDS_MAX, 50); shardMetricGroupMap.put( TEST_SHARD_ID, @@ -374,7 +374,10 @@ void testFetchUpdatesTheMillisBehindLatestMetric() throws IOException { @Test void testMaxRecordsToGetParameterPassed() throws IOException { int maxRecordsToGet = 2; - sourceConfig.set(KinesisSourceConfigOptions.SHARD_GET_RECORDS_MAX, maxRecordsToGet); + sourceConfig.set(SHARD_GET_RECORDS_MAX, maxRecordsToGet); + splitReader = + new PollingKinesisShardSplitReader( + testStreamProxy, shardMetricGroupMap, sourceConfig); testStreamProxy.addShards(TEST_SHARD_ID); List sentRecords = Stream.of(getTestRecord("data-1"), getTestRecord("data-2"), getTestRecord("data-3")) diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReaderTest.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReaderTest.java new file mode 100644 index 000000000..e065d4d2f --- /dev/null +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReaderTest.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kinesis.source.reader.fanout; + +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; +import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition; +import org.apache.flink.connector.kinesis.source.metrics.KinesisShardMetrics; +import org.apache.flink.connector.kinesis.source.proxy.AsyncStreamProxy; +import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit; +import org.apache.flink.connector.kinesis.source.util.FakeKinesisFanOutBehaviorsFactory; +import org.apache.flink.connector.kinesis.source.util.FakeKinesisFanOutBehaviorsFactory.TrackCloseStreamProxy; +import org.apache.flink.connector.kinesis.source.util.TestUtil; +import org.apache.flink.metrics.testutils.MetricListener; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.services.kinesis.model.Record; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import static org.apache.flink.connector.kinesis.source.util.TestUtil.CONSUMER_ARN; +import static org.apache.flink.connector.kinesis.source.util.TestUtil.getTestSplit; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatNoException; +import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; +import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively; + +/** Test for {@link FanOutKinesisShardSplitReader}. */ +public class FanOutKinesisShardSplitReaderTest { + private static final String TEST_SHARD_ID = TestUtil.generateShardId(1); + private static final Duration TEST_SUBSCRIPTION_TIMEOUT = Duration.ofMillis(1000); + + SplitReader splitReader; + + private AsyncStreamProxy testAsyncStreamProxy; + private Map shardMetricGroupMap; + private MetricListener metricListener; + + @BeforeEach + public void init() { + metricListener = new MetricListener(); + + shardMetricGroupMap = new ConcurrentHashMap<>(); + shardMetricGroupMap.put( + TEST_SHARD_ID, + new KinesisShardMetrics( + getTestSplit(TEST_SHARD_ID), metricListener.getMetricGroup())); + } + + @Test + public void testNoAssignedSplitsHandledGracefully() throws Exception { + // Given assigned split with no records + testAsyncStreamProxy = FakeKinesisFanOutBehaviorsFactory.boundedShard().build(); + splitReader = + new FanOutKinesisShardSplitReader( + testAsyncStreamProxy, + CONSUMER_ARN, + shardMetricGroupMap, + TEST_SUBSCRIPTION_TIMEOUT); + RecordsWithSplitIds retrievedRecords = splitReader.fetch(); + + assertThat(retrievedRecords.nextRecordFromSplit()).isNull(); + assertThat(retrievedRecords.nextSplit()).isNull(); + assertThat(retrievedRecords.finishedSplits()).isEmpty(); + } + + @Test + public void testAssignedSplitHasNoRecordsHandledGracefully() throws Exception { + // Given assigned split with no records + testAsyncStreamProxy = FakeKinesisFanOutBehaviorsFactory.boundedShard().build(); + splitReader = + new FanOutKinesisShardSplitReader( + testAsyncStreamProxy, + CONSUMER_ARN, + shardMetricGroupMap, + TEST_SUBSCRIPTION_TIMEOUT); + splitReader.handleSplitsChanges( + new SplitsAddition<>(Collections.singletonList(getTestSplit(TEST_SHARD_ID)))); + + // When fetching records + RecordsWithSplitIds retrievedRecords = splitReader.fetch(); + + // Then retrieve no records + assertThat(retrievedRecords.nextRecordFromSplit()).isNull(); + assertThat(retrievedRecords.nextSplit()).isNull(); + assertThat(retrievedRecords.finishedSplits()).isEmpty(); + } + + @Test + public void testSplitWithExpiredShardHandledAsCompleted() throws Exception { + // Given Kinesis will respond with expired shard + testAsyncStreamProxy = + FakeKinesisFanOutBehaviorsFactory.resourceNotFoundWhenObtainingSubscription(); + splitReader = + new FanOutKinesisShardSplitReader( + testAsyncStreamProxy, + CONSUMER_ARN, + shardMetricGroupMap, + TEST_SUBSCRIPTION_TIMEOUT); + splitReader.handleSplitsChanges( + new SplitsAddition<>(Collections.singletonList(getTestSplit(TEST_SHARD_ID)))); + + // When fetching records + RecordsWithSplitIds retrievedRecords = splitReader.fetch(); + + // Then shard is marked as completed + // Then retrieve no records and mark split as complete + assertThat(retrievedRecords.nextRecordFromSplit()).isNull(); + assertThat(retrievedRecords.nextSplit()).isNull(); + assertThat(retrievedRecords.finishedSplits()).containsExactly(TEST_SHARD_ID); + } + + @Test + public void testWakeUpIsNoOp() { + splitReader = + new FanOutKinesisShardSplitReader( + testAsyncStreamProxy, + CONSUMER_ARN, + shardMetricGroupMap, + TEST_SUBSCRIPTION_TIMEOUT); + + // When wakeup is called + // Then no exception is thrown and no-op + assertThatNoException().isThrownBy(splitReader::wakeUp); + } + + @Test + public void testCloseClosesStreamProxy() throws Exception { + // Given stream proxy + TrackCloseStreamProxy trackCloseStreamProxy = + FakeKinesisFanOutBehaviorsFactory.testCloseStreamProxy(); + splitReader = + new FanOutKinesisShardSplitReader( + trackCloseStreamProxy, + CONSUMER_ARN, + shardMetricGroupMap, + TEST_SUBSCRIPTION_TIMEOUT); + + // When split reader is not closed + // Then stream proxy is still open + assertThat(trackCloseStreamProxy.isClosed()).isFalse(); + + // When close split reader + splitReader.close(); + + // Then stream proxy is also closed + assertThat(trackCloseStreamProxy.isClosed()).isTrue(); + } + + private void consumeAllRecordsFromKinesis( + SplitReader splitReader, int numRecords) { + consumeRecordsFromKinesis(splitReader, numRecords, true); + } + + private void consumeSomeRecordsFromKinesis( + SplitReader splitReader, int numRecords) { + consumeRecordsFromKinesis(splitReader, numRecords, false); + } + + private void consumeRecordsFromKinesis( + SplitReader splitReader, + int numRecords, + boolean checkForShardCompletion) { + // Set timeout to prevent infinite loop on failure + assertTimeoutPreemptively( + Duration.ofSeconds(60), + () -> { + int numRetrievedRecords = 0; + RecordsWithSplitIds retrievedRecords = null; + while (numRetrievedRecords < numRecords) { + retrievedRecords = splitReader.fetch(); + List records = readAllRecords(retrievedRecords); + numRetrievedRecords += records.size(); + } + assertThat(numRetrievedRecords).isEqualTo(numRecords); + assertThat(retrievedRecords).isNotNull(); + // Check that the shard has been consumed completely + if (checkForShardCompletion) { + assertThat(retrievedRecords.finishedSplits()) + .containsExactly(TEST_SHARD_ID); + } else { + assertThat(retrievedRecords.finishedSplits()).isEmpty(); + } + }, + "did not receive expected " + numRecords + " records within 10 seconds."); + } + + private List readAllRecords(RecordsWithSplitIds recordsWithSplitIds) { + List outputRecords = new ArrayList<>(); + Record record; + do { + record = recordsWithSplitIds.nextRecordFromSplit(); + if (record != null) { + outputRecords.add(record); + } + } while (record != null); + + return outputRecords; + } +} diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/fanout/StreamConsumerRegistrarTest.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/fanout/StreamConsumerRegistrarTest.java new file mode 100644 index 000000000..6e0e51018 --- /dev/null +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/fanout/StreamConsumerRegistrarTest.java @@ -0,0 +1,212 @@ +package org.apache.flink.connector.kinesis.source.reader.fanout; + + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.kinesis.source.util.KinesisStreamProxyProvider.TestKinesisStreamProxy; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; + +import java.time.Duration; + +import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.ConsumerLifecycle.JOB_MANAGED; +import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.ConsumerLifecycle.SELF_MANAGED; +import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.EFO_CONSUMER_LIFECYCLE; +import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.EFO_CONSUMER_NAME; +import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.EFO_DEREGISTER_CONSUMER_TIMEOUT; +import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.READER_TYPE; +import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.ReaderType.EFO; +import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.ReaderType.POLLING; +import static org.apache.flink.connector.kinesis.source.util.KinesisStreamProxyProvider.getTestStreamProxy; +import static org.apache.flink.connector.kinesis.source.util.TestUtil.STREAM_ARN; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatNoException; +import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; + +class StreamConsumerRegistrarTest { + + private static final String CONSUMER_NAME = "kon-soo-mer"; + + private TestKinesisStreamProxy testKinesisStreamProxy; + private StreamConsumerRegistrar streamConsumerRegistrar; + + private Configuration sourceConfiguration; + + @BeforeEach + void setUp() { + testKinesisStreamProxy = getTestStreamProxy(); + sourceConfiguration = new Configuration(); + streamConsumerRegistrar = new StreamConsumerRegistrar(sourceConfiguration, STREAM_ARN, testKinesisStreamProxy); + } + + @Test + void testRegisterStreamConsumerSkippedWhenNotEfo() { + // Given POLLING reader type + sourceConfiguration.set(READER_TYPE, POLLING); + + // When registerStreamConsumer is called + // Then we skip registering consumer + assertThatNoException().isThrownBy(() -> streamConsumerRegistrar.registerStreamConsumer()); + assertThat(testKinesisStreamProxy.getRegisteredConsumers(STREAM_ARN)).hasSize(0); + } + + @Test + void testConsumerNameMustBeSpecified() { + // Given JOB_MANAGED consumer lifecycle + sourceConfiguration.set(READER_TYPE, EFO); + sourceConfiguration.set(EFO_CONSUMER_LIFECYCLE, JOB_MANAGED); + // Consumer name not provided + + // When registerStreamConsumer is called + // Then exception is thrown + assertThatExceptionOfType(NullPointerException.class) + .isThrownBy(() -> streamConsumerRegistrar.registerStreamConsumer()) + .withMessageContaining("For EFO reader type, EFO consumer name must be specified"); + } + + @Test + void testConsumerNameMustNotBeEmpty() { + // Given JOB_MANAGED consumer lifecycle + sourceConfiguration.set(READER_TYPE, EFO); + sourceConfiguration.set(EFO_CONSUMER_LIFECYCLE, JOB_MANAGED); + // Consumer name is empty + sourceConfiguration.set(EFO_CONSUMER_NAME, ""); + + + // When registerStreamConsumer is called + // Then exception is thrown + assertThatExceptionOfType(IllegalArgumentException.class) + .isThrownBy(() -> streamConsumerRegistrar.registerStreamConsumer()) + .withMessageContaining("For EFO reader type, EFO consumer name cannot be empty."); + } + + @Test + void testDeregisterStreamConsumerSkippedWhenNotEfo() { + // Given POLLING reader type + sourceConfiguration.set(READER_TYPE, POLLING); + // And consumer is registered + testKinesisStreamProxy.registerStreamConsumer(STREAM_ARN, CONSUMER_NAME); + + // When registerStreamConsumer is called + // Then we skip registering consumer + // We validate this by showing that no exception is thrown by checks that consumerName was not specified + assertThatNoException().isThrownBy(() -> streamConsumerRegistrar.registerStreamConsumer()); + assertThat(testKinesisStreamProxy.getRegisteredConsumers(STREAM_ARN)).containsExactly(CONSUMER_NAME); + } + + + @Test + void testRegisterStreamConsumerSkippedWhenSelfManaged() { + // Given SELF_MANAGED consumer lifecycle + sourceConfiguration.set(READER_TYPE, EFO); + sourceConfiguration.set(EFO_CONSUMER_LIFECYCLE, SELF_MANAGED); + sourceConfiguration.set(EFO_CONSUMER_NAME, CONSUMER_NAME); + // And consumer is registered + testKinesisStreamProxy.registerStreamConsumer(STREAM_ARN, CONSUMER_NAME); + + // When registerStreamConsumer is called + // Then we skip registering the consumer + assertThatNoException().isThrownBy(() -> streamConsumerRegistrar.registerStreamConsumer()); + } + + @Test + void testRegisterStreamConsumerFailsFastWhenSelfManaged() { + // Given SELF_MANAGED consumer lifecycle + sourceConfiguration.set(READER_TYPE, EFO); + sourceConfiguration.set(EFO_CONSUMER_LIFECYCLE, SELF_MANAGED); + sourceConfiguration.set(EFO_CONSUMER_NAME, CONSUMER_NAME); + // And consumer not registered + + // When registerStreamConsumer is called + // Then we fail fast to indicate consumer doesn't exist + assertThatExceptionOfType(ResourceNotFoundException.class) + .isThrownBy(() -> streamConsumerRegistrar.registerStreamConsumer()) + .withMessageContaining("Consumer " + CONSUMER_NAME) + .withMessageContaining("not found."); + } + + @Test + void testDeregisterStreamConsumerSkippedWhenSelfManaged() { + // Given SELF_MANAGED consumer lifecycle + sourceConfiguration.set(READER_TYPE, EFO); + sourceConfiguration.set(EFO_CONSUMER_LIFECYCLE, SELF_MANAGED); + sourceConfiguration.set(EFO_CONSUMER_NAME, CONSUMER_NAME); + // And consumer is registered + testKinesisStreamProxy.registerStreamConsumer(STREAM_ARN, CONSUMER_NAME); + + + // When registerStreamConsumer is called + // Then we skip registering the consumer + assertThatNoException().isThrownBy(() -> streamConsumerRegistrar.registerStreamConsumer()); + assertThat(testKinesisStreamProxy.getRegisteredConsumers(STREAM_ARN)).containsExactly(CONSUMER_NAME); + } + + @Test + void testRegisterStreamConsumerWhenJobManaged() { + // Given JOB_MANAGED consumer lifecycle + sourceConfiguration.set(READER_TYPE, EFO); + sourceConfiguration.set(EFO_CONSUMER_LIFECYCLE, JOB_MANAGED); + sourceConfiguration.set(EFO_CONSUMER_NAME, CONSUMER_NAME); + + // When registerStreamConsumer is called + streamConsumerRegistrar.registerStreamConsumer(); + + // Then consumer is registered + assertThat(testKinesisStreamProxy.getRegisteredConsumers(STREAM_ARN)).containsExactly(CONSUMER_NAME); + } + + @Test + void testRegisterStreamConsumerHandledGracefullyWhenConsumerExists() { + // Given JOB_MANAGED consumer lifecycle + sourceConfiguration.set(READER_TYPE, EFO); + sourceConfiguration.set(EFO_CONSUMER_LIFECYCLE, JOB_MANAGED); + sourceConfiguration.set(EFO_CONSUMER_NAME, CONSUMER_NAME); + // And consumer already exists + testKinesisStreamProxy.registerStreamConsumer(STREAM_ARN, CONSUMER_NAME); + assertThat(testKinesisStreamProxy.getRegisteredConsumers(STREAM_ARN)).containsExactly(CONSUMER_NAME); + + // When registerStreamConsumer is called + assertThatNoException().isThrownBy(() -> streamConsumerRegistrar.registerStreamConsumer()); + + // Then consumer is registered + assertThat(testKinesisStreamProxy.getRegisteredConsumers(STREAM_ARN)).containsExactly(CONSUMER_NAME); + } + + @Test + void testDeregisterStreamConsumerWhenJobManaged() { + // Given JOB_MANAGED consumer lifecycle + sourceConfiguration.set(READER_TYPE, EFO); + sourceConfiguration.set(EFO_CONSUMER_LIFECYCLE, JOB_MANAGED); + // And consumer is registered + sourceConfiguration.set(EFO_CONSUMER_NAME, CONSUMER_NAME); + streamConsumerRegistrar.registerStreamConsumer(); + assertThat(testKinesisStreamProxy.getRegisteredConsumers(STREAM_ARN)).containsExactly(CONSUMER_NAME); + + // When deregisterStreamConsumer is called + streamConsumerRegistrar.deregisterStreamConsumer(); + + // Then consumer is deregistered + assertThat(testKinesisStreamProxy.getRegisteredConsumers(STREAM_ARN)).hasSize(0); + } + + @Test + void testDeregisterStreamConsumerProceedsWhenTimeoutDeregistering() { + // Given JOB_MANAGED consumer lifecycle + sourceConfiguration.set(READER_TYPE, EFO); + sourceConfiguration.set(EFO_CONSUMER_LIFECYCLE, JOB_MANAGED); + sourceConfiguration.set(EFO_DEREGISTER_CONSUMER_TIMEOUT, Duration.ofMillis(50)); + // And consumer is registered + sourceConfiguration.set(EFO_CONSUMER_NAME, CONSUMER_NAME); + streamConsumerRegistrar.registerStreamConsumer(); + assertThat(testKinesisStreamProxy.getRegisteredConsumers(STREAM_ARN)).containsExactly(CONSUMER_NAME); + // And consumer is stuck in DELETING + testKinesisStreamProxy.setConsumersCurrentlyDeleting(CONSUMER_NAME); + + // When deregisterStreamConsumer is called + streamConsumerRegistrar.deregisterStreamConsumer(); + + // Then consumer is deregistered + assertThat(testKinesisStreamProxy.getRegisteredConsumers(STREAM_ARN)).hasSize(0); + } +} \ No newline at end of file diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/split/StartingPositionUtilTest.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/split/StartingPositionUtilTest.java new file mode 100644 index 000000000..f210b8c63 --- /dev/null +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/split/StartingPositionUtilTest.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kinesis.source.split; + +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; + +import java.time.Instant; + +import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; + +class StartingPositionUtilTest { + + @Test + void testTrimHorizonFieldsAreTransferredAccurately() { + // Given TRIM_HORIZON starting position + StartingPosition startingPosition = StartingPosition.fromStart(); + + // When converting to SdkStartingPosition + software.amazon.awssdk.services.kinesis.model.StartingPosition sdkStartingPosition = + StartingPositionUtil.toSdkStartingPosition(startingPosition); + + // Then fields are transferred accurately + assertThat(sdkStartingPosition.type()).isEqualByComparingTo(ShardIteratorType.TRIM_HORIZON); + } + + @Test + void testAtTimestampFieldsAreTransferredAccurately() { + // Given AT_TIMESTAMP starting position + StartingPosition startingPosition = StartingPosition.fromTimestamp(Instant.EPOCH); + + // When + software.amazon.awssdk.services.kinesis.model.StartingPosition sdkStartingPosition = + StartingPositionUtil.toSdkStartingPosition(startingPosition); + + // Then + assertThat(sdkStartingPosition.type()).isEqualByComparingTo(ShardIteratorType.AT_TIMESTAMP); + assertThat(sdkStartingPosition.timestamp()).isEqualTo(Instant.EPOCH); + } + + @Test + void testAtSequenceNumberFieldsAreTransferredAccurately() { + // Given TRIM_HORIZON starting position + StartingPosition startingPosition = + StartingPosition.continueFromSequenceNumber("some-sequence-number"); + + // When converting to SdkStartingPosition + software.amazon.awssdk.services.kinesis.model.StartingPosition sdkStartingPosition = + StartingPositionUtil.toSdkStartingPosition(startingPosition); + + // Then fields are transferred accurately + assertThat(sdkStartingPosition.type()) + .isEqualByComparingTo(ShardIteratorType.AFTER_SEQUENCE_NUMBER); + assertThat(sdkStartingPosition.sequenceNumber()).isEqualTo("some-sequence-number"); + } +} diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/FakeKinesisFanOutBehaviorsFactory.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/FakeKinesisFanOutBehaviorsFactory.java new file mode 100644 index 000000000..6f563229b --- /dev/null +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/FakeKinesisFanOutBehaviorsFactory.java @@ -0,0 +1,345 @@ +package org.apache.flink.connector.kinesis.source.util; + +import org.apache.flink.connector.kinesis.source.proxy.AsyncStreamProxy; + +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.services.kinesis.model.Record; +import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; +import software.amazon.awssdk.services.kinesis.model.StartingPosition; +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream; +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponse; +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler; + +import java.io.IOException; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic; +import static org.apache.flink.connector.kinesis.source.split.StartingPositionUtil.toSdkStartingPosition; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Factory for different kinds of fake Kinesis behaviours using the {@link AsyncStreamProxy} + * interface. + */ +public class FakeKinesisFanOutBehaviorsFactory { + + // ------------------------------------------------------------------------ + // Behaviours related to subscribe to shard and consuming data + // ------------------------------------------------------------------------ + public static SingleShardFanOutStreamProxy.Builder boundedShard() { + return new SingleShardFanOutStreamProxy.Builder(); + } + + public static AsyncStreamProxy resourceNotFoundWhenObtainingSubscription() { + return new ExceptionalFanOutStreamProxy(ResourceNotFoundException.builder().build()); + } + + public static TrackCloseStreamProxy testCloseStreamProxy() { + return new TrackCloseStreamProxy(); + } + + /** Test Stream Proxy to closure of StreamProxy. */ + public static class TrackCloseStreamProxy implements AsyncStreamProxy { + + public boolean isClosed() { + return closed; + } + + private boolean closed = false; + + @Override + public CompletableFuture subscribeToShard( + String consumerArn, + String shardId, + org.apache.flink.connector.kinesis.source.split.StartingPosition startingPosition, + SubscribeToShardResponseHandler responseHandler) { + // no-op + throw new UnsupportedOperationException( + "This method is not supported on the TrackCloseStreamProxy."); + } + + @Override + public void close() throws IOException { + closed = true; + } + } + + /** + * A fake implementation of KinesisProxyV2 SubscribeToShard that provides dummy records for EFO + * subscriptions. Aggregated and non-aggregated records are supported with various batch and + * subscription sizes. + */ + public static class SingleShardFanOutStreamProxy extends AbstractSingleShardFanOutStreamProxy { + + private final int batchesPerSubscription; + + private final int recordsPerBatch; + + private final long millisBehindLatest; + + private final int totalRecords; + + private final int aggregationFactor; + + private final AtomicInteger sequenceNumber = new AtomicInteger(); + + private SingleShardFanOutStreamProxy(final Builder builder) { + super(builder.getSubscriptionCount()); + this.batchesPerSubscription = builder.batchesPerSubscription; + this.recordsPerBatch = builder.recordsPerBatch; + this.millisBehindLatest = builder.millisBehindLatest; + this.aggregationFactor = builder.aggregationFactor; + this.totalRecords = builder.getTotalRecords(); + } + + @Override + List getEventsToSend() { + List events = new ArrayList<>(); + + SubscribeToShardEvent.Builder eventBuilder = + SubscribeToShardEvent.builder().millisBehindLatest(millisBehindLatest); + + for (int batchIndex = 0; + batchIndex < batchesPerSubscription && sequenceNumber.get() < totalRecords; + batchIndex++) { + List records = new ArrayList<>(); + + for (int i = 0; i < recordsPerBatch; i++) { + records.add(createRecord(sequenceNumber)); + } + + eventBuilder.records(records); + + String continuation = + sequenceNumber.get() < totalRecords + ? String.valueOf(sequenceNumber.get() + 1) + : null; + eventBuilder.continuationSequenceNumber(continuation); + + events.add(eventBuilder.build()); + } + + return events; + } + + /** A convenience builder for {@link SingleShardFanOutStreamProxy}. */ + public static class Builder { + private int batchesPerSubscription = 100000; + private int recordsPerBatch = 10; + private long millisBehindLatest = 0; + private int batchCount = 1; + private int aggregationFactor = 1; + + public int getSubscriptionCount() { + return (int) + Math.ceil( + (double) getTotalRecords() + / batchesPerSubscription + / recordsPerBatch); + } + + public int getTotalRecords() { + return batchCount * recordsPerBatch; + } + + public Builder withBatchesPerSubscription(final int batchesPerSubscription) { + this.batchesPerSubscription = batchesPerSubscription; + return this; + } + + public Builder withRecordsPerBatch(final int recordsPerBatch) { + this.recordsPerBatch = recordsPerBatch; + return this; + } + + public Builder withBatchCount(final int batchCount) { + this.batchCount = batchCount; + return this; + } + + public Builder withMillisBehindLatest(final long millisBehindLatest) { + this.millisBehindLatest = millisBehindLatest; + return this; + } + + public Builder withAggregationFactor(final int aggregationFactor) { + this.aggregationFactor = aggregationFactor; + return this; + } + + public SingleShardFanOutStreamProxy build() { + return new SingleShardFanOutStreamProxy(this); + } + } + } + + private static final class ExceptionalFanOutStreamProxy implements AsyncStreamProxy { + + private final RuntimeException exception; + + private ExceptionalFanOutStreamProxy(RuntimeException exception) { + this.exception = exception; + } + + @Override + public CompletableFuture subscribeToShard( + String consumerArn, + String shardId, + org.apache.flink.connector.kinesis.source.split.StartingPosition startingPosition, + SubscribeToShardResponseHandler responseHandler) { + responseHandler.exceptionOccurred(exception); + return CompletableFuture.completedFuture(null); + } + + @Override + public void close() throws IOException { + // no-op + } + } + + /** + * A single shard dummy EFO implementation that provides basic responses and subscription + * management. Does not provide any records. + */ + public abstract static class AbstractSingleShardFanOutStreamProxy implements AsyncStreamProxy { + + private final List requests = new ArrayList<>(); + private int remainingSubscriptions; + + private AbstractSingleShardFanOutStreamProxy(final int remainingSubscriptions) { + this.remainingSubscriptions = remainingSubscriptions; + } + + public int getNumberOfSubscribeToShardInvocations() { + return requests.size(); + } + + public StartingPosition getStartingPositionForSubscription(final int subscriptionIndex) { + assertThat(subscriptionIndex).isGreaterThanOrEqualTo(0); + assertThat(subscriptionIndex).isLessThan(getNumberOfSubscribeToShardInvocations()); + + return requests.get(subscriptionIndex).startingPosition(); + } + + @Override + public CompletableFuture subscribeToShard( + final String consumerArn, + final String shardId, + final org.apache.flink.connector.kinesis.source.split.StartingPosition + startingPosition, + final SubscribeToShardResponseHandler responseHandler) { + + requests.add( + SubscribeToShardRequest.builder() + .consumerARN(consumerArn) + .shardId(shardId) + .startingPosition(toSdkStartingPosition(startingPosition)) + .build()); + + return CompletableFuture.supplyAsync( + () -> { + responseHandler.responseReceived( + SubscribeToShardResponse.builder().build()); + responseHandler.onEventStream( + subscriber -> { + final List eventsToSend; + + if (remainingSubscriptions > 0) { + eventsToSend = getEventsToSend(); + remainingSubscriptions--; + } else { + eventsToSend = + Collections.singletonList( + SubscribeToShardEvent.builder() + .millisBehindLatest(0L) + .continuationSequenceNumber(null) + .build()); + } + + Iterator iterator = + eventsToSend.iterator(); + + Subscription subscription = + new FakeSubscription( + (n) -> { + if (iterator.hasNext()) { + subscriber.onNext(iterator.next()); + } else { + completeSubscription(subscriber); + } + }); + subscriber.onSubscribe(subscription); + }); + return null; + }); + } + + void completeSubscription(Subscriber subscriber) { + subscriber.onComplete(); + } + + abstract List getEventsToSend(); + + @Override + public void close() throws IOException { + // no-op + } + } + + private static class FakeSubscription implements Subscription { + + private final java.util.function.Consumer onRequest; + + public FakeSubscription(java.util.function.Consumer onRequest) { + this.onRequest = onRequest; + } + + @Override + public void request(long n) { + onRequest.accept(n); + } + + @Override + public void cancel() { + // Nothing to do + } + } + + private static Record createRecord(final AtomicInteger sequenceNumber) { + return createRecord(randomAlphabetic(32).getBytes(UTF_8), sequenceNumber); + } + + private static Record createRecord(final byte[] data, final AtomicInteger sequenceNumber) { + return Record.builder() + .approximateArrivalTimestamp(Instant.now()) + .data(SdkBytes.fromByteArray(data)) + .sequenceNumber(String.valueOf(sequenceNumber.incrementAndGet())) + .partitionKey("pk") + .build(); + } + + private static List generateEvents( + int numberOfEvents, AtomicInteger sequenceNumber) { + return IntStream.range(0, numberOfEvents) + .mapToObj( + i -> + SubscribeToShardEvent.builder() + .records(createRecord(sequenceNumber)) + .continuationSequenceNumber(String.valueOf(i)) + .build()) + .collect(Collectors.toList()); + } +} diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/KinesisAsyncClientProvider.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/KinesisAsyncClientProvider.java new file mode 100644 index 000000000..0eb267367 --- /dev/null +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/KinesisAsyncClientProvider.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kinesis.source.util; + +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.services.kinesis.KinesisClient; +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler; + +import java.util.concurrent.CompletableFuture; + +/** Provides {@link KinesisClient} with mocked Kinesis Stream behavior. */ +public class KinesisAsyncClientProvider { + + /** + * An implementation of the {@link KinesisClient} that allows control over Kinesis Service + * responses. + */ + public static class TestingAsyncKinesisClient implements KinesisAsyncClient { + + public static final CompletableFuture SUBSCRIBE_TO_SHARD_RESPONSE_FUTURE = + new CompletableFuture<>(); + + private boolean closed = false; + private SubscribeToShardRequest subscribeToShardRequest; + private SubscribeToShardResponseHandler subscribeToShardResponseHandler; + + @Override + public String serviceName() { + return "kinesis"; + } + + @Override + public void close() { + closed = true; + } + + public boolean isClosed() { + return closed; + } + + @Override + public CompletableFuture subscribeToShard( + SubscribeToShardRequest subscribeToShardRequest, + SubscribeToShardResponseHandler asyncResponseHandler) { + this.subscribeToShardRequest = subscribeToShardRequest; + this.subscribeToShardResponseHandler = asyncResponseHandler; + return SUBSCRIBE_TO_SHARD_RESPONSE_FUTURE; + } + + public SubscribeToShardRequest getSubscribeToShardRequest() { + return subscribeToShardRequest; + } + + public SubscribeToShardResponseHandler getSubscribeToShardResponseHandler() { + return subscribeToShardResponseHandler; + } + } +} diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/KinesisStreamProxyProvider.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/KinesisStreamProxyProvider.java index 235416a0a..50d8a8520 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/KinesisStreamProxyProvider.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/KinesisStreamProxyProvider.java @@ -22,11 +22,21 @@ import org.apache.flink.connector.kinesis.source.proxy.StreamProxy; import org.apache.flink.connector.kinesis.source.split.StartingPosition; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; +import software.amazon.awssdk.arns.Arn; +import software.amazon.awssdk.services.kinesis.model.Consumer; +import software.amazon.awssdk.services.kinesis.model.ConsumerDescription; +import software.amazon.awssdk.services.kinesis.model.ConsumerStatus; +import software.amazon.awssdk.services.kinesis.model.DeregisterStreamConsumerResponse; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamConsumerResponse; import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; import software.amazon.awssdk.services.kinesis.model.HashKeyRange; import software.amazon.awssdk.services.kinesis.model.Record; +import software.amazon.awssdk.services.kinesis.model.RegisterStreamConsumerResponse; +import software.amazon.awssdk.services.kinesis.model.ResourceInUseException; +import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.awssdk.services.kinesis.model.ShardFilter; import software.amazon.awssdk.services.kinesis.model.ShardFilterType; @@ -40,14 +50,17 @@ import java.util.Collections; import java.util.Deque; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; import static org.apache.flink.connector.kinesis.source.util.TestUtil.ENDING_HASH_KEY_TEST_VALUE; import static org.apache.flink.connector.kinesis.source.util.TestUtil.STARTING_HASH_KEY_TEST_VALUE; +import static org.apache.flink.connector.kinesis.source.util.TestUtil.STREAM_ARN; /** Provides {@link StreamProxy} with mocked Kinesis Streams behavior. */ public class KinesisStreamProxyProvider { @@ -77,6 +90,10 @@ public static class TestKinesisStreamProxy implements StreamProxy { private boolean shouldCompleteNextShard = false; private boolean closed = false; + // RegisterStreamConsumer configuration + private final Map> efoConsumerRegistration = new HashMap<>(); + private final Set consumersCurrentlyDeleting = new HashSet<>(); + @Override public StreamDescriptionSummary getStreamDescriptionSummary(String streamArn) { return StreamDescriptionSummary.builder() @@ -142,6 +159,96 @@ public GetRecordsResponse getRecords( .build(); } + @Override + public RegisterStreamConsumerResponse registerStreamConsumer( + String streamArn, String consumerName) { + Set registeredConsumers = efoConsumerRegistration.computeIfAbsent(streamArn, ignore -> new HashSet<>()); + String streamName = Arn.fromString(streamArn).resourceAsString(); + if (registeredConsumers.contains(consumerName)) { + throw ResourceInUseException.builder() + .message("Consumer " + consumerName + " under stream " + streamName + " already exists for account ") + .build(); + } + registeredConsumers.add(consumerName); + return RegisterStreamConsumerResponse.builder() + .consumer(Consumer.builder() + .consumerName(consumerName) + .consumerARN(getConsumerArnFromName(consumerName)) + .build()) + .build(); + } + + @Override + public DeregisterStreamConsumerResponse deregisterStreamConsumer(String consumerArn) { + String streamArn = convertConsumerArnToStreamArn(consumerArn); + String consumerName = getConsumerNameFromArn(consumerArn); + Set registeredConsumers = efoConsumerRegistration.computeIfAbsent(streamArn, ignore -> new HashSet<>()); + + if (!registeredConsumers.contains(consumerName)) { + throw ResourceNotFoundException.builder() + .message("Consumer " + consumerName + " under stream: " + streamArn + " not found.") + .build(); + } + + registeredConsumers.remove(consumerName); + + return DeregisterStreamConsumerResponse.builder() + .build(); + } + + @Override + public DescribeStreamConsumerResponse describeStreamConsumer( + String streamArn, String consumerName) { + if (consumersCurrentlyDeleting.contains(consumerName)) { + return DescribeStreamConsumerResponse.builder() + .consumerDescription(ConsumerDescription.builder() + .consumerName(consumerName) + .consumerStatus(ConsumerStatus.DELETING) + .build()) + .build(); + } + Set registeredConsumers = efoConsumerRegistration.computeIfAbsent(streamArn, ignore -> new HashSet<>()); + if (!registeredConsumers.contains(consumerName)) { + throw ResourceNotFoundException.builder() + .message("Consumer " + consumerName + " under stream: " + streamArn + " not found.") + .build(); + } else { + return DescribeStreamConsumerResponse.builder() + .consumerDescription(ConsumerDescription.builder() + .consumerName(consumerName) + .consumerStatus(ConsumerStatus.ACTIVE) + .build()) + .build(); + } + } + + + public Set getRegisteredConsumers(String streamArn) { + return efoConsumerRegistration.computeIfAbsent(streamArn, ignore -> new HashSet<>()); + } + + public void setConsumersCurrentlyDeleting(String consumerName) { + consumersCurrentlyDeleting.add(consumerName); + } + + private String convertConsumerArnToStreamArn(String consumerArn) { + Arn arn = Arn.fromString(consumerArn); + return arn.toBuilder().resource("stream/" + arn.resource().resource()).build().toString(); + } + + private String getConsumerNameFromArn(String consumerArn) { + String consumerQualifier = Arn.fromString(consumerArn).resource().qualifier().orElseThrow(() -> new IllegalArgumentException("Unable to parse consumer name from consumer ARN")); + return StringUtils.substringBetween(consumerQualifier, "/", ":"); + } + + private String getConsumerArnFromName(String consumerName) { + Arn streamArn = Arn.fromString(STREAM_ARN); + return streamArn.toBuilder() + .resource(streamArn.resourceAsString() + "/consumer/" + consumerName + ":" + Instant.now().getEpochSecond()) + .build() + .toString(); + } + public void setStreamSummary(Instant creationTimestamp, int retentionPeriodHours) { this.creationTimestamp = creationTimestamp; this.retentionPeriodHours = retentionPeriodHours; diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/TestUtil.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/TestUtil.java index 3bd58d34a..035d4496e 100644 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/TestUtil.java +++ b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/util/TestUtil.java @@ -48,6 +48,8 @@ public class TestUtil { public static final String STREAM_ARN = "arn:aws:kinesis:us-east-1:123456789012:stream/keenesesStream"; public static final String SHARD_ID = "shardId-000000000002"; + public static final String CONSUMER_ARN = + "arn:aws:kinesis:us-east-1:123456789012:stream/stream-name/consumer/consumer-name:1722967044"; public static final SimpleStringSchema STRING_SCHEMA = new SimpleStringSchema(); public static final long MILLIS_BEHIND_LATEST_TEST_VALUE = 100L; public static final String STARTING_HASH_KEY_TEST_VALUE =