Skip to content

Commit

Permalink
feat : adds retry and continue on error
Browse files Browse the repository at this point in the history
  • Loading branch information
rajadilipkolli committed Mar 19, 2024
1 parent b09b508 commit 781c505
Showing 1 changed file with 13 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import reactor.core.publisher.Mono;
import reactor.core.publisher.ParallelFlux;
import reactor.core.scheduler.Schedulers;
import reactor.util.retry.Retry;
import software.amazon.kinesis.retrieval.KinesisClientRecord;

@Service
Expand Down Expand Up @@ -73,7 +74,18 @@ public ParallelFlux<IpAddressEvent> process(KinesisClientRecord kinesisClientRec
Duration.ofSeconds(
applicationProperties
.getEventProcessingDelaySeconds())) // Adds artificial latency
.flatMap(ipAddressEventRepository::save);
.flatMap(ipAddressEventRepository::save)
.retryWhen(
Retry.backoff(
3,
Duration.ofSeconds(
1))) // Retry strategy for save errors
.onErrorContinue(
(throwable, obj) ->
log.error(
"Error processing record: {}",
obj,
throwable)); // Continue on error
});
}
}

0 comments on commit 781c505

Please sign in to comment.