diff --git a/aws-kinesis-project/consumer/README.md b/aws-kinesis-project/consumer/README.md index f9cb4990..14325e1c 100644 --- a/aws-kinesis-project/consumer/README.md +++ b/aws-kinesis-project/consumer/README.md @@ -13,6 +13,7 @@ $ ./mvnw spring-boot:run -Dspring-boot.run.profiles=local - Starting 4.0.0 kinesis steam binder works with aws v2 of dynamodb, cloudwatch and kinesis - The maximum size of a data blob (the data payload before Base64-encoding) is 1 megabyte (MB). + - Set DEFAULT_BOUNDED_ELASTIC_ON_VIRTUAL_THREADS=true for using virtual threads ### Useful Links diff --git a/aws-kinesis-project/consumer/src/main/java/com/learning/aws/spring/config/ApplicationProperties.java b/aws-kinesis-project/consumer/src/main/java/com/learning/aws/spring/config/ApplicationProperties.java index 34243892..9b2870f8 100644 --- a/aws-kinesis-project/consumer/src/main/java/com/learning/aws/spring/config/ApplicationProperties.java +++ b/aws-kinesis-project/consumer/src/main/java/com/learning/aws/spring/config/ApplicationProperties.java @@ -8,6 +8,8 @@ public class ApplicationProperties { @NestedConfigurationProperty private Cors cors = new Cors(); + long eventProcessingDelaySeconds; + public Cors getCors() { return cors; } @@ -17,6 +19,14 @@ public ApplicationProperties setCors(Cors cors) { return this; } + public long getEventProcessingDelaySeconds() { + return eventProcessingDelaySeconds; + } + + public void setEventProcessingDelaySeconds(long eventProcessingDelaySeconds) { + this.eventProcessingDelaySeconds = eventProcessingDelaySeconds; + } + public static class Cors { private String pathPattern = "/api/**"; private String allowedMethods = "*"; diff --git a/aws-kinesis-project/consumer/src/main/java/com/learning/aws/spring/consumer/IpConsumer.java b/aws-kinesis-project/consumer/src/main/java/com/learning/aws/spring/consumer/IpConsumer.java index f4988951..74eb1c8a 100644 --- a/aws-kinesis-project/consumer/src/main/java/com/learning/aws/spring/consumer/IpConsumer.java +++ b/aws-kinesis-project/consumer/src/main/java/com/learning/aws/spring/consumer/IpConsumer.java @@ -3,11 +3,11 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import com.learning.aws.spring.config.ApplicationProperties; import com.learning.aws.spring.entities.IpAddressEvent; import com.learning.aws.spring.model.IpAddressDTO; import com.learning.aws.spring.repository.IpAddressEventRepository; import java.time.Duration; -import java.time.LocalDateTime; import java.util.List; import java.util.function.Consumer; import org.slf4j.Logger; @@ -15,6 +15,8 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; import software.amazon.awssdk.services.kinesis.model.Record; @Configuration(proxyBeanMethods = false) @@ -24,11 +26,15 @@ public class IpConsumer { private final ObjectMapper objectMapper; private final IpAddressEventRepository ipAddressEventRepository; + private final ApplicationProperties applicationProperties; public IpConsumer( - ObjectMapper objectMapper, IpAddressEventRepository ipAddressEventRepository) { + ObjectMapper objectMapper, + IpAddressEventRepository ipAddressEventRepository, + ApplicationProperties applicationProperties) { this.objectMapper = objectMapper; this.ipAddressEventRepository = ipAddressEventRepository; + this.applicationProperties = applicationProperties; } // As we are using useNativeDecoding = true along with the listenerMode = batch, @@ -38,53 +44,45 @@ public IpConsumer( Consumer>> consumeEvent() { return recordFlux -> recordFlux - .flatMap(Flux::fromIterable) - .map( - kinessRecord -> { + .flatMapIterable(list -> list) + .flatMap( + kinesisRecord -> { log.info( "Sequence Number :{}, partitionKey :{} and expected ArrivalTime :{}", - kinessRecord.sequenceNumber(), - kinessRecord.partitionKey(), - kinessRecord.approximateArrivalTimestamp()); + kinesisRecord.sequenceNumber(), + kinesisRecord.partitionKey(), + kinesisRecord.approximateArrivalTimestamp()); String dataAsString = - new String(kinessRecord.data().asByteArray()); + new String(kinesisRecord.data().asByteArray()); String payload = dataAsString.substring(dataAsString.indexOf("[{")); - List ipAddressDTOS; + try { - ipAddressDTOS = + List ipAddressDTOS = objectMapper.readValue( payload, new TypeReference<>() {}); + return Flux.fromIterable(ipAddressDTOS); } catch (JsonProcessingException e) { - throw new RuntimeException(e); + return Flux.error(e); } - return Flux.fromIterable(ipAddressDTOS); }) - .doOnNext( - ipAddressDTOsList -> { - log.info( - "IpAddress processed at {} and value is:{}", - LocalDateTime.now(), - ipAddressDTOsList); - this.processEvents(ipAddressDTOsList); + .parallel() // Parallelize processing + .runOn(Schedulers.boundedElastic()) // Run processing on boundedElastic + // Scheduler + .flatMap( + ipAddressDTO -> { + IpAddressEvent ipAddressEvent = + new IpAddressEvent( + ipAddressDTO.ipAddress(), + ipAddressDTO.eventProducedTime()); + return Mono.just(ipAddressEvent) + .delayElement( + Duration.ofSeconds( + applicationProperties + .getEventProcessingDelaySeconds())) // Adds artificial latency + .flatMap(ipAddressEventRepository::save); }) - .subscribe(); - } - - private void processEvents(Flux ipAddressDTOFlux) { - ipAddressDTOFlux - .map( - ipAddressDTO -> - new IpAddressEvent( - ipAddressDTO.ipAddress(), ipAddressDTO.eventProducedTime())) - .delayElements(Duration.ofSeconds(1)) // Adds artificial latency - .subscribe( - ipAddressEvent -> - ipAddressEventRepository - .save(ipAddressEvent) - .subscribe( - savedEvent -> - log.info("Saved Event :{}", savedEvent))); + .subscribe(savedEvent -> log.info("Saved Event :{}", savedEvent)); } } diff --git a/aws-kinesis-project/consumer/src/main/resources/application.properties b/aws-kinesis-project/consumer/src/main/resources/application.properties index 9442fa7f..70db736b 100644 --- a/aws-kinesis-project/consumer/src/main/resources/application.properties +++ b/aws-kinesis-project/consumer/src/main/resources/application.properties @@ -1,14 +1,8 @@ spring.application.name=aws-kinesis-consumer-project server.port=8080 server.shutdown=graceful -spring.main.allow-bean-definition-overriding=true spring.jmx.enabled=false - -################ Logging ##################### -logging.file.name=logs/aws-kinesis-consumer-project.log -logging.level.web=INFO -logging.level.com.amazonaws.util.EC2MetadataUtils=ERROR -logging.level.com.amazonaws.internal.InstanceMetadataServiceResourceFetcher=ERROR +spring.threads.virtual.enabled=true ################ Actuator ##################### management.endpoints.web.exposure.include=configprops,env,health,info,logfile,loggers,metrics @@ -24,9 +18,10 @@ spring.cloud.stream.bindings.consumeEvent-in-0.group=my-test-stream-Consumer-Gro spring.cloud.stream.bindings.consumeEvent-in-0.content-type=application/json spring.cloud.stream.bindings.consumeEvent-in-0.consumer.header-mode=none spring.cloud.stream.bindings.consumeEvent-in-0.consumer.use-native-decoding=true -#defaults to 1, this will process upto 5 messages concurrently +#defaults to 1, this will process upto 5 messages concurrently, in reactive mode this is not necessary #spring.cloud.stream.bindings.consumeEvent-in-0.consumer.concurrency=5 spring.cloud.stream.kinesis.bindings.consumeEvent-in-0.consumer.listenerMode=batch + spring.cloud.function.definition=consumeEvent; #Kinesis-dynamodb-checkpoint @@ -34,7 +29,12 @@ spring.cloud.stream.kinesis.binder.checkpoint.table=spring-stream-metadata spring.cloud.stream.kinesis.binder.checkpoint.billingMode=provisioned spring.cloud.stream.kinesis.binder.checkpoint.readCapacity=5 spring.cloud.stream.kinesis.binder.checkpoint.writeCapacity=5 + spring.cloud.stream.kinesis.binder.locks.table=spring-stream-lock-registry spring.cloud.stream.kinesis.binder.locks.billingMode=provisioned spring.cloud.stream.kinesis.binder.locks.readCapacity=5 spring.cloud.stream.kinesis.binder.locks.writeCapacity=5 + +spring.reactor.schedulers.defaultBoundedElasticOnVirtualThreads=true + +application.event-processing-delay-seconds=1 \ No newline at end of file diff --git a/aws-kinesis-project/consumer/src/main/resources/logback-spring.xml b/aws-kinesis-project/consumer/src/main/resources/logback-spring.xml index 78689fa9..ca737e39 100644 --- a/aws-kinesis-project/consumer/src/main/resources/logback-spring.xml +++ b/aws-kinesis-project/consumer/src/main/resources/logback-spring.xml @@ -1,14 +1,20 @@ + + + + + + @@ -17,7 +23,6 @@ - - + diff --git a/aws-kinesis-project/consumer/src/test/resources/application-test.properties b/aws-kinesis-project/consumer/src/test/resources/application-test.properties index 143b486f..3474c707 100644 --- a/aws-kinesis-project/consumer/src/test/resources/application-test.properties +++ b/aws-kinesis-project/consumer/src/test/resources/application-test.properties @@ -1,5 +1,6 @@ spring.cloud.function.definition=producerSupplier;consumeEvent spring.cloud.stream.bindings.producerSupplier-out-0.destination=my-test-stream -spring.cloud.stream.bindings.producerSupplier-out-0.contentType=application/json +spring.cloud.stream.bindings.producerSupplier-out-0.content-type=application/json +spring.cloud.stream.bindings.producerSupplier-out-0.producer.header-mode=none spring.testcontainers.beans.startup=parallel \ No newline at end of file