Skip to content

Commit

Permalink
fixed pmd errors
Browse files Browse the repository at this point in the history
  • Loading branch information
¨Claude committed Dec 23, 2024
1 parent 994bc9d commit a67f7cb
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.function.Function;

import org.apache.kafka.connect.source.SourceTaskContext;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -66,8 +67,8 @@ protected OffsetManager(final SourceTaskContext context,

/**
* Get an entry from the offset manager. This method will return the local copy if it has been created otherwise
* will get the data from Kafka. If there is not a local copy and not one from Kafka then an empty Optional
* is returned
* will get the data from Kafka. If there is not a local copy and not one from Kafka then an empty Optional is
* returned
*
* @param key
* the key for the entry.
Expand Down Expand Up @@ -162,7 +163,7 @@ public interface OffsetManagerEntry<T extends OffsetManagerEntry<T>> extends Com
* @throws NullPointerException
* if a {@code null} key is not supported.
*/
default int getInt(String key) {
default int getInt(final String key) {
return ((Number) getProperty(key)).intValue();
}

Expand All @@ -175,7 +176,7 @@ default int getInt(String key) {
* @throws NullPointerException
* if a {@code null} key is not supported.
*/
default long getLong(String key) {
default long getLong(final String key) {
return ((Number) getProperty(key)).longValue();
}

Expand All @@ -188,7 +189,7 @@ default long getLong(String key) {
* @throws NullPointerException
* if a {@code null} key is not supported.
*/
default String getString(String key) {
default String getString(final String key) {
return getProperty(key).toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,11 @@

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;

import io.aiven.kafka.connect.common.source.input.InputFormat;
import io.aiven.kafka.connect.s3.source.testutils.BucketAccessor;
import io.aiven.kafka.connect.s3.source.testutils.ContentUtils;
import io.aiven.kafka.connect.s3.source.utils.S3OffsetManagerEntry;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.ObjectMetadata;
Expand Down Expand Up @@ -190,8 +186,6 @@ void bytesTest(final TestInfo testInfo) {
final Map<String, Long> expectedOffsetRecords = offsetKeys.subList(0, offsetKeys.size())
.stream()
.collect(Collectors.toMap(Function.identity(), s -> 0L));

sendExtraMessages(topicName, (Map) connectorConfig);
verifyOffsetPositions(expectedOffsetRecords, connectRunner.getBootstrapServers());
}

Expand Down Expand Up @@ -249,8 +243,6 @@ void avroTest(final TestInfo testInfo) throws IOException {

final Map<String, Long> expectedOffsetRecords = offsetKeys.stream()
.collect(Collectors.toMap(Function.identity(), s -> (long) numOfRecsFactor));

sendExtraMessages(topicName, (Map) connectorConfig);
verifyOffsetPositions(expectedOffsetRecords, connectRunner.getBootstrapServers());
}

Expand Down Expand Up @@ -320,32 +312,10 @@ void jsonTest(final TestInfo testInfo) {
assertThat(jsonNode.get("id").asText()).contains(Integer.toString(messageCount - 1));
});

sendExtraMessages(topicName, (Map) connectorConfig);
// Verify offset positions -- 0 based counting.
verifyOffsetPositions(Map.of(offsetKey, (long) messageCount), connectRunner.getBootstrapServers());
}

private void sendExtraMessages(String topic, Map<String, Object> props) {
try (Producer<String, String> producer = new KafkaProducer<>(props)) {
int i = 1;
String key = Integer.toString(i);
String message = "this is message " + Integer.toString(i);

producer.send(new ProducerRecord<String, String>(topic, key, message));

// log a confirmation once the message is written
System.out.println("sent msg " + key);
try {
// Sleep for a second
Thread.sleep(1000);
} catch (Exception e) {
// do nothin;
}

} catch (Exception e) {
System.out.println("Could not start producer: " + e);
}
}

private static byte[] generateNextAvroMessagesStartingFromId(final int messageId, final int noOfAvroRecs,
final Schema schema) throws IOException {
final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
Expand Down Expand Up @@ -406,26 +376,13 @@ static void verifyOffsetPositions(final Map<String, Long> expectedRecords, final
final Map<String, Long> offsetRecs = new HashMap<>();
try (KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(consumerProperties)) {
consumer.subscribe(Collections.singletonList("connect-offset-topic-" + CONNECTOR_NAME));

await().atMost(Duration.ofMinutes(1)).pollInterval(Duration.ofSeconds(1)).untilAsserted(() -> {
readOffsetPositions(consumer, offsetRecs);
LOGGER.error(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> offset record count: {}",
offsetRecs.size());
await().atMost(Duration.ofMinutes(2)).pollInterval(Duration.ofSeconds(1)).untilAsserted(() -> {
IntegrationBase.consumeOffsetMessages(consumer).forEach(s -> {
offsetRecs.merge(s.getKey(), s.getRecordCount(), (x, y) -> x > y ? x : y);
LOGGER.info("Read Offset Position: {} {} ", s.getKey(), s.getRecordCount());
}); // TODO remove tis line
assertThat(offsetRecs).containsExactlyInAnyOrderEntriesOf(expectedRecords);
});
}
}

static void readOffsetPositions(final KafkaConsumer<byte[], byte[]> consumer, final Map<String, Long> offsetRecs)
throws IOException {
boolean read = true;
while (read) {
List<S3OffsetManagerEntry> entries = IntegrationBase.consumeOffsetMessages(consumer);
if (!entries.isEmpty()) {
entries.forEach(s -> offsetRecs.merge(s.getKey(), s.getRecordCount(), (x, y) -> x > y ? x : y));
} else {
read = false;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ private boolean extractOffsetManagerEntry(final S3Object s3Object) {
final S3OffsetManagerEntry keyEntry = new S3OffsetManagerEntry(s3SourceConfig.getAwsS3BucketName(),
s3Object.getKey(), fileMatcher.group(PATTERN_TOPIC_KEY),
Integer.parseInt(fileMatcher.group(PATTERN_PARTITION_KEY)));
offsetManagerEntry = offsetManager.getEntry(keyEntry.getManagerKey(), keyEntry::fromProperties).orElse(keyEntry);
offsetManagerEntry = offsetManager.getEntry(keyEntry.getManagerKey(), keyEntry::fromProperties)
.orElse(keyEntry);
return !checkBytesTransformation(transformer, offsetManagerEntry.getRecordCount());
}
LOGGER.error("File naming doesn't match to any topic. {}", s3Object.getKey());
Expand Down Expand Up @@ -145,7 +146,7 @@ private Iterator<S3SourceRecord> getS3SourceRecordIterator(final S3Object s3Obje
.of(new SchemaAndValue(transformer.getKeySchema(), s3Object.getKey().getBytes(StandardCharsets.UTF_8)));
// Do not stream and map as the offsetManagerEntry updates in getRecords() will not be seen in S3SourceRecord
// constructor.
Iterator<SchemaAndValue> iter = transformer
final Iterator<SchemaAndValue> iter = transformer
.getRecords(s3Object::getObjectContent, offsetManagerEntry, s3SourceConfig)
.iterator();
return IteratorUtils.transformedIterator(iter, value -> new S3SourceRecord(offsetManagerEntry, key, value));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ void testGetEntry() {
when(offsetStorageReader.offset(any())).thenReturn(storedData);

final S3OffsetManagerEntry keyEntry = newEntry();
final Optional<S3OffsetManagerEntry> entry = offsetManager.getEntry(keyEntry.getManagerKey(), keyEntry::fromProperties);
final Optional<S3OffsetManagerEntry> entry = offsetManager.getEntry(keyEntry.getManagerKey(),
keyEntry::fromProperties);
assertThat(entry).isPresent();
assertThat(entry.get().getPartition()).isEqualTo(PARTITION);
assertThat(entry.get().getRecordCount()).isEqualTo(0);
Expand All @@ -92,7 +93,8 @@ void testGetEntry() {

// verify second read reads from local data

final Optional<S3OffsetManagerEntry> entry2 = offsetManager.getEntry(entry.get().getManagerKey(), entry.get()::fromProperties);
final Optional<S3OffsetManagerEntry> entry2 = offsetManager.getEntry(entry.get().getManagerKey(),
entry.get()::fromProperties);
assertThat(entry2).isPresent();
assertThat(entry2.get().getPartition()).isEqualTo(PARTITION);
assertThat(entry2.get().getRecordCount()).isEqualTo(0);
Expand All @@ -116,7 +118,8 @@ void testUpdate() {

offsetManager.updateCurrentOffsets(entry);

final Optional<S3OffsetManagerEntry> entry2 = offsetManager.getEntry(entry.getManagerKey(), entry::fromProperties);
final Optional<S3OffsetManagerEntry> entry2 = offsetManager.getEntry(entry.getManagerKey(),
entry::fromProperties);
assertThat(entry2).isPresent();
assertThat(entry2.get().getPartition()).isEqualTo(PARTITION);
assertThat(entry2.get().getRecordCount()).isEqualTo(1L);
Expand Down

0 comments on commit a67f7cb

Please sign in to comment.