Skip to content

Commit

Permalink
changes to speed up and verify processing
Browse files Browse the repository at this point in the history
  • Loading branch information
¨Claude committed Dec 20, 2024
1 parent 3bf46ae commit a81b01a
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
Expand Down Expand Up @@ -160,17 +159,25 @@ static <K, V> List<V> consumeMessages(final String topic, final int expectedMess
consumer.subscribe(Collections.singletonList(topic));

final List<V> recordValues = new ArrayList<>();
await().atMost(maxTime).pollInterval(Duration.ofSeconds(5)).untilAsserted(() -> {
final ConsumerRecords<K, V> records = consumer.poll(Duration.ofMillis(500L));
for (final ConsumerRecord<K, V> record : records) {
recordValues.add(record.value());
}
assertThat(recordValues).hasSize(expectedMessageCount);
await().atMost(maxTime).pollInterval(Duration.ofSeconds(2)).untilAsserted(() -> {
assertThat(assertAllRecordsConsumed(consumer, recordValues)).hasSize(expectedMessageCount);
});
return recordValues;
}
}

private static <K, V> List<V> assertAllRecordsConsumed(KafkaConsumer<K, V> consumer, List<V> recordValues) {
int recordsRetrieved = 0;
do {
final ConsumerRecords<K, V> records = consumer.poll(Duration.ofMillis(500L));
recordsRetrieved = records.count();
for (final ConsumerRecord<K, V> record : records) {
recordValues.add(record.value());
}
} while (recordsRetrieved == 500);
return recordValues;
}

static List<S3OffsetManagerEntry> consumeOffsetMessages(KafkaConsumer<byte[], byte[]> consumer) throws IOException {
// Poll messages from the topic
final List<S3OffsetManagerEntry> messages = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,9 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -89,7 +87,7 @@ final class IntegrationTest implements IntegrationBase {
private static final Logger LOGGER = LoggerFactory.getLogger(IntegrationTest.class);
private static final String CONNECTOR_NAME = "aiven-s3-source-connector";
private static final String COMMON_PREFIX = "s3-source-connector-for-apache-kafka-test-";
private static final int OFFSET_FLUSH_INTERVAL_MS = 500;
private static final int OFFSET_FLUSH_INTERVAL_MS = 50;

private static final String S3_ACCESS_KEY_ID = "test-key-id0";
private static final String S3_SECRET_ACCESS_KEY = "test_secret_key0";
Expand Down Expand Up @@ -185,7 +183,7 @@ void bytesTest(final TestInfo testInfo) {
assertThat(records).containsOnly(testData1, testData2);

// Verify offset positions
final Map<String, Object> expectedOffsetRecords = offsetKeys.subList(0, offsetKeys.size() - 1)
final Map<String, Long> expectedOffsetRecords = offsetKeys.subList(0, offsetKeys.size() - 1)
.stream()
.collect(Collectors.toMap(Function.identity(), s -> 0L));
verifyOffsetPositions(expectedOffsetRecords, connectRunner.getBootstrapServers());
Expand Down Expand Up @@ -217,7 +215,7 @@ void avroTest(final TestInfo testInfo) throws IOException {
final byte[] outputStream5 = generateNextAvroMessagesStartingFromId(4 * numOfRecsFactor + 1, numOfRecsFactor,
schema);

final Set<String> offsetKeys = new HashSet<>();
final List<String> offsetKeys = new ArrayList<>();

offsetKeys.add(writeToS3(topicName, outputStream1, "00001"));
offsetKeys.add(writeToS3(topicName, outputStream2, "00001"));
Expand All @@ -243,8 +241,8 @@ void avroTest(final TestInfo testInfo) throws IOException {
entry(4 * numOfRecsFactor, "Hello, Kafka Connect S3 Source! object " + (4 * numOfRecsFactor)),
entry(5 * numOfRecsFactor, "Hello, Kafka Connect S3 Source! object " + (5 * numOfRecsFactor)));

verifyOffsetPositions(offsetKeys.stream().collect(Collectors.toMap(Function.identity(), s -> numOfRecsFactor)),
connectRunner.getBootstrapServers());
final Map<String, Long> expectedOffsetRecords = offsetKeys.stream().collect(Collectors.toMap(Function.identity(), s -> (long) numOfRecsFactor - 1));
verifyOffsetPositions(expectedOffsetRecords, connectRunner.getBootstrapServers());
}

@Test
Expand Down Expand Up @@ -289,7 +287,6 @@ private Map<String, String> getAvroConfig(final String topicName, final InputFor

@Test
void jsonTest(final TestInfo testInfo) {
List<String> test = testBucketAccessor.listObjects();
final int messageCount = 500;
final String topicName = IntegrationBase.topicName(testInfo);
final Map<String, String> connectorConfig = getConfig(CONNECTOR_NAME, topicName, 1);
Expand All @@ -311,11 +308,11 @@ void jsonTest(final TestInfo testInfo) {

assertThat(records).map(jsonNode -> jsonNode.get("payload")).anySatisfy(jsonNode -> {
assertThat(jsonNode.get("message").asText()).contains(testMessage);
assertThat(jsonNode.get("id").asText()).contains(Integer.toString(messageCount-1));
assertThat(jsonNode.get("id").asText()).contains(Integer.toString(messageCount - 1));
});

// Verify offset positions -- 0 based counting.
verifyOffsetPositions(Map.of(offsetKey, (long)messageCount-1), connectRunner.getBootstrapServers());
verifyOffsetPositions(Map.of(offsetKey, (long)messageCount - 1), connectRunner.getBootstrapServers());
}

private static byte[] generateNextAvroMessagesStartingFromId(final int messageId, final int noOfAvroRecs,
Expand Down Expand Up @@ -371,15 +368,17 @@ private static Map<String, String> basicS3ConnectorConfig() {
return config;
}

static void verifyOffsetPositions(final Map<String, Object> expectedRecords, final String bootstrapServers) {
static void verifyOffsetPositions(final Map<String, Long> expectedRecords, final String bootstrapServers) {
final Properties consumerProperties = IntegrationBase.getConsumerProperties(bootstrapServers,
ByteArrayDeserializer.class, ByteArrayDeserializer.class);

final Map<String, Object> offsetRecs = new HashMap<>();
final Map<String, Long> offsetRecs = new HashMap<>();
try (KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(consumerProperties)) {
consumer.subscribe(Collections.singletonList("connect-offset-topic-" + CONNECTOR_NAME));
await().atMost(Duration.ofMinutes(1)).pollInterval(Duration.ofSeconds(1)).untilAsserted(() -> {
IntegrationBase.consumeOffsetMessages(consumer).forEach(s -> offsetRecs.put(s.getKey(), s.getRecordCount()));
await().atMost(Duration.ofMinutes(2)).pollInterval(Duration.ofSeconds(1)).untilAsserted(() -> {
IntegrationBase.consumeOffsetMessages(consumer).forEach(s -> {
offsetRecs.merge(s.getKey(), s.getRecordCount(), (x, y) -> x > y ? x : y);
LOGGER.info("Read Offset Position: {} {} ", s.getKey(), s.getRecordCount());}); // TODO remove tis line
assertThat(offsetRecs).containsExactlyInAnyOrderEntriesOf(expectedRecords);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,20 @@ public void start(final Map<String, String> props) {
this.taskInitialized = true;
}

@Override
public void commit() throws InterruptedException {
LOGGER.info("Committed all records through last poll()");
}

@Override
public void commitRecord(SourceRecord record) throws InterruptedException {
if (LOGGER.isInfoEnabled()) {
Map<String, Object> map = (Map<String, Object>) record.sourceOffset();
S3OffsetManagerEntry entry = S3OffsetManagerEntry.wrap(map);
LOGGER.info("Committed individual record {} {} {} committed", entry.getBucket(), entry.getKey(), entry.getRecordCount());
}
}

@Override
public List<SourceRecord> poll() throws InterruptedException {
LOGGER.info("Polling for new records...");
Expand Down Expand Up @@ -119,12 +133,13 @@ public List<SourceRecord> poll() throws InterruptedException {
}
} catch (DataException exception) {
LOGGER.warn("DataException occurred during polling. No retries will be attempted.", exception);
} catch (final Throwable t) { // NOPMD
LOGGER.error("Unexpected error encountered. Closing resources and stopping task.", t);
} catch (final RuntimeException t) { // NOPMD
LOGGER.error("Unexpected Exception encountered. Closing resources and stopping task.", t);
closeResources();
throw t;
}
}
LOGGER.debug("Poll returning {} records.", results.size());
return results;
}
}
Expand All @@ -141,19 +156,22 @@ List<SourceRecord> extractSourceRecords(final List<SourceRecord> results) {
return results;
}
final int maxPollRecords = s3SourceConfig.getMaxPollRecords();

long lastRecord = 0;
for (int i = 0; sourceRecordIterator.hasNext() && i < maxPollRecords && !connectorStopped.get(); i++) {
final S3SourceRecord s3SourceRecord = sourceRecordIterator.next();
if (s3SourceRecord != null) {
try {
offsetManager.updateCurrentOffsets(s3SourceRecord.getOffsetManagerEntry());
S3OffsetManagerEntry entry = s3SourceRecord.getOffsetManagerEntry();
offsetManager.updateCurrentOffsets(entry);
results.add(s3SourceRecord.getSourceRecord());
lastRecord = entry.getRecordCount();
} catch (DataException e) {
LOGGER.error("Error in reading s3 object stream {}", e.getMessage(), e);
awsv2SourceClient.addFailedObjectKeys(s3SourceRecord.getObjectKey());
}
}
}
LOGGER.info("Last record in batch: {}", lastRecord);
return results;
}

Expand Down

0 comments on commit a81b01a

Please sign in to comment.