Skip to content

Commit

Permalink
Merge pull request #1 from lensesio/fixes
Browse files Browse the repository at this point in the history
Fixes
  • Loading branch information
stheppi authored Nov 5, 2023
2 parents a225496 + d6e3a29 commit 62f5ab1
Show file tree
Hide file tree
Showing 15 changed files with 398 additions and 125 deletions.
87 changes: 87 additions & 0 deletions .github/workflows/maven.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
name: CI/CD

on:
push:
branches:
- main

create:
tags:
- 'v*'

jobs:
build:
name: Build
runs-on: ubuntu-latest

steps:
- name: Checkout code
uses: actions/checkout@v3

- name: Set up JDK
uses: actions/setup-java@v3
with:
java-version: '11'
distribution: 'temurin'

- name: Extract tag name
id: extract_tag
run: echo ::set-output name=TAG_NAME::${GITHUB_REF#refs/tags/}

- name: Update Maven version
run: mvn versions:set -DnewVersion=${{ steps.extract_tag.outputs.TAG_NAME }}

- name: Check License
run: mvn license:check

- name: Build
run: mvn clean package -B


release:
name: Create Release
needs: build
if: startsWith(github.ref, 'refs/tags/v')

runs-on: ubuntu-latest

steps:
- name: Checkout code
uses: actions/checkout@v3

- name: Set up JDK
uses: actions/setup-java@v3
with:
java-version: '11' # Or the desired Java version
distribution: 'temurin'

- name: Extract tag name
id: extract_tag
run: echo ::set-output name=TAG_NAME::${GITHUB_REF#refs/tags/}

- name: Update Maven version
run: mvn versions:set -DnewVersion=${{ steps.extract_tag.outputs.TAG_NAME }}

- name: Build Jar
run: mvn -B package --file pom.xml -DskipTests

- name: Create Release
id: create_release
uses: actions/create-release@v1
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with:
tag_name: ${{ github.ref }}
release_name: Release ${{ github.ref }}
draft: false
prerelease: false

- name: Upload JAR
uses: actions/upload-release-asset@v1
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with:
upload_url: ${{ steps.create_release.outputs.upload_url }}
asset_path: ./target/restore-group-offsets-s3-${{ steps.extract_tag.outputs.TAG_NAME }}.tar.gz
asset_name: restore-group-offsets-s3-${{ steps.extract_tag.outputs.TAG_NAME }}.tar.gz
asset_content_type: application/gzip
55 changes: 40 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# restore-consumer-groups-offset
# Kafka backup: Restore Kafka Consumer Offsets from S3

An application allowing to read the consumer groups offsets stored in S3 and apply them to the target Kafka cluster.
The S3 sink stores the information under the following path: `bucket[/prefix]/group/topic/partition`, with the content
Expand All @@ -8,19 +8,18 @@ being the offset (8 bytes array).

The application requires the configuration file. The configuration file in HOCON format supports the following options:

| Configuration Option | Description |
|---------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `Kafka configuration` | All Kafka settings should be prefixed with `kafka.`. For example: `kafka.bootstrap.servers = "localhost:9092"`. |
| `S3 location` | - `aws.bucket`: The name of the S3 bucket where consumer group offsets are stored. |
| | - `aws.prefix` (Optional): The prefix of objects within the bucket. |
| `groups` | An optional, comma-separated list of consumer groups to restore. If not specified, all groups stored in S3 will be restored. For example: `groups = group1, group2`. |
| `AWS connection mode` | - `aws.mode`: Set to `credentials` to use provided credentials or `default` for AWS default credentials provider chain. |
| `AWS Access Key` | - `aws.access.key`: AWS access key ID (only when `aws.mode` is set to `credentials`). |
| `AWS Secret Key` | - `aws.secret.key`: AWS secret access key (only when `aws.mode` is `credentials`). |
| `AWS Region` | - `aws.region`: AWS region (only when `aws.mode` is `credentials`). |
| `AWS HTTP Retries` | - `aws.http.retries`: How many times a failed request is attempted. Default is 5 |
| `AWS HTTP Retry interval` | - `aws.http.retry.inteval`: The time in milliseconds to wait before an HTTP operation is retried. Default is 50. |

| Configuration Option | Description |
|-----------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `Kafka connection settings` | All Kafka settings should be prefixed with `kafka.`. For example: `kafka.bootstrap.servers = "localhost:9092"`. |
| `S3 location` | - `aws.bucket`: The name of the S3 bucket where consumer group offsets are stored. |
| | - `aws.prefix` (Optional): The prefix of objects within the bucket. |
| `groups` | An optional, comma-separated list of consumer groups to restore. If not specified, all groups stored in S3 will be restored. For example: `groups = group1, group2`. |
| `AWS connection mode` | - `aws.mode`: Set to `credentials` to use provided credentials or `default` for AWS default credentials provider chain. |
| `AWS Access Key` | - `aws.access.key`: AWS access key ID (only when `aws.mode` is set to `credentials`). |
| `AWS Secret Key` | - `aws.secret.key`: AWS secret access key (only when `aws.mode` is `credentials`). |
| `AWS Region` | - `aws.region`: AWS region (only when `aws.mode` is `credentials`). |
| `AWS HTTP Retries` | - `aws.http.retries`: How many times a failed request is attempted. Default is 5 |
| `AWS HTTP Retry interval` | - `aws.http.retry.inteval`: The time in milliseconds to wait before an HTTP operation is retried. Default is 50. |

#### Examples

Expand All @@ -29,7 +28,6 @@ The application requires the configuration file. The configuration file in HOCON
```hocon
kafka {
bootstrap.servers = "localhost:9092"
security.protocol = "SSL"
# Add other Kafka settings here
}
Expand Down Expand Up @@ -69,7 +67,34 @@ aws.region = "your-aws-region"

It requires at least Java 8 to run.

To run the application, use the following command:

```bash
java -jar restore-consumer-groups-offset.jar --config <path-to-config-file> [--preview]
```

To build the application, run:

```bash
mvn clean install
```

This creates a tar.gz file in the `target` directory. When unpacked it contains the application jar and the
dependencies:

```properties
--bin
|--- restore.sh
--lib
|--- restore-consumer-groups-offset-1.0.0.jar
|--- ...
```

Using the shell to run the application is recommended, as it sets the classpath and the Java options:

```bash
./restore.sh --config <path-to-config-file> [--preview]
```

To format the code run:

Expand Down
31 changes: 21 additions & 10 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@
<aws.version>2.21.13</aws.version>
<hocon.version>1.4.2</hocon.version>
<kafka.version>3.5.0</kafka.version>
<log.version>2.20.0</log.version>
<logback.version>1.2.6</logback.version>
<mockito.version>3.12.4</mockito.version>
<junit.version>5.10.0</junit.version>
<headerFile>${project.basedir}/checkstyle/java.header</headerFile>
</properties>

Expand Down Expand Up @@ -67,12 +70,28 @@
<artifactId>config</artifactId>
<version>${hocon.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback.version}</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>RELEASE</version>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
Expand Down Expand Up @@ -119,14 +138,6 @@
<artifactId>maven-jar-plugin</artifactId>
<version>3.3.0</version>
<configuration>
<!-- <archive>
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>lib</classpathPrefix>
<mainClass>io.lenses.App</mainClass>
</manifest>
</archive>-->

<archive>
<manifest>
<mainClass>io.lenses.App</mainClass>
Expand Down Expand Up @@ -158,7 +169,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.19.1</version>
<version>3.1.2</version>
<configuration>
<forkCount>1</forkCount>
<reuseForks>false</reuseForks>
Expand Down
15 changes: 9 additions & 6 deletions src/main/java/io/lenses/App.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.nio.file.Files;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.s3.S3Client;

/**
Expand All @@ -44,8 +46,10 @@
* </ul>
*/
public class App {
private static final Logger logger = LoggerFactory.getLogger(App.class);

public static void main(String[] args) {
Ascii.display("/ascii.txt", System.out::println);
Ascii.display("/ascii.txt", logger::info);
final Either<Arguments.Errors, Arguments> either = Arguments.from(args);
if (either.isLeft()) {
final Arguments.Errors error = either.getLeft();
Expand All @@ -69,22 +73,21 @@ public static void main(String[] args) {
? new PreviewAdminClientKafkaOperations()
: AdminClientKafkaOperations.create(configuration.getKafkaProperties())) {
if (!kafkaOperations.checkConnection(10, TimeUnit.SECONDS)) {
System.err.println("Failed to connect to Kafka cluster.");
logger.error("Failed to connect to Kafka cluster.");
} else {
final S3Config s3Config = configuration.getS3Config();
try (S3Client s3Client = S3ClientBuilderHelper.build(s3Config)) {
final AwsGroupOffsetsReader s3Operations = new S3AwsGroupOffsetsReader(s3Client);
final List<GroupOffsets> offsets =
s3Operations.read(configuration.getSource(), configuration.getGroups());
GroupOffsets.consoleOutput(offsets);
System.out.println("Restoring Groups offsets");
logger.info("Restoring Groups offsets");
kafkaOperations.restoreGroupOffsets(offsets, 1, TimeUnit.MINUTES);
System.out.println("Finished restoring Groups offsets");
logger.info("Finished restoring Groups offsets");
}
}
}
} catch (Exception e) {
System.err.println("An error occurred. " + e);
logger.error("An error occurred. ", e);
System.exit(1);
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/lenses/Arguments.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public static Either<Errors, Arguments> from(String[] args) {
for (int i = 0; i < args.length; i++) {
if (args[i].equals("--config") && i + 1 < args.length) {
configFilePath = args[i + 1];
i++; // Skip the next argument, which is the file path
i++;
} else if (args[i].equals("--preview")) {
isPreview = true;
}
Expand Down
5 changes: 4 additions & 1 deletion src/main/java/io/lenses/Configuration.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,10 @@ public static Configuration from(InputStream inputStream) {
if (!sourceConfig.hasPath("bucket"))
throw new IllegalArgumentException("S3 bucket is required");
final String bucket = sourceConfig.getString("bucket");
final Optional<String> prefix = Optional.ofNullable(sourceConfig.getString("prefix"));
final Optional<String> prefix =
sourceConfig.hasPath("prefix")
? Optional.ofNullable(sourceConfig.getString("prefix"))
: Optional.empty();
final S3Location source = new S3Location(bucket, prefix);

// groups are optional, when define it's a comma separated list
Expand Down
15 changes: 1 addition & 14 deletions src/main/java/io/lenses/kafka/AdminClientKafkaOperations.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,20 +55,7 @@ public void restoreGroupOffsets(List<GroupOffsets> offsets, long timeout, TimeUn
offsets.stream()
.map(
offset -> {
offset
.getOffsets()
.forEach(
(topicPartition, offsetAndMetadata) -> {
System.out.println(
"Restoring Group:"
+ offset.getGroup()
+ " Topic:"
+ topicPartition.topic()
+ " Partition:"
+ topicPartition.partition()
+ " Offset:"
+ offsetAndMetadata.offset());
});
print(offset);
AlterConsumerGroupOffsetsResult result =
admin.alterConsumerGroupOffsets(offset.getGroup(), offset.getOffsets());
return new Tuple2<>(offset, result);
Expand Down
40 changes: 22 additions & 18 deletions src/main/java/io/lenses/kafka/GroupOffsets.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
*/
package io.lenses.kafka;

import java.util.Comparator;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
Expand All @@ -22,7 +23,6 @@ public class GroupOffsets {
public GroupOffsets(String group, Map<TopicPartition, OffsetAndMetadata> offsets) {
if (group == null) throw new IllegalArgumentException("Group cannot be null");
if (offsets == null) throw new IllegalArgumentException("Offsets cannot be null");
if (offsets.isEmpty()) throw new IllegalArgumentException("Offsets cannot be empty");
this.group = group;
this.offsets = offsets;
}
Expand All @@ -35,22 +35,26 @@ public Map<TopicPartition, OffsetAndMetadata> getOffsets() {
return offsets;
}

public static void consoleOutput(List<GroupOffsets> offsets) {
offsets.forEach(
offset -> {
System.out.println("Restoring Group:" + offset.getGroup());
offset
.getOffsets()
.forEach(
(topicPartition, offsetAndMetadata) -> {
System.out.println(
"Topic:"
+ topicPartition.topic()
+ " Partition:"
+ topicPartition.partition()
+ " Offset:"
+ offsetAndMetadata.offset());
});
});
public List<Map.Entry<TopicPartition, OffsetAndMetadata>> getSortedOffset() {
List<Map.Entry<TopicPartition, OffsetAndMetadata>> sortedOffsets =
new java.util.ArrayList<>(offsets.entrySet());
sortedOffsets.sort(new CustomComparator());
return sortedOffsets;
}

private static class CustomComparator
implements Comparator<Map.Entry<TopicPartition, OffsetAndMetadata>> {
@Override
public int compare(
Map.Entry<TopicPartition, OffsetAndMetadata> left,
Map.Entry<TopicPartition, OffsetAndMetadata> right) {
int topicComparison = left.getKey().topic().compareTo(right.getKey().topic());
if (topicComparison != 0) {
return topicComparison;
} else {

return Integer.compare(left.getKey().partition(), right.getKey().partition());
}
}
}
}
Loading

0 comments on commit 62f5ab1

Please sign in to comment.