From aebc405168879452404b0a3389573c24b032c6f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C2=A8Claude?= <¨claude.warren@aiven.io¨> Date: Fri, 20 Dec 2024 14:12:09 +0000 Subject: [PATCH] Updated as per reviews --- .../common/config/SourceCommonConfig.java | 2 +- .../common/config/SourceConfigFragment.java | 4 +-- .../common/source/input/AvroTransformer.java | 4 +-- .../source/input/ByteArrayTransformer.java | 4 +-- .../common/source/input/JsonTransformer.java | 4 +-- .../source/input/ParquetTransformer.java | 4 +-- .../common/source/input/Transformer.java | 9 ++--- .../source/input/JsonTransformerTest.java | 33 +++++++++---------- .../input/TransformerStreamingTest.java | 30 +++++++---------- .../kafka/connect/s3/source/S3SourceTask.java | 18 +++++++++- .../s3/source/config/S3SourceConfig.java | 1 + .../utils/SourceRecordIteratorTest.java | 4 +-- 12 files changed, 63 insertions(+), 54 deletions(-) diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/config/SourceCommonConfig.java b/commons/src/main/java/io/aiven/kafka/connect/common/config/SourceCommonConfig.java index 4adecbc05..954c9151d 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/config/SourceCommonConfig.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/config/SourceCommonConfig.java @@ -66,7 +66,7 @@ public String getTargetTopicPartitions() { } public ErrorsTolerance getErrorsTolerance() { - return ErrorsTolerance.forName(sourceConfigFragment.getErrorsTolerance()); + return sourceConfigFragment.getErrorsTolerance(); } public int getMaxPollRecords() { diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/config/SourceConfigFragment.java b/commons/src/main/java/io/aiven/kafka/connect/common/config/SourceConfigFragment.java index c62431dcb..58befa60e 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/config/SourceConfigFragment.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/config/SourceConfigFragment.java @@ -88,8 +88,8 @@ public int getExpectedMaxMessageBytes() { return cfg.getInt(EXPECTED_MAX_MESSAGE_BYTES); } - public String getErrorsTolerance() { - return cfg.getString(ERRORS_TOLERANCE); + public ErrorsTolerance getErrorsTolerance() { + return ErrorsTolerance.forName(cfg.getString(ERRORS_TOLERANCE)); } private static class ErrorsToleranceValidator implements ConfigDef.Validator { diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/AvroTransformer.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/AvroTransformer.java index d8015731b..5c3261205 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/AvroTransformer.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/AvroTransformer.java @@ -20,11 +20,11 @@ import java.io.InputStream; import java.util.function.Consumer; -import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; import io.aiven.kafka.connect.common.OffsetManager; +import io.aiven.kafka.connect.common.config.SourceCommonConfig; import io.confluent.connect.avro.AvroData; import org.apache.avro.file.DataFileStream; @@ -62,7 +62,7 @@ public Schema getKeySchema() { @Override public StreamSpliterator createSpliterator(final IOSupplier inputStreamIOSupplier, - final OffsetManager.OffsetManagerEntry offsetManagerEntry, final AbstractConfig sourceConfig) { + final OffsetManager.OffsetManagerEntry offsetManagerEntry, final SourceCommonConfig sourceConfig) { return new StreamSpliterator(LOGGER, inputStreamIOSupplier, offsetManagerEntry) { private DataFileStream dataFileStream; private final DatumReader datumReader = new GenericDatumReader<>(); diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/ByteArrayTransformer.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/ByteArrayTransformer.java index 2b1658295..d40c5c0d1 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/ByteArrayTransformer.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/ByteArrayTransformer.java @@ -21,11 +21,11 @@ import java.util.Arrays; import java.util.function.Consumer; -import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; import io.aiven.kafka.connect.common.OffsetManager; +import io.aiven.kafka.connect.common.config.SourceCommonConfig; import org.apache.commons.io.IOUtils; import org.apache.commons.io.function.IOSupplier; @@ -48,7 +48,7 @@ public Schema getKeySchema() { @Override public StreamSpliterator createSpliterator(final IOSupplier inputStreamIOSupplier, - final OffsetManager.OffsetManagerEntry offsetManagerEntry, final AbstractConfig sourceConfig) { + final OffsetManager.OffsetManagerEntry offsetManagerEntry, final SourceCommonConfig sourceConfig) { return new StreamSpliterator(LOGGER, inputStreamIOSupplier, offsetManagerEntry) { @Override diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/JsonTransformer.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/JsonTransformer.java index a5900b318..d81b2ac76 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/JsonTransformer.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/JsonTransformer.java @@ -23,12 +23,12 @@ import java.nio.charset.StandardCharsets; import java.util.function.Consumer; -import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.json.JsonConverter; import io.aiven.kafka.connect.common.OffsetManager; +import io.aiven.kafka.connect.common.config.SourceCommonConfig; import org.apache.commons.io.function.IOSupplier; import org.codehaus.plexus.util.StringUtils; @@ -62,7 +62,7 @@ public Schema getKeySchema() { @Override public StreamSpliterator createSpliterator(final IOSupplier inputStreamIOSupplier, - final OffsetManager.OffsetManagerEntry offsetManagerEntry, final AbstractConfig sourceConfig) { + final OffsetManager.OffsetManagerEntry offsetManagerEntry, final SourceCommonConfig sourceConfig) { return new StreamSpliterator(LOGGER, inputStreamIOSupplier, offsetManagerEntry) { BufferedReader reader; diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/ParquetTransformer.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/ParquetTransformer.java index 47c465315..45cfb0733 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/ParquetTransformer.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/ParquetTransformer.java @@ -24,11 +24,11 @@ import java.nio.file.Path; import java.util.function.Consumer; -import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; import io.aiven.kafka.connect.common.OffsetManager; +import io.aiven.kafka.connect.common.config.SourceCommonConfig; import io.aiven.kafka.connect.common.source.input.parquet.LocalInputFile; import io.confluent.connect.avro.AvroData; @@ -68,7 +68,7 @@ public Schema getKeySchema() { @Override public StreamSpliterator createSpliterator(final IOSupplier inputStreamIOSupplier, - final OffsetManager.OffsetManagerEntry offsetManagerEntry, final AbstractConfig sourceConfig) { + final OffsetManager.OffsetManagerEntry offsetManagerEntry, final SourceCommonConfig sourceConfig) { return new StreamSpliterator(LOGGER, inputStreamIOSupplier, offsetManagerEntry) { diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/Transformer.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/Transformer.java index bca5ecba4..1cbcaa193 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/Transformer.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/Transformer.java @@ -23,11 +23,11 @@ import java.util.stream.Stream; import java.util.stream.StreamSupport; -import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; import io.aiven.kafka.connect.common.OffsetManager; +import io.aiven.kafka.connect.common.config.SourceCommonConfig; import org.apache.commons.io.function.IOSupplier; import org.slf4j.Logger; @@ -53,7 +53,7 @@ public abstract class Transformer { * @return a Stream of SchemaAndValue objects. */ public final Stream getRecords(final IOSupplier inputStreamIOSupplier, - final OffsetManager.OffsetManagerEntry offsetManagerEntry, final AbstractConfig config) { + final OffsetManager.OffsetManagerEntry offsetManagerEntry, final SourceCommonConfig config) { final StreamSpliterator spliterator = createSpliterator(inputStreamIOSupplier, offsetManagerEntry, config); return StreamSupport.stream(spliterator, false) .onClose(spliterator::close) @@ -61,7 +61,8 @@ public final Stream getRecords(final IOSupplier inp } /** - * Get the schema to use for the key. + * Get the schema to use for the key. Some keys do not have a schema, in those cases @{code null} should be + * returned. * * @return the Schema to use for the key. */ @@ -79,7 +80,7 @@ public final Stream getRecords(final IOSupplier inp * @return a StreamSpliterator instance. */ protected abstract StreamSpliterator createSpliterator(IOSupplier inputStreamIOSupplier, - OffsetManager.OffsetManagerEntry offsetManagerEntry, AbstractConfig sourceConfig); + OffsetManager.OffsetManagerEntry offsetManagerEntry, SourceCommonConfig sourceConfig); /** * A Spliterator that performs various checks on the opening/closing of the input stream. diff --git a/commons/src/test/java/io/aiven/kafka/connect/common/source/input/JsonTransformerTest.java b/commons/src/test/java/io/aiven/kafka/connect/common/source/input/JsonTransformerTest.java index b4436cb48..03b6b83b2 100644 --- a/commons/src/test/java/io/aiven/kafka/connect/common/source/input/JsonTransformerTest.java +++ b/commons/src/test/java/io/aiven/kafka/connect/common/source/input/JsonTransformerTest.java @@ -124,23 +124,22 @@ void testHandleValueDataWithInvalidJson() { assertThat(jsonNodes.count()).isEqualTo(0); verify(offsetManagerEntry, times(0)).incrementRecordCount(); } - // - // @Test - // void testSerializeJsonDataValid() throws IOException { - // final InputStream validJsonInputStream = new ByteArrayInputStream( - // "{\"key\":\"value\"}".getBytes(StandardCharsets.UTF_8)); - // final IOSupplier inputStreamIOSupplier = () -> validJsonInputStream; - // final Stream jsonNodes = jsonTransformer.getRecords(inputStreamIOSupplier, offsetManagerEntry, - // sourceCommonConfig); - // final Object serializedData = jsonTransformer - // .getValueData( - // jsonNodes.findFirst().orElseThrow(() -> new AssertionError("No records found in stream!")), - // TESTTOPIC, sourceCommonConfig) - // .value(); - // - // // Assert: Verify the serialized data - // assertThat(serializedData).isInstanceOf(Map.class).extracting("key").isEqualTo("value"); - // } + + @Test + void testSerializeJsonDataValid() throws IOException { + final InputStream validJsonInputStream = new ByteArrayInputStream( + "{\"key\":\"value\"}".getBytes(StandardCharsets.UTF_8)); + final IOSupplier inputStreamIOSupplier = () -> validJsonInputStream; + final List jsonNodes = jsonTransformer + .getRecords(inputStreamIOSupplier, offsetManagerEntry, sourceCommonConfig) + .collect(Collectors.toList()); + + assertThat(jsonNodes).hasSize(1); + final SchemaAndValue result = jsonNodes.get(0); + + // Assert: Verify the serialized data + assertThat(result.value()).isInstanceOf(Map.class).extracting("key").isEqualTo("value"); + } @Test void testGetRecordsWithIOException() throws IOException { diff --git a/commons/src/test/java/io/aiven/kafka/connect/common/source/input/TransformerStreamingTest.java b/commons/src/test/java/io/aiven/kafka/connect/common/source/input/TransformerStreamingTest.java index 855e553ff..335159eb0 100644 --- a/commons/src/test/java/io/aiven/kafka/connect/common/source/input/TransformerStreamingTest.java +++ b/commons/src/test/java/io/aiven/kafka/connect/common/source/input/TransformerStreamingTest.java @@ -25,17 +25,14 @@ import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.stream.Stream; -import org.apache.kafka.common.config.AbstractConfig; -import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.data.SchemaAndValue; import io.aiven.kafka.connect.common.OffsetManager; -import io.aiven.kafka.connect.common.config.CommonConfig; +import io.aiven.kafka.connect.common.config.SourceCommonConfig; import org.apache.commons.io.function.IOSupplier; import org.junit.jupiter.params.ParameterizedTest; @@ -56,8 +53,8 @@ private static OffsetManager.OffsetManagerEntry getOffsetManagerEntry() { @ParameterizedTest @MethodSource("testData") - void verifyExceptionDuringIOOpen(final Transformer transformer, final byte[] testData, final AbstractConfig config, - final int expectedCount) throws IOException { + void verifyExceptionDuringIOOpen(final Transformer transformer, final byte[] testData, + final SourceCommonConfig config, final int expectedCount) throws IOException { final IOSupplier ioSupplier = mock(IOSupplier.class); when(ioSupplier.get()).thenThrow(new IOException("Test IOException during initialization")); final Stream objStream = transformer.getRecords(ioSupplier, getOffsetManagerEntry(), config); @@ -66,8 +63,8 @@ void verifyExceptionDuringIOOpen(final Transformer transformer, final byte[] tes @ParameterizedTest @MethodSource("testData") - void verifyCloseCalledAtEnd(final Transformer transformer, final byte[] testData, final AbstractConfig config, - final int expectedCount) throws IOException { + void verifyCloseCalledAtEnd(final Transformer transformer, final byte[] testData, final SourceCommonConfig config, + final int expectedCount) { final CloseTrackingStream stream = new CloseTrackingStream(new ByteArrayInputStream(testData)); final Stream objStream = transformer.getRecords(() -> stream, getOffsetManagerEntry(), config); final long count = objStream.count(); @@ -78,7 +75,7 @@ void verifyCloseCalledAtEnd(final Transformer transformer, final byte[] testData @ParameterizedTest @MethodSource("testData") void verifyCloseCalledAtIteratorEnd(final Transformer transformer, final byte[] testData, - final AbstractConfig config, final int expectedCount) throws IOException { + final SourceCommonConfig config, final int expectedCount) { final CloseTrackingStream stream = new CloseTrackingStream(new ByteArrayInputStream(testData)); final Stream objStream = transformer.getRecords(() -> stream, getOffsetManagerEntry(), config); final Iterator iter = objStream.iterator(); @@ -96,19 +93,14 @@ void verifyCloseCalledAtIteratorEnd(final Transformer transformer, final byte[] static Stream testData() throws IOException { final List lst = new ArrayList<>(); lst.add(Arguments.of(TransformerFactory.getTransformer(InputFormat.AVRO), - AvroTransformerTest.generateMockAvroData(100).toByteArray(), - new CommonConfig(new ConfigDef(), new HashMap<>()) { - }, 100)); + AvroTransformerTest.generateMockAvroData(100).toByteArray(), mock(SourceCommonConfig.class), 100)); lst.add(Arguments.of(TransformerFactory.getTransformer(InputFormat.BYTES), - "Hello World".getBytes(StandardCharsets.UTF_8), new CommonConfig(new ConfigDef(), new HashMap<>()) { - }, 1)); + "Hello World".getBytes(StandardCharsets.UTF_8), mock(SourceCommonConfig.class), 1)); lst.add(Arguments.of(TransformerFactory.getTransformer(InputFormat.JSONL), - JsonTransformerTest.getJsonRecs(100).getBytes(StandardCharsets.UTF_8), - new CommonConfig(new ConfigDef(), new HashMap<>()) { - }, 100)); + JsonTransformerTest.getJsonRecs(100).getBytes(StandardCharsets.UTF_8), mock(SourceCommonConfig.class), + 100)); lst.add(Arguments.of(TransformerFactory.getTransformer(InputFormat.PARQUET), - ParquetTransformerTest.generateMockParquetData(), new CommonConfig(new ConfigDef(), new HashMap<>()) { - }, 100)); + ParquetTransformerTest.generateMockParquetData(), mock(SourceCommonConfig.class), 100)); return lst.stream(); } diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java index de97fb834..b8315775c 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java @@ -31,6 +31,7 @@ import org.apache.kafka.connect.source.SourceTask; import io.aiven.kafka.connect.common.OffsetManager; +import io.aiven.kafka.connect.common.config.enums.ErrorsTolerance; import io.aiven.kafka.connect.common.source.input.Transformer; import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; import io.aiven.kafka.connect.s3.source.utils.AWSV2SourceClient; @@ -145,6 +146,21 @@ public List poll() throws InterruptedException { } } + /** + * Handle an exception that may result in the task shutdown. + * + * @param s3SourceRecord + * the source record that was being read. + */ + private void handleException(final S3SourceRecord s3SourceRecord) { + if (s3SourceConfig.getErrorsTolerance() == ErrorsTolerance.NONE) { + LOGGER.error("Stopping Task"); + connectorStopped.set(true); + } else { + awsv2SourceClient.addFailedObjectKeys(s3SourceRecord.getObjectKey()); + } + + } /** * Create a list of source records. Package private for testing. * @@ -168,7 +184,7 @@ List extractSourceRecords(final List results) { lastRecord = entry.getRecordCount(); } catch (DataException e) { LOGGER.error("Error in reading s3 object stream {}", e.getMessage(), e); - awsv2SourceClient.addFailedObjectKeys(s3SourceRecord.getObjectKey()); + handleException(s3SourceRecord); } } } diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfig.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfig.java index f4b83c440..68b9b2f98 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfig.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfig.java @@ -138,4 +138,5 @@ public AWSCredentialsProvider getCustomCredentialsProvider() { public S3ConfigFragment getS3ConfigFragment() { return s3ConfigFragment; } + } diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java index 8199e3edc..c099343cc 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java @@ -29,7 +29,6 @@ import java.util.Collections; import java.util.function.Consumer; -import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.source.SourceTaskContext; @@ -37,6 +36,7 @@ import io.aiven.kafka.connect.common.ClosableIterator; import io.aiven.kafka.connect.common.OffsetManager; +import io.aiven.kafka.connect.common.config.SourceCommonConfig; import io.aiven.kafka.connect.common.source.input.ByteArrayTransformer; import io.aiven.kafka.connect.common.source.input.Transformer; import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; @@ -143,7 +143,7 @@ public Schema getKeySchema() { @Override protected StreamSpliterator createSpliterator(final IOSupplier inputStreamIOSupplier, - final OffsetManager.OffsetManagerEntry offsetManagerEntry, final AbstractConfig sourceConfig) { + final OffsetManager.OffsetManagerEntry offsetManagerEntry, final SourceCommonConfig sourceConfig) { return new StreamSpliterator(LOGGER, inputStreamIOSupplier, offsetManagerEntry) { private boolean wasRead;