diff --git a/README.md b/README.md index 46af4b9..6faa572 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# Csv Reader User Manual 2.1.0 +# Csv Reader User Manual 2.2.0 ## Document Information @@ -17,22 +17,22 @@ Csv reader read csv files, results are sending to RabbitMQ. ##### Reader configuration ```yaml -apiVersion: th2.exactpro.com/v1 +apiVersion: th2.exactpro.com/v2 kind: Th2Box metadata: name: read-csv spec: - image-name: ghcr.io/th2-net/th2-read-csv - image-version: + imageName: ghcr.io/th2-net/th2-read-csv + imageVersion: type: th2-read - custom-config: + customConfig: sourceDirectory: "dir/with/csv/" aliases: A: - nameRegexp: "fileA.*\\.log" + nameRegexp: "fileA.*\\.csv" # delimiter: "," B: - nameRegexp: "fileB.*\\.log" + nameRegexp: "fileB.*\\.csv" delimiter: ";" header: ['ColumnA', 'ColumnB', 'ColumnC'] common: @@ -46,11 +46,15 @@ spec: pullingInterval: "PT5S" validateContent: true validateOnlyExtraData: false + useTransport: true pins: - - name: read_csv_out - connection-type: mq - attributes: ['raw', 'publish', 'store'] - extended-settings: + mq: + publishers: + - name: to_mstore + attributes: + - publish + - transport-group + extendedSettings: service: enabled: false envVariables: @@ -83,10 +87,26 @@ spec: The default value is `true`; + validateOnlyExtraData - disables validation when the content size is less than the header size (probably some columns were not set on purpose). Works only with `validateContent` set to `true`. The default value is `false` -+ useTransport - enables using th2 transport protocol ++ useTransport - enables using th2 transport protocol. The default value is `true` ## Changes +### 2.2.0 + +#### Fixed: +read-csv throws the IndexOutOfBoundsException when it works in `useTransport` mode and calculates header by the first line in a CSV file + +#### Changed: +* Default value for `useTransport` option is `true` + +#### Added: +* netty-bytebuf-utils: `0.2.0` + +#### Updated: +* common: `5.7.1-dev` +* read-file-common-core: `3.1.0-dev` +* opencsv: `5.9` + ### 2.1.0 + th2 transport protocol support diff --git a/build.gradle b/build.gradle index e2e14cd..8778d2e 100644 --- a/build.gradle +++ b/build.gradle @@ -82,14 +82,17 @@ repositories { dependencies { api platform("com.exactpro.th2:bom:4.5.0") - implementation ("com.exactpro.th2:common:5.4.2-dev") + implementation ("com.exactpro.th2:common:5.7.1-dev") implementation "org.slf4j:slf4j-api" - api "com.exactpro.th2:read-file-common-core:3.0.0-dev" - implementation "com.opencsv:opencsv:5.8" + api "com.exactpro.th2:read-file-common-core:3.1.0-dev" + implementation('com.exactpro.th2:netty-bytebuf-utils:0.2.0') { + because("'asExpandable' method is used") + } + + implementation "com.opencsv:opencsv:5.9" implementation "javax.annotation:javax.annotation-api:1.3.2" - implementation "org.jetbrains.kotlin:kotlin-stdlib" implementation "org.jetbrains.kotlin:kotlin-reflect" implementation "com.fasterxml.jackson.core:jackson-databind" diff --git a/gradle.properties b/gradle.properties index ef8f9fb..f34a6c6 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1 +1 @@ -release_version=2.1.0 \ No newline at end of file +release_version=2.2.0 \ No newline at end of file diff --git a/src/main/java/com/exactpro/th2/readcsv/cfg/ReaderConfig.java b/src/main/java/com/exactpro/th2/readcsv/cfg/ReaderConfig.java index 8d626d1..ec2e301 100644 --- a/src/main/java/com/exactpro/th2/readcsv/cfg/ReaderConfig.java +++ b/src/main/java/com/exactpro/th2/readcsv/cfg/ReaderConfig.java @@ -26,12 +26,20 @@ import com.fasterxml.jackson.annotation.JsonPropertyDescription; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import com.fasterxml.jackson.module.kotlin.KotlinFeature; import com.fasterxml.jackson.module.kotlin.KotlinModule; public class ReaderConfig { public static final ObjectMapper MAPPER = new ObjectMapper() - .registerModule(new KotlinModule()) + .registerModule(new KotlinModule.Builder() + .withReflectionCacheSize(512) + .configure(KotlinFeature.NullToEmptyCollection, false) + .configure(KotlinFeature.NullToEmptyMap, false) + .configure(KotlinFeature.NullIsSameAsDefault, false) + .configure(KotlinFeature.SingletonSupport, true) + .configure(KotlinFeature.StrictNullChecks, false) + .build()) .registerModule(new JavaTimeModule()); @JsonProperty(required = true) @@ -53,7 +61,7 @@ public class ReaderConfig { private boolean validateOnlyExtraData = false; @JsonPropertyDescription("Enables using th2 transport protocol") - private boolean useTransport = false; + private boolean useTransport = true; public Path getSourceDirectory() { return sourceDirectory; diff --git a/src/main/java/com/exactpro/th2/readcsv/impl/ProtoReaderFactory.java b/src/main/java/com/exactpro/th2/readcsv/impl/ProtoReaderFactory.java index d4e395f..d28f992 100644 --- a/src/main/java/com/exactpro/th2/readcsv/impl/ProtoReaderFactory.java +++ b/src/main/java/com/exactpro/th2/readcsv/impl/ProtoReaderFactory.java @@ -108,14 +108,14 @@ private static Collection attachHeaderOrHold( private static RawMessage.Builder validateAndAppend( HeaderHolder headerHolder, HeaderInfo extractedHeader, - RawMessage.Builder it, + RawMessage.Builder builder, boolean validate, boolean validateOnlyExtraData ) { if (validate) { - headerHolder.validateContentSize(extractedHeader, it.getBody(), validateOnlyExtraData); + headerHolder.validateContentSize(extractedHeader, builder.getBody(), validateOnlyExtraData); } - return it.setBody(extractedHeader.getContent().concat(it.getBody())); + return builder.setBody(extractedHeader.getContent().concat(builder.getBody())); } @NotNull diff --git a/src/main/java/com/exactpro/th2/readcsv/impl/TransportHeaderHolder.java b/src/main/java/com/exactpro/th2/readcsv/impl/TransportHeaderHolder.java index 1a0fff9..2d952c6 100644 --- a/src/main/java/com/exactpro/th2/readcsv/impl/TransportHeaderHolder.java +++ b/src/main/java/com/exactpro/th2/readcsv/impl/TransportHeaderHolder.java @@ -18,10 +18,12 @@ import com.exactpro.th2.readcsv.cfg.CsvFileConfiguration; import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; import java.util.Map; +import static com.exactpro.th2.netty.bytebuf.util.ByteBufUtil.asExpandable; +import static io.netty.buffer.Unpooled.wrappedBuffer; + public class TransportHeaderHolder extends HeaderHolder { private static final byte NEW_LINE = '\n'; @@ -31,7 +33,7 @@ public TransportHeaderHolder(Map configurationMap) protected HeaderInfo formatHeader(String alias, CsvFileConfiguration configuration) { String headerString = String.join(Character.toString(configuration.getDelimiter()), configuration.getHeader()) + '\n'; - ByteBuf content = Unpooled.wrappedBuffer(headerString.getBytes(CHARSET)); + ByteBuf content = wrappedBuffer(headerString.getBytes(CHARSET)); return new HeaderInfo<>(alias, configuration.getHeader().size(), content); } @@ -42,6 +44,6 @@ protected String contentToString(ByteBuf content) { @Override protected ByteBuf addNewLine(ByteBuf content) { - return content.writeByte(NEW_LINE); + return asExpandable(content).writeByte(NEW_LINE); } } \ No newline at end of file diff --git a/src/main/java/com/exactpro/th2/readcsv/impl/TransportReaderFactory.java b/src/main/java/com/exactpro/th2/readcsv/impl/TransportReaderFactory.java index 6624c62..e1f1839 100644 --- a/src/main/java/com/exactpro/th2/readcsv/impl/TransportReaderFactory.java +++ b/src/main/java/com/exactpro/th2/readcsv/impl/TransportReaderFactory.java @@ -32,7 +32,6 @@ import com.exactpro.th2.read.file.common.state.impl.InMemoryReaderState; import com.exactpro.th2.readcsv.cfg.ReaderConfig; import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; import kotlin.Unit; import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; @@ -45,6 +44,8 @@ import java.util.List; import java.util.stream.Collectors; +import static io.netty.buffer.Unpooled.wrappedBuffer; + public class TransportReaderFactory extends AbstractReaderFactory { private static final Logger LOGGER = LoggerFactory.getLogger(TransportReaderFactory.class); @@ -110,15 +111,15 @@ private static Collection transportAttachHeaderOrH private static RawMessage.Builder transportValidateAndAppend( HeaderHolder headerHolder, HeaderInfo extractedHeader, - RawMessage.Builder it, + RawMessage.Builder builder, boolean validate, boolean validateOnlyExtraData ) { if (validate) { - headerHolder.validateContentSize(extractedHeader, it.getBody(), validateOnlyExtraData); + headerHolder.validateContentSize(extractedHeader, builder.getBody(), validateOnlyExtraData); } - return it.setBody(Unpooled.wrappedBuffer(extractedHeader.getContent(), it.getBody())); + return builder.setBody(wrappedBuffer(extractedHeader.getContent(), builder.getBody())); } @NotNull diff --git a/src/test/kotlin/com/exactpro/th2/readcsv/impl/TestCsvContentParser.kt b/src/test/kotlin/com/exactpro/th2/readcsv/impl/ProtoCsvContentParserTest.kt similarity index 94% rename from src/test/kotlin/com/exactpro/th2/readcsv/impl/TestCsvContentParser.kt rename to src/test/kotlin/com/exactpro/th2/readcsv/impl/ProtoCsvContentParserTest.kt index 05c6225..e9f5db2 100644 --- a/src/test/kotlin/com/exactpro/th2/readcsv/impl/TestCsvContentParser.kt +++ b/src/test/kotlin/com/exactpro/th2/readcsv/impl/ProtoCsvContentParserTest.kt @@ -24,7 +24,7 @@ import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test import java.io.BufferedReader -class TestCsvContentParser { +class ProtoCsvContentParserTest { private val parser = ProtoCsvContentParser( mapOf( "test" to CsvFileConfiguration(".*", ",") @@ -72,7 +72,7 @@ class TestCsvContentParser { private fun malformedCsv(): BufferedReader = readerForResource("malformed.csv") private fun readerForResource(resourceName: String): BufferedReader { - val resourceAsStream = TestCsvContentParser::class.java.classLoader.getResourceAsStream(resourceName) + val resourceAsStream = ProtoCsvContentParserTest::class.java.classLoader.getResourceAsStream(resourceName) return requireNotNull(resourceAsStream) { "Unknown resource: $resourceName" }.bufferedReader() diff --git a/src/test/kotlin/com/exactpro/th2/readcsv/impl/TestHeaderHolder.kt b/src/test/kotlin/com/exactpro/th2/readcsv/impl/ProtoHeaderHolderTest.kt similarity index 97% rename from src/test/kotlin/com/exactpro/th2/readcsv/impl/TestHeaderHolder.kt rename to src/test/kotlin/com/exactpro/th2/readcsv/impl/ProtoHeaderHolderTest.kt index fb0541d..bee752d 100644 --- a/src/test/kotlin/com/exactpro/th2/readcsv/impl/TestHeaderHolder.kt +++ b/src/test/kotlin/com/exactpro/th2/readcsv/impl/ProtoHeaderHolderTest.kt @@ -21,8 +21,7 @@ import com.google.protobuf.ByteString import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Test -// TODO: th2 transport version of these tests should be added -class TestHeaderHolder { +class ProtoHeaderHolderTest { private val holder = ProtoHeaderHolder( mapOf( "A" to CsvFileConfiguration(".*", ","), diff --git a/src/test/kotlin/com/exactpro/th2/readcsv/impl/TransportCsvContentParserTest.kt b/src/test/kotlin/com/exactpro/th2/readcsv/impl/TransportCsvContentParserTest.kt new file mode 100644 index 0000000..bb77461 --- /dev/null +++ b/src/test/kotlin/com/exactpro/th2/readcsv/impl/TransportCsvContentParserTest.kt @@ -0,0 +1,80 @@ +/* + * Copyright 2023 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.readcsv.impl + +import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.RawMessage +import com.exactpro.th2.read.file.common.StreamId +import com.exactpro.th2.readcsv.cfg.CsvFileConfiguration +import com.exactpro.th2.readcsv.exception.MalformedCsvException +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.Test +import java.io.BufferedReader + +class TransportCsvContentParserTest { + private val parser = TransportCsvContentParser( + mapOf( + "test" to CsvFileConfiguration(".*", ",") + ) + ) + private val streamId = StreamId("test") + + @Test + fun `can parse valid csv`() { + validCsv().use { + it.mark(1024) + val canParse = parser.canParse(streamId, it, true) + assertTrue(canParse) { + "Parse should be able to parse the valid csv" + } + it.reset() + + val parsed: Collection = parser.parse(streamId, it) + assertEquals(1, parsed.size) + assertEquals("This,is,valid,\"multiline\nCSV file\"", parsed.first().body.toString(Charsets.UTF_8)) + } + } + + @Test + fun `does not throw malformed exception if file can be changed`() { + malformedCsv().use { + val canParse = parser.canParse(streamId, it, false) + assertFalse(canParse) { + "Parse should not be able to parse the malformed csv" + } + } + } + + @Test + fun `throws malformed exception if file can not be changed`() { + malformedCsv().use { + assertThrows(MalformedCsvException::class.java) { + parser.canParse(streamId, it, true) + } + } + } + + private fun validCsv(): BufferedReader = readerForResource("valid.csv") + + private fun malformedCsv(): BufferedReader = readerForResource("malformed.csv") + + private fun readerForResource(resourceName: String): BufferedReader { + val resourceAsStream = TransportCsvContentParserTest::class.java.classLoader.getResourceAsStream(resourceName) + return requireNotNull(resourceAsStream) { + "Unknown resource: $resourceName" + }.bufferedReader() + } +} \ No newline at end of file diff --git a/src/test/kotlin/com/exactpro/th2/readcsv/impl/TransportHeaderHolderTest.kt b/src/test/kotlin/com/exactpro/th2/readcsv/impl/TransportHeaderHolderTest.kt new file mode 100644 index 0000000..8f5b050 --- /dev/null +++ b/src/test/kotlin/com/exactpro/th2/readcsv/impl/TransportHeaderHolderTest.kt @@ -0,0 +1,99 @@ +/* + * Copyright 2023 Exactpro (Exactpro Systems Limited) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.exactpro.th2.readcsv.impl + +import com.exactpro.th2.readcsv.cfg.CsvFileConfiguration +import io.netty.buffer.Unpooled.wrappedBuffer +import org.junit.jupiter.api.Assertions.assertDoesNotThrow +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertNotNull +import org.junit.jupiter.api.Assertions.assertNull +import org.junit.jupiter.api.Assertions.assertThrows +import org.junit.jupiter.api.Test + +class TransportHeaderHolderTest { + private val holder = TransportHeaderHolder( + mapOf( + "A" to CsvFileConfiguration(".*", ","), + "B" to CsvFileConfiguration(".*", ",").apply { + header = listOf("A", "B", "C") + }, + ) + ) + + @Test + fun `holds the header from cfg`() { + val headerInfo = holder.getHeaderForAlias("B") + assertNotNull(headerInfo) { "Cannot find info for alias B" } + assertEquals(3, headerInfo!!.size) { + "Unexpected size for header: " + headerInfo.content.toString(Charsets.UTF_8) + } + } + + @Test + fun `does not remove the header from cfg`() { + holder.clearHeaderForAlias("B") + + val headerInfo = holder.getHeaderForAlias("B") + assertNotNull(headerInfo) { "Cannot find info for alias B" } + } + + @Test + fun `holds header from file`() { + holder.setHeaderForAlias("A", wrappedBuffer("Header,with,\"Multi\nline\"".toByteArray())) + + val headerInfo = holder.getHeaderForAlias("A") + assertNotNull(headerInfo) { "Cannot find info for alias A" } + assertEquals(3, headerInfo!!.size) { + "Unexpected size for header: " + headerInfo.content.toString(Charsets.UTF_8) + } + assertEquals("Header,with,\"Multi\nline\"\n", headerInfo.content.toString(Charsets.UTF_8)) + } + + @Test + fun `clears header from file`() { + holder.setHeaderForAlias("A", wrappedBuffer("Header,with,\"Multi\nline\"".toByteArray())) + holder.clearHeaderForAlias("A") + + val headerInfo = holder.getHeaderForAlias("A") + assertNull(headerInfo) { "Header info for A was not cleared: ${headerInfo!!.content.toString(Charsets.UTF_8)}" } + } + + @Test + fun `reports error if content size does not match header`() { + val headerForAlias = holder.getHeaderForAlias("B") + assertThrows(IllegalStateException::class.java) { + holder.validateContentSize(headerForAlias, wrappedBuffer("1,2,3,4".toByteArray()), false) + } + } + + @Test + fun `does not report error if content size is less than header and reporting disabled`() { + val headerForAlias = holder.getHeaderForAlias("B") + assertDoesNotThrow { + holder.validateContentSize(headerForAlias, wrappedBuffer("1,2".toByteArray()), true) + } + } + + @Test + fun `does not report error if content size matches the header`() { + val headerForAlias = holder.getHeaderForAlias("B") + assertDoesNotThrow { + holder.validateContentSize(headerForAlias, wrappedBuffer("1,2,3".toByteArray()), false) + } + } +} \ No newline at end of file