From 79d672372b0938362e0ed9ecc8a8409a783a27f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C2=A8Claude?= <¨claude.warren@aiven.io¨> Date: Thu, 19 Dec 2024 12:26:45 +0000 Subject: [PATCH] fixed byte integration test --- .../kafka/connect/common/OffsetManager.java | 16 ++++++++++++++ .../source/input/ByteArrayTransformer.java | 1 + .../common/source/input/Transformer.java | 2 +- .../input/TransformerStreamingTest.java | 2 ++ .../connect/s3/source/IntegrationBase.java | 7 ++++--- .../connect/s3/source/IntegrationTest.java | 21 ++++++++++--------- .../s3/source/utils/S3OffsetManagerEntry.java | 7 ++++--- .../s3/source/utils/S3SourceRecord.java | 5 ++++- .../utils/S3OffsetManagerEntryTest.java | 19 +++++++++++++++++ 9 files changed, 62 insertions(+), 18 deletions(-) diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/OffsetManager.java b/commons/src/main/java/io/aiven/kafka/connect/common/OffsetManager.java index be7e5b353..59f0431da 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/OffsetManager.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/OffsetManager.java @@ -152,14 +152,30 @@ public interface OffsetManagerEntry> extends Com */ OffsetManagerKey getManagerKey(); + /** + * Gets the Kafka topic for this entry. + * @return The Kafka topic for this entry. + */ String getTopic(); + /** + * Gets the Kafka partition for this entry. + * @return The Kafka partition for this entry. + */ Integer getPartition(); + /** + * Gets the number of records to skip to get to this record. + * This is the same as the zero-based index of this record if all records were in an array. + * @return The number of records to skip to get to this record. + */ default long skipRecords() { return 0; } + /** + * Increments the record count. + */ void incrementRecordCount(); } diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/ByteArrayTransformer.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/ByteArrayTransformer.java index 6f2d3ede7..2b1658295 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/ByteArrayTransformer.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/ByteArrayTransformer.java @@ -50,6 +50,7 @@ public Schema getKeySchema() { public StreamSpliterator createSpliterator(final IOSupplier inputStreamIOSupplier, final OffsetManager.OffsetManagerEntry offsetManagerEntry, final AbstractConfig sourceConfig) { return new StreamSpliterator(LOGGER, inputStreamIOSupplier, offsetManagerEntry) { + @Override protected InputStream inputOpened(final InputStream input) { return input; diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/Transformer.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/Transformer.java index 27ba132a1..653b7dd22 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/Transformer.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/Transformer.java @@ -173,7 +173,7 @@ public final void close() { public final boolean tryAdvance(final Consumer action) { boolean result = false; if (closed) { - logger.error("Attempt to advance after closed"); + return false; } try { if (inputStream == null) { diff --git a/commons/src/test/java/io/aiven/kafka/connect/common/source/input/TransformerStreamingTest.java b/commons/src/test/java/io/aiven/kafka/connect/common/source/input/TransformerStreamingTest.java index b8345a8ad..855e553ff 100644 --- a/commons/src/test/java/io/aiven/kafka/connect/common/source/input/TransformerStreamingTest.java +++ b/commons/src/test/java/io/aiven/kafka/connect/common/source/input/TransformerStreamingTest.java @@ -89,6 +89,8 @@ void verifyCloseCalledAtIteratorEnd(final Transformer transformer, final byte[] } assertThat(count).isEqualTo(expectedCount); assertThat(stream.closeCount).isGreaterThan(0); + + assertThat(iter).as("Calling hasNext() after last item should return false").isExhausted(); } static Stream testData() throws IOException { diff --git a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationBase.java b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationBase.java index 9ce09172b..5cdb3ac69 100644 --- a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationBase.java +++ b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationBase.java @@ -35,6 +35,7 @@ import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; +import io.aiven.kafka.connect.s3.source.utils.S3OffsetManagerEntry; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.NewTopic; @@ -170,14 +171,14 @@ static List consumeMessages(final String topic, final int expectedMess } } - static Map consumeOffsetMessages(KafkaConsumer consumer) throws IOException { + static List consumeOffsetMessages(KafkaConsumer consumer) throws IOException { // Poll messages from the topic - final Map messages = new HashMap<>(); + final List messages = new ArrayList<>(); final ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); for (final ConsumerRecord record : records) { Map offsetRec = OBJECT_MAPPER.readValue(record.value(), new TypeReference<>() { // NOPMD }); - messages.putAll(offsetRec); + messages.add(new S3OffsetManagerEntry(offsetRec)); } return messages; } diff --git a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java index f88b53928..ac0f5ae97 100644 --- a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java +++ b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java @@ -48,6 +48,7 @@ import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -187,7 +188,7 @@ void bytesTest(final TestInfo testInfo) { // Verify offset positions final Map expectedOffsetRecords = offsetKeys.subList(0, offsetKeys.size() - 1) .stream() - .collect(Collectors.toMap(Function.identity(), s -> 1)); + .collect(Collectors.toMap(Function.identity(), s -> 0L)); verifyOffsetPositions(expectedOffsetRecords, connectRunner.getBootstrapServers()); } @@ -289,7 +290,8 @@ private Map getAvroConfig(final String topicName, final InputFor @Test void jsonTest(final TestInfo testInfo) { - final var topicName = IntegrationBase.topicName(testInfo); + final int messageCount = 500; + final String topicName = IntegrationBase.topicName(testInfo); final Map connectorConfig = getConfig(CONNECTOR_NAME, topicName, 1); connectorConfig.put(INPUT_FORMAT_KEY, InputFormat.JSONL.getValue()); connectorConfig.put(VALUE_CONVERTER_KEY, "org.apache.kafka.connect.json.JsonConverter"); @@ -297,16 +299,15 @@ void jsonTest(final TestInfo testInfo) { connectRunner.configureConnector(CONNECTOR_NAME, connectorConfig); final String testMessage = "This is a test "; final StringBuilder jsonBuilder = new StringBuilder(); - for (int i = 0; i < 500; i++) { - final String jsonContent = "{\"message\": \"" + testMessage + "\", \"id\":\"" + i + "\"}"; - jsonBuilder.append(jsonContent).append("\n"); // NOPMD + for (int i = 0; i < messageCount; i++) { + jsonBuilder.append(String.format("{\"message\": \"%s\", \"id\":\"%s\"}%n", testMessage, i)); // NOPMD } final byte[] jsonBytes = jsonBuilder.toString().getBytes(StandardCharsets.UTF_8); final String offsetKey = writeToS3(topicName, jsonBytes, "00001"); - + await().atMost(1, TimeUnit.SECONDS); // Poll Json messages from the Kafka topic and deserialize them - final List records = IntegrationBase.consumeJsonMessages(topicName, 500, + final List records = IntegrationBase.consumeJsonMessages(topicName, 1, connectRunner.getBootstrapServers()); assertThat(records).map(jsonNode -> jsonNode.get("payload")).anySatisfy(jsonNode -> { @@ -314,8 +315,8 @@ void jsonTest(final TestInfo testInfo) { assertThat(jsonNode.get("id").asText()).contains("1"); }); - // Verify offset positions - verifyOffsetPositions(Map.of(offsetKey, 500), connectRunner.getBootstrapServers()); + // Verify offset positions -- 0 based counting. + verifyOffsetPositions(Map.of(offsetKey, (long)messageCount-1), connectRunner.getBootstrapServers()); } private static byte[] generateNextAvroMessagesStartingFromId(final int messageId, final int noOfAvroRecs, @@ -379,7 +380,7 @@ static void verifyOffsetPositions(final Map expectedRecords, fin try (KafkaConsumer consumer = new KafkaConsumer<>(consumerProperties)) { consumer.subscribe(Collections.singletonList("connect-offset-topic-" + CONNECTOR_NAME)); await().atMost(Duration.ofMinutes(1)).pollInterval(Duration.ofSeconds(1)).untilAsserted(() -> { - offsetRecs.putAll(IntegrationBase.consumeOffsetMessages(consumer)); + IntegrationBase.consumeOffsetMessages(consumer).forEach(s -> offsetRecs.put(s.getKey(), s.getRecordCount())); assertThat(offsetRecs).containsExactlyInAnyOrderEntriesOf(expectedRecords); }); } diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3OffsetManagerEntry.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3OffsetManagerEntry.java index b6248934b..676d568f2 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3OffsetManagerEntry.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3OffsetManagerEntry.java @@ -65,13 +65,14 @@ public S3OffsetManagerEntry(final String bucket, final String s3ObjectKey, final } /** - * Constructs an OffsetManagerEntry from an existing map. used by {@link #fromProperties(Map)}. Package private for - * testing + * Constructs an OffsetManagerEntry from an existing map. Used to reconstitute previously serialized + * S3OffsetManagerEntries. + * used by {@link #fromProperties(Map)} * * @param properties * the property map. */ - private S3OffsetManagerEntry(final Map properties) { + public S3OffsetManagerEntry(final Map properties) { data = new HashMap<>(properties); for (final String field : RESTRICTED_KEYS) { if (data.get(field) == null) { diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3SourceRecord.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3SourceRecord.java index 0856145ea..dcf336833 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3SourceRecord.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3SourceRecord.java @@ -21,6 +21,9 @@ import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.source.SourceRecord; +/** + * The S3SourceRecord creates an immutable copy of the offsetManagerEntry, the recordKey and the recordValue. + */ public final class S3SourceRecord { /** The S3OffsetManagerEntry for this source record */ @@ -41,7 +44,7 @@ public Object key() { } public SchemaAndValue value() { - return recordValue; + return new SchemaAndValue(recordValue.schema(), recordValue.value()); } public S3OffsetManagerEntry getOffsetManagerEntry() { diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/S3OffsetManagerEntryTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/S3OffsetManagerEntryTest.java index 8176676ba..4dd79a797 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/S3OffsetManagerEntryTest.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/S3OffsetManagerEntryTest.java @@ -122,4 +122,23 @@ void testUpdate() { assertThat(entry2.getProperty("random_entry")).isEqualTo(5L); verify(sourceTaskContext, times(0)).offsetStorageReader(); } + + @Test + void testFromProperties() { + final S3OffsetManagerEntry entry = new S3OffsetManagerEntry(TEST_BUCKET, OBJECT_KEY, TOPIC, PARTITION); + assertThat(entry.getRecordCount()).isEqualTo(0L); + assertThat(entry.getProperty("random_entry")).isNull(); + + entry.setProperty("random_entry", 5L); + entry.incrementRecordCount(); + assertThat(entry.getRecordCount()).isEqualTo(1L); + assertThat(entry.getProperty("random_entry")).isEqualTo(5L); + + S3OffsetManagerEntry other = entry.fromProperties(entry.getProperties()); + assertThat(other.getRecordCount()).isEqualTo(1L); + assertThat(other.getProperty("random_entry")).isEqualTo(5L); + + + + } }