diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/OffsetManager.java b/commons/src/main/java/io/aiven/kafka/connect/common/OffsetManager.java index 17b10dae..e4fbf86a 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/OffsetManager.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/OffsetManager.java @@ -23,6 +23,7 @@ import java.util.function.Function; import org.apache.kafka.connect.source.SourceTaskContext; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,8 +67,8 @@ protected OffsetManager(final SourceTaskContext context, /** * Get an entry from the offset manager. This method will return the local copy if it has been created otherwise - * will get the data from Kafka. If there is not a local copy and not one from Kafka then an empty Optional - * is returned + * will get the data from Kafka. If there is not a local copy and not one from Kafka then an empty Optional is + * returned * * @param key * the key for the entry. @@ -162,7 +163,7 @@ public interface OffsetManagerEntry> extends Com * @throws NullPointerException * if a {@code null} key is not supported. */ - default int getInt(String key) { + default int getInt(final String key) { return ((Number) getProperty(key)).intValue(); } @@ -175,7 +176,7 @@ default int getInt(String key) { * @throws NullPointerException * if a {@code null} key is not supported. */ - default long getLong(String key) { + default long getLong(final String key) { return ((Number) getProperty(key)).longValue(); } @@ -188,7 +189,7 @@ default long getLong(String key) { * @throws NullPointerException * if a {@code null} key is not supported. */ - default String getString(String key) { + default String getString(final String key) { return getProperty(key).toString(); } diff --git a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java index 219b3f38..b9b398d4 100644 --- a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java +++ b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java @@ -52,15 +52,11 @@ import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import io.aiven.kafka.connect.common.source.input.InputFormat; import io.aiven.kafka.connect.s3.source.testutils.BucketAccessor; import io.aiven.kafka.connect.s3.source.testutils.ContentUtils; -import io.aiven.kafka.connect.s3.source.utils.S3OffsetManagerEntry; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.model.ObjectMetadata; @@ -190,8 +186,6 @@ void bytesTest(final TestInfo testInfo) { final Map expectedOffsetRecords = offsetKeys.subList(0, offsetKeys.size()) .stream() .collect(Collectors.toMap(Function.identity(), s -> 0L)); - - sendExtraMessages(topicName, (Map) connectorConfig); verifyOffsetPositions(expectedOffsetRecords, connectRunner.getBootstrapServers()); } @@ -249,8 +243,6 @@ void avroTest(final TestInfo testInfo) throws IOException { final Map expectedOffsetRecords = offsetKeys.stream() .collect(Collectors.toMap(Function.identity(), s -> (long) numOfRecsFactor)); - - sendExtraMessages(topicName, (Map) connectorConfig); verifyOffsetPositions(expectedOffsetRecords, connectRunner.getBootstrapServers()); } @@ -320,32 +312,10 @@ void jsonTest(final TestInfo testInfo) { assertThat(jsonNode.get("id").asText()).contains(Integer.toString(messageCount - 1)); }); - sendExtraMessages(topicName, (Map) connectorConfig); + // Verify offset positions -- 0 based counting. verifyOffsetPositions(Map.of(offsetKey, (long) messageCount), connectRunner.getBootstrapServers()); } - private void sendExtraMessages(String topic, Map props) { - try (Producer producer = new KafkaProducer<>(props)) { - int i = 1; - String key = Integer.toString(i); - String message = "this is message " + Integer.toString(i); - - producer.send(new ProducerRecord(topic, key, message)); - - // log a confirmation once the message is written - System.out.println("sent msg " + key); - try { - // Sleep for a second - Thread.sleep(1000); - } catch (Exception e) { - // do nothin; - } - - } catch (Exception e) { - System.out.println("Could not start producer: " + e); - } - } - private static byte[] generateNextAvroMessagesStartingFromId(final int messageId, final int noOfAvroRecs, final Schema schema) throws IOException { final DatumWriter datumWriter = new GenericDatumWriter<>(schema); @@ -406,26 +376,13 @@ static void verifyOffsetPositions(final Map expectedRecords, final final Map offsetRecs = new HashMap<>(); try (KafkaConsumer consumer = new KafkaConsumer<>(consumerProperties)) { consumer.subscribe(Collections.singletonList("connect-offset-topic-" + CONNECTOR_NAME)); - - await().atMost(Duration.ofMinutes(1)).pollInterval(Duration.ofSeconds(1)).untilAsserted(() -> { - readOffsetPositions(consumer, offsetRecs); - LOGGER.error(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> offset record count: {}", - offsetRecs.size()); + await().atMost(Duration.ofMinutes(2)).pollInterval(Duration.ofSeconds(1)).untilAsserted(() -> { + IntegrationBase.consumeOffsetMessages(consumer).forEach(s -> { + offsetRecs.merge(s.getKey(), s.getRecordCount(), (x, y) -> x > y ? x : y); + LOGGER.info("Read Offset Position: {} {} ", s.getKey(), s.getRecordCount()); + }); // TODO remove tis line assertThat(offsetRecs).containsExactlyInAnyOrderEntriesOf(expectedRecords); }); } } - - static void readOffsetPositions(final KafkaConsumer consumer, final Map offsetRecs) - throws IOException { - boolean read = true; - while (read) { - List entries = IntegrationBase.consumeOffsetMessages(consumer); - if (!entries.isEmpty()) { - entries.forEach(s -> offsetRecs.merge(s.getKey(), s.getRecordCount(), (x, y) -> x > y ? x : y)); - } else { - read = false; - } - } - } } diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java index 777c5b44..773e0ef6 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java @@ -107,7 +107,8 @@ private boolean extractOffsetManagerEntry(final S3Object s3Object) { final S3OffsetManagerEntry keyEntry = new S3OffsetManagerEntry(s3SourceConfig.getAwsS3BucketName(), s3Object.getKey(), fileMatcher.group(PATTERN_TOPIC_KEY), Integer.parseInt(fileMatcher.group(PATTERN_PARTITION_KEY))); - offsetManagerEntry = offsetManager.getEntry(keyEntry.getManagerKey(), keyEntry::fromProperties).orElse(keyEntry); + offsetManagerEntry = offsetManager.getEntry(keyEntry.getManagerKey(), keyEntry::fromProperties) + .orElse(keyEntry); return !checkBytesTransformation(transformer, offsetManagerEntry.getRecordCount()); } LOGGER.error("File naming doesn't match to any topic. {}", s3Object.getKey()); @@ -145,7 +146,7 @@ private Iterator getS3SourceRecordIterator(final S3Object s3Obje .of(new SchemaAndValue(transformer.getKeySchema(), s3Object.getKey().getBytes(StandardCharsets.UTF_8))); // Do not stream and map as the offsetManagerEntry updates in getRecords() will not be seen in S3SourceRecord // constructor. - Iterator iter = transformer + final Iterator iter = transformer .getRecords(s3Object::getObjectContent, offsetManagerEntry, s3SourceConfig) .iterator(); return IteratorUtils.transformedIterator(iter, value -> new S3SourceRecord(offsetManagerEntry, key, value)); diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/S3OffsetManagerEntryTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/S3OffsetManagerEntryTest.java index e6c7ffbb..0a4c31a1 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/S3OffsetManagerEntryTest.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/S3OffsetManagerEntryTest.java @@ -81,7 +81,8 @@ void testGetEntry() { when(offsetStorageReader.offset(any())).thenReturn(storedData); final S3OffsetManagerEntry keyEntry = newEntry(); - final Optional entry = offsetManager.getEntry(keyEntry.getManagerKey(), keyEntry::fromProperties); + final Optional entry = offsetManager.getEntry(keyEntry.getManagerKey(), + keyEntry::fromProperties); assertThat(entry).isPresent(); assertThat(entry.get().getPartition()).isEqualTo(PARTITION); assertThat(entry.get().getRecordCount()).isEqualTo(0); @@ -92,7 +93,8 @@ void testGetEntry() { // verify second read reads from local data - final Optional entry2 = offsetManager.getEntry(entry.get().getManagerKey(), entry.get()::fromProperties); + final Optional entry2 = offsetManager.getEntry(entry.get().getManagerKey(), + entry.get()::fromProperties); assertThat(entry2).isPresent(); assertThat(entry2.get().getPartition()).isEqualTo(PARTITION); assertThat(entry2.get().getRecordCount()).isEqualTo(0); @@ -116,7 +118,8 @@ void testUpdate() { offsetManager.updateCurrentOffsets(entry); - final Optional entry2 = offsetManager.getEntry(entry.getManagerKey(), entry::fromProperties); + final Optional entry2 = offsetManager.getEntry(entry.getManagerKey(), + entry::fromProperties); assertThat(entry2).isPresent(); assertThat(entry2.get().getPartition()).isEqualTo(PARTITION); assertThat(entry2.get().getRecordCount()).isEqualTo(1L);