diff --git a/.vscode/launch.json b/.vscode/launch.json index 85e93280f..5832099dc 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -4,6 +4,16 @@ // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 "version": "0.2.0", "configurations": [ + { + "type": "java", + "name": "Spring Boot-Application", + "request": "launch", + "cwd": "${workspaceFolder}", + "mainClass": "com.example.cache.Application", + "projectName": "boot-reactive-cache", + "args": "", + "envFile": "${workspaceFolder}/.env" + }, { "type": "java", "name": "Spring Boot-Application", diff --git a/r2dbc/boot-reactive-cache/README.md b/r2dbc/boot-reactive-cache/README.md index d58958980..02a77c184 100644 --- a/r2dbc/boot-reactive-cache/README.md +++ b/r2dbc/boot-reactive-cache/README.md @@ -31,3 +31,4 @@ You can also run the application using Maven as follows: ### Useful Links * Swagger UI: http://localhost:8080/swagger-ui.html * Actuator Endpoint: http://localhost:8080/actuator +* Redis Insights: http://localhost:8001 diff --git a/r2dbc/boot-reactive-cache/src/main/java/com/example/cache/config/Initializer.java b/r2dbc/boot-reactive-cache/src/main/java/com/example/cache/config/Initializer.java index e33b39717..24054e5f6 100644 --- a/r2dbc/boot-reactive-cache/src/main/java/com/example/cache/config/Initializer.java +++ b/r2dbc/boot-reactive-cache/src/main/java/com/example/cache/config/Initializer.java @@ -1,30 +1,33 @@ package com.example.cache.config; import com.example.cache.entities.Movie; -import com.example.cache.repositories.MovieRepository; +import com.example.cache.services.MovieService; +import com.example.cache.utils.AppConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.ApplicationRunner; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Profile; import reactor.core.publisher.Flux; @Configuration(proxyBeanMethods = false) +@Profile(AppConstants.PROFILE_NOT_TEST) class Initializer { private static final Logger log = LoggerFactory.getLogger(Initializer.class); @Bean - public ApplicationRunner saveMovies(MovieRepository repository) { + public ApplicationRunner saveMovies(MovieService movieService) { Flux movies = Flux.just( new Movie(null, "DJ Tillu"), new Movie(null, "Tillu Square"), new Movie(null, " Om Bheem Bush"), new Movie(null, "Aa Okkati Adakku"), new Movie(null, " Bhimaa")); - return args -> repository + return args -> movieService .deleteAll() - .thenMany(repository.saveAll(movies)) + .thenMany(movieService.saveAllMovies(movies)) .subscribe(movie -> log.info(movie.toString())); } } diff --git a/r2dbc/boot-reactive-cache/src/main/java/com/example/cache/services/MovieService.java b/r2dbc/boot-reactive-cache/src/main/java/com/example/cache/services/MovieService.java index 11e06a2de..5bbfb2d24 100644 --- a/r2dbc/boot-reactive-cache/src/main/java/com/example/cache/services/MovieService.java +++ b/r2dbc/boot-reactive-cache/src/main/java/com/example/cache/services/MovieService.java @@ -6,11 +6,14 @@ import com.example.cache.model.response.MovieResponse; import com.example.cache.repositories.MovieRepository; import com.example.cache.utils.AppConstants; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.ReactiveRedisTemplate; +import org.springframework.data.redis.core.ReactiveValueOperations; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; @Service @Transactional(readOnly = true) @@ -19,7 +22,9 @@ public class MovieService { private final MovieRepository movieRepository; private final MovieMapper movieMapper; private final ReactiveRedisTemplate reactiveRedisTemplate; + private final ReactiveValueOperations stringMovieReactiveValueOperations; + @Autowired public MovieService( MovieRepository movieRepository, MovieMapper movieMapper, @@ -27,47 +32,63 @@ public MovieService( this.movieRepository = movieRepository; this.movieMapper = movieMapper; this.reactiveRedisTemplate = reactiveRedisTemplate; + this.stringMovieReactiveValueOperations = reactiveRedisTemplate.opsForValue(); } public Flux findAll() { return reactiveRedisTemplate - .keys("movie:*") + .keys(AppConstants.MOVIE_KEY + "*") // Fetching cached movies. - .flatMap(key -> reactiveRedisTemplate.opsForValue().get(key)) + .flatMap(stringMovieReactiveValueOperations::get) // If cache is empty, fetch the database for movies .switchIfEmpty(movieRepository .findAll() // Persisting the fetched movies in the cache. - .flatMap(movie -> - reactiveRedisTemplate.opsForValue().set(AppConstants.MOVIE_KEY + movie.id(), movie)) - // Fetching the movies from the updated cache. - .thenMany(reactiveRedisTemplate - .keys(AppConstants.MOVIE_KEY + "*") - .flatMap(key -> - reactiveRedisTemplate.opsForValue().get(key)))) + .doOnNext( + movie -> Mono.defer(() -> stringMovieReactiveValueOperations.set( + AppConstants.MOVIE_KEY + movie.id(), movie)) + .subscribeOn(Schedulers.boundedElastic()) + .subscribe() // Asynchronously update the cache + )) .map(movieMapper::toResponse); } public Mono findMovieById(Long id) { - return reactiveRedisTemplate - .opsForValue() + return stringMovieReactiveValueOperations .get(AppConstants.MOVIE_KEY + id) .switchIfEmpty(movieRepository .findById(id) - .flatMap(movie -> - reactiveRedisTemplate.opsForValue().set(AppConstants.MOVIE_KEY + movie.id(), movie)) - .then(reactiveRedisTemplate.opsForValue().get(AppConstants.MOVIE_KEY + id))) + .doOnNext( + movie -> Mono.defer(() -> stringMovieReactiveValueOperations.set( + AppConstants.MOVIE_KEY + movie.id(), movie)) + .subscribeOn(Schedulers.boundedElastic()) + .subscribe() // Asynchronously update the cache + )) .map(movieMapper::toResponse); } + @Transactional + public Flux saveAllMovies(Flux movieFlux) { + return movieRepository + .saveAll(movieFlux) + .doOnNext( + movie -> Mono.defer(() -> stringMovieReactiveValueOperations.set( + AppConstants.MOVIE_KEY + movie.id(), movie)) + .subscribeOn(Schedulers.boundedElastic()) + .subscribe() // Asynchronously update the cache + ); + } + @Transactional public Mono saveMovie(MovieRequest movieRequest) { return Mono.just(movieMapper.toEntity(movieRequest)) .flatMap(movieRepository::save) - .flatMap(movie -> reactiveRedisTemplate - .opsForValue() - .set(AppConstants.MOVIE_KEY + movie.id(), movie) - .then(reactiveRedisTemplate.opsForValue().get(AppConstants.MOVIE_KEY + movie.id()))) + .doOnNext( + movie -> Mono.defer(() -> stringMovieReactiveValueOperations.set( + AppConstants.MOVIE_KEY + movie.id(), movie)) + .subscribeOn(Schedulers.boundedElastic()) + .subscribe() // Asynchronously update the cache + ) .map(movieMapper::toResponse); } @@ -77,10 +98,12 @@ public Mono updateMovie(Long id, MovieRequest movieRequest) { .findById(id) .map(movie -> movieMapper.mapMovieWithRequest(movie, movieRequest)) .flatMap(movieRepository::save) - .flatMap(movie -> reactiveRedisTemplate - .opsForValue() - .set(AppConstants.MOVIE_KEY + movie.id(), movie) - .then(reactiveRedisTemplate.opsForValue().get(AppConstants.MOVIE_KEY + movie.id()))) + .publishOn(Schedulers.boundedElastic()) + .doOnNext( + movie -> Mono.defer(() -> stringMovieReactiveValueOperations.set( + AppConstants.MOVIE_KEY + movie.id(), movie)) + .subscribe() // Asynchronously update the cache + ) .map(movieMapper::toResponse); } @@ -88,4 +111,16 @@ public Mono updateMovie(Long id, MovieRequest movieRequest) { public Mono deleteMovieById(Long id) { return movieRepository.deleteById(id).then(reactiveRedisTemplate.delete(AppConstants.MOVIE_KEY + id)); } + + @Transactional + public Mono deleteAll() { + return movieRepository + .deleteAll() + .then(reactiveRedisTemplate + .getConnectionFactory() + .getReactiveConnection() + .serverCommands() + .flushDb() + .single()); + } } diff --git a/r2dbc/boot-reactive-cache/src/main/java/com/example/cache/utils/AppConstants.java b/r2dbc/boot-reactive-cache/src/main/java/com/example/cache/utils/AppConstants.java index c8f425133..b10d48dd8 100644 --- a/r2dbc/boot-reactive-cache/src/main/java/com/example/cache/utils/AppConstants.java +++ b/r2dbc/boot-reactive-cache/src/main/java/com/example/cache/utils/AppConstants.java @@ -2,6 +2,7 @@ public final class AppConstants { public static final String PROFILE_TEST = "test"; + public static final String PROFILE_NOT_TEST = "!" + PROFILE_TEST; public static final String MOVIE_KEY = "movie:"; private static final String PROFILE_PROD = "prod"; public static final String PROFILE_NOT_PROD = "!" + PROFILE_PROD;