From a22549613eb965dfa3e3c0adcac73d3b6036bee5 Mon Sep 17 00:00:00 2001 From: stheppi Date: Fri, 3 Nov 2023 23:14:55 +0000 Subject: [PATCH] First commit for the App code --- README.md | 84 +++- checkstyle/checkstyle.xml | 382 ++++++++++++++++++ checkstyle/java.header | 16 + checkstyle/suppressions.xml | 27 ++ pom.xml | 199 +++++++++ restore.sh | 42 ++ src/main/assembly/zip.xml | 37 ++ src/main/java/io/lenses/App.java | 95 +++++ src/main/java/io/lenses/Arguments.java | 73 ++++ src/main/java/io/lenses/Configuration.java | 92 +++++ .../kafka/AdminClientKafkaOperations.java | 106 +++++ .../java/io/lenses/kafka/GroupOffsets.java | 56 +++ .../java/io/lenses/kafka/KafkaOperations.java | 21 + .../PreviewAdminClientKafkaOperations.java | 45 +++ .../io/lenses/s3/AwsGroupOffsetsReader.java | 25 ++ src/main/java/io/lenses/s3/AwsMode.java | 16 + .../io/lenses/s3/S3AwsGroupOffsetsReader.java | 104 +++++ .../io/lenses/s3/S3ClientBuilderHelper.java | 74 ++++ src/main/java/io/lenses/s3/S3Config.java | 115 ++++++ src/main/java/io/lenses/s3/S3Location.java | 42 ++ src/main/java/io/lenses/utils/Ascii.java | 35 ++ src/main/java/io/lenses/utils/Either.java | 65 +++ src/main/java/io/lenses/utils/Tuple2.java | 29 ++ src/main/java/io/lenses/utils/Utils.java | 30 ++ src/main/resources/ascii.txt | 14 + src/main/resources/license-header.txt | 8 + src/test/java/io/lenses/ArgumentsTest.java | 91 +++++ .../java/io/lenses/ConfigurationTest.java | 187 +++++++++ .../kafka/AdminClientKafkaOperationsTest.java | 105 +++++ src/test/java/io/lenses/utils/AsciiTest.java | 47 +++ src/test/java/io/lenses/utils/UtilsTest.java | 32 ++ src/test/resources/application.conf | 0 src/test/resources/ascii.txt | 1 + 33 files changed, 2294 insertions(+), 1 deletion(-) create mode 100644 checkstyle/checkstyle.xml create mode 100644 checkstyle/java.header create mode 100644 checkstyle/suppressions.xml create mode 100644 pom.xml create mode 100644 restore.sh create mode 100644 src/main/assembly/zip.xml create mode 100644 src/main/java/io/lenses/App.java create mode 100644 src/main/java/io/lenses/Arguments.java create mode 100644 src/main/java/io/lenses/Configuration.java create mode 100644 src/main/java/io/lenses/kafka/AdminClientKafkaOperations.java create mode 100644 src/main/java/io/lenses/kafka/GroupOffsets.java create mode 100644 src/main/java/io/lenses/kafka/KafkaOperations.java create mode 100644 src/main/java/io/lenses/kafka/PreviewAdminClientKafkaOperations.java create mode 100644 src/main/java/io/lenses/s3/AwsGroupOffsetsReader.java create mode 100644 src/main/java/io/lenses/s3/AwsMode.java create mode 100644 src/main/java/io/lenses/s3/S3AwsGroupOffsetsReader.java create mode 100644 src/main/java/io/lenses/s3/S3ClientBuilderHelper.java create mode 100644 src/main/java/io/lenses/s3/S3Config.java create mode 100644 src/main/java/io/lenses/s3/S3Location.java create mode 100644 src/main/java/io/lenses/utils/Ascii.java create mode 100644 src/main/java/io/lenses/utils/Either.java create mode 100644 src/main/java/io/lenses/utils/Tuple2.java create mode 100644 src/main/java/io/lenses/utils/Utils.java create mode 100644 src/main/resources/ascii.txt create mode 100644 src/main/resources/license-header.txt create mode 100644 src/test/java/io/lenses/ArgumentsTest.java create mode 100644 src/test/java/io/lenses/ConfigurationTest.java create mode 100644 src/test/java/io/lenses/kafka/AdminClientKafkaOperationsTest.java create mode 100644 src/test/java/io/lenses/utils/AsciiTest.java create mode 100644 src/test/java/io/lenses/utils/UtilsTest.java create mode 100644 src/test/resources/application.conf create mode 100644 src/test/resources/ascii.txt diff --git a/README.md b/README.md index 8872458..aa1b03f 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,84 @@ # restore-consumer-groups-offset -An application allowing to read the consumer groups offsets stored in S3 and apply them to the target Kafka cluster + +An application allowing to read the consumer groups offsets stored in S3 and apply them to the target Kafka cluster. +The S3 sink stores the information under the following path: `bucket[/prefix]/group/topic/partition`, with the content +being the offset (8 bytes array). + +### Configuration + +The application requires the configuration file. The configuration file in HOCON format supports the following options: + +| Configuration Option | Description | +|---------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `Kafka configuration` | All Kafka settings should be prefixed with `kafka.`. For example: `kafka.bootstrap.servers = "localhost:9092"`. | +| `S3 location` | - `aws.bucket`: The name of the S3 bucket where consumer group offsets are stored. | +| | - `aws.prefix` (Optional): The prefix of objects within the bucket. | +| `groups` | An optional, comma-separated list of consumer groups to restore. If not specified, all groups stored in S3 will be restored. For example: `groups = group1, group2`. | +| `AWS connection mode` | - `aws.mode`: Set to `credentials` to use provided credentials or `default` for AWS default credentials provider chain. | +| `AWS Access Key` | - `aws.access.key`: AWS access key ID (only when `aws.mode` is set to `credentials`). | +| `AWS Secret Key` | - `aws.secret.key`: AWS secret access key (only when `aws.mode` is `credentials`). | +| `AWS Region` | - `aws.region`: AWS region (only when `aws.mode` is `credentials`). | +| `AWS HTTP Retries` | - `aws.http.retries`: How many times a failed request is attempted. Default is 5 | +| `AWS HTTP Retry interval` | - `aws.http.retry.inteval`: The time in milliseconds to wait before an HTTP operation is retried. Default is 50. | + + +#### Examples + +##### Example 1 + +```hocon +kafka { + bootstrap.servers = "localhost:9092" + security.protocol = "SSL" + # Add other Kafka settings here +} + +aws { + bucket = "your-s3-bucket" + prefix = "optional-prefix" +} + +# Optional: Specify the consumer groups to restore +groups = "group1,group2" + +aws { + mode = "credentials" # or "default" + access.key = "your-access-key" + secret.key = "your-secret-key" + region = "your-aws-region" +} +``` + +##### Example 2 + +```hocon +kafka.bootstrap.servers = "localhost:9092" + +aws.bucket = "your-s3-bucket" +aws.prefix = "optional-prefix" + +groups = "group1,group2" + +aws.mode = "credentials" # or "default" +aws.access.key = "your-access-key" +aws.secret.key = "your-secret-key" +aws.region = "your-aws-region" +``` + +## Running the application + +It requires at least Java 8 to run. + + + +To format the code run: + +```bash + mvn com.coveo:fmt-maven-plugin:format +``` + +To add license header, run: + +```bash +mvn license:format +``` \ No newline at end of file diff --git a/checkstyle/checkstyle.xml b/checkstyle/checkstyle.xml new file mode 100644 index 0000000..d4952ca --- /dev/null +++ b/checkstyle/checkstyle.xml @@ -0,0 +1,382 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/checkstyle/java.header b/checkstyle/java.header new file mode 100644 index 0000000..45804ac --- /dev/null +++ b/checkstyle/java.header @@ -0,0 +1,16 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ \ No newline at end of file diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml new file mode 100644 index 0000000..225c233 --- /dev/null +++ b/checkstyle/suppressions.xml @@ -0,0 +1,27 @@ + + + + + + + + + + \ No newline at end of file diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..c6fe882 --- /dev/null +++ b/pom.xml @@ -0,0 +1,199 @@ + + + + 4.0.0 + + io.lenses + restore-group-offsets-s3 + 1.0.0 + jar + + + 8 + 8 + UTF-8 + 1.8 + 2.21.13 + 1.4.2 + 3.5.0 + 3.12.4 + ${project.basedir}/checkstyle/java.header + + + + + org.apache.kafka + kafka-clients + ${kafka.version} + + + software.amazon.awssdk + s3 + ${aws.version} + + + software.amazon.awssdk + sts + ${aws.version} + + + software.amazon.awssdk + apache-client + ${aws.version} + + + com.typesafe + config + ${hocon.version} + + + org.junit.jupiter + junit-jupiter + RELEASE + test + + + org.mockito + mockito-core + ${mockito.version} + test + + + + + + + com.coveo + fmt-maven-plugin + 2.9 + + src/main/java + src/test/java + false + .*\.java + false + false + + + + + + check + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.11.0 + true + + ${java.version} + ${java.version} + + + + org.apache.maven.plugins + maven-jar-plugin + 3.3.0 + + + + + + io.lenses.App + true + true + + + + + + org.apache.maven.plugins + maven-assembly-plugin + 3.6.0 + + + package + + single + + + false + + src/main/assembly/zip.xml + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + 2.19.1 + + 1 + false + + + + + com.mycila + license-maven-plugin + 4.2 + + + +
${project.basedir}/src/main/resources/license-header.txt
+ + **/*.java + + + **/README + ${project.basedir}/src/test/resources/** + ${project.basedir}/src/main/resources/** + **/generated/**/*.java + *.pom + .gitignore + ${project.basedir}/.github/** + ${project.basedir}/.idea/** + checkstyle/** + +
+
+ + JAVADOC_STYLE + +
+
+
+
+
diff --git a/restore.sh b/restore.sh new file mode 100644 index 0000000..6b8b9b5 --- /dev/null +++ b/restore.sh @@ -0,0 +1,42 @@ +#!/bin/bash + +# Set the base directory where this script is located +BASEDIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" + + + +# Initialize optional arguments +PREVIEW=false +CONFIG_FILE="" + +# Create a classpath variable to include all JARs in the lib folder +CLASSPATH="${BASEDIR}/../lib/*" + +# Parse command line arguments +while [[ $# -gt 0 ]]; do + key="$1" + case $key in + --preview) + PREVIEW=true + ;; + *) + CONFIG_FILE="$1" + ;; + esac + shift +done + +# Check if a configuration file is provided +if [ -z "$CONFIG_FILE" ]; then + echo "Error: Please specify the application configuration file." + exit 1 +fi + +# Add optional logic for handling the --preview flag +if [ "$PREVIEW" = true ]; then + echo "Running the application in preview mode with configuration file: $CONFIG_FILE" + java -cp "$CLASSPATH" io.lenses.App --config "$CONFIG_FILE" --preview +else + echo "Running the application with configuration file: $CONFIG_FILE" + java -cp "$CLASSPATH" io.lenses.App --config "$CONFIG_FILE" +fi diff --git a/src/main/assembly/zip.xml b/src/main/assembly/zip.xml new file mode 100644 index 0000000..c63358f --- /dev/null +++ b/src/main/assembly/zip.xml @@ -0,0 +1,37 @@ + + assembly + true + + + tar.gz + + + + ${project.build.scriptSourceDirectory} + + + startup.* + + + + ${project.basedir}/conf + + + + + ${project.basedir}/restore.sh + bin + + + ${project.build.directory}/${project.artifactId}-${project.version}.jar + lib + + + + + lib + + + \ No newline at end of file diff --git a/src/main/java/io/lenses/App.java b/src/main/java/io/lenses/App.java new file mode 100644 index 0000000..83e485c --- /dev/null +++ b/src/main/java/io/lenses/App.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package io.lenses; + +import io.lenses.kafka.AdminClientKafkaOperations; +import io.lenses.kafka.GroupOffsets; +import io.lenses.kafka.KafkaOperations; +import io.lenses.kafka.PreviewAdminClientKafkaOperations; +import io.lenses.s3.AwsGroupOffsetsReader; +import io.lenses.s3.S3AwsGroupOffsetsReader; +import io.lenses.s3.S3ClientBuilderHelper; +import io.lenses.s3.S3Config; +import io.lenses.utils.Ascii; +import io.lenses.utils.Either; +import java.io.InputStream; +import java.nio.file.Files; +import java.util.List; +import java.util.concurrent.TimeUnit; +import software.amazon.awssdk.services.s3.S3Client; + +/** + * The application will read the group offsets stored in S3. The S3 object key is structured as + * $bucket/$prefix/${group}/${topic}/${partition} and the content represnets the long bytes of the + * offset. The application will then restore the group offsets to the Kafka cluster. + * + *

The application will receive the configuration file as arguments containing: + * + *

    + *
  • source=the AWS bucket and prefix, optionally, where the group offsets are restored + *
  • all the Kafka properties are prefixed with kafka + *
  • groups=an optional comma separated groups to consider + *
  • aws.mode=credentials all default chain provider + *
  • aws.region=the target AWS region + *
  • aws.access.key=when using credentials mode + *
  • aws.secret.key=when using credentials mode + *
+ */ +public class App { + public static void main(String[] args) { + Ascii.display("/ascii.txt", System.out::println); + final Either either = Arguments.from(args); + if (either.isLeft()) { + final Arguments.Errors error = either.getLeft(); + System.err.println(error.getMessage()); + switch (error) { + case MISSING_CONFIG_FILE: + printUsage(); + break; + case CONFIG_FILE_DOES_NOT_EXIST: + break; + } + System.exit(1); + } + + final Arguments arguments = either.getRight(); + + try (InputStream inputStream = Files.newInputStream(arguments.getConfigFile().toPath())) { + final Configuration configuration = Configuration.from(inputStream); + try (KafkaOperations kafkaOperations = + arguments.isPreview() + ? new PreviewAdminClientKafkaOperations() + : AdminClientKafkaOperations.create(configuration.getKafkaProperties())) { + if (!kafkaOperations.checkConnection(10, TimeUnit.SECONDS)) { + System.err.println("Failed to connect to Kafka cluster."); + } else { + final S3Config s3Config = configuration.getS3Config(); + try (S3Client s3Client = S3ClientBuilderHelper.build(s3Config)) { + final AwsGroupOffsetsReader s3Operations = new S3AwsGroupOffsetsReader(s3Client); + final List offsets = + s3Operations.read(configuration.getSource(), configuration.getGroups()); + GroupOffsets.consoleOutput(offsets); + System.out.println("Restoring Groups offsets"); + kafkaOperations.restoreGroupOffsets(offsets, 1, TimeUnit.MINUTES); + System.out.println("Finished restoring Groups offsets"); + } + } + } + } catch (Exception e) { + System.err.println("An error occurred. " + e); + System.exit(1); + } + } + + private static void printUsage() { + System.out.println("Usage: --config [--preview]"); + } +} diff --git a/src/main/java/io/lenses/Arguments.java b/src/main/java/io/lenses/Arguments.java new file mode 100644 index 0000000..85d4147 --- /dev/null +++ b/src/main/java/io/lenses/Arguments.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package io.lenses; + +import io.lenses.utils.Either; +import java.io.File; + +public class Arguments { + private final File configFile; + private final boolean preview; + + public Arguments(File configFile, boolean preview) { + this.configFile = configFile; + this.preview = preview; + } + + public File getConfigFile() { + return configFile; + } + + public boolean isPreview() { + return preview; + } + + public static Either from(String[] args) { + String configFilePath = null; + boolean isPreview = false; + + for (int i = 0; i < args.length; i++) { + if (args[i].equals("--config") && i + 1 < args.length) { + configFilePath = args[i + 1]; + i++; // Skip the next argument, which is the file path + } else if (args[i].equals("--preview")) { + isPreview = true; + } + } + + if (configFilePath == null) { + return Either.left(Errors.MISSING_CONFIG_FILE); + } + + File configFile = new File(configFilePath); + + if (!configFile.exists()) { + return Either.left(Errors.CONFIG_FILE_DOES_NOT_EXIST); + } + + return Either.right(new Arguments(configFile, isPreview)); + } + + public static enum Errors { + MISSING_CONFIG_FILE("Error: Missing --config argument."), + CONFIG_FILE_DOES_NOT_EXIST("Config file does not exist."); + + private final String message; + + Errors(String message) { + this.message = message; + } + + public String getMessage() { + return message; + } + } +} diff --git a/src/main/java/io/lenses/Configuration.java b/src/main/java/io/lenses/Configuration.java new file mode 100644 index 0000000..08d04a4 --- /dev/null +++ b/src/main/java/io/lenses/Configuration.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package io.lenses; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import io.lenses.s3.S3Config; +import io.lenses.s3.S3Location; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.HashMap; +import java.util.Optional; + +public class Configuration { + private final S3Location source; + private final Optional groups; + + private final HashMap kafkaProperties; + + private final S3Config s3Config; + + public Configuration( + S3Location source, + Optional groups, + S3Config s3Config, + HashMap kafkaProperties) { + if (source == null) throw new IllegalArgumentException("S3 source cannot be null"); + if (s3Config == null) throw new IllegalArgumentException("S3 config cannot be null"); + if (kafkaProperties == null) + throw new IllegalArgumentException("Kafka properties cannot be null"); + this.source = source; + this.groups = groups; + this.kafkaProperties = kafkaProperties; + this.s3Config = s3Config; + } + + public S3Location getSource() { + return source; + } + + public Optional getGroups() { + return groups; + } + + public HashMap getKafkaProperties() { + return kafkaProperties; + } + + public static Configuration from(InputStream inputStream) { + // read the input stream as HOCON and return the configuration + final Config config = ConfigFactory.parseReader(new InputStreamReader(inputStream)); + // read all kafka properties + final HashMap kafkaProperties = new HashMap<>(); + if (!config.hasPath("kafka")) + throw new IllegalArgumentException("Kafka properties are required"); + config + .getConfig("kafka") + .entrySet() + .forEach(e -> kafkaProperties.put(e.getKey(), e.getValue().unwrapped().toString())); + + // read the source + if (!config.hasPath("aws")) throw new IllegalArgumentException("S3 source is required"); + final Config sourceConfig = config.getConfig("aws"); + if (!sourceConfig.hasPath("bucket")) + throw new IllegalArgumentException("S3 bucket is required"); + final String bucket = sourceConfig.getString("bucket"); + final Optional prefix = Optional.ofNullable(sourceConfig.getString("prefix")); + final S3Location source = new S3Location(bucket, prefix); + + // groups are optional, when define it's a comma separated list + final Optional groups = + config.hasPath("groups") + ? Optional.of(config.getString("groups").split(",")) + : Optional.empty(); + + // read AwsMode + final S3Config s3Config = S3Config.from(config); + return new Configuration(source, groups, s3Config, kafkaProperties); + } + + public S3Config getS3Config() { + return s3Config; + } +} diff --git a/src/main/java/io/lenses/kafka/AdminClientKafkaOperations.java b/src/main/java/io/lenses/kafka/AdminClientKafkaOperations.java new file mode 100644 index 0000000..756050f --- /dev/null +++ b/src/main/java/io/lenses/kafka/AdminClientKafkaOperations.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package io.lenses.kafka; + +import io.lenses.utils.Tuple2; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AlterConsumerGroupOffsetsResult; +import org.apache.kafka.clients.admin.DescribeClusterResult; + +/** A class which uses the AdminClient to store the consumer groups offsets. */ +public class AdminClientKafkaOperations implements KafkaOperations { + private final Admin admin; + + public AdminClientKafkaOperations(Admin adminClient) { + if (adminClient == null) throw new IllegalArgumentException("AdminClient cannot be null"); + this.admin = adminClient; + } + + /** + * Checks the connection to the Kafka cluster + * + * @return true if the connection is successful, false otherwise + */ + @Override + public boolean checkConnection(long timeout, TimeUnit unit) { + try { + DescribeClusterResult result = admin.describeCluster(); + result.clusterId().get(timeout, unit); + return true; + } catch (Exception e) { + return false; + } + } + + @Override + public void restoreGroupOffsets(List offsets, long timeout, TimeUnit unit) { + // traverse the list of GroupOffsets and call the admin client to restore the offsets + // capture the future and wait for it to complete + // if the future fails, throw an exception + List> results = + offsets.stream() + .map( + offset -> { + offset + .getOffsets() + .forEach( + (topicPartition, offsetAndMetadata) -> { + System.out.println( + "Restoring Group:" + + offset.getGroup() + + " Topic:" + + topicPartition.topic() + + " Partition:" + + topicPartition.partition() + + " Offset:" + + offsetAndMetadata.offset()); + }); + AlterConsumerGroupOffsetsResult result = + admin.alterConsumerGroupOffsets(offset.getGroup(), offset.getOffsets()); + return new Tuple2<>(offset, result); + }) + .collect(Collectors.toList()); + + results.forEach( + result -> { + try { + System.out.println("Awaiting result for group:" + result._1().getGroup()); + result._2().all().get(timeout, unit); + } catch (Exception e) { + throw new RuntimeException( + "Failed to restore group offsets for group:" + result._1().getGroup(), e); + } + }); + } + + @Override + public void close() throws Exception { + admin.close(); + } + + public static AdminClientKafkaOperations create(Map properties) { + final Properties props = new Properties(); + props.putAll(properties); + return create(props); + } + + public static AdminClientKafkaOperations create(Properties properties) { + if (properties == null) throw new IllegalArgumentException("Properties cannot be null"); + AdminClient adminClient = AdminClient.create(properties); + return new AdminClientKafkaOperations(adminClient); + } +} diff --git a/src/main/java/io/lenses/kafka/GroupOffsets.java b/src/main/java/io/lenses/kafka/GroupOffsets.java new file mode 100644 index 0000000..48a7268 --- /dev/null +++ b/src/main/java/io/lenses/kafka/GroupOffsets.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package io.lenses.kafka; + +import java.util.List; +import java.util.Map; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; + +public class GroupOffsets { + private final String group; + private final Map offsets; + + public GroupOffsets(String group, Map offsets) { + if (group == null) throw new IllegalArgumentException("Group cannot be null"); + if (offsets == null) throw new IllegalArgumentException("Offsets cannot be null"); + if (offsets.isEmpty()) throw new IllegalArgumentException("Offsets cannot be empty"); + this.group = group; + this.offsets = offsets; + } + + public String getGroup() { + return group; + } + + public Map getOffsets() { + return offsets; + } + + public static void consoleOutput(List offsets) { + offsets.forEach( + offset -> { + System.out.println("Restoring Group:" + offset.getGroup()); + offset + .getOffsets() + .forEach( + (topicPartition, offsetAndMetadata) -> { + System.out.println( + "Topic:" + + topicPartition.topic() + + " Partition:" + + topicPartition.partition() + + " Offset:" + + offsetAndMetadata.offset()); + }); + }); + } +} diff --git a/src/main/java/io/lenses/kafka/KafkaOperations.java b/src/main/java/io/lenses/kafka/KafkaOperations.java new file mode 100644 index 0000000..3b8999d --- /dev/null +++ b/src/main/java/io/lenses/kafka/KafkaOperations.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package io.lenses.kafka; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +public interface KafkaOperations extends AutoCloseable { + + boolean checkConnection(long timeout, TimeUnit unit); + + void restoreGroupOffsets(List offsets, long timeout, TimeUnit unit); +} diff --git a/src/main/java/io/lenses/kafka/PreviewAdminClientKafkaOperations.java b/src/main/java/io/lenses/kafka/PreviewAdminClientKafkaOperations.java new file mode 100644 index 0000000..f6d8cae --- /dev/null +++ b/src/main/java/io/lenses/kafka/PreviewAdminClientKafkaOperations.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package io.lenses.kafka; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** A class which only previews the changes made but does not actually make them. */ +public class PreviewAdminClientKafkaOperations implements KafkaOperations { + @Override + public boolean checkConnection(long timeout, TimeUnit unit) { + return true; + } + + @Override + public void restoreGroupOffsets(List offsets, long timeout, TimeUnit unit) { + offsets.forEach( + offset -> { + System.out.println("Restoring Group:" + offset.getGroup()); + offset + .getOffsets() + .forEach( + (topicPartition, offsetAndMetadata) -> { + System.out.println( + "Topic:" + + topicPartition.topic() + + " Partition:" + + topicPartition.partition() + + " Offset:" + + offsetAndMetadata.offset()); + }); + }); + } + + @Override + public void close() throws Exception {} +} diff --git a/src/main/java/io/lenses/s3/AwsGroupOffsetsReader.java b/src/main/java/io/lenses/s3/AwsGroupOffsetsReader.java new file mode 100644 index 0000000..09cbd3f --- /dev/null +++ b/src/main/java/io/lenses/s3/AwsGroupOffsetsReader.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package io.lenses.s3; + +import io.lenses.kafka.GroupOffsets; +import java.util.List; +import java.util.Optional; + +/** + * The S3 sink stores consumer groups offsets using this key: + * bucket/prefix/${group}/${topic}/${partition} + * + *

This interface is used to read the group offsets from S3. + */ +public interface AwsGroupOffsetsReader { + List read(S3Location source, Optional groups); +} diff --git a/src/main/java/io/lenses/s3/AwsMode.java b/src/main/java/io/lenses/s3/AwsMode.java new file mode 100644 index 0000000..2aeb5bc --- /dev/null +++ b/src/main/java/io/lenses/s3/AwsMode.java @@ -0,0 +1,16 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package io.lenses.s3; + +public enum AwsMode { + CREDENTIALS, + DEFAULT +} diff --git a/src/main/java/io/lenses/s3/S3AwsGroupOffsetsReader.java b/src/main/java/io/lenses/s3/S3AwsGroupOffsetsReader.java new file mode 100644 index 0000000..fd365a1 --- /dev/null +++ b/src/main/java/io/lenses/s3/S3AwsGroupOffsetsReader.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package io.lenses.s3; + +import io.lenses.kafka.GroupOffsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import software.amazon.awssdk.core.ResponseBytes; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; +import software.amazon.awssdk.services.s3.model.S3Object; +import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Iterable; + +/** + * Implementation of {@link AwsGroupOffsetsReader} that reads the offsets from S3. + * + *

The S3 sink stores consumer groups offsets using a key like: + * bucket/prefix/${group}/${topic}/${partition}. The content is the 8 bytes long of the offset. The + * implementation starts from the bucket and prefix, and then it will list all the groups, topics + * and partitions and read the offsets. + */ +public class S3AwsGroupOffsetsReader implements AwsGroupOffsetsReader { + private final S3Client s3Client; + + public S3AwsGroupOffsetsReader(S3Client s3Client) { + this.s3Client = s3Client; + } + + @Override + public List read(S3Location source, Optional groups) { + System.out.println( + "Reading Consumer Group offsets from bucket:" + + source.getBucket() + + " prefix:" + + source.getPrefix().orElse("")); + ListObjectsV2Request.Builder requestBuilder = + ListObjectsV2Request.builder().bucket(source.getBucket()); + if (source.getPrefix().isPresent()) { + requestBuilder.prefix(source.getPrefix().get()); + } + ListObjectsV2Request request = requestBuilder.build(); + ListObjectsV2Iterable iterable = s3Client.listObjectsV2Paginator(request); + final Iterator iterator = iterable.iterator(); + final Map offsetsMap = new HashMap<>(); + while (iterator.hasNext()) { + final ListObjectsV2Response response = iterator.next(); + for (S3Object s3Object : response.contents()) { + String key = s3Object.key(); + System.out.println("Reading offsets for key:" + key); + final ResponseBytes objResponse = + s3Client.getObjectAsBytes( + GetObjectRequest.builder().bucket(source.getBucket()).key(key).build()); + final long offset = objResponse.asByteBuffer().getLong(); + final String[] parts = key.split("/"); + final String group = parts[parts.length - 3]; + final String topic = parts[parts.length - 2]; + final int partition = Integer.parseInt(parts[parts.length - 1]); + final TopicPartition topicPartition = new TopicPartition(topic, partition); + final OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(offset); + if (groups.isPresent()) { + final String[] groupsArray = groups.get(); + + if (Arrays.asList(groupsArray).contains(group)) { + offsetsMap + .computeIfAbsent(group, k -> new GroupOffsets(group, new HashMap<>())) + .getOffsets() + .put(topicPartition, offsetAndMetadata); + } + } else { + offsetsMap + .computeIfAbsent(group, k -> new GroupOffsets(group, new HashMap<>())) + .getOffsets() + .put(topicPartition, offsetAndMetadata); + } + } + } + final List groupsOffsets = new ArrayList<>(offsetsMap.values()); + groupsOffsets.sort(Comparator.comparing(GroupOffsets::getGroup)); + System.out.println( + "Finished reading Consumer Groups offsets S3 data. Found " + + groupsOffsets.size() + + " groups."); + return groupsOffsets; + } +} diff --git a/src/main/java/io/lenses/s3/S3ClientBuilderHelper.java b/src/main/java/io/lenses/s3/S3ClientBuilderHelper.java new file mode 100644 index 0000000..7737666 --- /dev/null +++ b/src/main/java/io/lenses/s3/S3ClientBuilderHelper.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package io.lenses.s3; + +import java.time.Duration; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; +import software.amazon.awssdk.core.retry.RetryPolicy; +import software.amazon.awssdk.core.retry.backoff.FixedDelayBackoffStrategy; +import software.amazon.awssdk.http.SdkHttpClient; +import software.amazon.awssdk.http.apache.ApacheHttpClient; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.S3ClientBuilder; +import software.amazon.awssdk.services.s3.S3Configuration; + +public class S3ClientBuilderHelper { + public static S3Client build(S3Config config) { + // build the S3 client based on the configuration + final RetryPolicy retryPolicy = + RetryPolicy.builder() + .numRetries(config.getAwsHttpRetries()) + .backoffStrategy( + FixedDelayBackoffStrategy.create( + Duration.ofMillis(config.getAwsHttpRetryInterval()))) + .build(); + + final ClientOverrideConfiguration overrideConfig = + ClientOverrideConfiguration.builder().retryPolicy(retryPolicy).build(); + + final S3Configuration s3Config = + S3Configuration.builder() + .pathStyleAccessEnabled(config.isEnableVirtualHostBuckets()) + .build(); + + final SdkHttpClient httpClient = ApacheHttpClient.builder().build(); + + final AwsCredentialsProvider credsProv = credentialsProvider(config); + final S3ClientBuilder builder = + S3Client.builder() + .overrideConfiguration(overrideConfig) + .serviceConfiguration(s3Config) + .credentialsProvider(credsProv) + .httpClient(httpClient); + if (config.getAwsRegion().isPresent()) { + builder.region(Region.of(config.getAwsRegion().get())); + } + return builder.build(); + } + + private static AwsCredentialsProvider credentialsProvider(S3Config config) { + switch (config.getAwsMode()) { + case CREDENTIALS: + return StaticCredentialsProvider.create( + AwsBasicCredentials.create( + config.getAwsAccessKey().get(), config.getAwsSecretKey().get())); + case DEFAULT: + return DefaultCredentialsProvider.create(); + default: + throw new IllegalArgumentException("Unsupported AWS mode: " + config.getAwsMode()); + } + } +} diff --git a/src/main/java/io/lenses/s3/S3Config.java b/src/main/java/io/lenses/s3/S3Config.java new file mode 100644 index 0000000..5527a8c --- /dev/null +++ b/src/main/java/io/lenses/s3/S3Config.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package io.lenses.s3; + +import com.typesafe.config.Config; +import java.util.Optional; + +public class S3Config { + private final AwsMode awsMode; + private final Optional awsRegion; + + private final Optional awsAccessKey; + private final Optional awsSecretKey; + + private final int awsHttpRetries; + private final long awsHttpRetryInterval; + + private final boolean enableVirtualHostBuckets; + + public S3Config( + AwsMode awsMode, + Optional awsRegion, + Optional awsAccessKey, + Optional awsSecretKey, + int awsHttpRetries, + long awsHttpRetryInterval, + boolean enableVirtualHostBuckets) { + if (awsMode == null) throw new IllegalArgumentException("AWS mode cannot be null"); + if (awsRegion == null) throw new IllegalArgumentException("AWS region cannot be null"); + if (awsMode == AwsMode.CREDENTIALS + && (!awsAccessKey.isPresent() || !awsSecretKey.isPresent())) { + throw new IllegalArgumentException("AWS credentials mode requires access and secret keys"); + } + this.awsMode = awsMode; + this.awsRegion = awsRegion; + this.awsAccessKey = awsAccessKey; + this.awsSecretKey = awsSecretKey; + this.awsHttpRetries = awsHttpRetries; + this.awsHttpRetryInterval = awsHttpRetryInterval; + this.enableVirtualHostBuckets = enableVirtualHostBuckets; + } + + public AwsMode getAwsMode() { + return awsMode; + } + + public Optional getAwsRegion() { + return awsRegion; + } + + public Optional getAwsAccessKey() { + return awsAccessKey; + } + + public Optional getAwsSecretKey() { + return awsSecretKey; + } + + public int getAwsHttpRetries() { + return awsHttpRetries; + } + + public long getAwsHttpRetryInterval() { + return awsHttpRetryInterval; + } + + public static S3Config from(Config config) { + final AwsMode awsMode = AwsMode.valueOf(config.getString("aws.mode").toUpperCase()); + final String awsRegion = config.getString("aws.region"); + // if credentials mode, read the access and secret keys or throw an exception if they are + // missing + if (awsMode == AwsMode.CREDENTIALS + && (!config.hasPath("aws.access.key") || !config.hasPath("aws.secret.key"))) { + throw new IllegalArgumentException("AWS credentials mode requires access and secret keys"); + } + final Optional awsAccessKey = + awsMode == AwsMode.CREDENTIALS + ? Optional.of(config.getString("aws.access.key")) + : Optional.empty(); + final Optional awsSecretKey = + awsMode == AwsMode.CREDENTIALS + ? Optional.of(config.getString("aws.secret.key")) + : Optional.empty(); + + // read the http retries and interval if not default to 5 and 50L + final int awsHttpRetries = + config.hasPath("aws.http.retries") ? config.getInt("aws.http.retries") : 5; + final long awsHttpRetryInterval = + config.hasPath("aws.http.retry.interval") ? config.getLong("aws.http.retry.interval") : 50L; + + final boolean enableVirtualHostBuckets = + config.hasPath("aws.enable.virtual.host.buckets") + && config.getBoolean("aws.enable.virtual.host.buckets"); + return new S3Config( + awsMode, + Optional.of(awsRegion), + awsAccessKey, + awsSecretKey, + awsHttpRetries, + awsHttpRetryInterval, + enableVirtualHostBuckets); + } + + public boolean isEnableVirtualHostBuckets() { + return enableVirtualHostBuckets; + } +} diff --git a/src/main/java/io/lenses/s3/S3Location.java b/src/main/java/io/lenses/s3/S3Location.java new file mode 100644 index 0000000..f27d410 --- /dev/null +++ b/src/main/java/io/lenses/s3/S3Location.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package io.lenses.s3; + +import java.util.Optional; + +public class S3Location { + private final String bucket; + // + private final Optional prefix; + + public S3Location(String bucket, Optional prefix) { + if (bucket == null) throw new IllegalArgumentException("S3 bucket cannot be null"); + if (bucket.trim().isEmpty()) throw new IllegalArgumentException("S3 bucket cannot be empty"); + this.bucket = bucket; + this.prefix = prefix; + } + + public String getBucket() { + return bucket; + } + + public Optional getPrefix() { + return prefix; + } + + @Override + public boolean equals(Object obj) { + if (obj == null) return false; + if (!(obj instanceof S3Location)) return false; + S3Location other = (S3Location) obj; + return bucket.equals(other.bucket) && prefix.equals(other.prefix); + } +} diff --git a/src/main/java/io/lenses/utils/Ascii.java b/src/main/java/io/lenses/utils/Ascii.java new file mode 100644 index 0000000..5a30193 --- /dev/null +++ b/src/main/java/io/lenses/utils/Ascii.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package io.lenses.utils; + +import static io.lenses.utils.Utils.readAll; + +import io.lenses.App; +import java.io.IOException; +import java.io.InputStream; +import java.util.function.Consumer; + +public class Ascii { + // java function one argument returns void + + public static void display(String resourceName, Consumer func) { + if (resourceName == null || resourceName.isEmpty()) { + return; + } + try (InputStream inputStream = App.class.getResourceAsStream(resourceName)) { + if (inputStream != null) { + func.accept(readAll(inputStream)); + } + } catch (IOException e) { + System.err.println("Failed to read the ascii.txt resource."); + } + } +} diff --git a/src/main/java/io/lenses/utils/Either.java b/src/main/java/io/lenses/utils/Either.java new file mode 100644 index 0000000..bfcf457 --- /dev/null +++ b/src/main/java/io/lenses/utils/Either.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package io.lenses.utils; + +import java.util.function.Consumer; + +public class Either { + private final A left; + private final B right; + private final boolean isLeft; + + private Either(A left, B right, boolean isLeft) { + this.left = left; + this.right = right; + this.isLeft = isLeft; + } + + public static Either left(A left) { + return new Either<>(left, null, true); + } + + public static Either right(B right) { + return new Either<>(null, right, false); + } + + public boolean isLeft() { + return isLeft; + } + + public boolean isRight() { + return !isLeft; + } + + public void ifRightOrElse(Consumer rightConsumer, Consumer leftConsumer) { + if (isLeft) { + leftConsumer.accept(left); + } else { + rightConsumer.accept(right); + } + } + + public A getLeft() { + return left; + } + + public B getRight() { + return right; + } + + public C fold(Fold fold) { + return fold.fold(left, right); + } + + public interface Fold { + C fold(A left, B right); + } +} diff --git a/src/main/java/io/lenses/utils/Tuple2.java b/src/main/java/io/lenses/utils/Tuple2.java new file mode 100644 index 0000000..0716794 --- /dev/null +++ b/src/main/java/io/lenses/utils/Tuple2.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package io.lenses.utils; + +public class Tuple2 { + private final T _1; + private final R _2; + + public Tuple2(T _1, R _2) { + this._1 = _1; + this._2 = _2; + } + + public T _1() { + return _1; + } + + public R _2() { + return _2; + } +} diff --git a/src/main/java/io/lenses/utils/Utils.java b/src/main/java/io/lenses/utils/Utils.java new file mode 100644 index 0000000..ea67808 --- /dev/null +++ b/src/main/java/io/lenses/utils/Utils.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package io.lenses.utils; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; + +public class Utils { + public static String readAll(InputStream inputStream) throws IOException { + ByteArrayOutputStream result = new ByteArrayOutputStream(); + byte[] buffer = new byte[1024]; + int length; + + while ((length = inputStream.read(buffer)) != -1) { + result.write(buffer, 0, length); + } + + return result.toString(StandardCharsets.UTF_8.name()); + } +} diff --git a/src/main/resources/ascii.txt b/src/main/resources/ascii.txt new file mode 100644 index 0000000..b2793e9 --- /dev/null +++ b/src/main/resources/ascii.txt @@ -0,0 +1,14 @@ + _ _ _ _ +| | | | _ __ __| | __ _ | |_ ___ +| |_| || '_ \/ _` |/ _` || _|/ -_) + \___/ | .__/\__,_|\__,_| \__|\___| + |_| + ___ ___ + / __| ___ _ _ ___ _ _ _ __ ___ _ _ / __| _ _ ___ _ _ _ __ ___ +| (__ / _ \| ' \ (_-<| || || ' \ / -_)| '_| | (_ || '_|/ _ \| || || '_ \(_-< + \___|\___/|_||_|/__/ \_,_||_|_|_|\___||_| \___||_| \___/ \_,_|| .__//__/ + |_| + __ __ _ by Lenses.io + ___ / _| / _| ___ ___ | |_ +/ _ \| _|| _|(_- { + assertEquals( + arguments.getConfigFile().getAbsolutePath(), configFile.getAbsolutePath()); + assertTrue(arguments.isPreview()); + }, + errors -> fail("Should not return errors")); + } + + @Test + void returnsMissingConfigFileError() { + Arguments.from(new String[] {"--preview"}) + .ifRightOrElse( + arguments -> fail("Should not return arguments"), + errors -> assertEquals(errors, Arguments.Errors.MISSING_CONFIG_FILE)); + } + + @Test + void returnsConfigFileDoesNotExistError() { + Arguments.from(new String[] {"--config", "non-existent-file.txt"}) + .ifRightOrElse( + arguments -> fail("Should not return arguments"), + errors -> assertEquals(errors, Arguments.Errors.CONFIG_FILE_DOES_NOT_EXIST)); + } + + @Test + void returnsConfigFileDoesNotExistErrorWhenConfigFileIsNotSpecified() { + Arguments.from(new String[] {}) + .ifRightOrElse( + arguments -> fail("Should not return arguments"), + errors -> assertEquals(errors, Arguments.Errors.MISSING_CONFIG_FILE)); + } + + @Test + void returnsFalseForPreviewWhenItsNotSpecified() throws IOException { + // create a temp file which is deleted on process stop + File configFile = new File("config.txt"); + configFile.deleteOnExit(); + configFile.createNewFile(); + Arguments.from(new String[] {"--config", configFile.getAbsolutePath()}) + .ifRightOrElse( + arguments -> assertFalse(arguments.isPreview()), + errors -> fail("Should not return errors")); + } + + @Test + void returnsArgumentsWhenPreviewIsTheFirstOne() throws IOException { + // create a temp file which is deleted on process stop + File configFile = new File("config.txt"); + configFile.deleteOnExit(); + configFile.createNewFile(); + Arguments.from(new String[] {"--preview", "--config", configFile.getAbsolutePath()}) + .ifRightOrElse( + arguments -> { + assertEquals( + arguments.getConfigFile().getAbsolutePath(), configFile.getAbsolutePath()); + assertTrue(arguments.isPreview()); + }, + errors -> fail("Should not return errors")); + } +} diff --git a/src/test/java/io/lenses/ConfigurationTest.java b/src/test/java/io/lenses/ConfigurationTest.java new file mode 100644 index 0000000..22c919b --- /dev/null +++ b/src/test/java/io/lenses/ConfigurationTest.java @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package io.lenses; + +import static org.junit.jupiter.api.Assertions.*; + +import io.lenses.s3.AwsMode; +import java.io.ByteArrayInputStream; +import org.junit.jupiter.api.Test; + +class ConfigurationTest { + + // code to test + /* public static Configuration from(InputStream inputStream) { + //read the input stream as HOCON and return the configuration + final Config config = ConfigFactory.parseReader(new InputStreamReader(inputStream)); + //read all kafka properties + final HashMap kafkaProperties = new HashMap<>(); + if (!config.hasPath("kafka")) throw new IllegalArgumentException("Kafka properties are required"); + config.getConfig("kafka").entrySet().forEach(e -> kafkaProperties.put(e.getKey(), e.getValue().unwrapped().toString())); + + //read the source + if (!config.hasPath("aws")) throw new IllegalArgumentException("S3 source is required"); + final Config sourceConfig = config.getConfig("aws"); + if (!sourceConfig.hasPath("bucket")) throw new IllegalArgumentException("S3 bucket is required"); + final String bucket = sourceConfig.getString("bucket"); + final Optional prefix = Optional.ofNullable(sourceConfig.getString("prefix")); + final S3Location source = new S3Location(bucket, prefix); + + final Optional groups = Optional.of(config.getString("groups").split(",")); + + //read AwsMode + final AwsMode awsMode = AwsMode.valueOf(config.getString("aws.mode").toUpperCase()); + final String awsRegion = config.getString("aws.region"); + //if credentials mode, read the access and secret keys + final Optional awsAccessKey = awsMode == AwsMode.CREDENTIALS ? Optional.of(config.getString("aws.access.key")) : Optional.empty(); + final Optional awsSecretKey = awsMode == AwsMode.CREDENTIALS ? Optional.of(config.getString("aws.secret.key")) : Optional.empty(); + + return new Configuration(source, groups, awsMode, awsRegion, awsAccessKey, awsSecretKey, kafkaProperties); + } */ + @Test + void createAnInstanceOfConfigurationFromTheHoconConfiguration() { + // create the Hocon configuration as string; use flatten keys + final String hocon = + "kafka.bootstrap.servers=\"localhost:9092\"\n" + + "kafka.security.protocol=PLAINTEXT\n" + + "kafka.sasl.mechanism=PLAIN\n" + + "kafka.sasl.jaas.config=\"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-secret\";\"\n" + + "aws.bucket=io.lenses\n" + + "aws.prefix=group-offsets\n" + + "groups=\"group1,group2\"\n" + + "aws.mode=credentials\n" + + "aws.region=eu-west-1\n" + + "aws.access.key=access-key\n" + + "aws.secret.key=secret-key\n"; + + // println hocon + System.out.println(hocon); + final Configuration configuration = + Configuration.from(new ByteArrayInputStream(hocon.getBytes())); + assertNotNull(configuration); + assertEquals("io.lenses", configuration.getSource().getBucket()); + assertEquals("group-offsets", configuration.getSource().getPrefix().get()); + assertEquals("group1", configuration.getGroups().get()[0]); + assertEquals("group2", configuration.getGroups().get()[1]); + assertEquals(AwsMode.CREDENTIALS, configuration.getS3Config().getAwsMode()); + assertEquals("eu-west-1", configuration.getS3Config().getAwsRegion()); + assertEquals("access-key", configuration.getS3Config().getAwsAccessKey().get()); + assertEquals("secret-key", configuration.getS3Config().getAwsSecretKey().get()); + assertEquals("localhost:9092", configuration.getKafkaProperties().get("bootstrap.servers")); + assertEquals("PLAINTEXT", configuration.getKafkaProperties().get("security.protocol")); + assertEquals("PLAIN", configuration.getKafkaProperties().get("sasl.mechanism")); + assertEquals( + "org.apache.kafka.common.security.plain.PlainLoginModule required username=admin password=admin-secret;", + configuration.getKafkaProperties().get("sasl.jaas.config")); + } + + @Test + void ignoreSecretAndAccessKeysIfTheModeIsDefault() { + final String hocon = + "kafka.bootstrap.servers=\"localhost:9092\"\n" + + "kafka.security.protocol=PLAINTEXT\n" + + "kafka.sasl.mechanism=PLAIN\n" + + "kafka.sasl.jaas.config=\"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-secret\";\"\n" + + "aws.bucket=io.lenses\n" + + "aws.prefix=group-offsets\n" + + "groups=\"group1,group2\"\n" + + "aws.mode=default\n" + + "aws.region=eu-west-1\n" + + "aws.access.key=access-key\n" + + "aws.secret.key=secret-key\n"; + + final Configuration configuration = + Configuration.from(new ByteArrayInputStream(hocon.getBytes())); + assertNotNull(configuration); + assertEquals("io.lenses", configuration.getSource().getBucket()); + assertEquals("group-offsets", configuration.getSource().getPrefix().get()); + assertEquals("group1", configuration.getGroups().get()[0]); + assertEquals("group2", configuration.getGroups().get()[1]); + assertEquals(AwsMode.DEFAULT, configuration.getS3Config().getAwsMode()); + assertEquals("eu-west-1", configuration.getS3Config().getAwsRegion()); + assertFalse(configuration.getS3Config().getAwsAccessKey().isPresent()); + assertFalse(configuration.getS3Config().getAwsSecretKey().isPresent()); + assertEquals("localhost:9092", configuration.getKafkaProperties().get("bootstrap.servers")); + assertEquals("PLAINTEXT", configuration.getKafkaProperties().get("security.protocol")); + assertEquals("PLAIN", configuration.getKafkaProperties().get("sasl.mechanism")); + assertEquals( + "org.apache.kafka.common.security.plain.PlainLoginModule required username=admin password=admin-secret;", + configuration.getKafkaProperties().get("sasl.jaas.config")); + } + + @Test + void hasEmptyGroupsWhenTheGroupSettingIsNotSpecified() { + final String hocon = + "kafka.bootstrap.servers=\"localhost:9092\"\n" + + "kafka.security.protocol=PLAINTEXT\n" + + "kafka.sasl.mechanism=PLAIN\n" + + "kafka.sasl.jaas.config=\"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-secret\";\"\n" + + "aws.bucket=io.lenses\n" + + "aws.prefix=group-offsets\n" + + "aws.mode=default\n" + + "aws.region=eu-west-1\n" + + "aws.access.key=access-key\n" + + "aws.secret.key=secret-key\n"; + + final Configuration configuration = + Configuration.from(new ByteArrayInputStream(hocon.getBytes())); + assertNotNull(configuration); + assertEquals("io.lenses", configuration.getSource().getBucket()); + assertEquals("group-offsets", configuration.getSource().getPrefix().get()); + assertFalse(configuration.getGroups().isPresent()); + assertEquals(AwsMode.DEFAULT, configuration.getS3Config().getAwsMode()); + assertEquals("eu-west-1", configuration.getS3Config().getAwsRegion()); + assertFalse(configuration.getS3Config().getAwsAccessKey().isPresent()); + assertFalse(configuration.getS3Config().getAwsSecretKey().isPresent()); + assertEquals("localhost:9092", configuration.getKafkaProperties().get("bootstrap.servers")); + assertEquals("PLAINTEXT", configuration.getKafkaProperties().get("security.protocol")); + assertEquals("PLAIN", configuration.getKafkaProperties().get("sasl.mechanism")); + assertEquals( + "org.apache.kafka.common.security.plain.PlainLoginModule required username=admin password=admin-secret;", + configuration.getKafkaProperties().get("sasl.jaas.config")); + } + + @Test + void throwsAnExceptionWhenCredentialModeIsSetButSecretKeyIsNot() { + final String hocon = + "kafka.bootstrap.servers=\"localhost:9092\"\n" + + "kafka.security.protocol=PLAINTEXT\n" + + "kafka.sasl.mechanism=PLAIN\n" + + "kafka.sasl.jaas.config=\"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-secret\";\"\n" + + "aws.bucket=io.lenses\n" + + "aws.prefix=group-offsets\n" + + "aws.mode=credentials\n" + + "aws.region=eu-west-1\n" + + "aws.access.key=access-key\n"; + + assertThrows( + IllegalArgumentException.class, + () -> Configuration.from(new ByteArrayInputStream(hocon.getBytes()))); + } + + @Test + void throwsAnExceptionWhenCredentialModeIsSetButAccessKeyIsNot() { + final String hocon = + "kafka.bootstrap.servers=\"localhost:9092\"\n" + + "kafka.security.protocol=PLAINTEXT\n" + + "kafka.sasl.mechanism=PLAIN\n" + + "kafka.sasl.jaas.config=\"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-secret\";\"\n" + + "aws.bucket=io.lenses\n" + + "aws.prefix=group-offsets\n" + + "aws.mode=credentials\n" + + "aws.region=eu-west-1\n" + + "aws.secret.key=secret-key\n"; + + assertThrows( + IllegalArgumentException.class, + () -> Configuration.from(new ByteArrayInputStream(hocon.getBytes()))); + } +} diff --git a/src/test/java/io/lenses/kafka/AdminClientKafkaOperationsTest.java b/src/test/java/io/lenses/kafka/AdminClientKafkaOperationsTest.java new file mode 100644 index 0000000..ef300cd --- /dev/null +++ b/src/test/java/io/lenses/kafka/AdminClientKafkaOperationsTest.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package io.lenses.kafka; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.concurrent.TimeUnit; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AlterConsumerGroupOffsetsResult; +import org.apache.kafka.clients.admin.DescribeClusterResult; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.internals.KafkaFutureImpl; +import org.junit.jupiter.api.Test; + +class AdminClientKafkaOperationsTest { + @Test + void returnsTrueOnCheckConnection() { + Admin admin = mock(Admin.class); + String clusterId = "clusterId"; + + final DescribeClusterResult result = mock(DescribeClusterResult.class); + when(result.clusterId()).thenReturn(KafkaFuture.completedFuture(clusterId)); + when(admin.describeCluster()).thenReturn(result); + assertTrue(new AdminClientKafkaOperations(admin).checkConnection(1, TimeUnit.SECONDS)); + } + + @Test + void returnsFalseOnCheckConnectionWhenTimeoutIsInvolved() { + Admin admin = mock(Admin.class); + + final DescribeClusterResult result = mock(DescribeClusterResult.class); + KafkaFutureImpl future = new KafkaFutureImpl<>(); + when(result.clusterId()).thenReturn(future); + when(admin.describeCluster()).thenReturn(result); + assertFalse(new AdminClientKafkaOperations(admin).checkConnection(1, TimeUnit.SECONDS)); + } + + @Test + void appliesTheGroupOffsets() { + Admin admin = mock(Admin.class); + + AdminClientKafkaOperations ops = new AdminClientKafkaOperations(admin); + ArrayList offsets = new ArrayList<>(); + offsets.add( + new GroupOffsets( + "group", + Collections.singletonMap( + new TopicPartition("topic", 0), new OffsetAndMetadata(0L, "metadata")))); + + offsets.add( + new GroupOffsets( + "group2", + Collections.singletonMap( + new TopicPartition("topic2", 0), new OffsetAndMetadata(0L, "metadata")))); + + AlterConsumerGroupOffsetsResult mock1 = mock(AlterConsumerGroupOffsetsResult.class); + when(mock1.all()).thenReturn(KafkaFuture.completedFuture(null)); + + AlterConsumerGroupOffsetsResult mock2 = mock(AlterConsumerGroupOffsetsResult.class); + when(mock2.all()).thenReturn(KafkaFuture.completedFuture(null)); + + when(admin.alterConsumerGroupOffsets( + "group", + Collections.singletonMap( + new TopicPartition("topic", 0), new OffsetAndMetadata(0L, "metadata")))) + .thenReturn(mock1); + + when(admin.alterConsumerGroupOffsets( + "group2", + Collections.singletonMap( + new TopicPartition("topic2", 0), new OffsetAndMetadata(0L, "metadata")))) + .thenReturn(mock2); + ops.restoreGroupOffsets(offsets, 1, TimeUnit.SECONDS); + + // check the calls were made once + verify(admin, times(1)) + .alterConsumerGroupOffsets( + "group", + Collections.singletonMap( + new TopicPartition("topic", 0), new OffsetAndMetadata(0L, "metadata"))); + + verify(admin, times(1)) + .alterConsumerGroupOffsets( + "group2", + Collections.singletonMap( + new TopicPartition("topic2", 0), new OffsetAndMetadata(0L, "metadata"))); + } +} diff --git a/src/test/java/io/lenses/utils/AsciiTest.java b/src/test/java/io/lenses/utils/AsciiTest.java new file mode 100644 index 0000000..c29efcd --- /dev/null +++ b/src/test/java/io/lenses/utils/AsciiTest.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package io.lenses.utils; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +import java.util.concurrent.atomic.AtomicReference; +import org.junit.jupiter.api.Test; + +class AsciiTest { + @Test + void handlesNullResourceName() { + AtomicReference captured = new AtomicReference<>(null); + Ascii.display(null, captured::set); + assertNull(captured.get()); + } + + @Test + void handlesEmptyResourceName() { + AtomicReference captured = new AtomicReference<>(null); + Ascii.display("", captured::set); + assertNull(captured.get()); + } + + @Test + void handlesNonExistentResourceName() { + AtomicReference captured = new AtomicReference<>(null); + Ascii.display("non-existent-resource-name", captured::set); + assertNull(captured.get()); + } + + @Test + void returnsTheResourceContent() { + AtomicReference captured = new AtomicReference<>(null); + Ascii.display("/ascii.txt", captured::set); + assertEquals(captured.get(), "test result"); + } +} diff --git a/src/test/java/io/lenses/utils/UtilsTest.java b/src/test/java/io/lenses/utils/UtilsTest.java new file mode 100644 index 0000000..174063a --- /dev/null +++ b/src/test/java/io/lenses/utils/UtilsTest.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package io.lenses.utils; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.fail; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import org.junit.jupiter.api.Test; + +class UtilsTest { + + @Test + void returnsTheContentOfTheInputStream() { + final String content = "lore ipsum"; + final ByteArrayInputStream inputStream = new ByteArrayInputStream(content.getBytes()); + try { + assertEquals(Utils.readAll(inputStream), content); + } catch (IOException e) { + fail("Should not throw exception"); + } + } +} diff --git a/src/test/resources/application.conf b/src/test/resources/application.conf new file mode 100644 index 0000000..e69de29 diff --git a/src/test/resources/ascii.txt b/src/test/resources/ascii.txt new file mode 100644 index 0000000..c226e99 --- /dev/null +++ b/src/test/resources/ascii.txt @@ -0,0 +1 @@ +test result \ No newline at end of file