Skip to content

Commit

Permalink
adds integration test (#467)
Browse files Browse the repository at this point in the history
* adds integration test

* fixes script

* Another way to set data

* removes unnecessary declaration

* fix : compile issue

* adds kinesis asyncClient Bean

* fix : compilation issue

* implement integration test
  • Loading branch information
rajadilipkolli authored Dec 27, 2024
1 parent b7c3730 commit 6dc56a3
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 15 deletions.
2 changes: 1 addition & 1 deletion aws-kinesis-project/producer/.localstack/init-aws.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/usr/bin/env bash
#!/bin/bash

awslocal kinesis create-stream --stream-name my-test-stream --shard-count 1

Expand Down
2 changes: 1 addition & 1 deletion aws-kinesis-project/producer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@
<configuration>
<java>
<googleJavaFormat>
<version>1.25.0</version>
<version>1.25.2</version>
<style>AOSP</style>
</googleJavaFormat>
</java>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,60 @@
package com.learning.aws.spring;

import static org.assertj.core.api.Assertions.assertThat;

import com.amazonaws.util.BinaryUtils;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.learning.aws.spring.common.AbstractIntegrationTest;
import com.learning.aws.spring.model.IpAddressDTO;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;
import org.springframework.integration.aws.support.AwsHeaders;
import org.springframework.messaging.Message;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.kinesis.retrieval.KinesisClientRecord;

class ApplicationIntegrationTest extends AbstractIntegrationTest {

@Test
void contextLoads() {}
void contextLoads() throws InterruptedException, JsonProcessingException {

assertThat(this.messageBarrier.await(30, TimeUnit.SECONDS)).isTrue();

Message<List<Record>> message = this.messageHolder.get();
assertThat(message.getHeaders())
.containsKeys(AwsHeaders.SHARD, AwsHeaders.RECEIVED_STREAM)
.doesNotContainKeys(
AwsHeaders.RECEIVED_PARTITION_KEY,
AwsHeaders.RECEIVED_SEQUENCE_NUMBER,
AwsHeaders.STREAM,
AwsHeaders.PARTITION_KEY,
AwsHeaders.CHECKPOINTER);

List<Record> payloadList = message.getPayload();

assertThat(payloadList).isNotEmpty().hasSizeGreaterThan(1);

Record item = payloadList.getFirst();
assertThat(item).isNotNull();

KinesisClientRecord kinesisClientRecord = KinesisClientRecord.fromRecord(item);

String sequenceNumber = kinesisClientRecord.sequenceNumber();
assertThat(sequenceNumber).isNotBlank();

Instant approximateArrivalTimestamp = kinesisClientRecord.approximateArrivalTimestamp();
assertThat(approximateArrivalTimestamp).isNotNull().isInstanceOf(Instant.class);

String partitionKey = kinesisClientRecord.partitionKey();
assertThat(partitionKey).isNotBlank();

String dataAsString = new String(BinaryUtils.copyBytesFrom(kinesisClientRecord.data()));
String payload = dataAsString.substring(dataAsString.indexOf("[{"));
List<IpAddressDTO> ipAddressDTOS =
objectMapper.readValue(payload, new TypeReference<>() {});
assertThat(ipAddressDTOS).isNotEmpty().hasSize(254);
}
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package com.learning.aws.spring;

import com.learning.aws.spring.common.ConsumerConfig;
import com.learning.aws.spring.common.ContainerConfig;
import org.springframework.boot.SpringApplication;

public class TestKinesisProducerApplication {

public static void main(String[] args) {
SpringApplication.from(KinesisProducerApplication::main)
.with(ContainerConfig.class)
.with(ContainerConfig.class, ConsumerConfig.class)
.run(args);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,32 @@
import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.RANDOM_PORT;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.messaging.Message;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.web.servlet.MockMvc;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.Record;

@ActiveProfiles({PROFILE_TEST})
@SpringBootTest(webEnvironment = RANDOM_PORT, classes = ContainerConfig.class)
@SpringBootTest(
webEnvironment = RANDOM_PORT,
classes = {ContainerConfig.class, ConsumerConfig.class})
@AutoConfigureMockMvc
public abstract class AbstractIntegrationTest {

@Autowired protected MockMvc mockMvc;

@Autowired protected ObjectMapper objectMapper;

@Autowired protected KinesisAsyncClient amazonKinesis;

@Autowired protected CountDownLatch messageBarrier;

@Autowired protected AtomicReference<Message<List<Record>>> messageHolder;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package com.learning.aws.spring.common;

import static org.testcontainers.containers.localstack.LocalStackContainer.Service.DYNAMODB;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.testcontainers.containers.localstack.LocalStackContainer;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.retry.RetryPolicy;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.model.Record;

@TestConfiguration
public class ConsumerConfig {

@Bean
DynamoDbAsyncClient dynamoDbAsyncClient(LocalStackContainer localStackContainer) {
return DynamoDbAsyncClient.builder()
.endpointOverride(localStackContainer.getEndpointOverride(DYNAMODB))
.region(Region.of(localStackContainer.getRegion()))
.credentialsProvider(
StaticCredentialsProvider.create(
AwsBasicCredentials.create(
localStackContainer.getAccessKey(),
localStackContainer.getSecretKey())))
.overrideConfiguration(
ClientOverrideConfiguration.builder()
.apiCallTimeout(Duration.ofSeconds(10))
.retryPolicy(RetryPolicy.builder().numRetries(3).build())
.build())
.build();
}

@Bean
public AtomicReference<Message<List<Record>>> messageHolder() {
return new AtomicReference<>();
}

@Bean
public CountDownLatch messageBarrier() {
return new CountDownLatch(1);
}

@Bean
public Consumer<Message<List<Record>>> consumeEvent() {
return eventMessages -> {
messageHolder().set(eventMessages);
messageBarrier().countDown();
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ public class ContainerConfig {
@Bean
LocalStackContainer localStackContainer() {
return new LocalStackContainer(
DockerImageName.parse("localstack/localstack").withTag("4.0.3"))
.withServices(KINESIS);
DockerImageName.parse("localstack/localstack").withTag("4.0.3"));
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,23 @@
cloud.aws.region.use-default-aws-region-chain=false
cloud.aws.credentials.use-default-aws-credentials-chain=false
cloud.aws.stack.auto=false
cloud.aws.region.auto=false
cloud.aws.region.static=us-east-1

application.endpoint-uri=http://localhost:4566
application.region=us-east-1

# events inbound
spring.cloud.stream.bindings.consumeEvent-in-0.destination=my-test-stream
spring.cloud.stream.bindings.consumeEvent-in-0.group=my-test-stream-Consumer-Group-1
spring.cloud.stream.bindings.consumeEvent-in-0.content-type=application/json
spring.cloud.stream.bindings.consumeEvent-in-0.consumer.header-mode=none
spring.cloud.stream.bindings.consumeEvent-in-0.consumer.use-native-decoding=true
#defaults to 1, this will process upto 5 messages concurrently, in reactive mode this is not necessary
spring.cloud.stream.bindings.consumeEvent-in-0.consumer.concurrency=5
spring.cloud.stream.kinesis.bindings.consumeEvent-in-0.consumer.listenerMode=batch

spring.cloud.function.definition=consumeEvent;producerSupplier;

#Kinesis-dynamodb-checkpoint
spring.cloud.stream.kinesis.binder.checkpoint.table=spring-stream-metadata
spring.cloud.stream.kinesis.binder.checkpoint.billingMode=provisioned
spring.cloud.stream.kinesis.binder.checkpoint.readCapacity=5
spring.cloud.stream.kinesis.binder.checkpoint.writeCapacity=5

spring.cloud.stream.kinesis.binder.locks.table=spring-stream-lock-registry
spring.cloud.stream.kinesis.binder.locks.billingMode=provisioned
spring.cloud.stream.kinesis.binder.locks.readCapacity=5
spring.cloud.stream.kinesis.binder.locks.writeCapacity=5

0 comments on commit 6dc56a3

Please sign in to comment.