Skip to content

Commit

Permalink
Merge pull request #19 from th2-net/TH2-5131
Browse files Browse the repository at this point in the history
Updated libs
* common: `5.7.1-dev`
* read-file-common-core: `3.1.0-dev`
* opencsv: `5.9`
  • Loading branch information
Nikita-Smirnov-Exactpro authored Nov 27, 2023
2 parents 85e9b4e + 954ebca commit e8309df
Show file tree
Hide file tree
Showing 11 changed files with 245 additions and 33 deletions.
44 changes: 32 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Csv Reader User Manual 2.1.0
# Csv Reader User Manual 2.2.0

## Document Information

Expand All @@ -17,22 +17,22 @@ Csv reader read csv files, results are sending to RabbitMQ.
##### Reader configuration

```yaml
apiVersion: th2.exactpro.com/v1
apiVersion: th2.exactpro.com/v2
kind: Th2Box
metadata:
name: read-csv
spec:
image-name: ghcr.io/th2-net/th2-read-csv
image-version: <image version>
imageName: ghcr.io/th2-net/th2-read-csv
imageVersion: <image version>
type: th2-read
custom-config:
customConfig:
sourceDirectory: "dir/with/csv/"
aliases:
A:
nameRegexp: "fileA.*\\.log"
nameRegexp: "fileA.*\\.csv"
# delimiter: ","
B:
nameRegexp: "fileB.*\\.log"
nameRegexp: "fileB.*\\.csv"
delimiter: ";"
header: ['ColumnA', 'ColumnB', 'ColumnC']
common:
Expand All @@ -46,11 +46,15 @@ spec:
pullingInterval: "PT5S"
validateContent: true
validateOnlyExtraData: false
useTransport: true
pins:
- name: read_csv_out
connection-type: mq
attributes: ['raw', 'publish', 'store']
extended-settings:
mq:
publishers:
- name: to_mstore
attributes:
- publish
- transport-group
extendedSettings:
service:
enabled: false
envVariables:
Expand Down Expand Up @@ -83,10 +87,26 @@ spec:
The default value is `true`;
+ validateOnlyExtraData - disables validation when the content size is less than the header size (probably some columns were not set on purpose).
Works only with `validateContent` set to `true`. The default value is `false`
+ useTransport - enables using th2 transport protocol
+ useTransport - enables using th2 transport protocol. The default value is `true`

## Changes

### 2.2.0

#### Fixed:
read-csv throws the IndexOutOfBoundsException when it works in `useTransport` mode and calculates header by the first line in a CSV file

#### Changed:
* Default value for `useTransport` option is `true`

#### Added:
* netty-bytebuf-utils: `0.2.0`

#### Updated:
* common: `5.7.1-dev`
* read-file-common-core: `3.1.0-dev`
* opencsv: `5.9`

### 2.1.0

+ th2 transport protocol support
Expand Down
11 changes: 7 additions & 4 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,17 @@ repositories {

dependencies {
api platform("com.exactpro.th2:bom:4.5.0")
implementation ("com.exactpro.th2:common:5.4.2-dev")
implementation ("com.exactpro.th2:common:5.7.1-dev")
implementation "org.slf4j:slf4j-api"

api "com.exactpro.th2:read-file-common-core:3.0.0-dev"
implementation "com.opencsv:opencsv:5.8"
api "com.exactpro.th2:read-file-common-core:3.1.0-dev"
implementation('com.exactpro.th2:netty-bytebuf-utils:0.2.0') {
because("'asExpandable' method is used")
}

implementation "com.opencsv:opencsv:5.9"

implementation "javax.annotation:javax.annotation-api:1.3.2"
implementation "org.jetbrains.kotlin:kotlin-stdlib"
implementation "org.jetbrains.kotlin:kotlin-reflect"

implementation "com.fasterxml.jackson.core:jackson-databind"
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
release_version=2.1.0
release_version=2.2.0
12 changes: 10 additions & 2 deletions src/main/java/com/exactpro/th2/readcsv/cfg/ReaderConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,20 @@
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.fasterxml.jackson.module.kotlin.KotlinFeature;
import com.fasterxml.jackson.module.kotlin.KotlinModule;

public class ReaderConfig {

public static final ObjectMapper MAPPER = new ObjectMapper()
.registerModule(new KotlinModule())
.registerModule(new KotlinModule.Builder()
.withReflectionCacheSize(512)
.configure(KotlinFeature.NullToEmptyCollection, false)
.configure(KotlinFeature.NullToEmptyMap, false)
.configure(KotlinFeature.NullIsSameAsDefault, false)
.configure(KotlinFeature.SingletonSupport, true)
.configure(KotlinFeature.StrictNullChecks, false)
.build())
.registerModule(new JavaTimeModule());

@JsonProperty(required = true)
Expand All @@ -53,7 +61,7 @@ public class ReaderConfig {
private boolean validateOnlyExtraData = false;

@JsonPropertyDescription("Enables using th2 transport protocol")
private boolean useTransport = false;
private boolean useTransport = true;

public Path getSourceDirectory() {
return sourceDirectory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,14 +108,14 @@ private static Collection<? extends RawMessage.Builder> attachHeaderOrHold(
private static RawMessage.Builder validateAndAppend(
HeaderHolder<ByteString> headerHolder,
HeaderInfo<ByteString> extractedHeader,
RawMessage.Builder it,
RawMessage.Builder builder,
boolean validate,
boolean validateOnlyExtraData
) {
if (validate) {
headerHolder.validateContentSize(extractedHeader, it.getBody(), validateOnlyExtraData);
headerHolder.validateContentSize(extractedHeader, builder.getBody(), validateOnlyExtraData);
}
return it.setBody(extractedHeader.getContent().concat(it.getBody()));
return builder.setBody(extractedHeader.getContent().concat(builder.getBody()));
}

@NotNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@

import com.exactpro.th2.readcsv.cfg.CsvFileConfiguration;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;

import java.util.Map;

import static com.exactpro.th2.netty.bytebuf.util.ByteBufUtil.asExpandable;
import static io.netty.buffer.Unpooled.wrappedBuffer;

public class TransportHeaderHolder extends HeaderHolder<ByteBuf> {
private static final byte NEW_LINE = '\n';

Expand All @@ -31,7 +33,7 @@ public TransportHeaderHolder(Map<String, CsvFileConfiguration> configurationMap)

protected HeaderInfo<ByteBuf> formatHeader(String alias, CsvFileConfiguration configuration) {
String headerString = String.join(Character.toString(configuration.getDelimiter()), configuration.getHeader()) + '\n';
ByteBuf content = Unpooled.wrappedBuffer(headerString.getBytes(CHARSET));
ByteBuf content = wrappedBuffer(headerString.getBytes(CHARSET));
return new HeaderInfo<>(alias, configuration.getHeader().size(), content);
}

Expand All @@ -42,6 +44,6 @@ protected String contentToString(ByteBuf content) {

@Override
protected ByteBuf addNewLine(ByteBuf content) {
return content.writeByte(NEW_LINE);
return asExpandable(content).writeByte(NEW_LINE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import com.exactpro.th2.read.file.common.state.impl.InMemoryReaderState;
import com.exactpro.th2.readcsv.cfg.ReaderConfig;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import kotlin.Unit;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
Expand All @@ -45,6 +44,8 @@
import java.util.List;
import java.util.stream.Collectors;

import static io.netty.buffer.Unpooled.wrappedBuffer;

public class TransportReaderFactory extends AbstractReaderFactory {
private static final Logger LOGGER = LoggerFactory.getLogger(TransportReaderFactory.class);

Expand Down Expand Up @@ -110,15 +111,15 @@ private static Collection<? extends RawMessage.Builder> transportAttachHeaderOrH
private static RawMessage.Builder transportValidateAndAppend(
HeaderHolder<ByteBuf> headerHolder,
HeaderInfo<ByteBuf> extractedHeader,
RawMessage.Builder it,
RawMessage.Builder builder,
boolean validate,
boolean validateOnlyExtraData
) {
if (validate) {
headerHolder.validateContentSize(extractedHeader, it.getBody(), validateOnlyExtraData);
headerHolder.validateContentSize(extractedHeader, builder.getBody(), validateOnlyExtraData);
}

return it.setBody(Unpooled.wrappedBuffer(extractedHeader.getContent(), it.getBody()));
return builder.setBody(wrappedBuffer(extractedHeader.getContent(), builder.getBody()));
}

@NotNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.Test
import java.io.BufferedReader

class TestCsvContentParser {
class ProtoCsvContentParserTest {
private val parser = ProtoCsvContentParser(
mapOf(
"test" to CsvFileConfiguration(".*", ",")
Expand Down Expand Up @@ -72,7 +72,7 @@ class TestCsvContentParser {
private fun malformedCsv(): BufferedReader = readerForResource("malformed.csv")

private fun readerForResource(resourceName: String): BufferedReader {
val resourceAsStream = TestCsvContentParser::class.java.classLoader.getResourceAsStream(resourceName)
val resourceAsStream = ProtoCsvContentParserTest::class.java.classLoader.getResourceAsStream(resourceName)
return requireNotNull(resourceAsStream) {
"Unknown resource: $resourceName"
}.bufferedReader()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ import com.google.protobuf.ByteString
import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.Test

// TODO: th2 transport version of these tests should be added
class TestHeaderHolder {
class ProtoHeaderHolderTest {
private val holder = ProtoHeaderHolder(
mapOf(
"A" to CsvFileConfiguration(".*", ","),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright 2023 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.exactpro.th2.readcsv.impl

import com.exactpro.th2.common.schema.message.impl.rabbitmq.transport.RawMessage
import com.exactpro.th2.read.file.common.StreamId
import com.exactpro.th2.readcsv.cfg.CsvFileConfiguration
import com.exactpro.th2.readcsv.exception.MalformedCsvException
import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.Test
import java.io.BufferedReader

class TransportCsvContentParserTest {
private val parser = TransportCsvContentParser(
mapOf(
"test" to CsvFileConfiguration(".*", ",")
)
)
private val streamId = StreamId("test")

@Test
fun `can parse valid csv`() {
validCsv().use {
it.mark(1024)
val canParse = parser.canParse(streamId, it, true)
assertTrue(canParse) {
"Parse should be able to parse the valid csv"
}
it.reset()

val parsed: Collection<RawMessage.Builder> = parser.parse(streamId, it)
assertEquals(1, parsed.size)
assertEquals("This,is,valid,\"multiline\nCSV file\"", parsed.first().body.toString(Charsets.UTF_8))
}
}

@Test
fun `does not throw malformed exception if file can be changed`() {
malformedCsv().use {
val canParse = parser.canParse(streamId, it, false)
assertFalse(canParse) {
"Parse should not be able to parse the malformed csv"
}
}
}

@Test
fun `throws malformed exception if file can not be changed`() {
malformedCsv().use {
assertThrows(MalformedCsvException::class.java) {
parser.canParse(streamId, it, true)
}
}
}

private fun validCsv(): BufferedReader = readerForResource("valid.csv")

private fun malformedCsv(): BufferedReader = readerForResource("malformed.csv")

private fun readerForResource(resourceName: String): BufferedReader {
val resourceAsStream = TransportCsvContentParserTest::class.java.classLoader.getResourceAsStream(resourceName)
return requireNotNull(resourceAsStream) {
"Unknown resource: $resourceName"
}.bufferedReader()
}
}
Loading

0 comments on commit e8309df

Please sign in to comment.