Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

When using reactive sagas, duplicate message detector causes consumer to crash on a duplicate #101

Open
rrrship opened this issue Feb 12, 2024 · 2 comments

Comments

@rrrship
Copy link

rrrship commented Feb 12, 2024

This is because the doWithMessage() method inside ReactiveSqlTableBasedDuplicateMessageDetector class runs inside a transaction:

return Mono.defer(() -> isDuplicate(subscriberIdAndMessage)
            .flatMap(dup -> {
              if (dup) return Mono.empty();
              else return Mono.from(processingFlow);
            })).as(transactionalOperator::transactional);

The duplicate exception handling is done correctly in the isDuplicate() method, but the Mono in doWithMessage() will fail because transactional manager will throw an PessimisticLockingFailureException. Adding onErrorResume (DataAccessException...) to this mono as well should fix the issue.

@cer
Copy link
Collaborator

cer commented Feb 20, 2024

Sorry for the delayed reply.
Please can you provide more information about the failure.
I'm not sure what could be causing a PessimisticLockingFailureException. What database are you using? What is the underlying database error that's causing the PessimisticLockingFailureException.

@rrrship
Copy link
Author

rrrship commented Mar 12, 2024

Sorry for the delayed reply. Please can you provide more information about the failure. I'm not sure what could be causing a PessimisticLockingFailureException. What database are you using? What is the underlying database error that's causing the PessimisticLockingFailureException.

We're using Aurora Postgres. The error in database is: 'duplicate key value violates unique constraint "received_messages_pkey"'. It kind of seems like there might be an outer transaction that might be messing with something? Not sure how this could be though because this code is ran before any application code.

Here's some of the stacktrace with personal information removed (I tried to include most important bits, but with the reactive stack it's kind of hard to distinguish them):

java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.springframework.dao.PessimisticLockingFailureException: R2DBC commit; The database returned ROLLBACK, so the transaction cannot be committed. Transaction failure is not known (check server logs?)
	at io.eventuate.messaging.kafka.consumer.MessageConsumerKafkaImpl.handle(MessageConsumerKafkaImpl.java:106) ~[eventuate-messaging-kafka-consumer-0.18.0.RELEASE.jar:na]
	at io.eventuate.messaging.kafka.consumer.MessageConsumerKafkaImpl.lambda$subscribeWithReactiveHandler$1(MessageConsumerKafkaImpl.java:59) ~[eventuate-messaging-kafka-consumer-0.18.0.RELEASE.jar:na]
	at io.eventuate.messaging.kafka.consumer.SwimlaneDispatcher.processQueuedMessage(SwimlaneDispatcher.java:72) ~[eventuate-messaging-kafka-consumer-0.18.0.RELEASE.jar:na]
	at net.personal.info.kafka.CustomReactiveSqlBasedDuplicateMessageDetectorTest$mockExecutorService$2.invoke(CustomReactiveSqlBasedDuplicateMessageDetectorTest.kt:151) ~[test/:na]
	at net.personal.info.kafka.CustomReactiveSqlBasedDuplicateMessageDetectorTest$mockExecutorService$2.invoke(CustomReactiveSqlBasedDuplicateMessageDetectorTest.kt:149) ~[test/:na]
	
Caused by: org.springframework.dao.PessimisticLockingFailureException: R2DBC commit; The database returned ROLLBACK, so the transaction cannot be committed. Transaction failure is not known (check server logs?)
    	at org.springframework.r2dbc.connection.ConnectionFactoryUtils.convertR2dbcException(ConnectionFactoryUtils.java:233) ~[spring-r2dbc-6.1.3.jar:6.1.3]
    	at org.springframework.r2dbc.connection.R2dbcTransactionManager.translateException(R2dbcTransactionManager.java:437) ~[spring-r2dbc-6.1.3.jar:6.1.3]
    	at org.springframework.r2dbc.connection.R2dbcTransactionManager.lambda$doCommit$9(R2dbcTransactionManager.java:307) ~[spring-r2dbc-6.1.3.jar:6.1.3]
    	at reactor.core.publisher.Mono.lambda$onErrorMap$27(Mono.java:3785) ~[reactor-core-3.6.2.jar:3.6.2]
    	at reactor.core.publisher.Mono.lambda$onErrorResume$29(Mono.java:3875) ~[reactor-core-3.6.2.jar:3.6.2]
    	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94) ~[reactor-core-3.6.2.jar:3.6.2]
    	at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:222) ~[reactor-core-3.6.2.jar:3.6.2]
    	at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onError(MonoIgnoreElements.java:84) ~[reactor-core-3.6.2.jar:3.6.2]
    	
Caused by: java.util.concurrent.ExecutionException: org.springframework.dao.PessimisticLockingFailureException: R2DBC commit; The database returned ROLLBACK, so the transaction cannot be committed. Transaction failure is not known (check server logs?)
	at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) ~[na:na]
	at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073) ~[na:na]
	at io.eventuate.messaging.kafka.consumer.MessageConsumerKafkaImpl.handle(MessageConsumerKafkaImpl.java:100) ~[eventuate-messaging-kafka-consumer-0.18.0.RELEASE.jar:na]
	... 25 common frames omitted
    	
Caused by: io.r2dbc.postgresql.ExceptionFactory$PostgresqlRollbackException: The database returned ROLLBACK, so the transaction cannot be committed. Transaction failure is not known (check server logs?)
	at io.r2dbc.postgresql.PostgresqlConnection.lambda$null$3(PostgresqlConnection.java:212) ~[r2dbc-postgresql-1.0.4.RELEASE.jar:1.0.4.RELEASE]
	at reactor.core.publisher.ContextPropagation.lambda$contextRestoreForHandle$2(ContextPropagation.java:173) ~[reactor-core-3.6.2.jar:3.6.2]
	at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:113) ~[reactor-core-3.6.2.jar:3.6.2]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants