From 5d7b5947d7abf9896bd46945eea03af833513ec2 Mon Sep 17 00:00:00 2001 From: Julien Ruaux Date: Wed, 6 Nov 2024 22:57:07 -0800 Subject: [PATCH] build: moved connectors to core --- core/riot-faker/gradle.properties | 19 ++ core/riot-faker/riot-faker.gradle | 33 +++ .../com/redis/riot/faker/FakerItemReader.java | 74 ++++++ .../redis/riot/faker/FakerReaderTests.java | 41 +++ core/riot-file/gradle.properties | 19 ++ core/riot-file/riot-file.gradle | 36 +++ .../com/redis/riot/file/AbstractRegistry.java | 42 ++++ .../com/redis/riot/file/AwsCredentials.java | 28 +++ .../java/com/redis/riot/file/AwsOptions.java | 40 +++ .../java/com/redis/riot/file/FileOptions.java | 69 +++++ .../redis/riot/file/FileReaderFactories.java | 157 ++++++++++++ .../redis/riot/file/FileReaderFactory.java | 10 + .../redis/riot/file/FileReaderOptions.java | 108 ++++++++ .../redis/riot/file/FileReaderRegistry.java | 15 ++ .../java/com/redis/riot/file/FileType.java | 63 +++++ .../redis/riot/file/FileWriterFactories.java | 124 +++++++++ .../redis/riot/file/FileWriterFactory.java | 10 + .../redis/riot/file/FileWriterOptions.java | 123 +++++++++ .../redis/riot/file/FileWriterRegistry.java | 15 ++ .../file/FilenameInputStreamResource.java | 21 ++ .../main/java/com/redis/riot/file/Files.java | 225 +++++++++++++++++ .../java/com/redis/riot/file/GcpOptions.java | 52 ++++ .../riot/file/HeaderCallbackHandler.java | 41 +++ .../redis/riot/file/JsonLineAggregator.java | 29 +++ .../redis/riot/file/MapFieldSetMapper.java | 25 ++ .../redis/riot/file/MapToFieldFunction.java | 25 ++ .../riot/file/ObjectMapperLineMapper.java | 22 ++ .../redis/riot/file/OutputStreamResource.java | 61 +++++ .../com/redis/riot/file/ResourceOptions.java | 33 +++ .../com/redis/riot/file/SystemInResource.java | 12 + .../redis/riot/file/SystemOutResource.java | 12 + .../riot/file/UncustomizedUrlResource.java | 30 +++ .../riot/file/UnknownFileTypeException.java | 18 ++ .../file/UnsupportedFileTypeException.java | 18 ++ .../redis/riot/file/xml/XmlItemReader.java | 108 ++++++++ .../riot/file/xml/XmlItemReaderBuilder.java | 139 +++++++++++ .../redis/riot/file/xml/XmlObjectReader.java | 81 ++++++ .../riot/file/xml/XmlResourceItemWriter.java | 115 +++++++++ .../xml/XmlResourceItemWriterBuilder.java | 236 ++++++++++++++++++ 39 files changed, 2329 insertions(+) create mode 100644 core/riot-faker/gradle.properties create mode 100644 core/riot-faker/riot-faker.gradle create mode 100644 core/riot-faker/src/main/java/com/redis/riot/faker/FakerItemReader.java create mode 100644 core/riot-faker/src/test/java/com/redis/riot/faker/FakerReaderTests.java create mode 100644 core/riot-file/gradle.properties create mode 100644 core/riot-file/riot-file.gradle create mode 100644 core/riot-file/src/main/java/com/redis/riot/file/AbstractRegistry.java create mode 100644 core/riot-file/src/main/java/com/redis/riot/file/AwsCredentials.java create mode 100644 core/riot-file/src/main/java/com/redis/riot/file/AwsOptions.java create mode 100644 core/riot-file/src/main/java/com/redis/riot/file/FileOptions.java create mode 100644 core/riot-file/src/main/java/com/redis/riot/file/FileReaderFactories.java create mode 100644 core/riot-file/src/main/java/com/redis/riot/file/FileReaderFactory.java create mode 100644 core/riot-file/src/main/java/com/redis/riot/file/FileReaderOptions.java create mode 100644 core/riot-file/src/main/java/com/redis/riot/file/FileReaderRegistry.java create mode 100644 core/riot-file/src/main/java/com/redis/riot/file/FileType.java create mode 100644 core/riot-file/src/main/java/com/redis/riot/file/FileWriterFactories.java create mode 100644 core/riot-file/src/main/java/com/redis/riot/file/FileWriterFactory.java create mode 100644 core/riot-file/src/main/java/com/redis/riot/file/FileWriterOptions.java create mode 100644 core/riot-file/src/main/java/com/redis/riot/file/FileWriterRegistry.java create mode 100644 core/riot-file/src/main/java/com/redis/riot/file/FilenameInputStreamResource.java create mode 100644 core/riot-file/src/main/java/com/redis/riot/file/Files.java create mode 100644 core/riot-file/src/main/java/com/redis/riot/file/GcpOptions.java create mode 100644 core/riot-file/src/main/java/com/redis/riot/file/HeaderCallbackHandler.java create mode 100644 core/riot-file/src/main/java/com/redis/riot/file/JsonLineAggregator.java create mode 100644 core/riot-file/src/main/java/com/redis/riot/file/MapFieldSetMapper.java create mode 100644 core/riot-file/src/main/java/com/redis/riot/file/MapToFieldFunction.java create mode 100644 core/riot-file/src/main/java/com/redis/riot/file/ObjectMapperLineMapper.java create mode 100644 core/riot-file/src/main/java/com/redis/riot/file/OutputStreamResource.java create mode 100644 core/riot-file/src/main/java/com/redis/riot/file/ResourceOptions.java create mode 100644 core/riot-file/src/main/java/com/redis/riot/file/SystemInResource.java create mode 100644 core/riot-file/src/main/java/com/redis/riot/file/SystemOutResource.java create mode 100644 core/riot-file/src/main/java/com/redis/riot/file/UncustomizedUrlResource.java create mode 100644 core/riot-file/src/main/java/com/redis/riot/file/UnknownFileTypeException.java create mode 100644 core/riot-file/src/main/java/com/redis/riot/file/UnsupportedFileTypeException.java create mode 100644 core/riot-file/src/main/java/com/redis/riot/file/xml/XmlItemReader.java create mode 100644 core/riot-file/src/main/java/com/redis/riot/file/xml/XmlItemReaderBuilder.java create mode 100644 core/riot-file/src/main/java/com/redis/riot/file/xml/XmlObjectReader.java create mode 100644 core/riot-file/src/main/java/com/redis/riot/file/xml/XmlResourceItemWriter.java create mode 100644 core/riot-file/src/main/java/com/redis/riot/file/xml/XmlResourceItemWriterBuilder.java diff --git a/core/riot-faker/gradle.properties b/core/riot-faker/gradle.properties new file mode 100644 index 000000000..a8e27f49d --- /dev/null +++ b/core/riot-faker/gradle.properties @@ -0,0 +1,19 @@ +# +# SPDX-License-Identifier: Apache-2.0 +# +# Copyright 2022-2023 The RIOT authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +project_description = RIOT Faker +automatic.module.name = com.redis.riot.faker diff --git a/core/riot-faker/riot-faker.gradle b/core/riot-faker/riot-faker.gradle new file mode 100644 index 000000000..6e417753b --- /dev/null +++ b/core/riot-faker/riot-faker.gradle @@ -0,0 +1,33 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * Copyright 2020-2023 The RIOT authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +dependencies { + implementation 'org.springframework.batch:spring-batch-infrastructure' + api group: 'net.datafaker', name: 'datafaker', version: datafakerVersion +} + +compileJava { + options.compilerArgs += ["-AprojectPath=${project.group}/${project.name}"] +} + +if (!(project.findProperty('automatic.module.name.skip') ?: false).toBoolean()) { + jar { + manifest { + attributes('Automatic-Module-Name': project.findProperty('automatic.module.name')) + } + } +} diff --git a/core/riot-faker/src/main/java/com/redis/riot/faker/FakerItemReader.java b/core/riot-faker/src/main/java/com/redis/riot/faker/FakerItemReader.java new file mode 100644 index 000000000..19855daad --- /dev/null +++ b/core/riot-faker/src/main/java/com/redis/riot/faker/FakerItemReader.java @@ -0,0 +1,74 @@ +package com.redis.riot.faker; + +import java.util.AbstractMap; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Locale; +import java.util.Map; +import java.util.Map.Entry; +import java.util.stream.Collectors; + +import org.springframework.batch.item.ItemReader; +import org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader; +import org.springframework.util.Assert; +import org.springframework.util.ClassUtils; + +import net.datafaker.Faker; + +/** + * {@link ItemReader} that generates HashMaps using Faker. + * + * @author Julien Ruaux + */ +public class FakerItemReader extends AbstractItemCountingItemStreamItemReader> { + + public static final Locale DEFAULT_LOCALE = Locale.getDefault(); + + private Map expressions = new LinkedHashMap<>(); + private Locale locale = DEFAULT_LOCALE; + + private Faker faker; + private Map fields; + + public FakerItemReader() { + setName(ClassUtils.getShortName(getClass())); + } + + public void setLocale(Locale locale) { + this.locale = locale; + } + + public void setExpressions(Map fields) { + this.expressions = fields; + } + + @Override + protected synchronized void doOpen() throws Exception { + Assert.notEmpty(expressions, "No field specified"); + if (fields == null) { + fields = expressions.entrySet().stream().map(this::normalizeField) + .collect(Collectors.toMap(Entry::getKey, Entry::getValue)); + } + faker = new Faker(locale); + } + + private Entry normalizeField(Entry field) { + if (field.getValue().startsWith("#{")) { + return field; + } + return new AbstractMap.SimpleEntry<>(field.getKey(), "#{" + field.getValue() + "}"); + } + + @Override + protected Map doRead() throws Exception { + Map map = new HashMap<>(); + fields.forEach((k, v) -> map.put(k, faker.expression(v))); + return map; + } + + @Override + protected synchronized void doClose() { + faker = null; + } + +} diff --git a/core/riot-faker/src/test/java/com/redis/riot/faker/FakerReaderTests.java b/core/riot-faker/src/test/java/com/redis/riot/faker/FakerReaderTests.java new file mode 100644 index 000000000..9e1d14791 --- /dev/null +++ b/core/riot-faker/src/test/java/com/redis/riot/faker/FakerReaderTests.java @@ -0,0 +1,41 @@ +package com.redis.riot.faker; + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.springframework.batch.item.ExecutionContext; +import org.springframework.batch.item.ItemReader; + +class FakerReaderTests { + + public static List readAll(ItemReader reader) throws Exception { + List list = new ArrayList<>(); + T element; + while ((element = reader.read()) != null) { + list.add(element); + } + return list; + } + + @Test + void fakerReader() throws Exception { + int count = 100; + FakerItemReader reader = new FakerItemReader(); + Map fields = new LinkedHashMap<>(); + fields.put("firstName", "Name.first_name"); + fields.put("lastName", "Name.last_name"); + reader.setExpressions(fields); + reader.setMaxItemCount(count); + reader.open(new ExecutionContext()); + List> items = readAll(reader); + reader.close(); + Assertions.assertEquals(count, items.size()); + Assertions.assertTrue(items.get(0).containsKey("firstName")); + Assertions.assertTrue(items.get(0).containsKey("lastName")); + } + +} diff --git a/core/riot-file/gradle.properties b/core/riot-file/gradle.properties new file mode 100644 index 000000000..ad625f43b --- /dev/null +++ b/core/riot-file/gradle.properties @@ -0,0 +1,19 @@ +# +# SPDX-License-Identifier: Apache-2.0 +# +# Copyright 2022-2023 The RIOT authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +project_description = RIOT File +automatic.module.name = com.redis.riot.file diff --git a/core/riot-file/riot-file.gradle b/core/riot-file/riot-file.gradle new file mode 100644 index 000000000..0c5b1f154 --- /dev/null +++ b/core/riot-file/riot-file.gradle @@ -0,0 +1,36 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * Copyright 2020-2023 The RIOT authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +dependencies { + implementation 'org.springframework.batch:spring-batch-infrastructure' + api 'com.fasterxml.jackson.dataformat:jackson-dataformat-xml' + api 'org.springframework:spring-oxm' + api group: 'io.awspring.cloud', name: 'spring-cloud-aws-starter-s3', version: awsVersion + api group: 'com.google.cloud', name: 'spring-cloud-gcp-starter-storage', version: gcpVersion +} + +compileJava { + options.compilerArgs += ["-AprojectPath=${project.group}/${project.name}"] +} + +if (!(project.findProperty('automatic.module.name.skip') ?: false).toBoolean()) { + jar { + manifest { + attributes('Automatic-Module-Name': project.findProperty('automatic.module.name')) + } + } +} diff --git a/core/riot-file/src/main/java/com/redis/riot/file/AbstractRegistry.java b/core/riot-file/src/main/java/com/redis/riot/file/AbstractRegistry.java new file mode 100644 index 000000000..4f78d1a48 --- /dev/null +++ b/core/riot-file/src/main/java/com/redis/riot/file/AbstractRegistry.java @@ -0,0 +1,42 @@ +package com.redis.riot.file; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.springframework.core.io.Resource; + +public abstract class AbstractRegistry { + + private final Map extensions = new HashMap<>(); + private final Map factories = new HashMap<>(); + + public void register(FileType fileType, T factory) { + factories.put(fileType, factory); + if (fileType != null) { + for (String extension : fileType.getExtensions()) { + extensions.put(extension, fileType); + } + } + } + + protected T factory(Resource resource, FileOptions options) throws IOException { + FileType fileType = fileType(resource, options); + if (fileType == null) { + throw new UnknownFileTypeException(resource.getFilename()); + } + T factory = factories.get(fileType); + if (factory == null) { + new UnsupportedFileTypeException(fileType); + } + return factory; + } + + private FileType fileType(Resource resource, FileOptions options) { + if (options.getFileType() == null) { + return extensions.get(Files.extension(resource.getFilename())); + } + return options.getFileType(); + } + +} diff --git a/core/riot-file/src/main/java/com/redis/riot/file/AwsCredentials.java b/core/riot-file/src/main/java/com/redis/riot/file/AwsCredentials.java new file mode 100644 index 000000000..60a34ca74 --- /dev/null +++ b/core/riot-file/src/main/java/com/redis/riot/file/AwsCredentials.java @@ -0,0 +1,28 @@ +package com.redis.riot.file; + +import lombok.ToString; + +@ToString +public class AwsCredentials { + + private String accessKey; + + private String secretKey; + + public String getAccessKey() { + return accessKey; + } + + public void setAccessKey(String accessKey) { + this.accessKey = accessKey; + } + + public String getSecretKey() { + return secretKey; + } + + public void setSecretKey(String secretKey) { + this.secretKey = secretKey; + } + +} \ No newline at end of file diff --git a/core/riot-file/src/main/java/com/redis/riot/file/AwsOptions.java b/core/riot-file/src/main/java/com/redis/riot/file/AwsOptions.java new file mode 100644 index 000000000..97885dc37 --- /dev/null +++ b/core/riot-file/src/main/java/com/redis/riot/file/AwsOptions.java @@ -0,0 +1,40 @@ +package com.redis.riot.file; + +import java.net.URI; +import java.util.Optional; + +import lombok.ToString; +import software.amazon.awssdk.regions.Region; + +@ToString +public class AwsOptions { + + private AwsCredentials credentials = new AwsCredentials(); + private Optional region = Optional.empty(); + private Optional endpoint = Optional.empty(); + + public AwsCredentials getCredentials() { + return credentials; + } + + public void setCredentials(AwsCredentials credentials) { + this.credentials = credentials; + } + + public Optional getRegion() { + return region; + } + + public void setRegion(Region region) { + this.region = Optional.ofNullable(region); + } + + public Optional getEndpoint() { + return endpoint; + } + + public void setEndpoint(URI endpoint) { + this.endpoint = Optional.ofNullable(endpoint); + } + +} diff --git a/core/riot-file/src/main/java/com/redis/riot/file/FileOptions.java b/core/riot-file/src/main/java/com/redis/riot/file/FileOptions.java new file mode 100644 index 000000000..0c421f5da --- /dev/null +++ b/core/riot-file/src/main/java/com/redis/riot/file/FileOptions.java @@ -0,0 +1,69 @@ +package com.redis.riot.file; + +import java.nio.charset.StandardCharsets; +import java.util.Optional; + +import lombok.ToString; + +@ToString +public class FileOptions { + + public static final String DEFAULT_ENCODING = StandardCharsets.UTF_8.name(); + public static final char DEFAULT_QUOTE_CHARACTER = '"'; + + private ResourceOptions resourceOptions = new ResourceOptions(); + private FileType fileType; + private String encoding = DEFAULT_ENCODING; + private boolean header; + private Optional delimiter = Optional.empty(); + private char quoteCharacter = DEFAULT_QUOTE_CHARACTER; + + public ResourceOptions getResourceOptions() { + return resourceOptions; + } + + public void setResourceOptions(ResourceOptions resourceOptions) { + this.resourceOptions = resourceOptions; + } + + public FileType getFileType() { + return fileType; + } + + public void setFileType(FileType type) { + this.fileType = type; + } + + public Optional getDelimiter() { + return delimiter; + } + + public void setDelimiter(String delimiter) { + this.delimiter = Optional.ofNullable(delimiter); + } + + public char getQuoteCharacter() { + return quoteCharacter; + } + + public void setQuoteCharacter(char character) { + this.quoteCharacter = character; + } + + public boolean isHeader() { + return header; + } + + public void setHeader(boolean header) { + this.header = header; + } + + public String getEncoding() { + return encoding; + } + + public void setEncoding(String encoding) { + this.encoding = encoding; + } + +} diff --git a/core/riot-file/src/main/java/com/redis/riot/file/FileReaderFactories.java b/core/riot-file/src/main/java/com/redis/riot/file/FileReaderFactories.java new file mode 100644 index 000000000..730def0ed --- /dev/null +++ b/core/riot-file/src/main/java/com/redis/riot/file/FileReaderFactories.java @@ -0,0 +1,157 @@ +package com.redis.riot.file; + +import java.util.List; +import java.util.Map; + +import org.springframework.batch.item.file.FlatFileItemReader; +import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder; +import org.springframework.batch.item.file.mapping.JsonLineMapper; +import org.springframework.batch.item.file.separator.DefaultRecordSeparatorPolicy; +import org.springframework.batch.item.file.separator.RecordSeparatorPolicy; +import org.springframework.batch.item.file.transform.AbstractLineTokenizer; +import org.springframework.batch.item.file.transform.DelimitedLineTokenizer; +import org.springframework.batch.item.file.transform.FixedLengthTokenizer; +import org.springframework.batch.item.file.transform.Range; +import org.springframework.batch.item.file.transform.RangeArrayPropertyEditor; +import org.springframework.batch.item.json.JacksonJsonObjectReader; +import org.springframework.batch.item.json.JsonItemReader; +import org.springframework.batch.item.json.builder.JsonItemReaderBuilder; +import org.springframework.core.io.Resource; +import org.springframework.util.Assert; +import org.springframework.util.ObjectUtils; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.fasterxml.jackson.dataformat.xml.XmlMapper; +import com.redis.riot.file.xml.XmlItemReader; +import com.redis.riot.file.xml.XmlItemReaderBuilder; +import com.redis.riot.file.xml.XmlObjectReader; + +public class FileReaderFactories { + + public static FlatFileItemReader> delimited(Resource resource, FileReaderOptions options) { + String delimiter = options.getFileOptions().getDelimiter().orElseGet(() -> Files.delimiter(resource)); + return flatFileReader(resource, delimitedLineTokenizer(delimiter, options), options); + } + + public static FlatFileItemReader> fixedWidth(Resource resource, FileReaderOptions options) { + FixedLengthTokenizer tokenizer = new FixedLengthTokenizer(); + RangeArrayPropertyEditor editor = new RangeArrayPropertyEditor(); + List columnRanges = options.getColumnRanges(); + Assert.notEmpty(columnRanges, "Column ranges are required"); + editor.setAsText(String.join(",", columnRanges)); + Range[] ranges = (Range[]) editor.getValue(); + Assert.notEmpty(ranges, "Invalid ranges specified: " + columnRanges); + tokenizer.setColumns(ranges); + return flatFileReader(resource, tokenizer, options); + } + + public static JsonItemReader json(Resource resource, FileReaderOptions options) { + JsonItemReaderBuilder builder = new JsonItemReaderBuilder<>(); + builder.name(resource.getFilename() + "-json-file-reader"); + builder.resource(resource); + builder.saveState(false); + JacksonJsonObjectReader objectReader = new JacksonJsonObjectReader<>(options.getItemType()); + objectReader.setMapper(objectMapper(new ObjectMapper(), options)); + builder.jsonObjectReader(objectReader); + options.getMaxItemCount().ifPresent(builder::maxItemCount); + return builder.build(); + } + + public static FlatFileItemReader jsonLines(Resource resource, FileReaderOptions options) { + if (Map.class.isAssignableFrom(options.getItemType())) { + FlatFileItemReaderBuilder> reader = flatFileReader(resource, options); + reader.lineMapper(new JsonLineMapper()); + reader.fieldSetMapper(new MapFieldSetMapper()); + return reader.build(); + } + FlatFileItemReaderBuilder reader = flatFileReader(resource, options); + ObjectMapper objectMapper = objectMapper(new ObjectMapper(), options); + reader.lineMapper(new ObjectMapperLineMapper<>(objectMapper, options.getItemType())); + return reader.build(); + } + + public static XmlItemReader xml(Resource resource, FileReaderOptions options) { + XmlItemReaderBuilder builder = new XmlItemReaderBuilder<>(); + builder.name(resource.getFilename() + "-xml-file-reader"); + builder.resource(resource); + XmlObjectReader objectReader = new XmlObjectReader<>(options.getItemType()); + objectReader.setMapper(objectMapper(new XmlMapper(), options)); + builder.xmlObjectReader(objectReader); + options.getMaxItemCount().ifPresent(builder::maxItemCount); + return builder.build(); + } + + private static DelimitedLineTokenizer delimitedLineTokenizer(String delimiter, FileReaderOptions options) { + DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(); + tokenizer.setDelimiter(delimiter); + tokenizer.setQuoteCharacter(options.getFileOptions().getQuoteCharacter()); + if (!ObjectUtils.isEmpty(options.getIncludedFields())) { + tokenizer.setIncludedFields(includedFields(options)); + } + return tokenizer; + } + + private static int[] includedFields(FileReaderOptions options) { + return options.getIncludedFields().stream().mapToInt(Integer::intValue).toArray(); + } + + private static FlatFileItemReader> flatFileReader(Resource resource, + AbstractLineTokenizer tokenizer, FileReaderOptions options) { + if (ObjectUtils.isEmpty(options.getFields())) { + Assert.isTrue(options.getFileOptions().isHeader(), + String.format("Could not create reader for file '%s': no header or field names specified", + resource.getFilename())); + } else { + tokenizer.setNames(options.getFields().toArray(new String[0])); + } + FlatFileItemReaderBuilder> builder = flatFileReader(resource, options); + builder.fieldSetMapper(new MapFieldSetMapper()); + builder.lineTokenizer(tokenizer); + builder.skippedLinesCallback(new HeaderCallbackHandler(tokenizer, headerIndex(options))); + return builder.build(); + } + + private static FlatFileItemReaderBuilder flatFileReader(Resource resource, FileReaderOptions options) { + FlatFileItemReaderBuilder builder = new FlatFileItemReaderBuilder<>(); + builder.resource(resource); + options.getMaxItemCount().ifPresent(builder::maxItemCount); + builder.encoding(options.getFileOptions().getEncoding()); + builder.recordSeparatorPolicy(recordSeparatorPolicy(options)); + builder.linesToSkip(linesToSkip(options)); + builder.saveState(false); + return builder; + } + + private static RecordSeparatorPolicy recordSeparatorPolicy(FileReaderOptions options) { + String quoteCharacter = String.valueOf(options.getFileOptions().getQuoteCharacter()); + return new DefaultRecordSeparatorPolicy(quoteCharacter, options.getContinuationString()); + } + + private static int headerIndex(FileReaderOptions options) { + if (options.getHeaderLine() != null) { + return options.getHeaderLine(); + } + return linesToSkip(options) - 1; + } + + private static int linesToSkip(FileReaderOptions options) { + if (options.getLinesToSkip() != null) { + return options.getLinesToSkip(); + } + if (options.getFileOptions().isHeader()) { + return 1; + } + return 0; + } + + private static T objectMapper(T objectMapper, FileReaderOptions options) { + objectMapper.configure(DeserializationFeature.USE_LONG_FOR_INTS, true); + SimpleModule module = new SimpleModule(); + options.getDeserializers().forEach(module::addDeserializer); + objectMapper.registerModule(module); + return objectMapper; + } + +} diff --git a/core/riot-file/src/main/java/com/redis/riot/file/FileReaderFactory.java b/core/riot-file/src/main/java/com/redis/riot/file/FileReaderFactory.java new file mode 100644 index 000000000..05f6fbf75 --- /dev/null +++ b/core/riot-file/src/main/java/com/redis/riot/file/FileReaderFactory.java @@ -0,0 +1,10 @@ +package com.redis.riot.file; + +import org.springframework.batch.item.ItemReader; +import org.springframework.core.io.Resource; + +public interface FileReaderFactory { + + ItemReader create(Resource resource, FileReaderOptions options); + +} diff --git a/core/riot-file/src/main/java/com/redis/riot/file/FileReaderOptions.java b/core/riot-file/src/main/java/com/redis/riot/file/FileReaderOptions.java new file mode 100644 index 000000000..1d4ddd864 --- /dev/null +++ b/core/riot-file/src/main/java/com/redis/riot/file/FileReaderOptions.java @@ -0,0 +1,108 @@ +package com.redis.riot.file; + +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.OptionalInt; +import java.util.Set; + +import com.fasterxml.jackson.databind.JsonDeserializer; + +public class FileReaderOptions { + + public static final String DEFAULT_CONTINUATION_STRING = "\\"; + + private FileOptions fileOptions = new FileOptions(); + private OptionalInt maxItemCount = OptionalInt.empty(); + private Set includedFields; + private String continuationString = DEFAULT_CONTINUATION_STRING; + private List fields; + private Integer headerLine; + private Integer linesToSkip; + private List columnRanges; + @SuppressWarnings("rawtypes") + private final Map deserializers = new LinkedHashMap<>(); + private Class itemType = Map.class; + + public FileOptions getFileOptions() { + return fileOptions; + } + + public void setFileOptions(FileOptions fileOptions) { + this.fileOptions = fileOptions; + } + + public void addDeserializer(Class type, JsonDeserializer deserializer) { + deserializers.put(type, deserializer); + } + + @SuppressWarnings("rawtypes") + public Map getDeserializers() { + return deserializers; + } + + public Class getItemType() { + return itemType; + } + + public void setItemType(Class type) { + this.itemType = type; + } + + public OptionalInt getMaxItemCount() { + return maxItemCount; + } + + public void setMaxItemCount(int count) { + this.maxItemCount = count > 0 ? OptionalInt.of(count) : OptionalInt.empty(); + } + + public Set getIncludedFields() { + return includedFields; + } + + public void setIncludedFields(Set fields) { + this.includedFields = fields; + } + + public String getContinuationString() { + return continuationString; + } + + public void setContinuationString(String string) { + this.continuationString = string; + } + + public List getFields() { + return fields; + } + + public void setFields(List fields) { + this.fields = fields; + } + + public Integer getHeaderLine() { + return headerLine; + } + + public void setHeaderLine(Integer headerLine) { + this.headerLine = headerLine; + } + + public Integer getLinesToSkip() { + return linesToSkip; + } + + public void setLinesToSkip(Integer linesToSkip) { + this.linesToSkip = linesToSkip; + } + + public List getColumnRanges() { + return columnRanges; + } + + public void setColumnRanges(List columnRanges) { + this.columnRanges = columnRanges; + } + +} diff --git a/core/riot-file/src/main/java/com/redis/riot/file/FileReaderRegistry.java b/core/riot-file/src/main/java/com/redis/riot/file/FileReaderRegistry.java new file mode 100644 index 000000000..52c3a32a2 --- /dev/null +++ b/core/riot-file/src/main/java/com/redis/riot/file/FileReaderRegistry.java @@ -0,0 +1,15 @@ +package com.redis.riot.file; + +import java.io.IOException; + +import org.springframework.batch.item.ItemReader; +import org.springframework.core.io.Resource; + +public class FileReaderRegistry extends AbstractRegistry { + + public ItemReader reader(String location, FileReaderOptions options) throws IOException { + Resource resource = Files.resource(location, options.getFileOptions().getResourceOptions()); + return factory(resource, options.getFileOptions()).create(resource, options); + } + +} diff --git a/core/riot-file/src/main/java/com/redis/riot/file/FileType.java b/core/riot-file/src/main/java/com/redis/riot/file/FileType.java new file mode 100644 index 000000000..9b0145084 --- /dev/null +++ b/core/riot-file/src/main/java/com/redis/riot/file/FileType.java @@ -0,0 +1,63 @@ +package com.redis.riot.file; + +import java.util.Arrays; +import java.util.Objects; +import java.util.Set; +import java.util.TreeSet; + +import org.springframework.util.Assert; + +public class FileType { + + public static final String EXTENSION_CSV = "csv"; + public static final String EXTENSION_TSV = "tsv"; + public static final String EXTENSION_PSV = "psv"; + public static final String EXTENSION_XML = "xml"; + public static final String EXTENSION_JSON = "json"; + public static final String EXTENSION_JSONL = "jsonl"; + public static final String EXTENSION_FW = "fw"; + + public static final FileType DELIMITED = new FileType("Delimited", EXTENSION_CSV, EXTENSION_TSV, EXTENSION_PSV); + public static final FileType FIXED_WIDTH = new FileType("Fixed-Width", EXTENSION_FW); + public static final FileType JSON = new FileType("JSON", EXTENSION_JSON); + public static final FileType JSONL = new FileType("JSON Lines", EXTENSION_JSONL); + public static final FileType XML = new FileType("XML", EXTENSION_XML); + + private final String name; + private final Set extensions = new TreeSet<>(String.CASE_INSENSITIVE_ORDER); + + public FileType(String name, String... extensions) { + Assert.notNull(name, "Name must not be null"); + this.name = name; + this.extensions.addAll(Arrays.asList(extensions)); + } + + public String[] getExtensions() { + return extensions.toArray(new String[0]); + } + + public boolean supportsExtension(String extension) { + return extensions.contains(extension); + } + + public String getName() { + return name; + } + + @Override + public int hashCode() { + return Objects.hash(name); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + FileType other = (FileType) obj; + return Objects.equals(name, other.name); + } +} diff --git a/core/riot-file/src/main/java/com/redis/riot/file/FileWriterFactories.java b/core/riot-file/src/main/java/com/redis/riot/file/FileWriterFactories.java new file mode 100644 index 000000000..e4e0652bd --- /dev/null +++ b/core/riot-file/src/main/java/com/redis/riot/file/FileWriterFactories.java @@ -0,0 +1,124 @@ +package com.redis.riot.file; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import org.springframework.batch.item.ItemWriter; +import org.springframework.batch.item.file.FlatFileItemWriter; +import org.springframework.batch.item.file.builder.FlatFileItemWriterBuilder; +import org.springframework.batch.item.file.builder.FlatFileItemWriterBuilder.DelimitedBuilder; +import org.springframework.batch.item.file.builder.FlatFileItemWriterBuilder.FormattedBuilder; +import org.springframework.batch.item.file.transform.LineAggregator; +import org.springframework.batch.item.file.transform.PassThroughFieldExtractor; +import org.springframework.batch.item.json.JacksonJsonObjectMarshaller; +import org.springframework.batch.item.json.builder.JsonFileItemWriterBuilder; +import org.springframework.core.io.WritableResource; +import org.springframework.util.CollectionUtils; + +import com.fasterxml.jackson.annotation.JsonInclude.Include; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.xml.XmlMapper; +import com.redis.riot.file.xml.XmlResourceItemWriterBuilder; + +public abstract class FileWriterFactories { + + public static FlatFileItemWriter> delimited(WritableResource resource, + FileWriterOptions options) { + FlatFileItemWriterBuilder> writer = flatFileWriter(resource, options); + DelimitedBuilder> delimitedBuilder = writer.delimited(); + delimitedBuilder.delimiter(options.getFileOptions().getDelimiter().orElseGet(() -> Files.delimiter(resource))); + delimitedBuilder.fieldExtractor(new PassThroughFieldExtractor<>()); + delimitedBuilder.quoteCharacter(String.valueOf(options.getFileOptions().getQuoteCharacter())); + return flatFileWriter(writer, delimitedBuilder.build(), options); + } + + public static FlatFileItemWriter> formatted(WritableResource resource, + FileWriterOptions options) { + FlatFileItemWriterBuilder> writer = flatFileWriter(resource, options); + FormattedBuilder> formattedBuilder = writer.formatted(); + formattedBuilder.format(options.getFormatterString()); + formattedBuilder.fieldExtractor(new PassThroughFieldExtractor<>()); + return flatFileWriter(writer, formattedBuilder.build(), options); + } + + public static FlatFileItemWriter jsonLines(WritableResource resource, FileWriterOptions options) { + FlatFileItemWriterBuilder builder = flatFileWriter(resource, options); + builder.lineAggregator(new JsonLineAggregator<>(new ObjectMapper())); + return builder.build(); + } + + public static ItemWriter json(WritableResource resource, FileWriterOptions options) { + JsonFileItemWriterBuilder writer = new JsonFileItemWriterBuilder<>(); + writer.name(resource.getFilename()); + writer.resource(resource); + writer.append(options.isAppend()); + writer.encoding(options.getFileOptions().getEncoding()); + writer.forceSync(options.isForceSync()); + writer.lineSeparator(options.getLineSeparator()); + writer.saveState(false); + writer.shouldDeleteIfEmpty(options.isShouldDeleteIfEmpty()); + writer.shouldDeleteIfExists(options.isShouldDeleteIfExists()); + writer.transactional(options.isTransactional()); + writer.jsonObjectMarshaller(new JacksonJsonObjectMarshaller<>(objectMapper(new ObjectMapper()))); + return writer.build(); + } + + private static T objectMapper(T objectMapper) { + objectMapper.setSerializationInclusion(Include.NON_NULL); + objectMapper.setSerializationInclusion(Include.NON_DEFAULT); + return objectMapper; + } + + public static ItemWriter xml(WritableResource resource, FileWriterOptions options) { + XmlResourceItemWriterBuilder writer = new XmlResourceItemWriterBuilder<>(); + writer.name(resource.getFilename()); + writer.append(options.isAppend()); + writer.encoding(options.getFileOptions().getEncoding()); + writer.lineSeparator(options.getLineSeparator()); + writer.rootName(options.getRootName()); + writer.resource(resource); + writer.saveState(false); + XmlMapper mapper = objectMapper(new XmlMapper()); + mapper.setConfig(mapper.getSerializationConfig().withRootName(options.getElementName())); + writer.xmlObjectMarshaller(new JacksonJsonObjectMarshaller<>(mapper)); + return writer.build(); + } + + private static FlatFileItemWriterBuilder flatFileWriter(WritableResource resource, + FileWriterOptions options) { + FlatFileItemWriterBuilder builder = new FlatFileItemWriterBuilder<>(); + builder.name(resource.getFilename()); + builder.resource(resource); + builder.append(options.isAppend()); + builder.encoding(options.getFileOptions().getEncoding()); + builder.forceSync(options.isForceSync()); + builder.lineSeparator(options.getLineSeparator()); + builder.saveState(false); + builder.shouldDeleteIfEmpty(options.isShouldDeleteIfEmpty()); + builder.shouldDeleteIfExists(options.isShouldDeleteIfExists()); + builder.transactional(options.isTransactional()); + return builder; + } + + private static FlatFileItemWriter> flatFileWriter( + FlatFileItemWriterBuilder> writer, LineAggregator> aggregator, + FileWriterOptions options) { + writer.lineAggregator(aggregator); + if (options.getFileOptions().isHeader()) { + Map headerRecord = options.getHeaderSupplier().get(); + if (!CollectionUtils.isEmpty(headerRecord)) { + List fields = new ArrayList<>(headerRecord.keySet()); + Collections.sort(fields); + Map fieldMap = new LinkedHashMap<>(); + fields.forEach(f -> fieldMap.put(f, f)); + String headerLine = aggregator.aggregate(fieldMap); + writer.headerCallback(w -> w.write(headerLine)); + } + } + return writer.build(); + } + +} diff --git a/core/riot-file/src/main/java/com/redis/riot/file/FileWriterFactory.java b/core/riot-file/src/main/java/com/redis/riot/file/FileWriterFactory.java new file mode 100644 index 000000000..34c24892a --- /dev/null +++ b/core/riot-file/src/main/java/com/redis/riot/file/FileWriterFactory.java @@ -0,0 +1,10 @@ +package com.redis.riot.file; + +import org.springframework.batch.item.ItemWriter; +import org.springframework.core.io.WritableResource; + +public interface FileWriterFactory { + + ItemWriter create(WritableResource resource, FileWriterOptions options); + +} diff --git a/core/riot-file/src/main/java/com/redis/riot/file/FileWriterOptions.java b/core/riot-file/src/main/java/com/redis/riot/file/FileWriterOptions.java new file mode 100644 index 000000000..e9b4ac756 --- /dev/null +++ b/core/riot-file/src/main/java/com/redis/riot/file/FileWriterOptions.java @@ -0,0 +1,123 @@ +package com.redis.riot.file; + +import java.util.Map; +import java.util.function.Supplier; + +import com.google.cloud.spring.core.GcpScope; + +import lombok.ToString; + +@ToString +public class FileWriterOptions { + + public static final String DEFAULT_LINE_SEPARATOR = System.getProperty("line.separator"); + public static final boolean DEFAULT_SHOULD_DELETE_IF_EXISTS = true; + public static final boolean DEFAULT_TRANSACTIONAL = true; + public static final String DEFAULT_ELEMENT_NAME = "record"; + public static final String DEFAULT_ROOT_NAME = "root"; + + private FileOptions fileOptions = new FileOptions(); + private String formatterString; + private boolean append; + private boolean forceSync; + private String lineSeparator = DEFAULT_LINE_SEPARATOR; + private boolean shouldDeleteIfEmpty; + private boolean shouldDeleteIfExists = DEFAULT_SHOULD_DELETE_IF_EXISTS; + private boolean transactional = DEFAULT_TRANSACTIONAL; + private String rootName = DEFAULT_ROOT_NAME; + private String elementName = DEFAULT_ELEMENT_NAME; + private Supplier> headerSupplier = () -> null; + + public FileOptions getFileOptions() { + return fileOptions; + } + + public void setFileOptions(FileOptions fileOptions) { + this.fileOptions = fileOptions; + } + + public Supplier> getHeaderSupplier() { + return headerSupplier; + } + + public void setHeaderSupplier(Supplier> headerSupplier) { + this.headerSupplier = headerSupplier; + } + + public String getRootName() { + return rootName; + } + + public void setRootName(String name) { + this.rootName = name; + } + + public String getElementName() { + return elementName; + } + + public void setElementName(String name) { + this.elementName = name; + } + + public FileWriterOptions() { + fileOptions.getResourceOptions().getGcpOptions().setScope(GcpScope.STORAGE_READ_WRITE); + } + + public boolean isAppend() { + return append; + } + + public void setAppend(boolean append) { + this.append = append; + } + + public boolean isForceSync() { + return forceSync; + } + + public void setForceSync(boolean forceSync) { + this.forceSync = forceSync; + } + + public String getLineSeparator() { + return lineSeparator; + } + + public void setLineSeparator(String separator) { + this.lineSeparator = separator; + } + + public boolean isShouldDeleteIfEmpty() { + return shouldDeleteIfEmpty; + } + + public void setShouldDeleteIfEmpty(boolean delete) { + this.shouldDeleteIfEmpty = delete; + } + + public boolean isShouldDeleteIfExists() { + return shouldDeleteIfExists; + } + + public void setShouldDeleteIfExists(boolean delete) { + this.shouldDeleteIfExists = delete; + } + + public String getFormatterString() { + return formatterString; + } + + public void setFormatterString(String formatterString) { + this.formatterString = formatterString; + } + + public boolean isTransactional() { + return transactional; + } + + public void setTransactional(boolean transactional) { + this.transactional = transactional; + } + +} diff --git a/core/riot-file/src/main/java/com/redis/riot/file/FileWriterRegistry.java b/core/riot-file/src/main/java/com/redis/riot/file/FileWriterRegistry.java new file mode 100644 index 000000000..a90d90ddc --- /dev/null +++ b/core/riot-file/src/main/java/com/redis/riot/file/FileWriterRegistry.java @@ -0,0 +1,15 @@ +package com.redis.riot.file; + +import java.io.IOException; + +import org.springframework.batch.item.ItemWriter; +import org.springframework.core.io.WritableResource; + +public class FileWriterRegistry extends AbstractRegistry { + + public ItemWriter writer(String location, FileWriterOptions options) throws IOException { + WritableResource resource = Files.writableResource(location, options.getFileOptions().getResourceOptions()); + return factory(resource, options.getFileOptions()).create(resource, options); + } + +} diff --git a/core/riot-file/src/main/java/com/redis/riot/file/FilenameInputStreamResource.java b/core/riot-file/src/main/java/com/redis/riot/file/FilenameInputStreamResource.java new file mode 100644 index 000000000..c106ffb7f --- /dev/null +++ b/core/riot-file/src/main/java/com/redis/riot/file/FilenameInputStreamResource.java @@ -0,0 +1,21 @@ +package com.redis.riot.file; + +import java.io.InputStream; + +import org.springframework.core.io.InputStreamResource; + +public class FilenameInputStreamResource extends InputStreamResource { + + private final String filename; + + public FilenameInputStreamResource(InputStream inputStream, String filename, String description) { + super(inputStream, description); + this.filename = filename; + } + + @Override + public String getFilename() { + return filename; + } + +} diff --git a/core/riot-file/src/main/java/com/redis/riot/file/Files.java b/core/riot-file/src/main/java/com/redis/riot/file/Files.java new file mode 100644 index 000000000..7fd90476a --- /dev/null +++ b/core/riot-file/src/main/java/com/redis/riot/file/Files.java @@ -0,0 +1,225 @@ +package com.redis.riot.file; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.file.DirectoryStream; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Base64; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Stream; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +import org.springframework.batch.item.file.transform.DelimitedLineTokenizer; +import org.springframework.core.io.FileSystemResource; +import org.springframework.core.io.Resource; +import org.springframework.core.io.WritableResource; +import org.springframework.util.ResourceUtils; +import org.springframework.util.StringUtils; + +import com.google.auth.oauth2.GoogleCredentials; +import com.google.cloud.ServiceOptions; +import com.google.cloud.spring.autoconfigure.storage.GcpStorageAutoConfiguration; +import com.google.cloud.spring.core.UserAgentHeaderProvider; +import com.google.cloud.spring.storage.GoogleStorageResource; +import com.google.cloud.storage.StorageOptions; + +import io.awspring.cloud.s3.InMemoryBufferingS3OutputStreamProvider; +import io.awspring.cloud.s3.Location; +import io.awspring.cloud.s3.PropertiesS3ObjectContentTypeResolver; +import io.awspring.cloud.s3.S3Resource; +import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.S3ClientBuilder; + +public abstract class Files { + + public static final Pattern EXTENSION_PATTERN = Pattern.compile("(?i)\\.(?\\w+)(?:\\.(?gz))?$"); + public static final String DELIMITER_PIPE = "|"; + public static final String GOOGLE_STORAGE_PROTOCOL_PREFIX = "gs://"; + public static final String STDIN = "-"; + public static final FileReaderRegistry readerRegistry = defaultReaderRegistry(); + public static final FileWriterRegistry writerRegistry = defaultWriterRegistry(); + + private Files() { + } + + public static Resource resource(String location, ResourceOptions options) throws IOException { + Resource resource = isStdin(location) ? new SystemInResource() : readableResource(location, options); + // Systematically obtain the input stream to validate the resource + InputStream inputStream = resource.getInputStream(); + if (options.isGzipped() || Files.isGzip(resource.getFilename())) { + GZIPInputStream gzipInputStream = new GZIPInputStream(inputStream); + return new FilenameInputStreamResource(gzipInputStream, resource.getFilename(), resource.getDescription()); + } + return resource; + } + + private static Resource readableResource(String location, ResourceOptions options) throws IOException { + if (ResourceUtils.isUrl(location)) { + return new UncustomizedUrlResource(location); + } + return createResource(location, options); + } + + public static WritableResource writableResource(String location, ResourceOptions options) throws IOException { + WritableResource resource = location == null ? new SystemOutResource() : createResource(location, options); + OutputStream outputStream = resource.getOutputStream(); + if (options.isGzipped() || Files.isGzip(resource.getFilename())) { + GZIPOutputStream gzipOutputStream = new GZIPOutputStream(outputStream); + return new OutputStreamResource(gzipOutputStream, resource.getFilename(), resource.getDescription()); + } + return resource; + } + + private static WritableResource createResource(String location, ResourceOptions options) throws IOException { + if (isAwsStorage(location)) { + return s3Resource(location, options.getAwsOptions()); + } + if (isGoogleStorage(location)) { + return googleStorageResource(location, options.getGcpOptions()); + } + return new FileSystemResource(location); + } + + public static boolean isStdin(String file) { + return STDIN.equals(file); + } + + public static boolean isFileSystem(String file) { + return !isAwsStorage(file) && !isGoogleStorage(file) && !ResourceUtils.isUrl(file) && !isStdin(file); + } + + public static boolean isGoogleStorage(String location) { + return StringUtils.hasLength(location) && location.toLowerCase().startsWith(GOOGLE_STORAGE_PROTOCOL_PREFIX); + } + + public static boolean isAwsStorage(String location) { + return StringUtils.hasLength(location) && location.toLowerCase().startsWith(Location.S3_PROTOCOL_PREFIX); + } + + public static GoogleStorageResource googleStorageResource(String location, GcpOptions options) throws IOException { + StorageOptions.Builder builder = StorageOptions.newBuilder().setProjectId(ServiceOptions.getDefaultProjectId()) + .setHeaderProvider(new UserAgentHeaderProvider(GcpStorageAutoConfiguration.class)); + if (options.getKeyFile() != null) { + InputStream inputStream = java.nio.file.Files.newInputStream(options.getKeyFile().toPath()); + builder.setCredentials(GoogleCredentials.fromStream(inputStream).createScoped(options.getScope().getUrl())); + } + if (options.getEncodedKey() != null) { + ByteArrayInputStream stream = new ByteArrayInputStream(Base64.getDecoder().decode(options.getEncodedKey())); + builder.setCredentials(GoogleCredentials.fromStream(stream)); + } + options.getProjectId().ifPresent(builder::setProjectId); + return new GoogleStorageResource(builder.build().getService(), location); + } + + public static S3Resource s3Resource(String location, AwsOptions options) { + S3ClientBuilder clientBuilder = S3Client.builder(); + options.getRegion().ifPresent(clientBuilder::region); + options.getEndpoint().ifPresent(clientBuilder::endpointOverride); + clientBuilder.credentialsProvider(credentialsProvider(options.getCredentials())); + S3Client client = clientBuilder.build(); + InMemoryBufferingS3OutputStreamProvider outputStreamProvider = new InMemoryBufferingS3OutputStreamProvider( + client, new PropertiesS3ObjectContentTypeResolver()); + return S3Resource.create(location, client, outputStreamProvider); + } + + private static AwsCredentialsProvider credentialsProvider(AwsCredentials credentials) { + if (credentials != null && StringUtils.hasText(credentials.getAccessKey()) + && StringUtils.hasText(credentials.getSecretKey())) { + return StaticCredentialsProvider + .create(AwsBasicCredentials.create(credentials.getAccessKey(), credentials.getSecretKey())); + } + return AnonymousCredentialsProvider.create(); + } + + /** + * + * @param file File path that might include a glob pattern + * @return List of file + * @throws IOException + */ + public static Stream expand(String file) throws IOException { + if (isFileSystem(file)) { + return expand(Paths.get(file)).stream().map(Path::toString); + } + return Stream.of(file); + } + + public static List expand(Path path) throws IOException { + if (java.nio.file.Files.exists(path) || path.getParent() == null + || !java.nio.file.Files.exists(path.getParent())) { + return Arrays.asList(path); + } + Path dir = path.getParent(); + String glob = path.getFileName().toString(); + // Path might be glob pattern + try (DirectoryStream stream = java.nio.file.Files.newDirectoryStream(dir, glob)) { + List paths = new ArrayList<>(); + stream.iterator().forEachRemaining(paths::add); + return paths; + } + } + + public static boolean isGzip(String filename) { + return extensionGroup(filename, "gz") != null; + } + + private static String extensionGroup(String file, String group) { + Matcher matcher = EXTENSION_PATTERN.matcher(file); + if (matcher.find()) { + return matcher.group(group); + } + return null; + } + + public static String extension(String filename) { + return extensionGroup(filename, "extension"); + } + + public static FileReaderRegistry defaultReaderRegistry() { + FileReaderRegistry registry = new FileReaderRegistry(); + registry.register(FileType.DELIMITED, FileReaderFactories::delimited); + registry.register(FileType.FIXED_WIDTH, FileReaderFactories::fixedWidth); + registry.register(FileType.JSON, FileReaderFactories::json); + registry.register(FileType.JSONL, FileReaderFactories::jsonLines); + registry.register(FileType.XML, FileReaderFactories::xml); + return registry; + } + + private static FileWriterRegistry defaultWriterRegistry() { + FileWriterRegistry registry = new FileWriterRegistry(); + registry.register(FileType.DELIMITED, FileWriterFactories::delimited); + registry.register(FileType.FIXED_WIDTH, FileWriterFactories::formatted); + registry.register(FileType.JSON, FileWriterFactories::json); + registry.register(FileType.JSONL, FileWriterFactories::jsonLines); + registry.register(FileType.XML, FileWriterFactories::xml); + registry.register(null, FileWriterFactories::jsonLines); + return registry; + } + + public static String delimiter(Resource resource) { + String extension = extension(resource.getFilename()); + if (FileType.EXTENSION_CSV.equalsIgnoreCase(extension)) { + return DelimitedLineTokenizer.DELIMITER_COMMA; + } + if (FileType.EXTENSION_PSV.equalsIgnoreCase(extension)) { + return DELIMITER_PIPE; + } + if (FileType.EXTENSION_TSV.equalsIgnoreCase(extension)) { + return DelimitedLineTokenizer.DELIMITER_TAB; + } + return null; + } + +} diff --git a/core/riot-file/src/main/java/com/redis/riot/file/GcpOptions.java b/core/riot-file/src/main/java/com/redis/riot/file/GcpOptions.java new file mode 100644 index 000000000..326670c8e --- /dev/null +++ b/core/riot-file/src/main/java/com/redis/riot/file/GcpOptions.java @@ -0,0 +1,52 @@ +package com.redis.riot.file; + +import java.io.File; +import java.util.Optional; + +import com.google.cloud.spring.core.GcpScope; + +import lombok.ToString; + +@ToString(exclude = "encodedKey") +public class GcpOptions { + + public static final GcpScope DEFAULT_SCOPE = GcpScope.STORAGE_READ_ONLY; + + private File keyFile; + private Optional projectId = Optional.empty(); + private String encodedKey; + private GcpScope scope = DEFAULT_SCOPE; + + public GcpScope getScope() { + return scope; + } + + public void setScope(GcpScope scope) { + this.scope = scope; + } + + public File getKeyFile() { + return keyFile; + } + + public void setKeyFile(File file) { + this.keyFile = file; + } + + public Optional getProjectId() { + return projectId; + } + + public void setProjectId(String id) { + this.projectId = Optional.ofNullable(id); + } + + public String getEncodedKey() { + return encodedKey; + } + + public void setEncodedKey(String key) { + this.encodedKey = key; + } + +} diff --git a/core/riot-file/src/main/java/com/redis/riot/file/HeaderCallbackHandler.java b/core/riot-file/src/main/java/com/redis/riot/file/HeaderCallbackHandler.java new file mode 100644 index 000000000..136f13c55 --- /dev/null +++ b/core/riot-file/src/main/java/com/redis/riot/file/HeaderCallbackHandler.java @@ -0,0 +1,41 @@ +package com.redis.riot.file; + +import java.util.ArrayList; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.batch.item.file.LineCallbackHandler; +import org.springframework.batch.item.file.transform.AbstractLineTokenizer; +import org.springframework.batch.item.file.transform.FieldSet; + +public class HeaderCallbackHandler implements LineCallbackHandler { + + private final Logger log = LoggerFactory.getLogger(getClass()); + + private final AbstractLineTokenizer tokenizer; + private final int headerIndex; + + private int lineIndex; + + public HeaderCallbackHandler(AbstractLineTokenizer tokenizer, int headerIndex) { + this.tokenizer = tokenizer; + this.headerIndex = headerIndex; + } + + @Override + public void handleLine(String line) { + if (lineIndex == headerIndex) { + log.info("Found header: {}", line); + FieldSet fieldSet = tokenizer.tokenize(line); + List fields = new ArrayList<>(); + for (int index = 0; index < fieldSet.getFieldCount(); index++) { + fields.add(fieldSet.readString(index)); + } + log.info("Using field names {}", fields); + tokenizer.setNames(fields.toArray(new String[0])); + } + lineIndex++; + } + +} diff --git a/core/riot-file/src/main/java/com/redis/riot/file/JsonLineAggregator.java b/core/riot-file/src/main/java/com/redis/riot/file/JsonLineAggregator.java new file mode 100644 index 000000000..5b927c3c6 --- /dev/null +++ b/core/riot-file/src/main/java/com/redis/riot/file/JsonLineAggregator.java @@ -0,0 +1,29 @@ +package com.redis.riot.file; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.batch.item.file.transform.LineAggregator; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +public class JsonLineAggregator implements LineAggregator { + + private final Logger log = LoggerFactory.getLogger(getClass()); + + private final ObjectMapper mapper; + + public JsonLineAggregator(ObjectMapper mapper) { + this.mapper = mapper; + } + + @Override + public String aggregate(T item) { + try { + return mapper.writeValueAsString(item); + } catch (JsonProcessingException e) { + log.error("Could not serialize item", e); + return null; + } + } +} \ No newline at end of file diff --git a/core/riot-file/src/main/java/com/redis/riot/file/MapFieldSetMapper.java b/core/riot-file/src/main/java/com/redis/riot/file/MapFieldSetMapper.java new file mode 100644 index 000000000..81124e8a3 --- /dev/null +++ b/core/riot-file/src/main/java/com/redis/riot/file/MapFieldSetMapper.java @@ -0,0 +1,25 @@ +package com.redis.riot.file; + +import java.util.HashMap; +import java.util.Map; + +import org.springframework.batch.item.file.mapping.FieldSetMapper; +import org.springframework.batch.item.file.transform.FieldSet; +import org.springframework.util.StringUtils; + +public class MapFieldSetMapper implements FieldSetMapper> { + + @Override + public Map mapFieldSet(FieldSet fieldSet) { + Map fields = new HashMap<>(); + String[] names = fieldSet.getNames(); + for (int index = 0; index < names.length; index++) { + String value = fieldSet.readString(index); + if (StringUtils.hasLength(value)) { + fields.put(names[index], value); + } + } + return fields; + } + +} diff --git a/core/riot-file/src/main/java/com/redis/riot/file/MapToFieldFunction.java b/core/riot-file/src/main/java/com/redis/riot/file/MapToFieldFunction.java new file mode 100644 index 000000000..77c514636 --- /dev/null +++ b/core/riot-file/src/main/java/com/redis/riot/file/MapToFieldFunction.java @@ -0,0 +1,25 @@ +package com.redis.riot.file; + +import java.util.Map; +import java.util.function.Function; + +public class MapToFieldFunction implements Function, Object> { + + private final String key; + + private Object defaultValue = null; + + public MapToFieldFunction(String key) { + this.key = key; + } + + public void setDefaultValue(Object defaultValue) { + this.defaultValue = defaultValue; + } + + @Override + public Object apply(Map t) { + return t.getOrDefault(key, defaultValue); + } + +} diff --git a/core/riot-file/src/main/java/com/redis/riot/file/ObjectMapperLineMapper.java b/core/riot-file/src/main/java/com/redis/riot/file/ObjectMapperLineMapper.java new file mode 100644 index 000000000..066cdac08 --- /dev/null +++ b/core/riot-file/src/main/java/com/redis/riot/file/ObjectMapperLineMapper.java @@ -0,0 +1,22 @@ +package com.redis.riot.file; + +import org.springframework.batch.item.file.LineMapper; + +import com.fasterxml.jackson.databind.ObjectMapper; + +public class ObjectMapperLineMapper implements LineMapper { + + private final Class valueType; + private final ObjectMapper mapper; + + public ObjectMapperLineMapper(ObjectMapper mapper, Class valueType) { + this.mapper = mapper; + this.valueType = valueType; + } + + @Override + public T mapLine(String line, int lineNumber) throws Exception { + return mapper.readValue(line, valueType); + } + +} \ No newline at end of file diff --git a/core/riot-file/src/main/java/com/redis/riot/file/OutputStreamResource.java b/core/riot-file/src/main/java/com/redis/riot/file/OutputStreamResource.java new file mode 100644 index 000000000..4e1f69667 --- /dev/null +++ b/core/riot-file/src/main/java/com/redis/riot/file/OutputStreamResource.java @@ -0,0 +1,61 @@ +package com.redis.riot.file; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Objects; + +import org.springframework.core.io.AbstractResource; +import org.springframework.core.io.WritableResource; + +public class OutputStreamResource extends AbstractResource implements WritableResource { + + private final OutputStream outStream; + private final String filename; + private final String desc; + + public OutputStreamResource(OutputStream outStream, String filename, String desc) { + this.outStream = outStream; + this.filename = filename; + this.desc = desc; + } + + @Override + public OutputStream getOutputStream() throws IOException { + return this.outStream; + } + + @Override + public String getDescription() { + return this.desc; + } + + @Override + public String getFilename() { + return filename; + } + + @Override + public InputStream getInputStream() throws IOException { + throw new IOException("Unable to create input stream."); + } + + @Override + public boolean isWritable() { + return true; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + Objects.hash(desc, outStream); + return result; + } + + @Override + public boolean equals(Object obj) { + return super.equals(obj); + } + +} diff --git a/core/riot-file/src/main/java/com/redis/riot/file/ResourceOptions.java b/core/riot-file/src/main/java/com/redis/riot/file/ResourceOptions.java new file mode 100644 index 000000000..11ba952cd --- /dev/null +++ b/core/riot-file/src/main/java/com/redis/riot/file/ResourceOptions.java @@ -0,0 +1,33 @@ +package com.redis.riot.file; + +public class ResourceOptions { + + private boolean gzipped; + private AwsOptions awsOptions = new AwsOptions(); + private GcpOptions gcpOptions = new GcpOptions(); + + public AwsOptions getAwsOptions() { + return awsOptions; + } + + public void setAwsOptions(AwsOptions awsOptions) { + this.awsOptions = awsOptions; + } + + public GcpOptions getGcpOptions() { + return gcpOptions; + } + + public void setGcpOptions(GcpOptions gcpOptions) { + this.gcpOptions = gcpOptions; + } + + public boolean isGzipped() { + return gzipped; + } + + public void setGzipped(boolean gzipped) { + this.gzipped = gzipped; + } + +} diff --git a/core/riot-file/src/main/java/com/redis/riot/file/SystemInResource.java b/core/riot-file/src/main/java/com/redis/riot/file/SystemInResource.java new file mode 100644 index 000000000..e6ea497e8 --- /dev/null +++ b/core/riot-file/src/main/java/com/redis/riot/file/SystemInResource.java @@ -0,0 +1,12 @@ +package com.redis.riot.file; + +public class SystemInResource extends FilenameInputStreamResource { + + private static final String FILENAME = "stdin"; + private static final String DESCRIPTION = "Standard Input"; + + public SystemInResource() { + super(System.in, FILENAME, DESCRIPTION); + } + +} diff --git a/core/riot-file/src/main/java/com/redis/riot/file/SystemOutResource.java b/core/riot-file/src/main/java/com/redis/riot/file/SystemOutResource.java new file mode 100644 index 000000000..8f47bc79a --- /dev/null +++ b/core/riot-file/src/main/java/com/redis/riot/file/SystemOutResource.java @@ -0,0 +1,12 @@ +package com.redis.riot.file; + +public class SystemOutResource extends OutputStreamResource { + + public static final String FILENAME = "stdout"; + public static final String DESCRIPTION = "Standard Output"; + + public SystemOutResource() { + super(System.out, FILENAME, DESCRIPTION); + } + +} diff --git a/core/riot-file/src/main/java/com/redis/riot/file/UncustomizedUrlResource.java b/core/riot-file/src/main/java/com/redis/riot/file/UncustomizedUrlResource.java new file mode 100644 index 000000000..a867983e7 --- /dev/null +++ b/core/riot-file/src/main/java/com/redis/riot/file/UncustomizedUrlResource.java @@ -0,0 +1,30 @@ +package com.redis.riot.file; + +import org.springframework.core.io.UrlResource; + +import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.MalformedURLException; +import java.net.URI; +import java.net.URL; + +public class UncustomizedUrlResource extends UrlResource { + + public UncustomizedUrlResource(String path) throws MalformedURLException { + super(path); + } + + public UncustomizedUrlResource(URI uri) throws MalformedURLException { + super(uri); + } + + public UncustomizedUrlResource(URL url) { + super(url); + } + + @Override + protected void customizeConnection(HttpURLConnection con) throws IOException { + // do nothing + } + +} diff --git a/core/riot-file/src/main/java/com/redis/riot/file/UnknownFileTypeException.java b/core/riot-file/src/main/java/com/redis/riot/file/UnknownFileTypeException.java new file mode 100644 index 000000000..8ad930217 --- /dev/null +++ b/core/riot-file/src/main/java/com/redis/riot/file/UnknownFileTypeException.java @@ -0,0 +1,18 @@ +package com.redis.riot.file; + +import java.io.IOException; + +@SuppressWarnings("serial") +public class UnknownFileTypeException extends IOException { + + private final String file; + + public UnknownFileTypeException(String file) { + this.file = file; + } + + public String getFile() { + return file; + } + +} diff --git a/core/riot-file/src/main/java/com/redis/riot/file/UnsupportedFileTypeException.java b/core/riot-file/src/main/java/com/redis/riot/file/UnsupportedFileTypeException.java new file mode 100644 index 000000000..3732d7b96 --- /dev/null +++ b/core/riot-file/src/main/java/com/redis/riot/file/UnsupportedFileTypeException.java @@ -0,0 +1,18 @@ +package com.redis.riot.file; + +import java.io.IOException; + +@SuppressWarnings("serial") +public class UnsupportedFileTypeException extends IOException { + + private final FileType fileType; + + public UnsupportedFileTypeException(FileType fileType) { + this.fileType = fileType; + } + + public FileType getFileType() { + return fileType; + } + +} diff --git a/core/riot-file/src/main/java/com/redis/riot/file/xml/XmlItemReader.java b/core/riot-file/src/main/java/com/redis/riot/file/xml/XmlItemReader.java new file mode 100644 index 000000000..80629e07a --- /dev/null +++ b/core/riot-file/src/main/java/com/redis/riot/file/xml/XmlItemReader.java @@ -0,0 +1,108 @@ +package com.redis.riot.file.xml; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.batch.item.ItemStreamReader; +import org.springframework.batch.item.file.ResourceAwareItemReaderItemStream; +import org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader; +import org.springframework.core.io.Resource; +import org.springframework.lang.Nullable; +import org.springframework.util.Assert; + +/** + * {@link ItemStreamReader} implementation that reads XML objects from a {@link Resource} having the following format: + * + *
+ * {@code
+ * 
+ *    ...
+ *    ...
+ * 
+ * }
+ * 
+ * + * The implementation is not thread-safe. + * + * @param the type of XML objects to read + * + * @author Julien Ruaux + */ +public class XmlItemReader extends AbstractItemCountingItemStreamItemReader + implements ResourceAwareItemReaderItemStream { + + private final Logger log = LoggerFactory.getLogger(XmlItemReader.class); + + private Resource resource; + + private XmlObjectReader xmlObjectReader; + + private boolean strict = true; + + /** + * Create a new {@link XmlItemReader} instance. + * + * @param resource the input XML resource + * @param xmlObjectReader the XML object reader to use + */ + public XmlItemReader(Resource resource, XmlObjectReader xmlObjectReader) { + Assert.notNull(resource, "The resource must not be null."); + Assert.notNull(xmlObjectReader, "The XML object reader must not be null."); + this.resource = resource; + this.xmlObjectReader = xmlObjectReader; + } + + /** + * Set the {@link XmlObjectReader} to use to read and map XML elements to domain objects. + * + * @param xmlObjectReader the XML object reader to use + */ + public void setXmlObjectReader(XmlObjectReader xmlObjectReader) { + this.xmlObjectReader = xmlObjectReader; + } + + /** + * In strict mode the reader will throw an exception on {@link #open(org.springframework.batch.item.ExecutionContext)} if + * the input resource does not exist. + * + * @param strict true by default + */ + public void setStrict(boolean strict) { + this.strict = strict; + } + + @Override + public void setResource(Resource resource) { + this.resource = resource; + } + + @Nullable + @Override + protected T doRead() throws Exception { + return xmlObjectReader.read(); + } + + @Override + protected void doOpen() throws Exception { + if (!this.resource.exists()) { + if (this.strict) { + throw new IllegalStateException("Input resource must exist (reader is in 'strict' mode)"); + } + log.warn("Input resource does not exist: {}", resource.getDescription()); + return; + } + if (!this.resource.isReadable()) { + if (this.strict) { + throw new IllegalStateException("Input resource must be readable (reader is in 'strict' mode)"); + } + log.warn("Input resource is not readable: {}", resource.getDescription()); + return; + } + this.xmlObjectReader.open(this.resource); + } + + @Override + protected void doClose() throws Exception { + this.xmlObjectReader.close(); + } + +} diff --git a/core/riot-file/src/main/java/com/redis/riot/file/xml/XmlItemReaderBuilder.java b/core/riot-file/src/main/java/com/redis/riot/file/xml/XmlItemReaderBuilder.java new file mode 100644 index 000000000..a24206eb8 --- /dev/null +++ b/core/riot-file/src/main/java/com/redis/riot/file/xml/XmlItemReaderBuilder.java @@ -0,0 +1,139 @@ +package com.redis.riot.file.xml; + +import org.springframework.core.io.Resource; +import org.springframework.util.Assert; +import org.springframework.util.StringUtils; + +public class XmlItemReaderBuilder { + + private XmlObjectReader xmlObjectReader; + + private Resource resource; + + private String name; + + private boolean strict = true; + + private boolean saveState = true; + + private int maxItemCount = Integer.MAX_VALUE; + + private int currentItemCount; + + /** + * Set the {@link XmlObjectReader} to use to read and map XML objects to domain + * objects. + * + * @param xmlObjectReader to use + * @return The current instance of the builder. + * @see XmlItemReader#setXmlObjectReader(XmlObjectReader) + */ + public XmlItemReaderBuilder xmlObjectReader(XmlObjectReader xmlObjectReader) { + this.xmlObjectReader = xmlObjectReader; + return this; + } + + /** + * The {@link Resource} to be used as input. + * + * @param resource the input to the reader. + * @return The current instance of the builder. + * @see XmlItemReader#setResource(Resource) + */ + public XmlItemReaderBuilder resource(Resource resource) { + this.resource = resource; + return this; + } + + /** + * The name used to calculate the key within the + * {@link org.springframework.batch.item.ExecutionContext}. Required if + * {@link #saveState(boolean)} is set to true. + * + * @param name name of the reader instance + * @return The current instance of the builder. + * @see org.springframework.batch.item.ItemStreamSupport#setName(String) + */ + public XmlItemReaderBuilder name(String name) { + this.name = name; + + return this; + } + + /** + * Setting this value to true indicates that it is an error if the input does + * not exist and an exception will be thrown. Defaults to true. + * + * @param strict indicates the input resource must exist + * @return The current instance of the builder. + * @see XmlItemReader#setStrict(boolean) + */ + public XmlItemReaderBuilder strict(boolean strict) { + this.strict = strict; + + return this; + } + + /** + * Configure if the state of the + * {@link org.springframework.batch.item.ItemStreamSupport} should be persisted + * within the {@link org.springframework.batch.item.ExecutionContext} for + * restart purposes. + * + * @param saveState defaults to true + * @return The current instance of the builder. + */ + public XmlItemReaderBuilder saveState(boolean saveState) { + this.saveState = saveState; + + return this; + } + + /** + * Configure the max number of items to be read. + * + * @param maxItemCount the max items to be read + * @return The current instance of the builder. + * @see org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader#setMaxItemCount(int) + */ + public XmlItemReaderBuilder maxItemCount(int maxItemCount) { + this.maxItemCount = maxItemCount; + + return this; + } + + /** + * Index for the current item. Used on restarts to indicate where to start from. + * + * @param currentItemCount current index + * @return The current instance of the builder. + * @see org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader#setCurrentItemCount(int) + */ + public XmlItemReaderBuilder currentItemCount(int currentItemCount) { + this.currentItemCount = currentItemCount; + + return this; + } + + /** + * Validate the configuration and build a new {@link XmlItemReader}. + * + * @return a new instance of the {@link XmlItemReader} + */ + public XmlItemReader build() { + Assert.notNull(this.xmlObjectReader, "A XML object reader is required."); + Assert.notNull(this.resource, "A resource is required."); + if (this.saveState) { + Assert.state(StringUtils.hasText(this.name), "A name is required when saveState is set to true."); + } + + XmlItemReader reader = new XmlItemReader<>(this.resource, this.xmlObjectReader); + reader.setName(this.name); + reader.setStrict(this.strict); + reader.setSaveState(this.saveState); + reader.setMaxItemCount(this.maxItemCount); + reader.setCurrentItemCount(this.currentItemCount); + + return reader; + } +} diff --git a/core/riot-file/src/main/java/com/redis/riot/file/xml/XmlObjectReader.java b/core/riot-file/src/main/java/com/redis/riot/file/xml/XmlObjectReader.java new file mode 100644 index 000000000..81f6c4410 --- /dev/null +++ b/core/riot-file/src/main/java/com/redis/riot/file/xml/XmlObjectReader.java @@ -0,0 +1,81 @@ +package com.redis.riot.file.xml; + +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.dataformat.xml.XmlMapper; +import com.redis.riot.file.xml.XmlObjectReader; + +import org.springframework.core.io.Resource; +import org.springframework.lang.Nullable; +import org.springframework.util.Assert; + +import javax.xml.stream.XMLInputFactory; +import javax.xml.stream.XMLStreamReader; +import java.io.InputStream; + +/** + * + * @author Julien Ruaux + * + * @param type of the target object + */ +public class XmlObjectReader { + + private Class itemType; + private XMLStreamReader reader; + private XmlMapper mapper = new XmlMapper(); + private InputStream inputStream; + + /** + * Create a new {@link XmlObjectReader} instance. + * + * @param itemType the target item type + */ + public XmlObjectReader(Class itemType) { + this.itemType = itemType; + } + + /** + * Set the object mapper to use to map Xml objects to domain objects. + * + * @param mapper the object mapper to use + */ + public void setMapper(XmlMapper mapper) { + Assert.notNull(mapper, "The mapper must not be null"); + this.mapper = mapper; + } + + public void open(Resource resource) throws Exception { + Assert.notNull(resource, "The resource must not be null"); + this.inputStream = resource.getInputStream(); + this.reader = XMLInputFactory.newFactory().createXMLStreamReader(this.inputStream); + this.mapper = new XmlMapper(); + if (reader.hasNext()) { + reader.next(); // point to root element + } else { + throw new Exception("XML not in the form ..."); + } + if (reader.hasNext()) { + reader.next(); // point to first element under root + } else { + throw new Exception("XML not in the form ..."); + } + } + + @Nullable + public T read() throws Exception { + if (reader.hasNext()) { + try { + return mapper.readValue(reader, itemType); + } catch (JsonParseException e) { + // reached end of stream, ignore + } + } + return null; + } + + public void close() throws Exception { + this.inputStream.close(); + this.reader.close(); + } + +} diff --git a/core/riot-file/src/main/java/com/redis/riot/file/xml/XmlResourceItemWriter.java b/core/riot-file/src/main/java/com/redis/riot/file/xml/XmlResourceItemWriter.java new file mode 100644 index 000000000..3ce7aa2c8 --- /dev/null +++ b/core/riot-file/src/main/java/com/redis/riot/file/xml/XmlResourceItemWriter.java @@ -0,0 +1,115 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.redis.riot.file.xml; + +import java.util.Iterator; + +import org.springframework.batch.item.Chunk; +import org.springframework.batch.item.json.JsonObjectMarshaller; +import org.springframework.batch.item.support.AbstractFileItemWriter; +import org.springframework.core.io.Resource; +import org.springframework.core.io.WritableResource; +import org.springframework.util.Assert; +import org.springframework.util.ClassUtils; + +import com.fasterxml.jackson.dataformat.xml.XmlMapper; + +/** + * Item writer that writes data in XML format to an output file. The location of + * the output file is defined by a {@link Resource} and must represent a + * writable file. Items are transformed to XML format using a {@link XmlMapper}. + * Items will be enclosed in a XML element as follows: + * + *
+ * {@code
+ * 
+ *  ...
+ *  ...
+ *  ...
+ * 
+ * }
+ * 
+ * + * The implementation is not thread-safe. + * + */ +public class XmlResourceItemWriter extends AbstractFileItemWriter { + + private JsonObjectMarshaller xmlObjectMarshaller; + + /** + * Create a new {@link XmlResourceItemWriter} instance. + * + * @param resource to write XML data to + * @param rootName XML root element tag name + * @param xmlObjectMarshaller used to marshal object into XML representation + */ + public XmlResourceItemWriter(WritableResource resource, String rootName, + JsonObjectMarshaller xmlObjectMarshaller) { + Assert.notNull(resource, "resource must not be null"); + Assert.notNull(rootName, "root name must not be null"); + Assert.notNull(xmlObjectMarshaller, "xml object writer must not be null"); + setResource(resource); + setRootName(rootName); + setXmlObjectMarshaller(xmlObjectMarshaller); + setExecutionContextName(ClassUtils.getShortName(XmlResourceItemWriter.class)); + } + + public void setRootName(String rootName) { + setHeaderCallback(writer -> writer.write("<" + rootName + ">")); + setFooterCallback(writer -> writer.write(this.lineSeparator + "" + this.lineSeparator)); + } + + /** + * Assert that mandatory properties (xmlObjectMarshaller) are set. + * + * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet() + */ + @Override + public void afterPropertiesSet() throws Exception { + if (this.append) { + this.shouldDeleteIfExists = false; + } + } + + /** + * Set the {@link JsonObjectMarshaller} to use to marshal object to XML. + * + * @param objectMarshaller the marshaller to use + */ + public void setXmlObjectMarshaller(JsonObjectMarshaller objectMarshaller) { + this.xmlObjectMarshaller = objectMarshaller; + } + + @Override + public String doWrite(Chunk items) { + StringBuilder lines = new StringBuilder(); + Iterator iterator = items.iterator(); + if (!items.isEmpty() && state.getLinesWritten() > 0) { + lines.append(this.lineSeparator); + } + while (iterator.hasNext()) { + T item = iterator.next(); + lines.append(' ').append(this.xmlObjectMarshaller.marshal(item)); + if (iterator.hasNext()) { + lines.append(this.lineSeparator); + } + } + return lines.toString(); + } + +} diff --git a/core/riot-file/src/main/java/com/redis/riot/file/xml/XmlResourceItemWriterBuilder.java b/core/riot-file/src/main/java/com/redis/riot/file/xml/XmlResourceItemWriterBuilder.java new file mode 100644 index 000000000..ca33aa056 --- /dev/null +++ b/core/riot-file/src/main/java/com/redis/riot/file/xml/XmlResourceItemWriterBuilder.java @@ -0,0 +1,236 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.redis.riot.file.xml; + +import org.springframework.batch.item.file.FlatFileFooterCallback; +import org.springframework.batch.item.file.FlatFileHeaderCallback; +import org.springframework.batch.item.json.JsonObjectMarshaller; +import org.springframework.batch.item.support.AbstractFileItemWriter; +import org.springframework.core.io.Resource; +import org.springframework.core.io.WritableResource; +import org.springframework.util.Assert; + +/** + * Builder for {@link XmlResourceItemWriter}. + * + * @param type of objects to write as XML output. + */ +public class XmlResourceItemWriterBuilder { + + private WritableResource resource; + private String rootName; + private JsonObjectMarshaller xmlObjectMarshaller; + private FlatFileHeaderCallback headerCallback; + private FlatFileFooterCallback footerCallback; + + private String name; + private String encoding = AbstractFileItemWriter.DEFAULT_CHARSET; + private String lineSeparator = AbstractFileItemWriter.DEFAULT_LINE_SEPARATOR; + + private boolean append = false; + private boolean saveState = true; + private boolean shouldDeleteIfExists = true; + private boolean shouldDeleteIfEmpty = false; + + /** + * Configure if the state of the + * {@link org.springframework.batch.item.ItemStreamSupport} should be persisted + * within the {@link org.springframework.batch.item.ExecutionContext} for + * restart purposes. + * + * @param saveState defaults to true + * @return The current instance of the builder. + */ + public XmlResourceItemWriterBuilder saveState(boolean saveState) { + this.saveState = saveState; + + return this; + } + + /** + * The name used to calculate the key within the + * {@link org.springframework.batch.item.ExecutionContext}. Required if + * {@link #saveState(boolean)} is set to true. + * + * @param name name of the reader instance + * @return The current instance of the builder. + * @see org.springframework.batch.item.ItemStreamSupport#setName(String) + */ + public XmlResourceItemWriterBuilder name(String name) { + this.name = name; + + return this; + } + + /** + * String used to separate lines in output. Defaults to the System property + * line.separator. + * + * @param lineSeparator value to use for a line separator + * @return The current instance of the builder. + * @see XmlResourceItemWriter#setLineSeparator(String) + */ + public XmlResourceItemWriterBuilder lineSeparator(String lineSeparator) { + this.lineSeparator = lineSeparator; + + return this; + } + + /** + * Set the {@link JsonObjectMarshaller} to use to marshal objects to XML. + * + * @param xmlObjectMarshaller to use + * @return The current instance of the builder. + * @see XmlResourceItemWriter#setXmlObjectMarshaller(JsonObjectMarshaller) + */ + public XmlResourceItemWriterBuilder xmlObjectMarshaller(JsonObjectMarshaller xmlObjectMarshaller) { + this.xmlObjectMarshaller = xmlObjectMarshaller; + + return this; + } + + /** + * The {@link Resource} to be used as output. + * + * @param resource the output of the writer. + * @return The current instance of the builder. + */ + public XmlResourceItemWriterBuilder resource(WritableResource resource) { + this.resource = resource; + + return this; + } + + public XmlResourceItemWriterBuilder rootName(String rootName) { + this.rootName = rootName; + return this; + } + + /** + * Encoding used for output. + * + * @param encoding encoding type. + * @return The current instance of the builder. + * @see XmlResourceItemWriter#setEncoding(String) + */ + public XmlResourceItemWriterBuilder encoding(String encoding) { + this.encoding = encoding; + + return this; + } + + /** + * If set to true, once the step is complete, if the resource previously + * provided is empty, it will be deleted. + * + * @param shouldDelete defaults to false + * @return The current instance of the builder + * @see XmlResourceItemWriter#setShouldDeleteIfEmpty(boolean) + */ + public XmlResourceItemWriterBuilder shouldDeleteIfEmpty(boolean shouldDelete) { + this.shouldDeleteIfEmpty = shouldDelete; + + return this; + } + + /** + * If set to true, upon the start of the step, if the resource already exists, + * it will be deleted and recreated. + * + * @param shouldDelete defaults to true + * @return The current instance of the builder + * @see XmlResourceItemWriter#setShouldDeleteIfExists(boolean) + */ + public XmlResourceItemWriterBuilder shouldDeleteIfExists(boolean shouldDelete) { + this.shouldDeleteIfExists = shouldDelete; + + return this; + } + + /** + * If set to true and the file exists, the output will be appended to the + * existing file. + * + * @param append defaults to false + * @return The current instance of the builder + * @see XmlResourceItemWriter#setAppendAllowed(boolean) + */ + public XmlResourceItemWriterBuilder append(boolean append) { + this.append = append; + + return this; + } + + /** + * A callback for header processing. + * + * @param callback {@link FlatFileHeaderCallback} implementation + * @return The current instance of the builder + * @see XmlResourceItemWriter#setHeaderCallback(FlatFileHeaderCallback) + */ + public XmlResourceItemWriterBuilder headerCallback(FlatFileHeaderCallback callback) { + this.headerCallback = callback; + + return this; + } + + /** + * A callback for footer processing. + * + * @param callback {@link FlatFileFooterCallback} implementation + * @return The current instance of the builder + * @see XmlResourceItemWriter#setFooterCallback(FlatFileFooterCallback) + */ + public XmlResourceItemWriterBuilder footerCallback(FlatFileFooterCallback callback) { + this.footerCallback = callback; + + return this; + } + + /** + * Validate the configuration and build a new {@link XmlResourceItemWriter}. + * + * @return a new instance of the {@link XmlResourceItemWriter} + */ + public XmlResourceItemWriter build() { + Assert.notNull(this.resource, "A resource is required."); + Assert.notNull(this.rootName, "A root name is required."); + Assert.notNull(this.xmlObjectMarshaller, "An xml object marshaller is required."); + + if (this.saveState) { + Assert.hasText(this.name, "A name is required when saveState is true"); + } + + XmlResourceItemWriter xmlResourceItemWriter = new XmlResourceItemWriter<>(this.resource, this.rootName, + this.xmlObjectMarshaller); + + xmlResourceItemWriter.setName(this.name); + xmlResourceItemWriter.setAppendAllowed(this.append); + xmlResourceItemWriter.setEncoding(this.encoding); + if (this.headerCallback != null) { + xmlResourceItemWriter.setHeaderCallback(this.headerCallback); + } + if (this.footerCallback != null) { + xmlResourceItemWriter.setFooterCallback(this.footerCallback); + } + xmlResourceItemWriter.setLineSeparator(this.lineSeparator); + xmlResourceItemWriter.setSaveState(this.saveState); + xmlResourceItemWriter.setShouldDeleteIfEmpty(this.shouldDeleteIfEmpty); + xmlResourceItemWriter.setShouldDeleteIfExists(this.shouldDeleteIfExists); + return xmlResourceItemWriter; + } +}