Skip to content

Commit

Permalink
Wait on consumer de-registration
Browse files Browse the repository at this point in the history
  • Loading branch information
hlteoh37 committed Nov 3, 2024
1 parent c3b8931 commit f720c9c
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -347,13 +347,11 @@ private StandardRetryStrategy.Builder createExpBackoffRetryStrategyBuilder(
final BackoffStrategy backoffStrategy =
BackoffStrategy.exponentialDelayHalfJitter(initialDelay, maxDelay);

StandardRetryStrategy.Builder retryStrategyBuilder =
SdkDefaultRetryStrategy.standardRetryStrategyBuilder()
.backoffStrategy(backoffStrategy)
.throttlingBackoffStrategy(backoffStrategy)
.circuitBreakerEnabled(false)
.maxAttempts(maxAttempts);

return retryStrategyBuilder;
return SdkDefaultRetryStrategy.standardRetryStrategyBuilder()
.backoffStrategy(backoffStrategy)
.throttlingBackoffStrategy(backoffStrategy)
.circuitBreakerEnabled(false)
.retryOnExceptionOrCauseInstanceOf(LimitExceededException.class)
.maxAttempts(maxAttempts);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,11 @@ public enum ConsumerLifecycle {
.defaultValue(100)
.withDescription(
"Maximum number of attempts for the exponential backoff retry strategy");

public static final ConfigOption<Duration> EFO_DEREGISTER_CONSUMER_TIMEOUT =
ConfigOptions.key("efo.consumer.deregister.timeout")
.durationType()
.defaultValue(Duration.ofMillis(10000))
.withDescription(
"Timeout for consumer deregistration. When timeout is reached, code will continue as per normal.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,18 @@

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.arns.Arn;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamConsumerResponse;
import software.amazon.awssdk.services.kinesis.model.RegisterStreamConsumerResponse;
import software.amazon.awssdk.services.kinesis.model.ResourceInUseException;
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;

import java.time.Instant;

import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.ConsumerLifecycle.JOB_MANAGED;
import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.EFO_CONSUMER_LIFECYCLE;
import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.EFO_CONSUMER_NAME;
import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.EFO_DEREGISTER_CONSUMER_TIMEOUT;
import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.READER_TYPE;
import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.ReaderType.EFO;

Expand Down Expand Up @@ -102,10 +107,7 @@ public void registerStreamConsumer() {
}
}

/**
* De-registers stream consumer from specified stream, if needed. This method does not wait for
* the consumer to be deregistered.
*/
/** De-registers stream consumer from specified stream, if needed. */
public void deregisterStreamConsumer() {
if (sourceConfig.get(READER_TYPE) == EFO
&& sourceConfig.get(EFO_CONSUMER_LIFECYCLE) == JOB_MANAGED) {
Expand All @@ -117,6 +119,26 @@ public void deregisterStreamConsumer() {
}
kinesisStreamProxy.deregisterStreamConsumer(consumerArn);
LOG.info("De-registered stream consumer - {}", consumerArn);

Instant timeout = Instant.now().plus(sourceConfig.get(EFO_DEREGISTER_CONSUMER_TIMEOUT));
String consumerName = Arn.fromString(consumerArn).resourceAsString();
while (Instant.now().isBefore(timeout)) {
try {
DescribeStreamConsumerResponse response =
kinesisStreamProxy.describeStreamConsumer(streamArn, consumerName);
LOG.info(
"Waiting for stream consumer to be deregistered - {} {} {}",
streamArn,
consumerName,
response.consumerDescription().consumerStatusAsString());

} catch (ResourceNotFoundException e) {
LOG.info("Stream consumer {} has been deregistered", consumerArn);
return;
}
}
LOG.warn(
"Timed out waiting for stream consumer to be deregistered. There may be leaked EFO consumers on the Kinesis stream.");
}
}
}

0 comments on commit f720c9c

Please sign in to comment.