Skip to content

Commit

Permalink
Additional check to make sure all old com.amazon entries are now remo…
Browse files Browse the repository at this point in the history
…ved from the source connector

Signed-off-by: Aindriu Lavelle <[email protected]>
  • Loading branch information
aindriu-aiven committed Dec 17, 2024
1 parent d6b69fb commit 7c4aea9
Show file tree
Hide file tree
Showing 5 changed files with 8 additions and 197 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
Expand Down Expand Up @@ -144,11 +143,9 @@ private List<S3SourceRecord> readNext() {
return sourceRecords;
}


try (Stream<Object> recordStream = transformer.getRecords(s3Object, topic, topicPartition,
s3SourceConfig, numberOfRecsAlreadyProcessed)) {

final byte[] keyBytes = currentObjectKey.getBytes(StandardCharsets.UTF_8);
final Iterator<Object> recordIterator = recordStream.iterator();
while (recordIterator.hasNext()) {
final Object record = recordIterator.next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import io.aiven.kafka.connect.common.source.input.InputFormat;
import io.aiven.kafka.connect.config.s3.S3ConfigFragment;

import com.amazonaws.regions.Regions;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.regions.Region;

Expand All @@ -42,7 +41,7 @@ void correctFullConfig() {
props.put(S3ConfigFragment.AWS_S3_BUCKET_NAME_CONFIG, "the-bucket");
props.put(S3ConfigFragment.AWS_S3_ENDPOINT_CONFIG, "AWS_S3_ENDPOINT");
props.put(S3ConfigFragment.AWS_S3_PREFIX_CONFIG, "AWS_S3_PREFIX");
props.put(S3ConfigFragment.AWS_S3_REGION_CONFIG, Regions.US_EAST_1.getName());
props.put(S3ConfigFragment.AWS_S3_REGION_CONFIG, Region.US_EAST_1.id());

// record, topic specific props
props.put(INPUT_FORMAT_KEY, InputFormat.AVRO.getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@

import io.aiven.kafka.connect.common.config.CompressionType;

import com.amazonaws.services.s3.model.MultiObjectDeleteException;
import com.github.luben.zstd.ZstdInputStream;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import org.slf4j.Logger;
Expand All @@ -47,6 +46,7 @@
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.services.s3.model.S3Object;

public class BucketAccessor {
Expand Down Expand Up @@ -79,11 +79,9 @@ public final void removeBucket() {
.bucket(bucketName)
.delete(Delete.builder().objects(deleteIds).build())
.build());
} catch (final MultiObjectDeleteException e) {
for (final var err : e.getErrors()) {
LOGGER.warn(String.format("Couldn't delete object: %s. Reason: [%s] %s", err.getKey(), err.getCode(),
err.getMessage()));
}
} catch (final S3Exception e) {
LOGGER.warn(
String.format("Couldn't delete objects. Reason: [%s] %s", e.awsErrorDetails().errorMessage(), e));
} catch (final SdkException e) {

LOGGER.error("Couldn't delete objects: {}, Exception{} ", deleteIds, e.getMessage());
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import static org.mockito.Mockito.when;

import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.util.Collections;
import java.util.stream.Stream;

Expand All @@ -38,7 +39,6 @@
import io.aiven.kafka.connect.common.source.input.Transformer;
import io.aiven.kafka.connect.s3.source.config.S3SourceConfig;

import com.amazonaws.services.s3.model.S3ObjectInputStream;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

Expand All @@ -64,8 +64,7 @@ void testIteratorProcessesS3Objects() throws Exception {
final String key = "topic-00001-abc123.txt";

// Mock InputStream
try (S3ObjectInputStream mockInputStream = new S3ObjectInputStream(new ByteArrayInputStream(new byte[] {}),
null);) {
try (InputStream mockInputStream = new ByteArrayInputStream(new byte[] {})) {
when(mockSourceApiClient.getObject(anyString())).thenReturn(() -> mockInputStream);

when(mockTransformer.getRecords(any(), anyString(), anyInt(), any(), anyLong()))
Expand Down Expand Up @@ -96,8 +95,7 @@ void testIteratorProcessesS3ObjectsForByteArrayTransformer() throws Exception {
final String key = "topic-00001-abc123.txt";

// Mock InputStream
try (S3ObjectInputStream mockInputStream = new S3ObjectInputStream(new ByteArrayInputStream(new byte[] {}),
null);) {
try (InputStream mockInputStream = new ByteArrayInputStream(new byte[] {})) {
when(mockSourceApiClient.getObject(anyString())).thenReturn(() -> mockInputStream);

// With ByteArrayTransformer
Expand Down

0 comments on commit 7c4aea9

Please sign in to comment.