-
Notifications
You must be signed in to change notification settings - Fork 18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Place offset manager in commons #373
base: s3-source-release
Are you sure you want to change the base?
Place offset manager in commons #373
Conversation
b5278e0
to
69ea274
Compare
Units tests pass, there is an issue with the integration tests not picking up the changes in commons. |
s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java
Outdated
Show resolved
Hide resolved
commons/src/main/java/io/aiven/kafka/connect/common/source/input/ByteArrayTransformer.java
Show resolved
Hide resolved
commons/src/main/java/io/aiven/kafka/connect/common/source/input/ByteArrayTransformer.java
Show resolved
Hide resolved
commons/src/main/java/io/aiven/kafka/connect/common/source/input/JsonTransformer.java
Show resolved
Hide resolved
commons/src/main/java/io/aiven/kafka/connect/common/source/input/JsonTransformer.java
Show resolved
Hide resolved
commons/src/main/java/io/aiven/kafka/connect/common/source/input/ParquetTransformer.java
Show resolved
Hide resolved
...-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3ObjectSummaryIterator.java
Show resolved
Hide resolved
if (objectListing.isTruncated()) { | ||
// get the next set of data and create an iterator on it. | ||
request.setStartAfter(null); | ||
request.withContinuationToken(objectListing.getContinuationToken()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am pretty sure the continuation token is all that is required here, you can create a new request and only add the contiuation token (possibly also require the bucket though)
...-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3ObjectSummaryIterator.java
Show resolved
Hide resolved
...rce-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had a few comments some are for future follow ups but we should create issues for them so we dont miss them.
throw new AmazonClientException(e); | ||
} | ||
this.s3ObjectIterator = IteratorUtils.filteredIterator(sourceClient.getIteratorOfObjects(null), | ||
s3Object -> extractOffsetManagerEntry(s3Object)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lambda can be replaced with method reference
s3Object -> extractOffsetManagerEntry(s3Object)); | |
this::extractOffsetManagerEntry); |
* the Abstract Config to use. | ||
* @return a Stream of SchemaAndValue objects. | ||
*/ | ||
public final Stream<SchemaAndValue> getRecords(final IOSupplier<InputStream> inputStreamIOSupplier, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is looking great, much simplified version
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to find why no events are pushed to kafka offsets topic
@@ -119,6 +118,7 @@ public List<SourceRecord> poll() throws InterruptedException { | |||
|
|||
while (!connectorStopped.get()) { | |||
try { | |||
waitForObjects(); | |||
extractSourceRecords(results); | |||
LOGGER.info("Number of records extracted and sent: {}", results.size()); | |||
return results; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have an extract of what is sent to kafka offsets topic, before this PR, and with this PR.
Before this PR :
SourceRecord{
sourcePartition={bucket=test-bucket0, topic=bytesTest, topicPartition=0},
sourceOffset={object_key_s3-source-connector-for-apache-kafka-test-2024-12-20T13:34:01.62052/bytesTest-00000-1734698057527.txt=1}
}
ConnectRecord{topic='bytesTest', kafkaPartition=0, key=[B@6e96f788, keySchema=null, value=[B@49e57a97, valueSchema=null, timestamp=null, headers=ConnectHeaders(headers=)}
With this PR :
SourceRecord{
sourcePartition={partition=0, bucket=test-bucket0, objectKey=s3-source-connector-for-apache-kafka-test-2024-12-20T13:28:08.047694/bytesTest-00000-1734697707480.txt, topic=bytesTest}, sourceOffset={bucket=test-bucket0, topic=bytesTest, partition=0, objectKey=s3-source-connector-for-apache-kafka-test-2024-12-20T13:28:08.047694/bytesTest-00000-1734697707480.txt, recordCount=0}
}
ConnectRecord{topic='bytesTest', kafkaPartition=0, key=[B@67e2252f, keySchema=null, value=[B@1d001ae2, valueSchema=null, timestamp=null, headers=ConnectHeaders(headers=)}
- There are some duplicate keys sent in sourcePartition, and sourceOffset, which should be removed.
- Have tested locally, and no events are pushed to connect-offset-topic- topic
Am not sure where the problem is, am going to debug further. May be something to do with the new structure
*/ | ||
@Override | ||
public OffsetManager.OffsetManagerKey getManagerKey() { | ||
return () -> Map.of(BUCKET, data.get(BUCKET), OBJECT_KEY, data.get(OBJECT_KEY)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of objectkey storing as keys, it is better to store partition ids in key.
We will have fewer number of keys.
Just verified lenses s3 source connector and adobe s3 source connector, and they store partitionids.
Can we think about this too ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
topic.partitions we have this config. Our earlier implementation was based on this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gharris1727 your suggestion will be helpful here.
According to javadocs of OffsetStorageReader : offsets() method, I was thinking we would have to store topic and partition id in offset storage keys atleast ?
@Override
public OffsetManager.OffsetManagerKey getManagerKey() {
return () -> Map.of(BUCKET, data.get(BUCKET), TOPIC, TOPIC, PARTITION, PARTITION);
}
When we have several objects under specified topics and partitions and to retrieve the stored offset map, how can be better structure the keys ?
IntegrationBase.consumeOffsetMessages(consumer).forEach(s -> { | ||
offsetRecs.merge(s.getKey(), s.getRecordCount(), (x, y) -> x > y ? x : y); | ||
}); | ||
// FIXME after KAFKA-14947 is fixed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But it is already working in feature branch. Not sure if it's totally related
for (final ConsumerRecord<K, V> record : records) { | ||
recordValues.add(record.value()); | ||
} | ||
} while (recordsRetrieved == 500); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
} while (recordsRetrieved == 500); | |
} while (recordsRetrieved > 10); |
This should deal with tests we introduce that require smaller numbers being polled at a time, without impacting performance.
Fix for KCON-57
While this looks like a large change, there are multiple cases where files were migrated from s3-source-connector to common module. Those files are counted twice. This change also removes unused classes/files.
Significant changes are in OffsetManager, S3SourceTask, S3SourceRecord and AWSV2SourceClient.
Made OffsetManager generic to handle multiple OffsetManagerRecord types while simplifying access from sources.
Source should implement an instance of OffsetManager.OffsetManagerEntry that tracks the specific data for the source.
OffsetManagerEntry is included in the Source specific record (e.g. S3SourceRecord), is updated as processing continues, and is the source of record for many of the S3 and Kafka specific values (e.g. partition, topic, S3Object key) as well as some dynamic data such as the current record number.
Transformer was modified to update the OffsetManagerEntry as records are returned.
Due to bug in Kafka this implementation can not guarantee write once functionality. https://issues.apache.org/jira/browse/KAFKA-14947
Added javadoc.