Skip to content

Commit

Permalink
fixed byte integration test
Browse files Browse the repository at this point in the history
  • Loading branch information
¨Claude committed Dec 19, 2024
1 parent 934ab93 commit 79d6723
Show file tree
Hide file tree
Showing 9 changed files with 62 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -152,14 +152,30 @@ public interface OffsetManagerEntry<T extends OffsetManagerEntry<T>> extends Com
*/
OffsetManagerKey getManagerKey();

/**
* Gets the Kafka topic for this entry.
* @return The Kafka topic for this entry.
*/
String getTopic();

/**
* Gets the Kafka partition for this entry.
* @return The Kafka partition for this entry.
*/
Integer getPartition();

/**
* Gets the number of records to skip to get to this record.
* This is the same as the zero-based index of this record if all records were in an array.
* @return The number of records to skip to get to this record.
*/
default long skipRecords() {
return 0;
}

/**
* Increments the record count.
*/
void incrementRecordCount();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public Schema getKeySchema() {
public StreamSpliterator createSpliterator(final IOSupplier<InputStream> inputStreamIOSupplier,
final OffsetManager.OffsetManagerEntry<?> offsetManagerEntry, final AbstractConfig sourceConfig) {
return new StreamSpliterator(LOGGER, inputStreamIOSupplier, offsetManagerEntry) {

@Override
protected InputStream inputOpened(final InputStream input) {
return input;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ public final void close() {
public final boolean tryAdvance(final Consumer<? super SchemaAndValue> action) {
boolean result = false;
if (closed) {
logger.error("Attempt to advance after closed");
return false;
}
try {
if (inputStream == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ void verifyCloseCalledAtIteratorEnd(final Transformer transformer, final byte[]
}
assertThat(count).isEqualTo(expectedCount);
assertThat(stream.closeCount).isGreaterThan(0);

assertThat(iter).as("Calling hasNext() after last item should return false").isExhausted();
}

static Stream<Arguments> testData() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

import io.aiven.kafka.connect.s3.source.utils.S3OffsetManagerEntry;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
Expand Down Expand Up @@ -170,14 +171,14 @@ static <K, V> List<V> consumeMessages(final String topic, final int expectedMess
}
}

static Map<String, Object> consumeOffsetMessages(KafkaConsumer<byte[], byte[]> consumer) throws IOException {
static List<S3OffsetManagerEntry> consumeOffsetMessages(KafkaConsumer<byte[], byte[]> consumer) throws IOException {
// Poll messages from the topic
final Map<String, Object> messages = new HashMap<>();
final List<S3OffsetManagerEntry> messages = new ArrayList<>();
final ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofSeconds(1));
for (final ConsumerRecord<byte[], byte[]> record : records) {
Map<String, Object> offsetRec = OBJECT_MAPPER.readValue(record.value(), new TypeReference<>() { // NOPMD
});
messages.putAll(offsetRec);
messages.add(new S3OffsetManagerEntry(offsetRec));
}
return messages;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -187,7 +188,7 @@ void bytesTest(final TestInfo testInfo) {
// Verify offset positions
final Map<String, Object> expectedOffsetRecords = offsetKeys.subList(0, offsetKeys.size() - 1)
.stream()
.collect(Collectors.toMap(Function.identity(), s -> 1));
.collect(Collectors.toMap(Function.identity(), s -> 0L));
verifyOffsetPositions(expectedOffsetRecords, connectRunner.getBootstrapServers());
}

Expand Down Expand Up @@ -289,33 +290,33 @@ private Map<String, String> getAvroConfig(final String topicName, final InputFor

@Test
void jsonTest(final TestInfo testInfo) {
final var topicName = IntegrationBase.topicName(testInfo);
final int messageCount = 500;
final String topicName = IntegrationBase.topicName(testInfo);
final Map<String, String> connectorConfig = getConfig(CONNECTOR_NAME, topicName, 1);
connectorConfig.put(INPUT_FORMAT_KEY, InputFormat.JSONL.getValue());
connectorConfig.put(VALUE_CONVERTER_KEY, "org.apache.kafka.connect.json.JsonConverter");

connectRunner.configureConnector(CONNECTOR_NAME, connectorConfig);
final String testMessage = "This is a test ";
final StringBuilder jsonBuilder = new StringBuilder();
for (int i = 0; i < 500; i++) {
final String jsonContent = "{\"message\": \"" + testMessage + "\", \"id\":\"" + i + "\"}";
jsonBuilder.append(jsonContent).append("\n"); // NOPMD
for (int i = 0; i < messageCount; i++) {
jsonBuilder.append(String.format("{\"message\": \"%s\", \"id\":\"%s\"}%n", testMessage, i)); // NOPMD
}
final byte[] jsonBytes = jsonBuilder.toString().getBytes(StandardCharsets.UTF_8);

final String offsetKey = writeToS3(topicName, jsonBytes, "00001");

await().atMost(1, TimeUnit.SECONDS);
// Poll Json messages from the Kafka topic and deserialize them
final List<JsonNode> records = IntegrationBase.consumeJsonMessages(topicName, 500,
final List<JsonNode> records = IntegrationBase.consumeJsonMessages(topicName, 1,
connectRunner.getBootstrapServers());

assertThat(records).map(jsonNode -> jsonNode.get("payload")).anySatisfy(jsonNode -> {
assertThat(jsonNode.get("message").asText()).contains(testMessage);
assertThat(jsonNode.get("id").asText()).contains("1");
});

// Verify offset positions
verifyOffsetPositions(Map.of(offsetKey, 500), connectRunner.getBootstrapServers());
// Verify offset positions -- 0 based counting.
verifyOffsetPositions(Map.of(offsetKey, (long)messageCount-1), connectRunner.getBootstrapServers());
}

private static byte[] generateNextAvroMessagesStartingFromId(final int messageId, final int noOfAvroRecs,
Expand Down Expand Up @@ -379,7 +380,7 @@ static void verifyOffsetPositions(final Map<String, Object> expectedRecords, fin
try (KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(consumerProperties)) {
consumer.subscribe(Collections.singletonList("connect-offset-topic-" + CONNECTOR_NAME));
await().atMost(Duration.ofMinutes(1)).pollInterval(Duration.ofSeconds(1)).untilAsserted(() -> {
offsetRecs.putAll(IntegrationBase.consumeOffsetMessages(consumer));
IntegrationBase.consumeOffsetMessages(consumer).forEach(s -> offsetRecs.put(s.getKey(), s.getRecordCount()));
assertThat(offsetRecs).containsExactlyInAnyOrderEntriesOf(expectedRecords);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,14 @@ public S3OffsetManagerEntry(final String bucket, final String s3ObjectKey, final
}

/**
* Constructs an OffsetManagerEntry from an existing map. used by {@link #fromProperties(Map)}. Package private for
* testing
* Constructs an OffsetManagerEntry from an existing map. Used to reconstitute previously serialized
* S3OffsetManagerEntries.
* used by {@link #fromProperties(Map)}
*
* @param properties
* the property map.
*/
private S3OffsetManagerEntry(final Map<String, Object> properties) {
public S3OffsetManagerEntry(final Map<String, Object> properties) {
data = new HashMap<>(properties);
for (final String field : RESTRICTED_KEYS) {
if (data.get(field) == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.source.SourceRecord;

/**
* The S3SourceRecord creates an immutable copy of the offsetManagerEntry, the recordKey and the recordValue.
*/
public final class S3SourceRecord {

/** The S3OffsetManagerEntry for this source record */
Expand All @@ -41,7 +44,7 @@ public Object key() {
}

public SchemaAndValue value() {
return recordValue;
return new SchemaAndValue(recordValue.schema(), recordValue.value());
}

public S3OffsetManagerEntry getOffsetManagerEntry() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,4 +122,23 @@ void testUpdate() {
assertThat(entry2.getProperty("random_entry")).isEqualTo(5L);
verify(sourceTaskContext, times(0)).offsetStorageReader();
}

@Test
void testFromProperties() {
final S3OffsetManagerEntry entry = new S3OffsetManagerEntry(TEST_BUCKET, OBJECT_KEY, TOPIC, PARTITION);
assertThat(entry.getRecordCount()).isEqualTo(0L);
assertThat(entry.getProperty("random_entry")).isNull();

entry.setProperty("random_entry", 5L);
entry.incrementRecordCount();
assertThat(entry.getRecordCount()).isEqualTo(1L);
assertThat(entry.getProperty("random_entry")).isEqualTo(5L);

S3OffsetManagerEntry other = entry.fromProperties(entry.getProperties());
assertThat(other.getRecordCount()).isEqualTo(1L);
assertThat(other.getProperty("random_entry")).isEqualTo(5L);



}
}

0 comments on commit 79d6723

Please sign in to comment.