Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TH2-5131] Fixed IndexOutOfBoundsException when read calculates header #19

Merged
merged 5 commits into from
Nov 27, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 27 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Csv Reader User Manual 2.1.0
# Csv Reader User Manual 2.1.1

## Document Information

Expand All @@ -17,22 +17,22 @@ Csv reader read csv files, results are sending to RabbitMQ.
##### Reader configuration

```yaml
apiVersion: th2.exactpro.com/v1
apiVersion: th2.exactpro.com/v2
kind: Th2Box
metadata:
name: read-csv
spec:
image-name: ghcr.io/th2-net/th2-read-csv
image-version: <image version>
imageName: ghcr.io/th2-net/th2-read-csv
imageVersion: <image version>
type: th2-read
custom-config:
customConfig:
sourceDirectory: "dir/with/csv/"
aliases:
A:
nameRegexp: "fileA.*\\.log"
nameRegexp: "fileA.*\\.csv"
# delimiter: ","
B:
nameRegexp: "fileB.*\\.log"
nameRegexp: "fileB.*\\.csv"
delimiter: ";"
header: ['ColumnA', 'ColumnB', 'ColumnC']
common:
Expand All @@ -46,11 +46,15 @@ spec:
pullingInterval: "PT5S"
validateContent: true
validateOnlyExtraData: false
useTransport: true
pins:
- name: read_csv_out
connection-type: mq
attributes: ['raw', 'publish', 'store']
extended-settings:
mq:
publishers:
- name: to_mstore
attributes:
- publish
- transport-group
extendedSettings:
service:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While we are here changing the readme how about changing the JAVA_TOOL_OPTIONS to something more realistic?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately I have run read in pico only and doesn't have all parameters for calculation more realistic parameter

enabled: false
envVariables:
Expand Down Expand Up @@ -83,10 +87,21 @@ spec:
The default value is `true`;
+ validateOnlyExtraData - disables validation when the content size is less than the header size (probably some columns were not set on purpose).
Works only with `validateContent` set to `true`. The default value is `false`
+ useTransport - enables using th2 transport protocol
+ useTransport - enables using th2 transport protocol. The default value is `true`

## Changes

### 2.1.1

#### Fixed:
read-csv throws the IndexOutOfBoundsException when it works in `useTransport` mode and calculates header by the first line in a CSV file

#### Changed:
* Default value for `useTransport` option is `true`

#### Added:
* netty-bytebuf-utils: `0.2.0`

### 2.1.0

+ th2 transport protocol support
Expand Down
4 changes: 4 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ dependencies {
implementation "org.slf4j:slf4j-api"

api "com.exactpro.th2:read-file-common-core:3.0.0-dev"
implementation('com.exactpro.th2:netty-bytebuf-utils:0.2.0') {
because("'asExpandable' method is used")
}

implementation "com.opencsv:opencsv:5.8"

implementation "javax.annotation:javax.annotation-api:1.3.2"
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
release_version=2.1.0
release_version=2.2.0
12 changes: 10 additions & 2 deletions src/main/java/com/exactpro/th2/readcsv/cfg/ReaderConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,20 @@
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.fasterxml.jackson.module.kotlin.KotlinFeature;
import com.fasterxml.jackson.module.kotlin.KotlinModule;

public class ReaderConfig {

public static final ObjectMapper MAPPER = new ObjectMapper()
.registerModule(new KotlinModule())
.registerModule(new KotlinModule.Builder()
.withReflectionCacheSize(512)
.configure(KotlinFeature.NullToEmptyCollection, false)
.configure(KotlinFeature.NullToEmptyMap, false)
.configure(KotlinFeature.NullIsSameAsDefault, false)
.configure(KotlinFeature.SingletonSupport, true)
.configure(KotlinFeature.StrictNullChecks, false)
.build())
.registerModule(new JavaTimeModule());

@JsonProperty(required = true)
Expand All @@ -53,7 +61,7 @@ public class ReaderConfig {
private boolean validateOnlyExtraData = false;

@JsonPropertyDescription("Enables using th2 transport protocol")
private boolean useTransport = false;
private boolean useTransport = true;

public Path getSourceDirectory() {
return sourceDirectory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,14 +108,14 @@ private static Collection<? extends RawMessage.Builder> attachHeaderOrHold(
private static RawMessage.Builder validateAndAppend(
HeaderHolder<ByteString> headerHolder,
HeaderInfo<ByteString> extractedHeader,
RawMessage.Builder it,
RawMessage.Builder builder,
boolean validate,
boolean validateOnlyExtraData
) {
if (validate) {
headerHolder.validateContentSize(extractedHeader, it.getBody(), validateOnlyExtraData);
headerHolder.validateContentSize(extractedHeader, builder.getBody(), validateOnlyExtraData);
}
return it.setBody(extractedHeader.getContent().concat(it.getBody()));
return builder.setBody(extractedHeader.getContent().concat(builder.getBody()));
}

@NotNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@

import com.exactpro.th2.readcsv.cfg.CsvFileConfiguration;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;

import java.util.Map;

import static com.exactpro.th2.netty.bytebuf.util.ByteBufUtil.asExpandable;
import static io.netty.buffer.Unpooled.wrappedBuffer;

public class TransportHeaderHolder extends HeaderHolder<ByteBuf> {
private static final byte NEW_LINE = '\n';

Expand All @@ -31,7 +33,7 @@ public TransportHeaderHolder(Map<String, CsvFileConfiguration> configurationMap)

protected HeaderInfo<ByteBuf> formatHeader(String alias, CsvFileConfiguration configuration) {
String headerString = String.join(Character.toString(configuration.getDelimiter()), configuration.getHeader()) + '\n';
ByteBuf content = Unpooled.wrappedBuffer(headerString.getBytes(CHARSET));
ByteBuf content = wrappedBuffer(headerString.getBytes(CHARSET));
return new HeaderInfo<>(alias, configuration.getHeader().size(), content);
}

Expand All @@ -42,6 +44,6 @@ protected String contentToString(ByteBuf content) {

@Override
protected ByteBuf addNewLine(ByteBuf content) {
return content.writeByte(NEW_LINE);
return asExpandable(content).writeByte(NEW_LINE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import com.exactpro.th2.read.file.common.state.impl.InMemoryReaderState;
import com.exactpro.th2.readcsv.cfg.ReaderConfig;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import kotlin.Unit;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
Expand All @@ -45,6 +44,8 @@
import java.util.List;
import java.util.stream.Collectors;

import static io.netty.buffer.Unpooled.wrappedBuffer;

public class TransportReaderFactory extends AbstractReaderFactory {
private static final Logger LOGGER = LoggerFactory.getLogger(TransportReaderFactory.class);

Expand Down Expand Up @@ -110,15 +111,15 @@ private static Collection<? extends RawMessage.Builder> transportAttachHeaderOrH
private static RawMessage.Builder transportValidateAndAppend(
HeaderHolder<ByteBuf> headerHolder,
HeaderInfo<ByteBuf> extractedHeader,
RawMessage.Builder it,
RawMessage.Builder builder,
boolean validate,
boolean validateOnlyExtraData
) {
if (validate) {
headerHolder.validateContentSize(extractedHeader, it.getBody(), validateOnlyExtraData);
headerHolder.validateContentSize(extractedHeader, builder.getBody(), validateOnlyExtraData);
}

return it.setBody(Unpooled.wrappedBuffer(extractedHeader.getContent(), it.getBody()));
return builder.setBody(wrappedBuffer(extractedHeader.getContent(), builder.getBody()));
}

@NotNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.Test
import java.io.BufferedReader

class TestCsvContentParser {
class ProtoCsvContentParserTest {
private val parser = ProtoCsvContentParser(
mapOf(
"test" to CsvFileConfiguration(".*", ",")
Expand Down Expand Up @@ -72,7 +72,7 @@ class TestCsvContentParser {
private fun malformedCsv(): BufferedReader = readerForResource("malformed.csv")

private fun readerForResource(resourceName: String): BufferedReader {
val resourceAsStream = TestCsvContentParser::class.java.classLoader.getResourceAsStream(resourceName)
val resourceAsStream = ProtoCsvContentParserTest::class.java.classLoader.getResourceAsStream(resourceName)
return requireNotNull(resourceAsStream) {
"Unknown resource: $resourceName"
}.bufferedReader()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ import com.google.protobuf.ByteString
import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.Test

// TODO: th2 transport version of these tests should be added
class TestHeaderHolder {
class ProtoHeaderHolderTest {
private val holder = ProtoHeaderHolder(
mapOf(
"A" to CsvFileConfiguration(".*", ","),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright 2023 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.exactpro.th2.readcsv.impl

import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.RawMessage
import com.exactpro.th2.read.file.common.StreamId
import com.exactpro.th2.readcsv.cfg.CsvFileConfiguration
import com.exactpro.th2.readcsv.exception.MalformedCsvException
import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.Test
import java.io.BufferedReader

class TransportCsvContentParserTest {
private val parser = TransportCsvContentParser(
mapOf(
"test" to CsvFileConfiguration(".*", ",")
)
)
private val streamId = StreamId("test")

@Test
fun `can parse valid csv`() {
validCsv().use {
it.mark(1024)
val canParse = parser.canParse(streamId, it, true)
assertTrue(canParse) {
"Parse should be able to parse the valid csv"
}
it.reset()

val parsed: Collection<RawMessage.Builder> = parser.parse(streamId, it)
assertEquals(1, parsed.size)
assertEquals("This,is,valid,\"multiline\nCSV file\"", parsed.first().body.toString(Charsets.UTF_8))
}
}

@Test
fun `does not throw malformed exception if file can be changed`() {
malformedCsv().use {
val canParse = parser.canParse(streamId, it, false)
assertFalse(canParse) {
"Parse should not be able to parse the malformed csv"
}
}
}

@Test
fun `throws malformed exception if file can not be changed`() {
malformedCsv().use {
assertThrows(MalformedCsvException::class.java) {
parser.canParse(streamId, it, true)
}
}
}

private fun validCsv(): BufferedReader = readerForResource("valid.csv")

private fun malformedCsv(): BufferedReader = readerForResource("malformed.csv")

private fun readerForResource(resourceName: String): BufferedReader {
val resourceAsStream = TransportCsvContentParserTest::class.java.classLoader.getResourceAsStream(resourceName)
return requireNotNull(resourceAsStream) {
"Unknown resource: $resourceName"
}.bufferedReader()
}
}
Loading
Loading