diff --git a/aws-kinesis-project/consumer/src/main/java/com/learning/aws/spring/consumer/IpConsumer.java b/aws-kinesis-project/consumer/src/main/java/com/learning/aws/spring/consumer/IpConsumer.java index 07b738b3..46d8bd37 100644 --- a/aws-kinesis-project/consumer/src/main/java/com/learning/aws/spring/consumer/IpConsumer.java +++ b/aws-kinesis-project/consumer/src/main/java/com/learning/aws/spring/consumer/IpConsumer.java @@ -17,6 +17,7 @@ import reactor.core.publisher.Mono; import reactor.core.publisher.ParallelFlux; import reactor.core.scheduler.Schedulers; +import reactor.util.retry.Retry; import software.amazon.kinesis.retrieval.KinesisClientRecord; @Service @@ -73,7 +74,18 @@ public ParallelFlux process(KinesisClientRecord kinesisClientRec Duration.ofSeconds( applicationProperties .getEventProcessingDelaySeconds())) // Adds artificial latency - .flatMap(ipAddressEventRepository::save); + .flatMap(ipAddressEventRepository::save) + .retryWhen( + Retry.backoff( + 3, + Duration.ofSeconds( + 1))) // Retry strategy for save errors + .onErrorContinue( + (throwable, obj) -> + log.error( + "Error processing record: {}", + obj, + throwable)); // Continue on error }); } }