Skip to content

Commit

Permalink
fixed testing errors
Browse files Browse the repository at this point in the history
  • Loading branch information
¨Claude committed Jan 6, 2025
1 parent 4dd9ad8 commit f401478
Show file tree
Hide file tree
Showing 12 changed files with 134 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ public boolean isExpired() {
}

/**
* Aborts the timer. Timer will report that it has expired until reset is called.
* Aborts the timer. Timer will report that it has expired until reset is called.
*/
public void abort() {
hasAborted = true;
Expand Down Expand Up @@ -322,6 +322,7 @@ public void reset() {

/**
* Gets a Backoff Config for this timer.
*
* @return a backoff Configuration.
*/
public BackoffConfig getBackoffConfig() {
Expand Down Expand Up @@ -476,16 +477,16 @@ public interface SupplierOfLong {
}

/**
* A functional interface that will abort the timer. After being called timer will indicate that it is expired, until
* it is reset.
* A functional interface that will abort the timer. After being called timer will indicate that it is expired,
* until it is reset.
*/
@FunctionalInterface
public interface AbortTrigger {
void apply();
}

/**
* An interface to define the Backoff configuration. Used for convenience with Timer.
* An interface to define the Backoff configuration. Used for convenience with Timer.
*/
public interface BackoffConfig {
SupplierOfLong getSupplierOfTimeRemaining();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ void verifyExceptionDuringRead(final Transformer transformer, final byte[] testD
try (InputStream inputStream = mock(InputStream.class)) {
when(inputStream.read()).thenThrow(new IOException("Test IOException during read"));
when(inputStream.read(any())).thenThrow(new IOException("Test IOException during read"));
when(inputStream.read(any(), anyInt(), anyInt())).thenThrow(new IOException("Test IOException during read"));
when(inputStream.read(any(), anyInt(), anyInt()))
.thenThrow(new IOException("Test IOException during read"));
when(inputStream.readNBytes(any(), anyInt(), anyInt()))
.thenThrow(new IOException("Test IOException during read"));
when(inputStream.readNBytes(anyInt())).thenThrow(new IOException("Test IOException during read"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
import software.amazon.awssdk.services.s3.S3Client;

@Testcontainers
public class AwsIntegrationTest implements IntegrationBase {
class AwsIntegrationTest implements IntegrationBase {

private static final String COMMON_PREFIX = "s3-source-connector-for-apache-kafka-AWS-test-";

Expand Down Expand Up @@ -156,23 +156,23 @@ void sourceRecordIteratorBytesTest(final TestInfo testInfo) {

assertThat(testBucketAccessor.listObjects()).hasSize(5);

S3SourceConfig s3SourceConfig = new S3SourceConfig(configData);
SourceTaskContext context = mock(SourceTaskContext.class);
OffsetStorageReader offsetStorageReader = mock(OffsetStorageReader.class);
final S3SourceConfig s3SourceConfig = new S3SourceConfig(configData);
final SourceTaskContext context = mock(SourceTaskContext.class);
final OffsetStorageReader offsetStorageReader = mock(OffsetStorageReader.class);
when(context.offsetStorageReader()).thenReturn(offsetStorageReader);
when(offsetStorageReader.offsets(any())).thenReturn(new HashMap<>());

OffsetManager offsetManager = new OffsetManager(context, s3SourceConfig);
final OffsetManager offsetManager = new OffsetManager(context, s3SourceConfig);

AWSV2SourceClient sourceClient = new AWSV2SourceClient(s3SourceConfig, new HashSet<>());
final AWSV2SourceClient sourceClient = new AWSV2SourceClient(s3SourceConfig, new HashSet<>());

Iterator<S3SourceRecord> sourceRecordIterator = new SourceRecordIterator(s3SourceConfig, offsetManager,
final Iterator<S3SourceRecord> sourceRecordIterator = new SourceRecordIterator(s3SourceConfig, offsetManager,
TransformerFactory.getTransformer(InputFormat.BYTES), sourceClient);

HashSet<String> seenKeys = new HashSet<>();
final HashSet<String> seenKeys = new HashSet<>();
while (sourceRecordIterator.hasNext()) {
S3SourceRecord s3SourceRecord = sourceRecordIterator.next();
String key = OBJECT_KEY + SEPARATOR + s3SourceRecord.getObjectKey();
final S3SourceRecord s3SourceRecord = sourceRecordIterator.next();
final String key = OBJECT_KEY + SEPARATOR + s3SourceRecord.getObjectKey();
assertThat(offsetKeys).contains(key);
seenKeys.add(key);
}
Expand Down Expand Up @@ -219,26 +219,26 @@ void sourceRecordIteratorAvroTest(final TestInfo testInfo) throws IOException {

assertThat(testBucketAccessor.listObjects()).hasSize(5);

S3SourceConfig s3SourceConfig = new S3SourceConfig(configData);
SourceTaskContext context = mock(SourceTaskContext.class);
OffsetStorageReader offsetStorageReader = mock(OffsetStorageReader.class);
final S3SourceConfig s3SourceConfig = new S3SourceConfig(configData);
final SourceTaskContext context = mock(SourceTaskContext.class);
final OffsetStorageReader offsetStorageReader = mock(OffsetStorageReader.class);
when(context.offsetStorageReader()).thenReturn(offsetStorageReader);
when(offsetStorageReader.offsets(any())).thenReturn(new HashMap<>());

OffsetManager offsetManager = new OffsetManager(context, s3SourceConfig);
final OffsetManager offsetManager = new OffsetManager(context, s3SourceConfig);

AWSV2SourceClient sourceClient = new AWSV2SourceClient(s3SourceConfig, new HashSet<>());
final AWSV2SourceClient sourceClient = new AWSV2SourceClient(s3SourceConfig, new HashSet<>());

Iterator<S3SourceRecord> sourceRecordIterator = new SourceRecordIterator(s3SourceConfig, offsetManager,
final Iterator<S3SourceRecord> sourceRecordIterator = new SourceRecordIterator(s3SourceConfig, offsetManager,
TransformerFactory.getTransformer(InputFormat.AVRO), sourceClient);

HashSet<String> seenKeys = new HashSet<>();
Map<String, List<Long>> seenRecords = new HashMap<>();
final HashSet<String> seenKeys = new HashSet<>();
final Map<String, List<Long>> seenRecords = new HashMap<>();
while (sourceRecordIterator.hasNext()) {
S3SourceRecord s3SourceRecord = sourceRecordIterator.next();
String key = OBJECT_KEY + SEPARATOR + s3SourceRecord.getObjectKey();
final S3SourceRecord s3SourceRecord = sourceRecordIterator.next();
final String key = OBJECT_KEY + SEPARATOR + s3SourceRecord.getObjectKey();
seenRecords.compute(key, (k, v) -> {
List<Long> lst = v == null ? new ArrayList<>() : v;
final List<Long> lst = v == null ? new ArrayList<>() : v; // NOPMD new object inside loop
lst.add(s3SourceRecord.getRecordNumber());
return lst;
});
Expand All @@ -247,12 +247,12 @@ void sourceRecordIteratorAvroTest(final TestInfo testInfo) throws IOException {
}
assertThat(seenKeys).containsAll(offsetKeys);
assertThat(seenRecords).hasSize(5);
List<Long> expected = new ArrayList<>();
final List<Long> expected = new ArrayList<>();
for (long l = 0; l < numOfRecsFactor; l++) {
expected.add(l + 1);
}
for (String key : offsetKeys) {
List<Long> seen = seenRecords.get(key);
for (final String key : offsetKeys) {
final List<Long> seen = seenRecords.get(key);
assertThat(seen).as("Count for " + key).containsExactlyInAnyOrderElementsOf(expected);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;

@SuppressWarnings("PMD.ExcessiveImports")
public interface IntegrationBase {
String PLUGINS_S3_SOURCE_CONNECTOR_FOR_APACHE_KAFKA = "plugins/s3-source-connector-for-apache-kafka/";
String S3_SOURCE_CONNECTOR_FOR_APACHE_KAFKA_TEST = "s3-source-connector-for-apache-kafka-test-";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,12 @@ final class IntegrationTest implements IntegrationBase {

private static S3Client s3Client;

@Override
public S3Client getS3Client() {
return s3Client;
}

@Override
public String getS3Prefix() {
return s3Prefix;
}
Expand Down Expand Up @@ -182,7 +184,7 @@ void bytesTest(final TestInfo testInfo) {
final Map<String, Object> expectedOffsetRecords = offsetKeys.subList(0, offsetKeys.size() - 1)
.stream()
.collect(Collectors.toMap(Function.identity(), s -> 1));
// verifyOffsetPositions(expectedOffsetRecords, connectRunner.getBootstrapServers());
verifyOffsetPositions(expectedOffsetRecords, connectRunner.getBootstrapServers());
}

@Test
Expand Down Expand Up @@ -238,9 +240,8 @@ void avroTest(final TestInfo testInfo) throws IOException {
entry(4 * numOfRecsFactor, "Hello, Kafka Connect S3 Source! object " + (4 * numOfRecsFactor)),
entry(5 * numOfRecsFactor, "Hello, Kafka Connect S3 Source! object " + (5 * numOfRecsFactor)));

// verifyOffsetPositions(offsetKeys.stream().collect(Collectors.toMap(Function.identity(), s ->
// numOfRecsFactor)),
// connectRunner.getBootstrapServers());
verifyOffsetPositions(offsetKeys.stream().collect(Collectors.toMap(Function.identity(), s -> numOfRecsFactor)),
connectRunner.getBootstrapServers());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ public boolean hasNext() {
@Override
public SourceRecord next() {
final S3SourceRecord s3SourceRecord = s3SourceRecordIterator.next();
offsetManager.updateAndReturnCurrentOffsets(s3SourceRecord.getPartitionMap(), s3SourceRecord.getObjectKey(),
s3SourceRecord.getRecordNumber());
offsetManager.updateAndReturnCurrentOffsets(s3SourceRecord.getPartitionMap(),
s3SourceRecord.getObjectKey(), s3SourceRecord.getRecordNumber());
return RecordProcessor.createSourceRecord(s3SourceRecord, s3SourceConfig, awsv2SourceClient,
offsetManager);
}
Expand Down Expand Up @@ -145,7 +145,7 @@ public void commit() {
@Override
public void commitRecord(final SourceRecord record) {
if (LOGGER.isInfoEnabled()) {
//final Map<String, Object> map = (Map<String, Object>) record.sourceOffset();
// final Map<String, Object> map = (Map<String, Object>) record.sourceOffset();
// LOGGER.info("Committed individual record {} {} {} committed", map.get(BUCKET), map.get(OBJECT_KEY),
// offsetManager.recordsProcessedForObjectKey((Map<String, Object>) record.sourcePartition(),
// map.get(OBJECT_KEY).toString()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public AWSV2SourceClient(final S3SourceConfig s3SourceConfig, final Set<String>
this.taskId = taskId;
}

public Stream<S3Object> getS3ObjectStream(final String startToken) {
private Stream<S3Object> getS3ObjectStream(final String startToken) {
final ListObjectsV2Request request = ListObjectsV2Request.builder()
.bucket(bucketName)
.maxKeys(s3SourceConfig.getS3ConfigFragment().getFetchPageSize() * PAGE_SIZE_FACTOR)
Expand All @@ -125,24 +125,28 @@ public Stream<S3Object> getS3ObjectStream(final String startToken) {
.build();

return Stream.iterate(s3Client.listObjectsV2(request), Objects::nonNull, response -> {
// This is called every time next() is called on the iterator.
if (response.isTruncated()) {
return s3Client.listObjectsV2(ListObjectsV2Request.builder()
.maxKeys(s3SourceConfig.getS3ConfigFragment().getFetchPageSize() * PAGE_SIZE_FACTOR)
.continuationToken(response.nextContinuationToken())
.build());
} else {
return null;
}

})
// This is called every time next() is called on the iterator.
if (response.isTruncated()) {
return s3Client.listObjectsV2(ListObjectsV2Request.builder()
.maxKeys(s3SourceConfig.getS3ConfigFragment().getFetchPageSize() * PAGE_SIZE_FACTOR)
.continuationToken(response.nextContinuationToken())
.build());
} else {
return null;
}

})
.flatMap(response -> response.contents()
.stream()
.filter(filterPredicate)
.filter(objectSummary -> assignObjectToTask(objectSummary.key()))
.filter(objectSummary -> !failedObjectKeys.contains(objectSummary.key())));
}

public Iterator<S3Object> getS3ObjectIterator(final String startToken) {
return new S3ObjectIterator(startToken);
}

public Iterator<String> getListOfObjectKeys(final String startToken) {
return getS3ObjectStream(startToken).map(S3Object::key).iterator();
}
Expand Down Expand Up @@ -177,4 +181,39 @@ public void shutdown() {
s3Client.close();
}

/**
* An iterator that reads from
*/
public class S3ObjectIterator implements Iterator<S3Object> {

/** The current iterator. */
private Iterator<S3Object> inner;
/** The last object key that was seen. */
private String lastSeenObjectKey;

private S3ObjectIterator(final String initialKey) {
lastSeenObjectKey = initialKey;
inner = getS3ObjectStream(lastSeenObjectKey).iterator();
}
@Override
public boolean hasNext() {
if (!inner.hasNext()) {
inner = getS3ObjectStream(lastSeenObjectKey).iterator();
}
return inner.hasNext();
}

@Override
public S3Object next() {
final S3Object result = inner.next();
lastSeenObjectKey = result.key();
return result;
}

@Override
public void remove() {
throw new UnsupportedOperationException();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ public long incrementAndUpdateOffsetMap(final Map<String, Object> partitionMap,
return startOffset;
}

public Map<String, Object> updateAndReturnCurrentOffsets(final Map<String, Object> partitionMap, final String currentObjectKey,
final long offset) {
public Map<String, Object> updateAndReturnCurrentOffsets(final Map<String, Object> partitionMap,
final String currentObjectKey, final long offset) {
final Map<String, Object> offsetMap = offsets.compute(partitionMap, (k, v) -> {
final Map<String, Object> map = v == null ? new Hashtable<>() : v;
map.put(getObjectMapKey(currentObjectKey), offset);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ public SchemaAndValue getValue() {
}

public SourceRecord getSourceRecord(final OffsetManager offsetManager) {
final Map<String, Object> offsetMap = offsetManager.updateAndReturnCurrentOffsets(getPartitionMap(), getObjectKey(),
getRecordNumber());
final Map<String, Object> offsetMap = offsetManager.updateAndReturnCurrentOffsets(getPartitionMap(),
getObjectKey(), getRecordNumber());
return new SourceRecord(getPartitionMap(), offsetMap, topic, partition(), keyData.schema(), keyData.value(),
valueData.schema(), valueData.value());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,17 @@ public SourceRecordIterator(final S3SourceConfig s3SourceConfig, final OffsetMan
this.transformer = transformer;
this.sourceClient = sourceClient;

inner = IteratorUtils.filteredIterator(sourceClient.getS3ObjectStream(null).iterator(),
inner = IteratorUtils.filteredIterator(sourceClient.getS3ObjectIterator(null),
s3Object -> this.fileNamePredicate.test(s3Object)); // call filter out bad file names and extract
// topic/partition
// topic/partition
}

@Override
protected Iterator<? extends S3SourceRecord> nextIterator(final int count) {
/*
* This code has to get the next iterator from the inner iterator if it exists, otherwise we need to restart
* with the last seen key.
*/
return inner.hasNext() ? convert(inner.next()).iterator() : null;
}

Expand All @@ -99,6 +103,13 @@ public void remove() {
throw new UnsupportedOperationException("This iterator is unmodifiable");
}

/**
* Converts the S3Object into stream of S3SourceRecords.
*
* @param s3Object
* the S3Object to read data from.
* @return a stream of S3SourceRecords created from the input stream of the S3Object.
*/
private Stream<S3SourceRecord> convert(final S3Object s3Object) {

final Map<String, Object> partitionMap = ConnectUtils.getPartitionMap(topic, partitionId, bucketName);
Expand All @@ -116,14 +127,29 @@ private Stream<S3SourceRecord> convert(final S3Object s3Object) {
.map(new Mapper(partitionMap, recordCount, keyData, s3Object.key()));
}

/**
* maps the data from the @{link Transformer} stream to an S3SourceRecord given all the additional data required.
*/
class Mapper implements Function<SchemaAndValue, S3SourceRecord> {
/**
* The partition map
*/
private final Map<String, Object> partitionMap;
/**
* The record number for the record being created.
*/
private long recordCount;
/**
* The schema and value for the key
*/
private final SchemaAndValue keyData;

/**
* The object key from S3
*/
private final String objectKey;

public Mapper(final Map<String, Object> partitionMap, final long recordCount, final SchemaAndValue keyData, final String objectKey) {
public Mapper(final Map<String, Object> partitionMap, final long recordCount, final SchemaAndValue keyData,
final String objectKey) {
this.partitionMap = partitionMap;
this.recordCount = recordCount;
this.keyData = keyData;
Expand All @@ -136,5 +162,4 @@ public S3SourceRecord apply(final SchemaAndValue value) {
return new S3SourceRecord(partitionMap, recordCount, topic, partitionId, objectKey, keyData, value);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ public boolean hasNext() {
public S3SourceRecord next() {
final S3SourceRecord result = inner.next();
if (!inner.hasNext()) {
inner = null; //NOPMD null assignment
inner = null; // NOPMD null assignment
}
return result;
}
Expand Down
Loading

0 comments on commit f401478

Please sign in to comment.