diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanoutv2/FanOutKinesisShardSplitReader.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanoutv2/FanOutKinesisShardSplitReader.java deleted file mode 100644 index 5d110fcf..00000000 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanoutv2/FanOutKinesisShardSplitReader.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.connector.kinesis.source.reader.fanoutv2; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; -import org.apache.flink.connector.kinesis.source.metrics.KinesisShardMetrics; -import org.apache.flink.connector.kinesis.source.proxy.KinesisAsyncStreamProxy; -import org.apache.flink.connector.kinesis.source.reader.KinesisShardSplitReaderBase; -import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit; -import org.apache.flink.connector.kinesis.source.split.KinesisShardSplitState; - -import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; - -import java.util.HashMap; -import java.util.Map; - -/** - * An implementation of the KinesisShardSplitReader that consumes from Kinesis using Enhanced - * Fan-Out and HTTP/2. - */ -@Internal -public class FanOutKinesisShardSplitReader extends KinesisShardSplitReaderBase { - private final KinesisAsyncStreamProxy asyncStreamProxy; - private final String consumerArn; - - private final Map splitSubscriptions = new HashMap<>(); - - public FanOutKinesisShardSplitReader( - KinesisAsyncStreamProxy asyncStreamProxy, - String consumerArn, - Map shardMetricGroupMap) { - super(shardMetricGroupMap); - this.asyncStreamProxy = asyncStreamProxy; - this.consumerArn = consumerArn; - } - - @Override - protected RecordBatch fetchRecords(KinesisShardSplitState splitState) { - FanOutKinesisShardSubscription subscription = - splitSubscriptions.get(splitState.getShardId()); - - SubscribeToShardEvent event = subscription.nextEvent(); - if (event == null) { - return null; - } - - boolean shardCompleted = event.continuationSequenceNumber() == null; - if (shardCompleted) { - splitSubscriptions.remove(splitState.getShardId()); - } - return new RecordBatch(event.records(), event.millisBehindLatest(), shardCompleted); - } - - @Override - public void handleSplitsChanges(SplitsChange splitsChanges) { - super.handleSplitsChanges(splitsChanges); - for (KinesisShardSplit split : splitsChanges.splits()) { - FanOutKinesisShardSubscription subscription = - new FanOutKinesisShardSubscription( - asyncStreamProxy, - consumerArn, - split.getShardId(), - split.getStartingPosition()); - subscription.activateSubscription(); - splitSubscriptions.put(split.splitId(), subscription); - } - } - - @Override - public void close() throws Exception { - asyncStreamProxy.close(); - } -} diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanoutv2/FanOutKinesisShardSubscription.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanoutv2/FanOutKinesisShardSubscription.java deleted file mode 100644 index 0bd6e6f1..00000000 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanoutv2/FanOutKinesisShardSubscription.java +++ /dev/null @@ -1,230 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.connector.kinesis.source.reader.fanoutv2; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.connector.kinesis.source.exception.KinesisStreamsSourceException; -import org.apache.flink.connector.kinesis.source.proxy.AsyncStreamProxy; -import org.apache.flink.connector.kinesis.source.split.StartingPosition; -import org.apache.flink.util.ExceptionUtils; - -import io.netty.handler.timeout.ReadTimeoutException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import software.amazon.awssdk.services.kinesis.model.InternalFailureException; -import software.amazon.awssdk.services.kinesis.model.LimitExceededException; -import software.amazon.awssdk.services.kinesis.model.ResourceInUseException; -import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; -import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; -import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler; - -import java.io.IOException; -import java.time.Duration; -import java.util.Arrays; -import java.util.List; -import java.util.Optional; -import java.util.concurrent.TimeoutException; - -/** - * FanOutSubscription class responsible for handling the subscription to a single shard of the - * Kinesis stream. Given a shardId, it will manage the lifecycle of the subscription, and eagerly - * keep the next batch of records available for consumption when next polled. - */ -@Internal -class FanOutKinesisShardSubscription { - private static final Logger LOG = LoggerFactory.getLogger(FanOutKinesisShardSubscription.class); - private static final List> RECOVERABLE_EXCEPTIONS = - Arrays.asList( - InternalFailureException.class, - ResourceNotFoundException.class, - ResourceInUseException.class, - ReadTimeoutException.class, - TimeoutException.class, - IOException.class, - LimitExceededException.class); - - private final AsyncStreamProxy kinesis; - private final String consumerArn; - private final String shardId; - - private final Duration subscriptionTimeout = Duration.ofSeconds(60); - - private FanOutShardSubscriber subscriber; - - // Store the current starting position for this subscription. Will be updated each time new - // batch of records is consumed - private StartingPosition startingPosition; - private Throwable subscriptionFailure; - - FanOutKinesisShardSubscription( - AsyncStreamProxy kinesis, - String consumerArn, - String shardId, - StartingPosition startingPosition) { - this.kinesis = kinesis; - this.consumerArn = consumerArn; - this.shardId = shardId; - this.startingPosition = startingPosition; - } - - /** Method to allow eager activation of the subscription. */ - public void activateSubscription() { - LOG.info( - "Activating subscription to shard {} with starting position {} for consumer {}.", - shardId, - startingPosition, - consumerArn); - - subscriber = new FanOutShardSubscriber(consumerArn, shardId, startingPosition); - subscriptionFailure = null; - - SubscribeToShardResponseHandler responseHandler = - SubscribeToShardResponseHandler.builder() - .subscriber(() -> subscriber) - .onError( - throwable -> { - // After subscription is acquired, these errors can be ignored. - if (!subscriber.isSubscribed()) { - - terminateSubscription(throwable); - } - }) - .build(); - - // We don't need to keep track of the future here because we monitor subscription success - // using our own CountDownLatch - kinesis.subscribeToShard(consumerArn, shardId, startingPosition, responseHandler) - .exceptionally( - throwable -> { - LOG.error( - "Error subscribing to shard {} with starting position {} for consumer {}.", - shardId, - startingPosition, - consumerArn, - throwable); - terminateSubscription(throwable); - return null; - }); - } - - private void terminateSubscription(Throwable t) { - this.subscriptionFailure = t; - if (subscriber != null) { - subscriber.cancel(); - } - subscriber = null; - } - - /** - * This is the main entrypoint for this subscription class. It will retrieve the next batch of - * records from the Kinesis stream shard. It will throw any unrecoverable exceptions encountered - * during the subscription process. - * - * @return next FanOut subscription event containing records. Returns null if subscription is - * not yet active and fetching should be retried at a later time. - */ - public SubscribeToShardEvent nextEvent() { - // TODO handle race conditions between nextEvent and - - if (subscriber == null) { - activateSubscription(); - return null; - } - - if (subscriptionFailure != null) { - Optional recoverableException = - getRecoverableException(subscriptionFailure); - if (recoverableException.isPresent()) { - LOG.warn( - "Encountered recoverable exception while subscribing to shard {}.", - shardId, - recoverableException.get()); - activateSubscription(); - return null; - } - LOG.error("Subscription encountered unrecoverable exception.", subscriptionFailure); - throw new KinesisStreamsSourceException( - "Subscription encountered unrecoverable exception.", subscriptionFailure); - } - - if (subscriber.isFailed()) { - subscriber - .getFailure() - .map( - (failure) -> { - Optional recoverableException = - getRecoverableException(failure); - if (recoverableException.isPresent()) { - LOG.warn( - "Encountered recoverable exception while reading from to shard {}.", - shardId, - recoverableException.get()); - activateSubscription(); - return null; - } else { - LOG.error( - "Subscription encountered unrecoverable exception.", - failure); - throw new KinesisStreamsSourceException( - "Subscription encountered unrecoverable exception.", - failure); - } - }) - .orElseThrow( - () -> - new KinesisStreamsSourceException( - "Subscription encountered unexpected failure.", null)); - } - - if (!subscriber.isSubscribed()) { - LOG.debug( - "Subscription to shard {} for consumer {} is not yet active. Skipping.", - shardId, - consumerArn); - // TODO handle timeouts ... somewhere - - return null; - } - - SubscribeToShardEvent event = subscriber.poll(); - if (event != null) { - startingPosition = - StartingPosition.continueFromSequenceNumber(event.continuationSequenceNumber()); - return event; - } - - if (subscriber.isTerminated()) { - LOG.debug( - "Subscription to shard {} for consumer {} is terminated. Skipping.", - shardId, - consumerArn); - activateSubscription(); - } - - return null; - } - - private Optional getRecoverableException(Throwable throwable) { - return RECOVERABLE_EXCEPTIONS.stream() - .map(clazz -> ExceptionUtils.findThrowable(throwable, clazz)) - .filter(Optional::isPresent) - .map(Optional::get) - .findFirst(); - } -} diff --git a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanoutv2/FanOutShardSubscriber.java b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanoutv2/FanOutShardSubscriber.java deleted file mode 100644 index 8fba7b8a..00000000 --- a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanoutv2/FanOutShardSubscriber.java +++ /dev/null @@ -1,119 +0,0 @@ -package org.apache.flink.connector.kinesis.source.reader.fanoutv2; - -import org.apache.flink.connector.kinesis.source.reader.fanout.FanOutKinesisShardSubscription; -import org.apache.flink.connector.kinesis.source.split.StartingPosition; - -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; -import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream; -import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler; - -import java.util.ArrayDeque; -import java.util.Optional; -import java.util.Queue; - -/** - * Implementation of {@link Subscriber} to retrieve events from Kinesis stream using Reactive - * Streams. - */ -class FanOutShardSubscriber implements Subscriber { - - private static final Logger LOG = LoggerFactory.getLogger(FanOutKinesisShardSubscription.class); - private static final int BUFFER_SIZE = 2; - - // these are here just for debugging purposes - private final String consumerArn; - private final String shardId; - private final StartingPosition startingPosition; - - // our buffer - private final Queue eventQueue = new ArrayDeque<>(BUFFER_SIZE); - - // subscription to shard - private Subscription subscription; - - // maybe we can use an enum for this? - private boolean subscribed; - private boolean terminated; - private boolean failed; - private Throwable subscriptionException; - - FanOutShardSubscriber(String consumerArn, String shardId, StartingPosition startingPosition) { - this.consumerArn = consumerArn; - this.shardId = shardId; - this.startingPosition = startingPosition; - } - - boolean isSubscribed() { - return subscribed; - } - - boolean isTerminated() { - return terminated; - } - - boolean isFailed() { - return failed; - } - - Optional getFailure() { - return Optional.ofNullable(subscriptionException); - } - - @Override - public void onSubscribe(Subscription acquiredSubscription) { - LOG.info( - "Successfully subscribed to shard {} at {} using consumer {}.", - shardId, - startingPosition, - consumerArn); - subscription = acquiredSubscription; - subscribed = true; - subscription.request(BUFFER_SIZE); - } - - public void cancel() { - if (terminated) { - LOG.warn("Trying to cancel inactive subscription. Ignoring."); - return; - } - terminated = true; - subscription.cancel(); - } - - @Override - public void onNext(SubscribeToShardEventStream subscribeToShardEventStream) { - subscribeToShardEventStream.accept( - new SubscribeToShardResponseHandler.Visitor() { - @Override - public void visit(SubscribeToShardEvent event) { - LOG.debug( - "Received event: {}, {}", event.getClass().getSimpleName(), event); - eventQueue.add(event); - } - }); - } - - @Override - public void onError(Throwable throwable) { - failed = true; - terminated = true; - subscriptionException = throwable; - } - - @Override - public void onComplete() { - terminated = true; - } - - public SubscribeToShardEvent poll() { - SubscribeToShardEvent event = eventQueue.poll(); - if (event != null && !terminated) { - subscription.request(1); - } - return event; - } -}