Skip to content

Commit

Permalink
fixed spotlessApply issues
Browse files Browse the repository at this point in the history
  • Loading branch information
¨Claude committed Dec 20, 2024
1 parent a81b01a commit db868ac
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -154,19 +154,22 @@ public interface OffsetManagerEntry<T extends OffsetManagerEntry<T>> extends Com

/**
* Gets the Kafka topic for this entry.
*
* @return The Kafka topic for this entry.
*/
String getTopic();

/**
* Gets the Kafka partition for this entry.
*
* @return The Kafka partition for this entry.
*/
Integer getPartition();

/**
* Gets the number of records to skip to get to this record.
* This is the same as the zero-based index of this record if all records were in an array.
* Gets the number of records to skip to get to this record. This is the same as the zero-based index of this
* record if all records were in an array.
*
* @return The number of records to skip to get to this record.
*/
default long skipRecords() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

import io.aiven.kafka.connect.s3.source.utils.S3OffsetManagerEntry;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
Expand All @@ -47,6 +46,8 @@
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.connect.json.JsonDeserializer;

import io.aiven.kafka.connect.s3.source.utils.S3OffsetManagerEntry;

import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
Expand Down Expand Up @@ -135,7 +136,8 @@ static List<String> consumeByteMessages(final String topic, final int expectedMe
String bootstrapServers) {
final Properties consumerProperties = getConsumerProperties(bootstrapServers, ByteArrayDeserializer.class,
ByteArrayDeserializer.class);
final List<byte[]> objects = consumeMessages(topic, expectedMessageCount, consumerProperties, Duration.ofMinutes(2));
final List<byte[]> objects = consumeMessages(topic, expectedMessageCount, consumerProperties,
Duration.ofMinutes(2));
return objects.stream().map(String::new).collect(Collectors.toList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,8 @@ void avroTest(final TestInfo testInfo) throws IOException {
entry(4 * numOfRecsFactor, "Hello, Kafka Connect S3 Source! object " + (4 * numOfRecsFactor)),
entry(5 * numOfRecsFactor, "Hello, Kafka Connect S3 Source! object " + (5 * numOfRecsFactor)));

final Map<String, Long> expectedOffsetRecords = offsetKeys.stream().collect(Collectors.toMap(Function.identity(), s -> (long) numOfRecsFactor - 1));
final Map<String, Long> expectedOffsetRecords = offsetKeys.stream()
.collect(Collectors.toMap(Function.identity(), s -> (long) numOfRecsFactor - 1));
verifyOffsetPositions(expectedOffsetRecords, connectRunner.getBootstrapServers());
}

Expand Down Expand Up @@ -312,7 +313,7 @@ void jsonTest(final TestInfo testInfo) {
});

// Verify offset positions -- 0 based counting.
verifyOffsetPositions(Map.of(offsetKey, (long)messageCount - 1), connectRunner.getBootstrapServers());
verifyOffsetPositions(Map.of(offsetKey, (long) messageCount - 1), connectRunner.getBootstrapServers());
}

private static byte[] generateNextAvroMessagesStartingFromId(final int messageId, final int noOfAvroRecs,
Expand Down Expand Up @@ -377,8 +378,9 @@ static void verifyOffsetPositions(final Map<String, Long> expectedRecords, final
consumer.subscribe(Collections.singletonList("connect-offset-topic-" + CONNECTOR_NAME));
await().atMost(Duration.ofMinutes(2)).pollInterval(Duration.ofSeconds(1)).untilAsserted(() -> {
IntegrationBase.consumeOffsetMessages(consumer).forEach(s -> {
offsetRecs.merge(s.getKey(), s.getRecordCount(), (x, y) -> x > y ? x : y);
LOGGER.info("Read Offset Position: {} {} ", s.getKey(), s.getRecordCount());}); // TODO remove tis line
offsetRecs.merge(s.getKey(), s.getRecordCount(), (x, y) -> x > y ? x : y);
LOGGER.info("Read Offset Position: {} {} ", s.getKey(), s.getRecordCount());
}); // TODO remove tis line
assertThat(offsetRecs).containsExactlyInAnyOrderEntriesOf(expectedRecords);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ public void commitRecord(SourceRecord record) throws InterruptedException {
if (LOGGER.isInfoEnabled()) {
Map<String, Object> map = (Map<String, Object>) record.sourceOffset();
S3OffsetManagerEntry entry = S3OffsetManagerEntry.wrap(map);
LOGGER.info("Committed individual record {} {} {} committed", entry.getBucket(), entry.getKey(), entry.getRecordCount());
LOGGER.info("Committed individual record {} {} {} committed", entry.getBucket(), entry.getKey(),
entry.getRecordCount());
}
}

Expand All @@ -126,7 +127,7 @@ public List<SourceRecord> poll() throws InterruptedException {
LOGGER.warn("Retryable error encountered during polling. Waiting before retrying...",
exception);
pollLock.wait(ERROR_BACKOFF);
// TODO validate that the iterator does not lose an S3Object. Add test to S3ObjectIterator.
// TODO validate that the iterator does not lose an S3Object. Add test to S3ObjectIterator.
} else {
LOGGER.warn("Non-retryable AmazonS3Exception occurred. Stopping polling.", exception);
return null; // NOPMD
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,13 @@ public S3OffsetManagerEntry(final String bucket, final String s3ObjectKey, final
}

/**
* Wraps an existing property map as an S3OffsetManagerEntry.
* Creates a copy of the map for its internal use.
* @param properties the map of properties to wrap.
* Wraps an existing property map as an S3OffsetManagerEntry. Creates a copy of the map for its internal use.
*
* @param properties
* the map of properties to wrap.
* @return an S3OffsetManagerEntry.
* @throws IllegalArgumentException if all the required fields are not present.
* @throws IllegalArgumentException
* if all the required fields are not present.
*/
public static S3OffsetManagerEntry wrap(final Map<String, Object> properties) {
if (properties == null) {
Expand All @@ -79,17 +81,16 @@ public static S3OffsetManagerEntry wrap(final Map<String, Object> properties) {
long recordCount = 0;
Object recordCountProperty = ourProperties.computeIfAbsent(RECORD_COUNT, s -> 0L);
if (recordCountProperty instanceof Number) {
recordCount = ((Number)recordCountProperty).longValue();
recordCount = ((Number) recordCountProperty).longValue();
}
final S3OffsetManagerEntry result = new S3OffsetManagerEntry(ourProperties);
result.recordCount = recordCount;
return result;
}

/**
* Constructs an OffsetManagerEntry from an existing map. Used to reconstitute previously serialized
* S3OffsetManagerEntries.
* used by {@link #fromProperties(Map)}
* Constructs an OffsetManagerEntry from an existing map. Used to reconstitute previously serialized
* S3OffsetManagerEntries. used by {@link #fromProperties(Map)}
*
* @param properties
* the property map.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,5 @@ void testFromProperties() {
assertThat(other.getRecordCount()).isEqualTo(1L);
assertThat(other.getProperty("random_entry")).isEqualTo(5L);



}
}

0 comments on commit db868ac

Please sign in to comment.