diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java index 16dac30bc..b6e6dc1dd 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java @@ -174,26 +174,24 @@ public void executeLifecycle() { if (isShutdownRequested()) { stateChangeFuture = shutdownComplete(); } else if (needsInitialization) { - if (stateChangeFuture != null) { - if (stateChangeFuture.get()) { - // Task rejection during the subscribe() call will not be propagated back as it not executed - // in the context of the Scheduler thread. Hence we should not assume the subscription will - // always be successful. - // But if subscription was not successful, then it will recover - // during healthCheck which will restart subscription. - // From Shardconsumer point of view, initialization after the below subscribe call - // is complete - subscribe(); - needsInitialization = false; - // Initialization is complete, return now, because we dont need to do - // initializeComplete anymore. ShardConsumer is in ProcessingState now and any further activity - // will be driven by publisher pushing data to subscriber which invokes handleInput - // and that triggers ProcessTask. Scheduler is only meant to do health-checks - // to ensure the consumer is not stuck for any reason and to do shutdown handling. - return; - } + if (stateChangeFuture != null && stateChangeFuture.get()) { + // Task rejection during the subscribe() call will not be propagated back as it not executed + // in the context of the Scheduler thread. Hence we should not assume the subscription will + // always be successful. + // But if subscription was not successful, then it will recover + // during healthCheck which will restart subscription. + // From Shardconsumer point of view, initialization after the below subscribe call + // is complete + subscribe(); + needsInitialization = false; + // Initialization is complete, we don't need to do initializeComplete anymore. + // ShardConsumer is already in ProcessingState and any further activity + // will be driven by publisher pushing data to subscriber which invokes handleInput + // and that triggers ProcessTask. Scheduler is only meant to do health-checks + // to ensure the consumer is not stuck for any reason and to do shutdown handling. + } else { + stateChangeFuture = initializeComplete(); } - stateChangeFuture = initializeComplete(); } } catch (InterruptedException e) { //