diff --git a/.vscode/launch.json b/.vscode/launch.json index 4d8a521d8..7d7746e98 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -170,6 +170,16 @@ "projectName": "multidatasource-multitenancy", "args": "", "envFile": "${workspaceFolder}/.env" + }, + { + "type": "java", + "name": "Spring Boot-TestMongoESApplication", + "request": "launch", + "cwd": "${workspaceFolder}", + "mainClass": "com.example.mongoes.TestMongoESApplication", + "projectName": "boot-mongodb-elasticsearch", + "args": "", + "envFile": "${workspaceFolder}/.env" } ] } \ No newline at end of file diff --git a/boot-mongodb-elasticsearch/src/main/java/com/example/mongoes/config/GlobalExceptionHandler.java b/boot-mongodb-elasticsearch/src/main/java/com/example/mongoes/config/GlobalExceptionHandler.java new file mode 100644 index 000000000..b70d45cab --- /dev/null +++ b/boot-mongodb-elasticsearch/src/main/java/com/example/mongoes/config/GlobalExceptionHandler.java @@ -0,0 +1,18 @@ +package com.example.mongoes.config; + +import com.example.mongoes.response.GenericMessage; +import com.example.mongoes.web.exception.DuplicateRestaurantException; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.ExceptionHandler; +import org.springframework.web.bind.annotation.RestControllerAdvice; + +@RestControllerAdvice +public class GlobalExceptionHandler { + + @ExceptionHandler(DuplicateRestaurantException.class) + public ResponseEntity handleDuplicateRestaurantException( + DuplicateRestaurantException ex) { + return ResponseEntity.status(HttpStatus.CONFLICT).body(new GenericMessage(ex.getMessage())); + } +} diff --git a/boot-mongodb-elasticsearch/src/main/java/com/example/mongoes/utils/DateUtility.java b/boot-mongodb-elasticsearch/src/main/java/com/example/mongoes/utils/DateUtility.java index 7d8216185..ff36143f5 100644 --- a/boot-mongodb-elasticsearch/src/main/java/com/example/mongoes/utils/DateUtility.java +++ b/boot-mongodb-elasticsearch/src/main/java/com/example/mongoes/utils/DateUtility.java @@ -5,7 +5,17 @@ import java.util.Date; public class DateUtility { + + /** + * Converts a {@link Date} to {@link LocalDateTime} using the system default timezone. + * + * @param dateToConvert the date to convert, can be null + * @return the converted {@link LocalDateTime} or null if input is null + */ public static LocalDateTime convertToLocalDateViaInstant(Date dateToConvert) { + if (dateToConvert == null) { + return null; + } return dateToConvert.toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime(); } } diff --git a/boot-mongodb-elasticsearch/src/main/java/com/example/mongoes/web/controller/RestaurantController.java b/boot-mongodb-elasticsearch/src/main/java/com/example/mongoes/web/controller/RestaurantController.java index 40ccdc8bc..caede0a4c 100644 --- a/boot-mongodb-elasticsearch/src/main/java/com/example/mongoes/web/controller/RestaurantController.java +++ b/boot-mongodb-elasticsearch/src/main/java/com/example/mongoes/web/controller/RestaurantController.java @@ -9,6 +9,8 @@ import jakarta.validation.Valid; import jakarta.validation.constraints.Size; import java.net.URI; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; import org.springframework.data.elasticsearch.core.SearchPage; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.GetMapping; @@ -77,18 +79,24 @@ public Mono> addNotesToRestaurant( } @PostMapping - public ResponseEntity createRestaurant( - @RequestBody RestaurantRequest restaurantRequest) { - return ResponseEntity.created( - URI.create( - String.format( - "/restaurant/%s", - this.restaurantService - .createRestaurant(restaurantRequest) - .map(Restaurant::getName)))) - .body( - new GenericMessage( - "restaurant with name %s created" - .formatted(restaurantRequest.name()))); + public Mono> createRestaurant( + @RequestBody @Valid RestaurantRequest restaurantRequest) { + return this.restaurantService + .createRestaurant(restaurantRequest) + .map( + restaurant -> + ResponseEntity.created( + URI.create( + String.format( + "/api/restaurant/name/%s", + URLEncoder.encode( + restaurantRequest.name(), + StandardCharsets.UTF_8)))) + .body( + new GenericMessage( + "restaurant with name %s created" + .formatted( + restaurantRequest + .name())))); } } diff --git a/boot-mongodb-elasticsearch/src/main/java/com/example/mongoes/web/controller/SearchController.java b/boot-mongodb-elasticsearch/src/main/java/com/example/mongoes/web/controller/SearchController.java index d9871708f..388c4afe4 100644 --- a/boot-mongodb-elasticsearch/src/main/java/com/example/mongoes/web/controller/SearchController.java +++ b/boot-mongodb-elasticsearch/src/main/java/com/example/mongoes/web/controller/SearchController.java @@ -5,6 +5,15 @@ import com.example.mongoes.response.ResultData; import com.example.mongoes.web.service.SearchService; import io.micrometer.core.annotation.Timed; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.Parameter; +import io.swagger.v3.oas.annotations.media.Content; +import io.swagger.v3.oas.annotations.media.Schema; +import io.swagger.v3.oas.annotations.responses.ApiResponse; +import jakarta.validation.constraints.Max; +import jakarta.validation.constraints.Min; +import jakarta.validation.constraints.Pattern; +import jakarta.validation.constraints.Positive; import java.util.List; import org.springframework.data.elasticsearch.core.SearchPage; import org.springframework.http.ResponseEntity; @@ -145,12 +154,43 @@ public Mono> aggregateSearch( .map(ResponseEntity::ok); } + @Operation( + summary = "Search restaurants within range", + description = "Find restaurants within specified distance from given coordinates", + responses = { + @ApiResponse( + responseCode = "200", + description = "Successfully retrieved restaurants", + content = @Content(schema = @Schema(implementation = ResultData.class))), + @ApiResponse(responseCode = "400", description = "Invalid parameters provided") + }) @GetMapping("/search/restaurant/withInRange") public Flux searchRestaurantsWithInRange( - @RequestParam Double lat, - @RequestParam Double lon, - @RequestParam Double distance, - @RequestParam(defaultValue = "km", required = false) String unit) { + @Parameter( + description = "Latitude coordinate (between -90 and 90)", + example = "40.7128") + @RequestParam + @Min(-90) + @Max(90) + Double lat, + @Parameter( + description = "Longitude coordinate (between -180 and 180)", + example = "-74.0060") + @RequestParam + @Min(-180) + @Max(180) + Double lon, + @Parameter(description = "Distance from coordinates (must be positive)") + @RequestParam + @Positive + Double distance, + @Parameter( + description = "Unit of distance", + example = "km", + schema = @Schema(allowableValues = {"km", "mi"})) + @RequestParam(defaultValue = "km", required = false) + @Pattern(regexp = "^(km|mi)$", message = "Unit must be either 'km' or 'mi'") + String unit) { return this.searchService.searchRestaurantsWithInRange(lat, lon, distance, unit); } } diff --git a/boot-mongodb-elasticsearch/src/main/java/com/example/mongoes/web/exception/DuplicateRestaurantException.java b/boot-mongodb-elasticsearch/src/main/java/com/example/mongoes/web/exception/DuplicateRestaurantException.java new file mode 100644 index 000000000..fadc4ce82 --- /dev/null +++ b/boot-mongodb-elasticsearch/src/main/java/com/example/mongoes/web/exception/DuplicateRestaurantException.java @@ -0,0 +1,8 @@ +package com.example.mongoes.web.exception; + +public class DuplicateRestaurantException extends RuntimeException { + + public DuplicateRestaurantException(String message) { + super(message); + } +} diff --git a/boot-mongodb-elasticsearch/src/main/java/com/example/mongoes/web/service/AggregationProcessor.java b/boot-mongodb-elasticsearch/src/main/java/com/example/mongoes/web/service/AggregationProcessor.java new file mode 100644 index 000000000..9f4c341c1 --- /dev/null +++ b/boot-mongodb-elasticsearch/src/main/java/com/example/mongoes/web/service/AggregationProcessor.java @@ -0,0 +1,76 @@ +package com.example.mongoes.web.service; + +import co.elastic.clients.elasticsearch._types.aggregations.Aggregate; +import java.util.HashMap; +import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.data.elasticsearch.client.elc.ElasticsearchAggregation; +import org.springframework.stereotype.Service; + +/** + * Processes Elasticsearch aggregations and transforms them into a structured map format. Supports + * 'terms' and 'dateRange' aggregation types. + * + *

Example output format: { "termAggregation": {"term1": 10, "term2": 20}, + * "dateRangeAggregation": {"2023-01-01 - 2023-12-31": 100} } + */ +@Service +class AggregationProcessor { + + private static final Logger log = LoggerFactory.getLogger(AggregationProcessor.class); + + /** + * Processes Elasticsearch aggregations and returns a structured map of results. + * + * @param aggregationMap Map of aggregation key to ElasticsearchAggregation + * @return Map of aggregation key to counts, where counts is a map of bucket key to document + * count + * @throws IllegalArgumentException if aggregationMap is null + */ + public Map> processAggregations( + Map aggregationMap) { + if (aggregationMap == null) { + throw new IllegalArgumentException("aggregationMap must not be null"); + } + Map> resultMap = new HashMap<>(); + aggregationMap.forEach( + (String aggregateKey, ElasticsearchAggregation aggregation) -> { + Map countMap = new HashMap<>(); + Aggregate aggregate = aggregation.aggregation().getAggregate(); + processAggregate(aggregate, countMap); + resultMap.put(aggregateKey, countMap); + }); + return resultMap; + } + + private void processAggregate(Aggregate aggregate, Map countMap) { + if (aggregate.isSterms()) { + processTermsAggregate(aggregate, countMap); + } else if (aggregate.isDateRange()) { + processDateRangeAggregate(aggregate, countMap); + } else { + log.debug( + "Unsupported aggregation type encountered: {}", + aggregate.getClass().getSimpleName()); + } + } + + private void processTermsAggregate(Aggregate aggregate, Map countMap) { + aggregate + .sterms() + .buckets() + .array() + .forEach(bucket -> countMap.put(bucket.key().stringValue(), bucket.docCount())); + } + + private void processDateRangeAggregate(Aggregate aggregate, Map countMap) { + aggregate.dateRange().buckets().array().stream() + .filter(bucket -> bucket.docCount() != 0) + .forEach( + bucket -> + countMap.put( + bucket.fromAsString() + " - " + bucket.toAsString(), + bucket.docCount())); + } +} diff --git a/boot-mongodb-elasticsearch/src/main/java/com/example/mongoes/web/service/RestaurantService.java b/boot-mongodb-elasticsearch/src/main/java/com/example/mongoes/web/service/RestaurantService.java index b6f64500d..76a910c8f 100644 --- a/boot-mongodb-elasticsearch/src/main/java/com/example/mongoes/web/service/RestaurantService.java +++ b/boot-mongodb-elasticsearch/src/main/java/com/example/mongoes/web/service/RestaurantService.java @@ -9,14 +9,14 @@ import com.example.mongoes.mongodb.repository.RestaurantRepository; import com.example.mongoes.utils.AppConstants; import com.example.mongoes.utils.DateUtility; +import com.example.mongoes.web.exception.DuplicateRestaurantException; import com.example.mongoes.web.model.RestaurantRequest; import com.mongodb.client.model.changestream.ChangeStreamDocument; import com.mongodb.client.model.changestream.OperationType; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; +import java.nio.charset.StandardCharsets; import java.time.Duration; import java.time.Instant; +import java.util.Arrays; import java.util.Date; import java.util.List; import java.util.Objects; @@ -25,7 +25,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.core.io.ClassPathResource; -import org.springframework.core.io.Resource; +import org.springframework.core.io.buffer.DataBufferUtils; +import org.springframework.core.io.buffer.DefaultDataBufferFactory; import org.springframework.data.domain.PageRequest; import org.springframework.data.domain.Pageable; import org.springframework.data.domain.Sort; @@ -38,6 +39,7 @@ import org.springframework.transaction.annotation.Transactional; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; @Service public class RestaurantService { @@ -59,46 +61,54 @@ public RestaurantService( this.reactiveMongoTemplate = reactiveMongoTemplate; } - public Flux loadData() throws IOException { - Resource input = new ClassPathResource("restaurants.json"); - Path path = input.getFile().toPath(); - var restaurantArray = Files.readAllLines(path); - return this.saveAll(restaurantArray); + public Flux loadData() { + return DataBufferUtils.join( + DataBufferUtils.read( + new ClassPathResource("restaurants.json"), + new DefaultDataBufferFactory(), + 4096)) + .map( + dataBuffer -> { + byte[] bytes = new byte[dataBuffer.readableByteCount()]; + dataBuffer.read(bytes); + DataBufferUtils.release(dataBuffer); + return new String(bytes, StandardCharsets.UTF_8); + }) + .flatMapMany( + fileContent -> { + List restaurantArray = Arrays.asList(fileContent.split("\n")); + return this.saveAll(restaurantArray); + }); } private Flux saveAll(List restaurantStringList) { - List restaurantList = - restaurantStringList.stream() - .map(Document::parse) - .map( - document -> { - Restaurant restaurant = new Restaurant(); - restaurant.setRestaurantId( - Long.valueOf( - document.get("restaurant_id", String.class))); - restaurant.setName(document.get("name", String.class)); - restaurant.setCuisine(document.get("cuisine", String.class)); - restaurant.setBorough(document.get("borough", String.class)); - Address address = new Address(); - Document addressDoc = (Document) document.get("address"); - address.setBuilding(addressDoc.get("building", String.class)); - address.setStreet(addressDoc.get("street", String.class)); - address.setZipcode( - Integer.valueOf( - addressDoc.get("zipcode", String.class))); - List obj = addressDoc.getList("coord", Double.class); - Point geoJsonPoint = new Point(obj.getFirst(), obj.get(1)); - address.setLocation(geoJsonPoint); - restaurant.setAddress(address); - List gradesList = - getGradesList( - document.getList("grades", Document.class)); - restaurant.setGrades(gradesList); - - return restaurant; - }) - .toList(); - return restaurantRepository.saveAll(restaurantList); + return Flux.fromIterable(restaurantStringList) + .map(Document::parse) + .map(this::documentToRestaurant) + .flatMap(restaurantRepository::save); + } + + private Restaurant documentToRestaurant(Document document) { + Restaurant restaurant = new Restaurant(); + restaurant.setRestaurantId(Long.valueOf(document.get("restaurant_id", String.class))); + restaurant.setName(document.get("name", String.class)); + restaurant.setCuisine(document.get("cuisine", String.class)); + restaurant.setBorough(document.get("borough", String.class)); + + Address address = new Address(); + Document addressDoc = document.get("address", Document.class); + address.setBuilding(addressDoc.get("building", String.class)); + address.setStreet(addressDoc.get("street", String.class)); + address.setZipcode(Integer.valueOf(addressDoc.get("zipcode", String.class))); + List coord = addressDoc.getList("coord", Double.class); + Point geoJsonPoint = new Point(coord.get(0), coord.get(1)); + address.setLocation(geoJsonPoint); + restaurant.setAddress(address); + + List gradesList = getGradesList(document.getList("grades", Document.class)); + restaurant.setGrades(gradesList); + + return restaurant; } private List getGradesList(List gradeDocumentList) { @@ -150,6 +160,7 @@ public Flux> changeStreamProcessor() { .resumeAt(getChangeStreamOption()) .listen() .delayElements(Duration.ofMillis(5)) + .publishOn(Schedulers.boundedElastic()) .doOnNext( restaurantChangeStreamEvent -> { log.info( @@ -218,9 +229,17 @@ public Mono> findAllRestaurants(int offset, int limit) { return this.restaurantESRepository.findAll(pageable); } - public Mono createRestaurant(RestaurantRequest restaurantRequest) { - - return save(restaurantRequest.toRestaurant()); + public Mono createRestaurant(RestaurantRequest restaurantRequest) { + return restaurantESRepository + .findByName(restaurantRequest.name()) + .flatMap( + existingRestaurant -> + Mono.error( + new DuplicateRestaurantException( + "Restaurant with name " + + restaurantRequest.name() + + " already exists"))) + .switchIfEmpty(restaurantRepository.save(restaurantRequest.toRestaurant())); } public Mono deleteAll() { diff --git a/boot-mongodb-elasticsearch/src/main/java/com/example/mongoes/web/service/SearchService.java b/boot-mongodb-elasticsearch/src/main/java/com/example/mongoes/web/service/SearchService.java index e65779c04..ac65c8e10 100644 --- a/boot-mongodb-elasticsearch/src/main/java/com/example/mongoes/web/service/SearchService.java +++ b/boot-mongodb-elasticsearch/src/main/java/com/example/mongoes/web/service/SearchService.java @@ -1,7 +1,5 @@ package com.example.mongoes.web.service; -import co.elastic.clients.elasticsearch._types.aggregations.Aggregate; -import co.elastic.clients.elasticsearch._types.aggregations.RangeBucket; import com.example.mongoes.document.Restaurant; import com.example.mongoes.elasticsearch.repository.RestaurantESRepository; import com.example.mongoes.response.AggregationSearchResponse; @@ -9,11 +7,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.function.Function; import org.springframework.data.domain.PageRequest; import org.springframework.data.domain.Pageable; import org.springframework.data.domain.Sort; -import org.springframework.data.elasticsearch.client.elc.ElasticsearchAggregation; import org.springframework.data.elasticsearch.client.elc.ElasticsearchAggregations; import org.springframework.data.elasticsearch.core.SearchPage; import org.springframework.data.elasticsearch.core.geo.GeoPoint; @@ -26,9 +22,13 @@ public class SearchService { private final RestaurantESRepository restaurantESRepository; + private final AggregationProcessor aggregationProcessor; - public SearchService(RestaurantESRepository restaurantESRepository) { + public SearchService( + RestaurantESRepository restaurantESRepository, + AggregationProcessor aggregationProcessor) { this.restaurantESRepository = restaurantESRepository; + this.aggregationProcessor = aggregationProcessor; } public Mono> searchMatchBorough(String query, Integer offset, Integer limit) { @@ -123,7 +123,7 @@ public Mono aggregateSearch( Map> map = new HashMap<>(); if (elasticsearchAggregations != null) { map = - aggregationFunction.apply( + aggregationProcessor.processAggregations( elasticsearchAggregations.aggregationsAsMap()); } return new AggregationSearchResponse( @@ -135,47 +135,6 @@ public Mono aggregateSearch( }); } - final Function, Map>> - aggregationFunction = - aggregationMap -> { - Map> resultMap = new HashMap<>(); - aggregationMap.forEach( - (String aggregateKey, ElasticsearchAggregation aggregation) -> { - Map countMap = new HashMap<>(); - Aggregate aggregate = aggregation.aggregation().getAggregate(); - if (aggregate.isSterms()) { - aggregate - .sterms() - .buckets() - .array() - .forEach( - stringTermsBucket -> - countMap.put( - stringTermsBucket - .key() - .stringValue(), - stringTermsBucket - .docCount())); - } else if (aggregate.isDateRange()) { - List bucketList = - aggregate.dateRange().buckets().array(); - bucketList.forEach( - rangeBucket -> { - if (rangeBucket.docCount() != 0) { - countMap.put( - rangeBucket.fromAsString() - + " - " - + rangeBucket.toAsString(), - rangeBucket.docCount()); - } - }); - } - resultMap.put(aggregateKey, countMap); - }); - - return resultMap; - }; - public Flux searchRestaurantsWithInRange( Double lat, Double lon, Double distance, String unit) { GeoPoint location = new GeoPoint(lat, lon);