diff --git a/aws-kinesis-project/producer/.localstack/init-aws.sh b/aws-kinesis-project/producer/.localstack/init-aws.sh index fc53c3a3..418c3036 100755 --- a/aws-kinesis-project/producer/.localstack/init-aws.sh +++ b/aws-kinesis-project/producer/.localstack/init-aws.sh @@ -1,4 +1,4 @@ -#!/usr/bin/env bash +#!/bin/bash awslocal kinesis create-stream --stream-name my-test-stream --shard-count 1 diff --git a/aws-kinesis-project/producer/pom.xml b/aws-kinesis-project/producer/pom.xml index b092219a..62409039 100644 --- a/aws-kinesis-project/producer/pom.xml +++ b/aws-kinesis-project/producer/pom.xml @@ -223,7 +223,7 @@ - 1.25.0 + 1.25.2 diff --git a/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/ApplicationIntegrationTest.java b/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/ApplicationIntegrationTest.java index 5aea7f5d..1ec74378 100644 --- a/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/ApplicationIntegrationTest.java +++ b/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/ApplicationIntegrationTest.java @@ -1,10 +1,60 @@ package com.learning.aws.spring; +import static org.assertj.core.api.Assertions.assertThat; + +import com.amazonaws.util.BinaryUtils; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; import com.learning.aws.spring.common.AbstractIntegrationTest; +import com.learning.aws.spring.model.IpAddressDTO; +import java.time.Instant; +import java.util.List; +import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.Test; +import org.springframework.integration.aws.support.AwsHeaders; +import org.springframework.messaging.Message; +import software.amazon.awssdk.services.kinesis.model.Record; +import software.amazon.kinesis.retrieval.KinesisClientRecord; class ApplicationIntegrationTest extends AbstractIntegrationTest { @Test - void contextLoads() {} + void contextLoads() throws InterruptedException, JsonProcessingException { + + assertThat(this.messageBarrier.await(30, TimeUnit.SECONDS)).isTrue(); + + Message> message = this.messageHolder.get(); + assertThat(message.getHeaders()) + .containsKeys(AwsHeaders.SHARD, AwsHeaders.RECEIVED_STREAM) + .doesNotContainKeys( + AwsHeaders.RECEIVED_PARTITION_KEY, + AwsHeaders.RECEIVED_SEQUENCE_NUMBER, + AwsHeaders.STREAM, + AwsHeaders.PARTITION_KEY, + AwsHeaders.CHECKPOINTER); + + List payloadList = message.getPayload(); + + assertThat(payloadList).isNotEmpty().hasSizeGreaterThan(1); + + Record item = payloadList.getFirst(); + assertThat(item).isNotNull(); + + KinesisClientRecord kinesisClientRecord = KinesisClientRecord.fromRecord(item); + + String sequenceNumber = kinesisClientRecord.sequenceNumber(); + assertThat(sequenceNumber).isNotBlank(); + + Instant approximateArrivalTimestamp = kinesisClientRecord.approximateArrivalTimestamp(); + assertThat(approximateArrivalTimestamp).isNotNull().isInstanceOf(Instant.class); + + String partitionKey = kinesisClientRecord.partitionKey(); + assertThat(partitionKey).isNotBlank(); + + String dataAsString = new String(BinaryUtils.copyBytesFrom(kinesisClientRecord.data())); + String payload = dataAsString.substring(dataAsString.indexOf("[{")); + List ipAddressDTOS = + objectMapper.readValue(payload, new TypeReference<>() {}); + assertThat(ipAddressDTOS).isNotEmpty().hasSize(254); + } } diff --git a/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/TestKinesisProducerApplication.java b/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/TestKinesisProducerApplication.java index 78d4451a..1b9f48d4 100644 --- a/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/TestKinesisProducerApplication.java +++ b/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/TestKinesisProducerApplication.java @@ -1,5 +1,6 @@ package com.learning.aws.spring; +import com.learning.aws.spring.common.ConsumerConfig; import com.learning.aws.spring.common.ContainerConfig; import org.springframework.boot.SpringApplication; @@ -7,7 +8,7 @@ public class TestKinesisProducerApplication { public static void main(String[] args) { SpringApplication.from(KinesisProducerApplication::main) - .with(ContainerConfig.class) + .with(ContainerConfig.class, ConsumerConfig.class) .run(args); } } diff --git a/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/common/AbstractIntegrationTest.java b/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/common/AbstractIntegrationTest.java index 6297ba0b..9a56e260 100644 --- a/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/common/AbstractIntegrationTest.java +++ b/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/common/AbstractIntegrationTest.java @@ -4,18 +4,32 @@ import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.RANDOM_PORT; import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc; import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.messaging.Message; import org.springframework.test.context.ActiveProfiles; import org.springframework.test.web.servlet.MockMvc; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.services.kinesis.model.Record; @ActiveProfiles({PROFILE_TEST}) -@SpringBootTest(webEnvironment = RANDOM_PORT, classes = ContainerConfig.class) +@SpringBootTest( + webEnvironment = RANDOM_PORT, + classes = {ContainerConfig.class, ConsumerConfig.class}) @AutoConfigureMockMvc public abstract class AbstractIntegrationTest { @Autowired protected MockMvc mockMvc; @Autowired protected ObjectMapper objectMapper; + + @Autowired protected KinesisAsyncClient amazonKinesis; + + @Autowired protected CountDownLatch messageBarrier; + + @Autowired protected AtomicReference>> messageHolder; } diff --git a/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/common/ConsumerConfig.java b/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/common/ConsumerConfig.java new file mode 100644 index 00000000..ae96ea24 --- /dev/null +++ b/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/common/ConsumerConfig.java @@ -0,0 +1,60 @@ +package com.learning.aws.spring.common; + +import static org.testcontainers.containers.localstack.LocalStackContainer.Service.DYNAMODB; + +import java.time.Duration; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.messaging.Message; +import org.testcontainers.containers.localstack.LocalStackContainer; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; +import software.amazon.awssdk.core.retry.RetryPolicy; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; +import software.amazon.awssdk.services.kinesis.model.Record; + +@TestConfiguration +public class ConsumerConfig { + + @Bean + DynamoDbAsyncClient dynamoDbAsyncClient(LocalStackContainer localStackContainer) { + return DynamoDbAsyncClient.builder() + .endpointOverride(localStackContainer.getEndpointOverride(DYNAMODB)) + .region(Region.of(localStackContainer.getRegion())) + .credentialsProvider( + StaticCredentialsProvider.create( + AwsBasicCredentials.create( + localStackContainer.getAccessKey(), + localStackContainer.getSecretKey()))) + .overrideConfiguration( + ClientOverrideConfiguration.builder() + .apiCallTimeout(Duration.ofSeconds(10)) + .retryPolicy(RetryPolicy.builder().numRetries(3).build()) + .build()) + .build(); + } + + @Bean + public AtomicReference>> messageHolder() { + return new AtomicReference<>(); + } + + @Bean + public CountDownLatch messageBarrier() { + return new CountDownLatch(1); + } + + @Bean + public Consumer>> consumeEvent() { + return eventMessages -> { + messageHolder().set(eventMessages); + messageBarrier().countDown(); + }; + } +} diff --git a/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/common/ContainerConfig.java b/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/common/ContainerConfig.java index 01f1e6b3..1e9274fc 100644 --- a/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/common/ContainerConfig.java +++ b/aws-kinesis-project/producer/src/test/java/com/learning/aws/spring/common/ContainerConfig.java @@ -25,8 +25,7 @@ public class ContainerConfig { @Bean LocalStackContainer localStackContainer() { return new LocalStackContainer( - DockerImageName.parse("localstack/localstack").withTag("4.0.3")) - .withServices(KINESIS); + DockerImageName.parse("localstack/localstack").withTag("4.0.3")); } @Bean diff --git a/aws-kinesis-project/producer/src/test/resources/application-test.properties b/aws-kinesis-project/producer/src/test/resources/application-test.properties index 949d8220..593816b1 100644 --- a/aws-kinesis-project/producer/src/test/resources/application-test.properties +++ b/aws-kinesis-project/producer/src/test/resources/application-test.properties @@ -1,8 +1,23 @@ -cloud.aws.region.use-default-aws-region-chain=false -cloud.aws.credentials.use-default-aws-credentials-chain=false -cloud.aws.stack.auto=false -cloud.aws.region.auto=false -cloud.aws.region.static=us-east-1 - -application.endpoint-uri=http://localhost:4566 -application.region=us-east-1 + +# events inbound +spring.cloud.stream.bindings.consumeEvent-in-0.destination=my-test-stream +spring.cloud.stream.bindings.consumeEvent-in-0.group=my-test-stream-Consumer-Group-1 +spring.cloud.stream.bindings.consumeEvent-in-0.content-type=application/json +spring.cloud.stream.bindings.consumeEvent-in-0.consumer.header-mode=none +spring.cloud.stream.bindings.consumeEvent-in-0.consumer.use-native-decoding=true +#defaults to 1, this will process upto 5 messages concurrently, in reactive mode this is not necessary +spring.cloud.stream.bindings.consumeEvent-in-0.consumer.concurrency=5 +spring.cloud.stream.kinesis.bindings.consumeEvent-in-0.consumer.listenerMode=batch + +spring.cloud.function.definition=consumeEvent;producerSupplier; + +#Kinesis-dynamodb-checkpoint +spring.cloud.stream.kinesis.binder.checkpoint.table=spring-stream-metadata +spring.cloud.stream.kinesis.binder.checkpoint.billingMode=provisioned +spring.cloud.stream.kinesis.binder.checkpoint.readCapacity=5 +spring.cloud.stream.kinesis.binder.checkpoint.writeCapacity=5 + +spring.cloud.stream.kinesis.binder.locks.table=spring-stream-lock-registry +spring.cloud.stream.kinesis.binder.locks.billingMode=provisioned +spring.cloud.stream.kinesis.binder.locks.readCapacity=5 +spring.cloud.stream.kinesis.binder.locks.writeCapacity=5