Skip to content

Commit

Permalink
fixed PMD errors
Browse files Browse the repository at this point in the history
  • Loading branch information
¨Claude committed Jan 6, 2025
1 parent a7c2570 commit 4dd9ad8
Show file tree
Hide file tree
Showing 13 changed files with 67 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -435,18 +435,16 @@ private long timeWithJitter() {
* If any thread interrupts this thread.
*/
public void delay() throws InterruptedException {
long sleepTime = timeRemaining.get();
if (sleepTime > 0) {
if (waitCount < maxCount) {
waitCount++;
long nextSleep = timeWithJitter();
// don't sleep negative time. Jitter can introduce negative tme.
if (nextSleep > 0) {
if (nextSleep >= sleepTime) {
abortTrigger.apply();
} else {
Thread.sleep(nextSleep);
}
final long sleepTime = timeRemaining.get();
if (sleepTime > 0 && waitCount < maxCount) {
waitCount++;
final long nextSleep = timeWithJitter();
// don't sleep negative time. Jitter can introduce negative tme.
if (nextSleep > 0) {
if (nextSleep >= sleepTime) {
abortTrigger.apply();
} else {
Thread.sleep(nextSleep);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,10 @@ void backoffTest() throws InterruptedException {

@Test
void backoffIncrementalTimeTest() throws InterruptedException {
AtomicBoolean abortTrigger = new AtomicBoolean();
final AtomicBoolean abortTrigger = new AtomicBoolean();
// delay increases in powers of 2.
final long maxDelay = 1000; // not a power of 2
AbstractSourceTask.BackoffConfig config = new AbstractSourceTask.BackoffConfig() {
final AbstractSourceTask.BackoffConfig config = new AbstractSourceTask.BackoffConfig() {
@Override
public AbstractSourceTask.SupplierOfLong getSupplierOfTimeRemaining() {
return () -> maxDelay;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ void testReadAvroRecords() throws Exception {
final ByteArrayOutputStream avroData = generateMockAvroData(25);
final InputStream inputStream = new ByteArrayInputStream(avroData.toByteArray());

List<String> expected = new ArrayList<>();
final List<String> expected = new ArrayList<>();
for (int i = 0; i < 25; i++) {
expected.add("Hello, Kafka Connect S3 Source! object " + i);
}
Expand All @@ -108,7 +108,7 @@ void testReadAvroRecordsSkipFew() throws Exception {
final ByteArrayOutputStream avroData = generateMockAvroData(20);
final InputStream inputStream = new ByteArrayInputStream(avroData.toByteArray());

List<String> expected = new ArrayList<>();
final List<String> expected = new ArrayList<>();
for (int i = 5; i < 20; i++) {
expected.add("Hello, Kafka Connect S3 Source! object " + i);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ void testHandleValueDataWithValidJson() {
final InputStream validJsonInputStream = new ByteArrayInputStream(
getJsonRecs(100).getBytes(StandardCharsets.UTF_8));

List<String> expected = new ArrayList<>();
final List<String> expected = new ArrayList<>();
for (int i = 0; i < 100; i++) {
expected.add("value" + i);
}
Expand All @@ -88,15 +88,15 @@ void testHandleValueDataWithValidJson() {

assertThat(records).extracting(SchemaAndValue::value)
.extracting(sv -> ((Map) sv).get("key"))
.containsExactlyElementsOf(expected);;
.containsExactlyElementsOf(expected);
}

@Test
void testHandleValueDataWithValidJsonSkipFew() {
final InputStream validJsonInputStream = new ByteArrayInputStream(
getJsonRecs(100).getBytes(StandardCharsets.UTF_8));

List<String> expected = new ArrayList<>();
final List<String> expected = new ArrayList<>();
for (int i = 25; i < 100; i++) {
expected.add("value" + i);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,20 @@ void verifyExceptionDuringIOOpen(final Transformer transformer, final byte[] tes
@MethodSource("testData")
void verifyExceptionDuringRead(final Transformer transformer, final byte[] testData, final AbstractConfig config,
final int expectedCount) throws IOException {
final InputStream inputStream = mock(InputStream.class);
when(inputStream.read()).thenThrow(new IOException("Test IOException during read"));
when(inputStream.read(any())).thenThrow(new IOException("Test IOException during read"));
when(inputStream.read(any(), anyInt(), anyInt())).thenThrow(new IOException("Test IOException during read"));
when(inputStream.readNBytes(any(), anyInt(), anyInt()))
.thenThrow(new IOException("Test IOException during read"));
when(inputStream.readNBytes(anyInt())).thenThrow(new IOException("Test IOException during read"));
when(inputStream.readAllBytes()).thenThrow(new IOException("Test IOException during read"));
final CloseTrackingStream stream = new CloseTrackingStream(inputStream);
final Stream<?> objStream = transformer.getRecords(() -> stream, "topic", 1, config, 0);
assertThat(objStream).isEmpty();
assertThat(stream.closeCount).isGreaterThan(0);
try (InputStream inputStream = mock(InputStream.class)) {
when(inputStream.read()).thenThrow(new IOException("Test IOException during read"));
when(inputStream.read(any())).thenThrow(new IOException("Test IOException during read"));
when(inputStream.read(any(), anyInt(), anyInt())).thenThrow(new IOException("Test IOException during read"));
when(inputStream.readNBytes(any(), anyInt(), anyInt()))
.thenThrow(new IOException("Test IOException during read"));
when(inputStream.readNBytes(anyInt())).thenThrow(new IOException("Test IOException during read"));
when(inputStream.readAllBytes()).thenThrow(new IOException("Test IOException during read"));
try (CloseTrackingStream stream = new CloseTrackingStream(inputStream)) {
final Stream<?> objStream = transformer.getRecords(() -> stream, "topic", 1, config, 0);
assertThat(objStream).isEmpty();
assertThat(stream.closeCount).isGreaterThan(0);
}
}
}

@ParameterizedTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public String version() {

@Override
protected Iterator<SourceRecord> getIterator(BackoffConfig config) { // NOPMD cognatavie complexity
Iterator<SourceRecord> inner = new Iterator<>() {
final Iterator<SourceRecord> inner = new Iterator<>() {
/**
* The backoff for Amazon retryable exceptions
*/
Expand Down Expand Up @@ -115,7 +115,7 @@ public boolean hasNext() {
@Override
public SourceRecord next() {
final S3SourceRecord s3SourceRecord = s3SourceRecordIterator.next();
offsetManager.setCurrentOffsets(s3SourceRecord.getPartitionMap(), s3SourceRecord.getObjectKey(),
offsetManager.updateAndReturnCurrentOffsets(s3SourceRecord.getPartitionMap(), s3SourceRecord.getObjectKey(),
s3SourceRecord.getRecordNumber());
return RecordProcessor.createSourceRecord(s3SourceRecord, s3SourceConfig, awsv2SourceClient,
offsetManager);
Expand Down Expand Up @@ -145,11 +145,11 @@ public void commit() {
@Override
public void commitRecord(final SourceRecord record) {
if (LOGGER.isInfoEnabled()) {
final Map<String, Object> map = (Map<String, Object>) record.sourceOffset();
//final Map<String, Object> map = (Map<String, Object>) record.sourceOffset();
// LOGGER.info("Committed individual record {} {} {} committed", map.get(BUCKET), map.get(OBJECT_KEY),
// offsetManager.recordsProcessedForObjectKey((Map<String, Object>) record.sourcePartition(),
// map.get(OBJECT_KEY).toString()));
LOGGER.info("Committed individual record {} committed", map);
LOGGER.info("Committed individual record {} committed", (Map<String, Object>) record.sourceOffset());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,7 @@ public Stream<S3Object> getS3ObjectStream(final String startToken) {
.startAfter(optionalKey(startToken))
.build();

final Stream<S3Object> s3ObjectKeyStream = Stream
.iterate(s3Client.listObjectsV2(request), Objects::nonNull, response -> {
return Stream.iterate(s3Client.listObjectsV2(request), Objects::nonNull, response -> {
// This is called every time next() is called on the iterator.
if (response.isTruncated()) {
return s3Client.listObjectsV2(ListObjectsV2Request.builder()
Expand All @@ -142,7 +141,6 @@ public Stream<S3Object> getS3ObjectStream(final String startToken) {
.filter(filterPredicate)
.filter(objectSummary -> assignObjectToTask(objectSummary.key()))
.filter(objectSummary -> !failedObjectKeys.contains(objectSummary.key())));
return s3ObjectKeyStream;
}

public Iterator<String> getListOfObjectKeys(final String startToken) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,10 @@ public long incrementAndUpdateOffsetMap(final Map<String, Object> partitionMap,
return startOffset;
}

public Map<String, Object> setCurrentOffsets(final Map<String, Object> partitionMap, final String currentObjectKey,
final long offset) {
Map<String, Object> offsetMap = offsets.compute(partitionMap, (k, v) -> {
Map<String, Object> map = v == null ? new Hashtable<>() : v;
public Map<String, Object> updateAndReturnCurrentOffsets(final Map<String, Object> partitionMap, final String currentObjectKey,
final long offset) {
final Map<String, Object> offsetMap = offsets.compute(partitionMap, (k, v) -> {
final Map<String, Object> map = v == null ? new Hashtable<>() : v;
map.put(getObjectMapKey(currentObjectKey), offset);
return map;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ public SchemaAndValue getValue() {
return new SchemaAndValue(valueData.schema(), valueData.value());
}

public SourceRecord getSourceRecord(OffsetManager offsetManager) {
Map<String, Object> offsetMap = offsetManager.setCurrentOffsets(getPartitionMap(), getObjectKey(),
public SourceRecord getSourceRecord(final OffsetManager offsetManager) {
final Map<String, Object> offsetMap = offsetManager.updateAndReturnCurrentOffsets(getPartitionMap(), getObjectKey(),
getRecordNumber());
return new SourceRecord(getPartitionMap(), offsetMap, topic, partition(), keyData.schema(), keyData.value(),
valueData.schema(), valueData.value());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package io.aiven.kafka.connect.s3.source.utils;

import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.function.Function;
Expand All @@ -33,26 +32,20 @@

import org.apache.commons.collections4.IteratorUtils;
import org.apache.commons.collections4.iterators.LazyIteratorChain;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.s3.model.S3Object;

/**
* Iterator that processes S3 files and creates Kafka source records. Supports different output formats (Avro, JSON,
* Parquet).
*/
public final class SourceRecordIterator extends LazyIteratorChain<S3SourceRecord> implements Iterator<S3SourceRecord> {

private static final Logger LOGGER = LoggerFactory.getLogger(SourceRecordIterator.class);
public static final String PATTERN_TOPIC_KEY = "topicName";
public static final String PATTERN_PARTITION_KEY = "partitionId";

public static final Pattern FILE_DEFAULT_PATTERN = Pattern.compile("(?<topicName>[^/]+?)-"
+ "(?<partitionId>\\d{5})-" + "(?<uniqueId>[a-zA-Z0-9]+)" + "\\.(?<fileExtension>[^.]+)$"); // topic-00001.txt
public static final long BYTES_TRANSFORMATION_NUM_OF_RECS = 1L;

private Iterator<S3SourceRecord> recordIterator = Collections.emptyIterator();

private final OffsetManager offsetManager;

private final S3SourceConfig s3SourceConfig;
Expand Down Expand Up @@ -83,6 +76,7 @@ public final class SourceRecordIterator extends LazyIteratorChain<S3SourceRecord

public SourceRecordIterator(final S3SourceConfig s3SourceConfig, final OffsetManager offsetManager,
final Transformer transformer, final AWSV2SourceClient sourceClient) {
super();
this.s3SourceConfig = s3SourceConfig;
this.offsetManager = offsetManager;

Expand All @@ -107,15 +101,15 @@ public void remove() {

private Stream<S3SourceRecord> convert(final S3Object s3Object) {

Map<String, Object> partitionMap = ConnectUtils.getPartitionMap(topic, partitionId, bucketName);
long recordCount = offsetManager.recordsProcessedForObjectKey(partitionMap, s3Object.key());
final Map<String, Object> partitionMap = ConnectUtils.getPartitionMap(topic, partitionId, bucketName);
final long recordCount = offsetManager.recordsProcessedForObjectKey(partitionMap, s3Object.key());

// Optimizing without reading stream again.
if (transformer instanceof ByteArrayTransformer && recordCount > 0) {
return Stream.empty();
}

SchemaAndValue keyData = transformer.getKeyData(s3Object.key(), topic, s3SourceConfig);
final SchemaAndValue keyData = transformer.getKeyData(s3Object.key(), topic, s3SourceConfig);

return transformer
.getRecords(sourceClient.getObject(s3Object.key()), topic, partitionId, s3SourceConfig, recordCount)
Expand All @@ -129,15 +123,15 @@ class Mapper implements Function<SchemaAndValue, S3SourceRecord> {

private final String objectKey;

public Mapper(Map<String, Object> partitionMap, long recordCount, SchemaAndValue keyData, String objectKey) {
public Mapper(final Map<String, Object> partitionMap, final long recordCount, final SchemaAndValue keyData, final String objectKey) {
this.partitionMap = partitionMap;
this.recordCount = recordCount;
this.keyData = keyData;
this.objectKey = objectKey;
}

@Override
public S3SourceRecord apply(SchemaAndValue value) {
public S3SourceRecord apply(final SchemaAndValue value) {
recordCount++;
return new S3SourceRecord(partitionMap, recordCount, topic, partitionId, objectKey, keyData, value);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ void testPollWithNoDataReturned() {
private void assertEquals(final S3SourceRecord s3Record, final SourceRecord sourceRecord) {
assertThat(sourceRecord).isNotNull();
assertThat(sourceRecord.sourcePartition()).isEqualTo(s3Record.getPartitionMap());
Map<String, Object> map = (Map<String, Object>) sourceRecord.sourceOffset();
final Map<String, Object> map = (Map<String, Object>) sourceRecord.sourceOffset();

assertThat(map.get(OffsetManager.getObjectMapKey(s3Record.getObjectKey())))
.isEqualTo(s3Record.getRecordNumber());
Expand All @@ -227,7 +227,7 @@ void testPollsWithRecords() {
assertThat(stopWatch.getTime()).isLessThan(AbstractSourceTask.MAX_POLL_TIME.toMillis());
}

private List<S3SourceRecord> createS3SourceRecords(int count) {
private List<S3SourceRecord> createS3SourceRecords(final int count) {
final List<S3SourceRecord> lst = new ArrayList<>();
if (count > 0) {
lst.add(createS3SourceRecord(TOPIC, PARTITION, TEST_BUCKET, OBJECT_KEY,
Expand All @@ -243,11 +243,11 @@ private List<S3SourceRecord> createS3SourceRecords(int count) {

@Test
void testPollWithInterruptedIterator() {
List<S3SourceRecord> lst = createS3SourceRecords(3);
final List<S3SourceRecord> lst = createS3SourceRecords(3);

Iterator<S3SourceRecord> inner1 = lst.subList(0, 2).iterator();
Iterator<S3SourceRecord> inner2 = lst.subList(2, 3).iterator();
Iterator<S3SourceRecord> sourceRecordIterator = new Iterator<>() {
final Iterator<S3SourceRecord> inner1 = lst.subList(0, 2).iterator();
final Iterator<S3SourceRecord> inner2 = lst.subList(2, 3).iterator();
final Iterator<S3SourceRecord> sourceRecordIterator = new Iterator<>() {
Iterator<S3SourceRecord> inner = inner1;
@Override
public boolean hasNext() {
Expand All @@ -260,9 +260,9 @@ public boolean hasNext() {

@Override
public S3SourceRecord next() {
S3SourceRecord result = inner.next();
final S3SourceRecord result = inner.next();
if (!inner.hasNext()) {
inner = null;
inner = null; //NOPMD null assignment
}
return result;
}
Expand All @@ -288,9 +288,9 @@ public S3SourceRecord next() {

@Test
void testPollWithSlowProducer() {
List<S3SourceRecord> lst = createS3SourceRecords(3);
final List<S3SourceRecord> lst = createS3SourceRecords(3);

Iterator<S3SourceRecord> sourceRecordIterator = new Iterator<>() {
final Iterator<S3SourceRecord> sourceRecordIterator = new Iterator<>() {
Iterator<S3SourceRecord> inner = lst.iterator();
@Override
public boolean hasNext() {
Expand Down
Loading

0 comments on commit 4dd9ad8

Please sign in to comment.