Skip to content

Commit

Permalink
Updated as per reviews
Browse files Browse the repository at this point in the history
  • Loading branch information
¨Claude committed Dec 20, 2024
1 parent 6fd112f commit aebc405
Show file tree
Hide file tree
Showing 12 changed files with 63 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public String getTargetTopicPartitions() {
}

public ErrorsTolerance getErrorsTolerance() {
return ErrorsTolerance.forName(sourceConfigFragment.getErrorsTolerance());
return sourceConfigFragment.getErrorsTolerance();
}

public int getMaxPollRecords() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ public int getExpectedMaxMessageBytes() {
return cfg.getInt(EXPECTED_MAX_MESSAGE_BYTES);
}

public String getErrorsTolerance() {
return cfg.getString(ERRORS_TOLERANCE);
public ErrorsTolerance getErrorsTolerance() {
return ErrorsTolerance.forName(cfg.getString(ERRORS_TOLERANCE));
}

private static class ErrorsToleranceValidator implements ConfigDef.Validator {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
import java.io.InputStream;
import java.util.function.Consumer;

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;

import io.aiven.kafka.connect.common.OffsetManager;
import io.aiven.kafka.connect.common.config.SourceCommonConfig;

import io.confluent.connect.avro.AvroData;
import org.apache.avro.file.DataFileStream;
Expand Down Expand Up @@ -62,7 +62,7 @@ public Schema getKeySchema() {

@Override
public StreamSpliterator createSpliterator(final IOSupplier<InputStream> inputStreamIOSupplier,
final OffsetManager.OffsetManagerEntry<?> offsetManagerEntry, final AbstractConfig sourceConfig) {
final OffsetManager.OffsetManagerEntry<?> offsetManagerEntry, final SourceCommonConfig sourceConfig) {
return new StreamSpliterator(LOGGER, inputStreamIOSupplier, offsetManagerEntry) {
private DataFileStream<GenericRecord> dataFileStream;
private final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@
import java.util.Arrays;
import java.util.function.Consumer;

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;

import io.aiven.kafka.connect.common.OffsetManager;
import io.aiven.kafka.connect.common.config.SourceCommonConfig;

import org.apache.commons.io.IOUtils;
import org.apache.commons.io.function.IOSupplier;
Expand All @@ -48,7 +48,7 @@ public Schema getKeySchema() {

@Override
public StreamSpliterator createSpliterator(final IOSupplier<InputStream> inputStreamIOSupplier,
final OffsetManager.OffsetManagerEntry<?> offsetManagerEntry, final AbstractConfig sourceConfig) {
final OffsetManager.OffsetManagerEntry<?> offsetManagerEntry, final SourceCommonConfig sourceConfig) {
return new StreamSpliterator(LOGGER, inputStreamIOSupplier, offsetManagerEntry) {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@
import java.nio.charset.StandardCharsets;
import java.util.function.Consumer;

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.json.JsonConverter;

import io.aiven.kafka.connect.common.OffsetManager;
import io.aiven.kafka.connect.common.config.SourceCommonConfig;

import org.apache.commons.io.function.IOSupplier;
import org.codehaus.plexus.util.StringUtils;
Expand Down Expand Up @@ -62,7 +62,7 @@ public Schema getKeySchema() {

@Override
public StreamSpliterator createSpliterator(final IOSupplier<InputStream> inputStreamIOSupplier,
final OffsetManager.OffsetManagerEntry<?> offsetManagerEntry, final AbstractConfig sourceConfig) {
final OffsetManager.OffsetManagerEntry<?> offsetManagerEntry, final SourceCommonConfig sourceConfig) {
return new StreamSpliterator(LOGGER, inputStreamIOSupplier, offsetManagerEntry) {
BufferedReader reader;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
import java.nio.file.Path;
import java.util.function.Consumer;

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;

import io.aiven.kafka.connect.common.OffsetManager;
import io.aiven.kafka.connect.common.config.SourceCommonConfig;
import io.aiven.kafka.connect.common.source.input.parquet.LocalInputFile;

import io.confluent.connect.avro.AvroData;
Expand Down Expand Up @@ -68,7 +68,7 @@ public Schema getKeySchema() {

@Override
public StreamSpliterator createSpliterator(final IOSupplier<InputStream> inputStreamIOSupplier,
final OffsetManager.OffsetManagerEntry<?> offsetManagerEntry, final AbstractConfig sourceConfig) {
final OffsetManager.OffsetManagerEntry<?> offsetManagerEntry, final SourceCommonConfig sourceConfig) {

return new StreamSpliterator(LOGGER, inputStreamIOSupplier, offsetManagerEntry) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;

import io.aiven.kafka.connect.common.OffsetManager;
import io.aiven.kafka.connect.common.config.SourceCommonConfig;

import org.apache.commons.io.function.IOSupplier;
import org.slf4j.Logger;
Expand All @@ -53,15 +53,16 @@ public abstract class Transformer {
* @return a Stream of SchemaAndValue objects.
*/
public final Stream<SchemaAndValue> getRecords(final IOSupplier<InputStream> inputStreamIOSupplier,
final OffsetManager.OffsetManagerEntry<?> offsetManagerEntry, final AbstractConfig config) {
final OffsetManager.OffsetManagerEntry<?> offsetManagerEntry, final SourceCommonConfig config) {
final StreamSpliterator spliterator = createSpliterator(inputStreamIOSupplier, offsetManagerEntry, config);
return StreamSupport.stream(spliterator, false)
.onClose(spliterator::close)
.skip(offsetManagerEntry.skipRecords());
}

/**
* Get the schema to use for the key.
* Get the schema to use for the key. Some keys do not have a schema, in those cases @{code null} should be
* returned.
*
* @return the Schema to use for the key.
*/
Expand All @@ -79,7 +80,7 @@ public final Stream<SchemaAndValue> getRecords(final IOSupplier<InputStream> inp
* @return a StreamSpliterator instance.
*/
protected abstract StreamSpliterator createSpliterator(IOSupplier<InputStream> inputStreamIOSupplier,
OffsetManager.OffsetManagerEntry<?> offsetManagerEntry, AbstractConfig sourceConfig);
OffsetManager.OffsetManagerEntry<?> offsetManagerEntry, SourceCommonConfig sourceConfig);

/**
* A Spliterator that performs various checks on the opening/closing of the input stream.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,23 +124,22 @@ void testHandleValueDataWithInvalidJson() {
assertThat(jsonNodes.count()).isEqualTo(0);
verify(offsetManagerEntry, times(0)).incrementRecordCount();
}
//
// @Test
// void testSerializeJsonDataValid() throws IOException {
// final InputStream validJsonInputStream = new ByteArrayInputStream(
// "{\"key\":\"value\"}".getBytes(StandardCharsets.UTF_8));
// final IOSupplier<InputStream> inputStreamIOSupplier = () -> validJsonInputStream;
// final Stream<SchemaAndValue> jsonNodes = jsonTransformer.getRecords(inputStreamIOSupplier, offsetManagerEntry,
// sourceCommonConfig);
// final Object serializedData = jsonTransformer
// .getValueData(
// jsonNodes.findFirst().orElseThrow(() -> new AssertionError("No records found in stream!")),
// TESTTOPIC, sourceCommonConfig)
// .value();
//
// // Assert: Verify the serialized data
// assertThat(serializedData).isInstanceOf(Map.class).extracting("key").isEqualTo("value");
// }

@Test
void testSerializeJsonDataValid() throws IOException {
final InputStream validJsonInputStream = new ByteArrayInputStream(
"{\"key\":\"value\"}".getBytes(StandardCharsets.UTF_8));
final IOSupplier<InputStream> inputStreamIOSupplier = () -> validJsonInputStream;
final List<SchemaAndValue> jsonNodes = jsonTransformer
.getRecords(inputStreamIOSupplier, offsetManagerEntry, sourceCommonConfig)
.collect(Collectors.toList());

assertThat(jsonNodes).hasSize(1);
final SchemaAndValue result = jsonNodes.get(0);

// Assert: Verify the serialized data
assertThat(result.value()).isInstanceOf(Map.class).extracting("key").isEqualTo("value");
}

@Test
void testGetRecordsWithIOException() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,14 @@
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Stream;

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.data.SchemaAndValue;

import io.aiven.kafka.connect.common.OffsetManager;
import io.aiven.kafka.connect.common.config.CommonConfig;
import io.aiven.kafka.connect.common.config.SourceCommonConfig;

import org.apache.commons.io.function.IOSupplier;
import org.junit.jupiter.params.ParameterizedTest;
Expand All @@ -56,8 +53,8 @@ private static OffsetManager.OffsetManagerEntry<?> getOffsetManagerEntry() {

@ParameterizedTest
@MethodSource("testData")
void verifyExceptionDuringIOOpen(final Transformer transformer, final byte[] testData, final AbstractConfig config,
final int expectedCount) throws IOException {
void verifyExceptionDuringIOOpen(final Transformer transformer, final byte[] testData,
final SourceCommonConfig config, final int expectedCount) throws IOException {
final IOSupplier<InputStream> ioSupplier = mock(IOSupplier.class);
when(ioSupplier.get()).thenThrow(new IOException("Test IOException during initialization"));
final Stream<SchemaAndValue> objStream = transformer.getRecords(ioSupplier, getOffsetManagerEntry(), config);
Expand All @@ -66,8 +63,8 @@ void verifyExceptionDuringIOOpen(final Transformer transformer, final byte[] tes

@ParameterizedTest
@MethodSource("testData")
void verifyCloseCalledAtEnd(final Transformer transformer, final byte[] testData, final AbstractConfig config,
final int expectedCount) throws IOException {
void verifyCloseCalledAtEnd(final Transformer transformer, final byte[] testData, final SourceCommonConfig config,
final int expectedCount) {
final CloseTrackingStream stream = new CloseTrackingStream(new ByteArrayInputStream(testData));
final Stream<?> objStream = transformer.getRecords(() -> stream, getOffsetManagerEntry(), config);
final long count = objStream.count();
Expand All @@ -78,7 +75,7 @@ void verifyCloseCalledAtEnd(final Transformer transformer, final byte[] testData
@ParameterizedTest
@MethodSource("testData")
void verifyCloseCalledAtIteratorEnd(final Transformer transformer, final byte[] testData,
final AbstractConfig config, final int expectedCount) throws IOException {
final SourceCommonConfig config, final int expectedCount) {
final CloseTrackingStream stream = new CloseTrackingStream(new ByteArrayInputStream(testData));
final Stream<SchemaAndValue> objStream = transformer.getRecords(() -> stream, getOffsetManagerEntry(), config);
final Iterator<SchemaAndValue> iter = objStream.iterator();
Expand All @@ -96,19 +93,14 @@ void verifyCloseCalledAtIteratorEnd(final Transformer transformer, final byte[]
static Stream<Arguments> testData() throws IOException {
final List<Arguments> lst = new ArrayList<>();
lst.add(Arguments.of(TransformerFactory.getTransformer(InputFormat.AVRO),
AvroTransformerTest.generateMockAvroData(100).toByteArray(),
new CommonConfig(new ConfigDef(), new HashMap<>()) {
}, 100));
AvroTransformerTest.generateMockAvroData(100).toByteArray(), mock(SourceCommonConfig.class), 100));
lst.add(Arguments.of(TransformerFactory.getTransformer(InputFormat.BYTES),
"Hello World".getBytes(StandardCharsets.UTF_8), new CommonConfig(new ConfigDef(), new HashMap<>()) {
}, 1));
"Hello World".getBytes(StandardCharsets.UTF_8), mock(SourceCommonConfig.class), 1));
lst.add(Arguments.of(TransformerFactory.getTransformer(InputFormat.JSONL),
JsonTransformerTest.getJsonRecs(100).getBytes(StandardCharsets.UTF_8),
new CommonConfig(new ConfigDef(), new HashMap<>()) {
}, 100));
JsonTransformerTest.getJsonRecs(100).getBytes(StandardCharsets.UTF_8), mock(SourceCommonConfig.class),
100));
lst.add(Arguments.of(TransformerFactory.getTransformer(InputFormat.PARQUET),
ParquetTransformerTest.generateMockParquetData(), new CommonConfig(new ConfigDef(), new HashMap<>()) {
}, 100));
ParquetTransformerTest.generateMockParquetData(), mock(SourceCommonConfig.class), 100));
return lst.stream();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.kafka.connect.source.SourceTask;

import io.aiven.kafka.connect.common.OffsetManager;
import io.aiven.kafka.connect.common.config.enums.ErrorsTolerance;
import io.aiven.kafka.connect.common.source.input.Transformer;
import io.aiven.kafka.connect.s3.source.config.S3SourceConfig;
import io.aiven.kafka.connect.s3.source.utils.AWSV2SourceClient;
Expand Down Expand Up @@ -145,6 +146,21 @@ public List<SourceRecord> poll() throws InterruptedException {
}
}

/**
* Handle an exception that may result in the task shutdown.
*
* @param s3SourceRecord
* the source record that was being read.
*/
private void handleException(final S3SourceRecord s3SourceRecord) {
if (s3SourceConfig.getErrorsTolerance() == ErrorsTolerance.NONE) {
LOGGER.error("Stopping Task");
connectorStopped.set(true);
} else {
awsv2SourceClient.addFailedObjectKeys(s3SourceRecord.getObjectKey());
}

}
/**
* Create a list of source records. Package private for testing.
*
Expand All @@ -168,7 +184,7 @@ List<SourceRecord> extractSourceRecords(final List<SourceRecord> results) {
lastRecord = entry.getRecordCount();
} catch (DataException e) {
LOGGER.error("Error in reading s3 object stream {}", e.getMessage(), e);
awsv2SourceClient.addFailedObjectKeys(s3SourceRecord.getObjectKey());
handleException(s3SourceRecord);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,4 +138,5 @@ public AWSCredentialsProvider getCustomCredentialsProvider() {
public S3ConfigFragment getS3ConfigFragment() {
return s3ConfigFragment;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@
import java.util.Collections;
import java.util.function.Consumer;

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.apache.kafka.connect.storage.OffsetStorageReader;

import io.aiven.kafka.connect.common.ClosableIterator;
import io.aiven.kafka.connect.common.OffsetManager;
import io.aiven.kafka.connect.common.config.SourceCommonConfig;
import io.aiven.kafka.connect.common.source.input.ByteArrayTransformer;
import io.aiven.kafka.connect.common.source.input.Transformer;
import io.aiven.kafka.connect.s3.source.config.S3SourceConfig;
Expand Down Expand Up @@ -143,7 +143,7 @@ public Schema getKeySchema() {

@Override
protected StreamSpliterator createSpliterator(final IOSupplier<InputStream> inputStreamIOSupplier,
final OffsetManager.OffsetManagerEntry<?> offsetManagerEntry, final AbstractConfig sourceConfig) {
final OffsetManager.OffsetManagerEntry<?> offsetManagerEntry, final SourceCommonConfig sourceConfig) {

return new StreamSpliterator(LOGGER, inputStreamIOSupplier, offsetManagerEntry) {
private boolean wasRead;
Expand Down

0 comments on commit aebc405

Please sign in to comment.