Skip to content

Commit

Permalink
added AWS Integration test
Browse files Browse the repository at this point in the history
  • Loading branch information
¨Claude committed Dec 31, 2024
1 parent 9ccb1a8 commit d3ce578
Show file tree
Hide file tree
Showing 7 changed files with 276 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ private boolean tryAdd(final List<SourceRecord> results, final Iterator<SourceRe
results.add(sourceRecordIterator.next());
return true;
}
logger.info("No records found in tryAdd call");
return false;
}

Expand All @@ -158,28 +159,35 @@ private boolean tryAdd(final List<SourceRecord> results, final Iterator<SourceRe
* @return {@code true} if the connector is not stopped and the timer has not expired.
*/
protected boolean stillPolling() {
return !connectorStopped.get() && !timer.expired();
boolean result = !connectorStopped.get() && !timer.expired();
logger.info("Still polling: {}", result);
return result;
}

@Override
public final List<SourceRecord> poll() {
logger.debug("Polling");
if (connectorStopped.get()) {
logger.info("Stopping");
closeResources();
return Collections.emptyList();
} else {
timer.start();
try {
final List<SourceRecord> result = populateList();
if (logger.isDebugEnabled()) {
logger.debug("Poll() returning {} SourceRecords.", result == null ? null : result.size());
try {
logger.debug("Polling");
if (connectorStopped.get()) {
logger.info("Stopping");
closeResources();
return Collections.emptyList();
} else {
timer.start();
try {
final List<SourceRecord> result = populateList();
if (logger.isInfoEnabled()) { // TODO reset this to debug
logger.info("Poll() returning {} SourceRecords.", result == null ? null : result.size());
}
return result;
} finally {
timer.stop();
timer.reset();
}
return result;
} finally {
timer.stop();
timer.reset();
}
} catch (RuntimeException e) {
logger.error("******************** " + e.getMessage(), e);
throw e;
}
}

Expand All @@ -195,11 +203,11 @@ private List<SourceRecord> populateList() {
while (stillPolling() && results.size() < maxPollRecords) {
if (!tryAdd(results, sourceRecordIterator)) {
if (!results.isEmpty()) {
logger.debug("tryAdd() did not add to the list, returning current results.");
logger.info("tryAdd() did not add to the list, returning current results.");
// if we could not get a record and the results are not empty return them
break;
}
logger.debug("Attempting {}", backoff);
logger.info("Attempting {}", backoff);
backoff.cleanDelay();
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package io.aiven.kafka.connect.s3.source;

import io.aiven.kafka.connect.common.source.input.InputFormat;
import io.aiven.kafka.connect.common.source.input.TransformerFactory;
import io.aiven.kafka.connect.s3.source.config.S3SourceConfig;
import io.aiven.kafka.connect.s3.source.testutils.BucketAccessor;
import io.aiven.kafka.connect.s3.source.utils.AWSV2SourceClient;
import io.aiven.kafka.connect.s3.source.utils.OffsetManager;
import io.aiven.kafka.connect.s3.source.utils.S3SourceRecord;
import io.aiven.kafka.connect.s3.source.utils.SourceRecordIterator;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.testcontainers.containers.localstack.LocalStackContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import software.amazon.awssdk.services.s3.S3Client;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;

import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.INPUT_FORMAT_KEY;
import static io.aiven.kafka.connect.common.config.SourceConfigFragment.TARGET_TOPICS;
import static io.aiven.kafka.connect.common.config.SourceConfigFragment.TARGET_TOPIC_PARTITIONS;
import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_ACCESS_KEY_ID_CONFIG;
import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_BUCKET_NAME_CONFIG;
import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_ENDPOINT_CONFIG;
import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_PREFIX_CONFIG;
import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_SECRET_ACCESS_KEY_CONFIG;
import static io.aiven.kafka.connect.s3.source.S3SourceTask.OBJECT_KEY;
import static io.aiven.kafka.connect.s3.source.utils.OffsetManager.SEPARATOR;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

@Testcontainers
public class AwsIntegrationTest implements IntegrationBase {

private static final String COMMON_PREFIX = "s3-source-connector-for-apache-kafka-AWS-test-";

@Container
public static final LocalStackContainer LOCALSTACK = IntegrationBase.createS3Container();

private static String s3Prefix;

private S3Client s3Client;
private String s3Endpoint;

private BucketAccessor testBucketAccessor;


@Override
public String getS3Prefix() {
return s3Prefix;
}

@Override
public S3Client getS3Client() {
return s3Client;
}

@BeforeAll
static void setUpAll() throws IOException, InterruptedException {
s3Prefix = COMMON_PREFIX + ZonedDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME) + "/";
}

@BeforeEach
void setupAWS() {
s3Client = IntegrationBase.createS3Client(LOCALSTACK);
s3Endpoint = LOCALSTACK.getEndpoint().toString();
testBucketAccessor = new BucketAccessor(s3Client, TEST_BUCKET_NAME);
testBucketAccessor.createBucket();
}

@AfterEach
void tearDownAWS() {
testBucketAccessor.removeBucket();
s3Client.close();
}

private Map<String, String> getConfig(final String topics, final int maxTasks) {
final Map<String, String> config = new HashMap<>();
config.put(AWS_ACCESS_KEY_ID_CONFIG, S3_ACCESS_KEY_ID);
config.put(AWS_SECRET_ACCESS_KEY_CONFIG, S3_SECRET_ACCESS_KEY);
config.put(AWS_S3_ENDPOINT_CONFIG, s3Endpoint);
config.put(AWS_S3_BUCKET_NAME_CONFIG, TEST_BUCKET_NAME);
config.put(AWS_S3_PREFIX_CONFIG, getS3Prefix());
config.put(TARGET_TOPIC_PARTITIONS, "0,1");
config.put(TARGET_TOPICS, topics);
config.put("key.converter", "org.apache.kafka.connect.converters.ByteArrayConverter");
config.put(VALUE_CONVERTER_KEY, "org.apache.kafka.connect.converters.ByteArrayConverter");
config.put("tasks.max", String.valueOf(maxTasks));
return config;
}
/**
* Test the integration with the Amazon connector
* @param testInfo
*/
@Test
void sourceRecordIteratorTest(final TestInfo testInfo) {
final var topicName = IntegrationBase.topicName(testInfo);
final Map<String, String> configData = getConfig(topicName, 1);

configData.put(INPUT_FORMAT_KEY, InputFormat.BYTES.getValue());

final String testData1 = "Hello, Kafka Connect S3 Source! object 1";
final String testData2 = "Hello, Kafka Connect S3 Source! object 2";

final List<String> offsetKeys = new ArrayList<>();
final List<String> expectedKeys = new ArrayList<>();
// write 2 objects to s3
expectedKeys.add(writeToS3(topicName, testData1.getBytes(StandardCharsets.UTF_8), "00000"));
expectedKeys.add(writeToS3(topicName, testData2.getBytes(StandardCharsets.UTF_8), "00000"));
expectedKeys.add(writeToS3(topicName, testData1.getBytes(StandardCharsets.UTF_8), "00001"));
expectedKeys.add(writeToS3(topicName, testData2.getBytes(StandardCharsets.UTF_8), "00001"));

// we don't expext the empty one.
offsetKeys.addAll(expectedKeys);
offsetKeys.add(writeToS3(topicName, new byte[0], "00003"));

assertThat(testBucketAccessor.listObjects()).hasSize(5);

S3SourceConfig s3SourceConfig = new S3SourceConfig(configData);
SourceTaskContext context = mock(SourceTaskContext.class);
OffsetStorageReader offsetStorageReader = mock(OffsetStorageReader.class);
when(context.offsetStorageReader()).thenReturn(offsetStorageReader);
when(offsetStorageReader.offsets(any())).thenReturn(new HashMap<>());

OffsetManager offsetManager = new OffsetManager(context, s3SourceConfig);

AWSV2SourceClient sourceClient = new AWSV2SourceClient(s3SourceConfig, new HashSet<>());

SourceRecordIterator sourceRecordIterator = new SourceRecordIterator(s3SourceConfig, offsetManager,
TransformerFactory.getTransformer(InputFormat.BYTES), sourceClient);

HashSet<String> seenKeys = new HashSet<>();
while (sourceRecordIterator.hasNext()) {
S3SourceRecord s3SourceRecord = sourceRecordIterator.next();
String key = OBJECT_KEY + SEPARATOR + s3SourceRecord.getObjectKey();
assertThat(offsetKeys).contains(key);
seenKeys.add(key);
}
assertThat(seenKeys).containsAll(expectedKeys);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package io.aiven.kafka.connect.s3.source;

import static io.aiven.kafka.connect.s3.source.S3SourceTask.OBJECT_KEY;
import static io.aiven.kafka.connect.s3.source.utils.OffsetManager.SEPARATOR;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

Expand Down Expand Up @@ -59,13 +61,31 @@
import org.testcontainers.utility.DockerImageName;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;

public interface IntegrationBase {
String PLUGINS_S3_SOURCE_CONNECTOR_FOR_APACHE_KAFKA = "plugins/s3-source-connector-for-apache-kafka/";
String S3_SOURCE_CONNECTOR_FOR_APACHE_KAFKA_TEST = "s3-source-connector-for-apache-kafka-test-";
ObjectMapper OBJECT_MAPPER = new ObjectMapper();
String TEST_BUCKET_NAME = "test-bucket0";
String S3_ACCESS_KEY_ID = "test-key-id0";
String VALUE_CONVERTER_KEY = "value.converter";
String S3_SECRET_ACCESS_KEY = "test_secret_key0";

S3Client getS3Client();

String getS3Prefix();

default String writeToS3(final String topicName, final byte[] testDataBytes, final String partitionId) {
final String objectKey = org.apache.commons.lang3.StringUtils.defaultIfBlank(getS3Prefix(), "") + topicName + "-" + partitionId + "-"
+ System.currentTimeMillis() + ".txt";
final PutObjectRequest request = PutObjectRequest.builder().bucket(IntegrationTest.TEST_BUCKET_NAME).key(objectKey).build();
getS3Client().putObject(request, RequestBody.fromBytes(testDataBytes));
return OBJECT_KEY + SEPARATOR + objectKey;
}

default AdminClient newAdminClient(final String bootstrapServers) {
final Properties adminClientConfig = new Properties();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
import static java.util.Map.entry;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
Expand All @@ -53,6 +56,12 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import io.aiven.kafka.connect.common.source.input.TransformerFactory;
import io.aiven.kafka.connect.s3.source.config.S3SourceConfig;
import io.aiven.kafka.connect.s3.source.utils.AWSV2SourceClient;
import io.aiven.kafka.connect.s3.source.utils.OffsetManager;
import io.aiven.kafka.connect.s3.source.utils.S3SourceRecord;
import io.aiven.kafka.connect.s3.source.utils.SourceRecordIterator;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
Expand All @@ -68,18 +77,18 @@
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.platform.commons.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.localstack.LocalStackContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;

Expand All @@ -92,13 +101,6 @@ final class IntegrationTest implements IntegrationBase {
private static final String COMMON_PREFIX = "s3-source-connector-for-apache-kafka-test-";
private static final int OFFSET_FLUSH_INTERVAL_MS = 500;

private static final String S3_ACCESS_KEY_ID = "test-key-id0";
private static final String S3_SECRET_ACCESS_KEY = "test_secret_key0";

private static final String VALUE_CONVERTER_KEY = "value.converter";

private static final String TEST_BUCKET_NAME = "test-bucket0";

private static String s3Endpoint;
private static String s3Prefix;
private static BucketAccessor testBucketAccessor;
Expand All @@ -112,6 +114,16 @@ final class IntegrationTest implements IntegrationBase {

private static S3Client s3Client;

public S3Client getS3Client() {
return s3Client;
}

public String getS3Prefix() {
return s3Prefix;
}

public

@BeforeAll
static void setUpAll() throws IOException, InterruptedException {
s3Prefix = COMMON_PREFIX + ZonedDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME) + "/";
Expand Down Expand Up @@ -159,7 +171,7 @@ void tearDown() {
@Test
void bytesTest(final TestInfo testInfo) {
final var topicName = IntegrationBase.topicName(testInfo);
final Map<String, String> connectorConfig = getConfig(CONNECTOR_NAME, topicName, 2);
final Map<String, String> connectorConfig = getConfig(CONNECTOR_NAME, topicName, 1);

connectorConfig.put(INPUT_FORMAT_KEY, InputFormat.BYTES.getValue());
connectRunner.configureConnector(CONNECTOR_NAME, connectorConfig);
Expand Down Expand Up @@ -253,7 +265,7 @@ void parquetTest(final TestInfo testInfo) throws IOException {
final var topicName = IntegrationBase.topicName(testInfo);

final String partition = "00000";
final String fileName = addPrefixOrDefault("") + topicName + "-" + partition + "-" + System.currentTimeMillis()
final String fileName = org.apache.commons.lang3.StringUtils.defaultIfBlank(getS3Prefix(), "") + topicName + "-" + partition + "-" + System.currentTimeMillis()
+ ".txt";
final String name = "testuser";

Expand Down Expand Up @@ -337,18 +349,6 @@ private static byte[] generateNextAvroMessagesStartingFromId(final int messageId
}
}

private static String writeToS3(final String topicName, final byte[] testDataBytes, final String partitionId) {
final String objectKey = addPrefixOrDefault("") + topicName + "-" + partitionId + "-"
+ System.currentTimeMillis() + ".txt";
final PutObjectRequest request = PutObjectRequest.builder().bucket(TEST_BUCKET_NAME).key(objectKey).build();
s3Client.putObject(request, RequestBody.fromBytes(testDataBytes));
return OBJECT_KEY + SEPARATOR + objectKey;
}

private static String addPrefixOrDefault(final String defaultValue) {
return StringUtils.isNotBlank(s3Prefix) ? s3Prefix : defaultValue;
}

private Map<String, String> getConfig(final String connectorName, final String topics, final int maxTasks) {
final Map<String, String> config = new HashMap<>(basicS3ConnectorConfig());
config.put("name", connectorName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,10 @@ public void commit() {
public void commitRecord(final SourceRecord record) {
if (LOGGER.isInfoEnabled()) {
final Map<String, Object> map = (Map<String, Object>) record.sourceOffset();
LOGGER.info("Committed individual record {} {} {} committed", map.get(BUCKET), map.get(OBJECT_KEY),
offsetManager.recordsProcessedForObjectKey((Map<String, Object>) record.sourcePartition(),
map.get(OBJECT_KEY).toString()));
// LOGGER.info("Committed individual record {} {} {} committed", map.get(BUCKET), map.get(OBJECT_KEY),
// offsetManager.recordsProcessedForObjectKey((Map<String, Object>) record.sourcePartition(),
// map.get(OBJECT_KEY).toString()));
LOGGER.info("Committed individual record {} committed", map);
}
}

Expand Down
Loading

0 comments on commit d3ce578

Please sign in to comment.