Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adds integration test #467

Open
wants to merge 20 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
908d51e
adds integration test
rajadilipkolli Jun 6, 2023
5824c61
Merge branch 'main' into add-integration-tests
rajadilipkolli Jun 6, 2023
9dcbf80
Merge branch 'main' into add-integration-tests
rajadilipkolli Jun 7, 2023
e0c1dd9
fixes script
rajadilipkolli Jun 7, 2023
77291ee
Another way to set data
rajadilipkolli Jun 8, 2023
840f68f
Merge branch 'main' into add-integration-tests
rajadilipkolli Jun 14, 2023
8201406
Merge branch 'main' into add-integration-tests
rajadilipkolli Jul 18, 2023
daa3969
Merge branch 'main' into add-integration-tests
rajadilipkolli Jul 25, 2023
893b396
removes unnecessary declaration
rajadilipkolli Jul 25, 2023
09415eb
Merge branch 'main' into add-integration-tests
rajadilipkolli Jul 26, 2023
25724ec
Merge branch 'main' into add-integration-tests
rajadilipkolli Nov 6, 2023
3519513
Merge branch 'main' into add-integration-tests
rajadilipkolli Mar 12, 2024
e1656f4
fix : compile issue
rajadilipkolli Mar 12, 2024
e1f2dfb
adds kinesis asyncClient Bean
rajadilipkolli Mar 12, 2024
4b5a412
Merge branch 'main' into add-integration-tests
rajadilipkolli Mar 12, 2024
66c5a35
Merge remote-tracking branch 'origin/main' into add-integration-tests
rajadilipkolli Mar 13, 2024
5d66616
Merge branch 'main' into add-integration-tests
rajadilipkolli Mar 13, 2024
59d0013
Merge branch 'main' into add-integration-tests
rajadilipkolli Mar 18, 2024
081fcf5
Merge branch 'main' into add-integration-tests
rajadilipkolli Apr 22, 2024
5b1b2aa
Merge branch 'main' into add-integration-tests
rajadilipkolli Jul 29, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion aws-kinesis-project/producer/.localstack/init-aws.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/usr/bin/env bash
#!/bin/bash

awslocal kinesis create-stream --stream-name my-test-stream --shard-count 1

Expand Down
3 changes: 1 addition & 2 deletions aws-kinesis-project/producer/docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@ services:
ports:
- "4566:4566"
environment:
- SERVICES=kinesis, dynamodb
- SERVICES=kinesis, dynamodb, iam
- DEFAULT_REGION=us-east-1
volumes:
- "../.localstack/init-aws.sh:/etc/localstack/init/ready.d/init-aws.sh" # ready hook
- localstack:/tmp/localstack

volumes:
Expand Down
2 changes: 1 addition & 1 deletion aws-kinesis-project/producer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@
<configuration>
<java>
<googleJavaFormat>
<version>1.18.1</version>
<version>1.19.2</version>
<style>AOSP</style>
</googleJavaFormat>
</java>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,38 @@
package com.learning.aws.spring;

import static org.assertj.core.api.Assertions.assertThat;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.learning.aws.spring.common.AbstractIntegrationTest;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;
import org.springframework.integration.aws.support.AwsHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.GenericMessage;

class ApplicationIntegrationTest extends AbstractIntegrationTest {

@Test
void contextLoads() {}
void contextLoads() throws InterruptedException, JsonProcessingException {

assertThat(this.messageBarrier.await(30, TimeUnit.SECONDS)).isTrue();

Message<List<?>> message = this.messageHolder.get();
assertThat(message.getHeaders())
.containsKeys(AwsHeaders.CHECKPOINTER, AwsHeaders.SHARD, AwsHeaders.RECEIVED_STREAM)
.doesNotContainKeys(AwsHeaders.STREAM, AwsHeaders.PARTITION_KEY);

List<?> payload = message.getPayload();
assertThat(payload).hasSize(10);

Object item = payload.get(0);

assertThat(item).isInstanceOf(GenericMessage.class);

Message<?> messageFromBatch = (Message<?>) item;

assertThat(messageFromBatch.getPayload()).isEqualTo("Message0");
assertThat(messageFromBatch.getHeaders()).containsEntry("event.eventType", "createEvent");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,40 @@
import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.RANDOM_PORT;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Import;
import org.springframework.messaging.Message;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.web.servlet.MockMvc;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;

@ActiveProfiles({PROFILE_TEST})
@SpringBootTest(webEnvironment = RANDOM_PORT)
@SpringBootTest(
webEnvironment = RANDOM_PORT,
properties = {
"spring.cloud.stream.kinesis.bindings.eventConsumerBatchProcessingWithHeaders-in-0.consumer.idleBetweenPolls = 1",
"spring.cloud.stream.kinesis.bindings.eventConsumerBatchProcessingWithHeaders-in-0.consumer.listenerMode = batch",
"spring.cloud.stream.kinesis.bindings.eventConsumerBatchProcessingWithHeaders-in-0.consumer.checkpointMode = manual",
"spring.cloud.stream.bindings.eventConsumerBatchProcessingWithHeaders-in-0.consumer.useNativeDecoding = true",
"spring.cloud.stream.kinesis.binder.headers = event.eventType",
"spring.cloud.stream.kinesis.binder.autoAddShards = true"
})
@Import(LocalStackConfig.class)
@AutoConfigureMockMvc
public abstract class AbstractIntegrationTest {

@Autowired protected MockMvc mockMvc;

@Autowired protected ObjectMapper objectMapper;

@Autowired protected KinesisAsyncClient amazonKinesis;

@Autowired protected CountDownLatch messageBarrier;

@Autowired protected AtomicReference<Message<List<?>>> messageHolder;
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,42 +2,53 @@

import static org.testcontainers.containers.localstack.LocalStackContainer.Service.KINESIS;

import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
import org.springframework.messaging.Message;
import org.testcontainers.containers.localstack.LocalStackContainer;
import org.testcontainers.utility.DockerImageName;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;

@TestConfiguration
@TestConfiguration(proxyBeanMethods = false)
public class LocalStackConfig {

static LocalStackContainer localStackContainer;

static {
System.setProperty("com.amazonaws.sdk.disableCbor", "true");
localStackContainer =
new LocalStackContainer(
DockerImageName.parse("localstack/localstack").withTag("3.1.0"))
.withEnv("EAGER_SERVICE_LOADING", "1")
DockerImageName.parse("localstack/localstack").withTag("3.2.0"))
.withServices(KINESIS)
.withExposedPorts(4566);
localStackContainer.start();
System.setProperty(
"spring.cloud.aws.endpoint", localStackContainer.getEndpoint().toString());
System.setProperty(
"spring.cloud.aws.credentials.access-key", localStackContainer.getAccessKey());
System.setProperty(
"spring.cloud.aws.credentials.secret-key", localStackContainer.getSecretKey());
System.setProperty("spring.cloud.aws.region.static", localStackContainer.getRegion());
}

@Bean
public AtomicReference<Message<List<Message<?>>>> messageHolder() {
return new AtomicReference<>();
}

@Bean
public CountDownLatch messageBarrier() {
return new CountDownLatch(1);
}

@Bean
@Primary
public KinesisAsyncClient amazonKinesis() {
return KinesisAsyncClient.builder()
.endpointOverride(localStackContainer.getEndpointOverride(KINESIS))
.region(Region.of(localStackContainer.getRegion()))
.credentialsProvider(
StaticCredentialsProvider.create(
AwsBasicCredentials.create(
localStackContainer.getAccessKey(),
localStackContainer.getSecretKey())))
.build();
public Consumer<Message<List<Message<?>>>> eventConsumerBatchProcessingWithHeaders() {
return eventMessages -> {
messageHolder().set(eventMessages);
messageBarrier().countDown();
};
}
}
Loading