Skip to content

Commit

Permalink
Update readmes
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffxiang committed Aug 7, 2024
1 parent 4a26f95 commit a5a14dc
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 145 deletions.
36 changes: 32 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,31 +1,59 @@
# Kafka Tiered Storage
This edition of Kafka tiered storage provides a Kafka broker independent approach to tiered storage. It consists of two main components:
Kafka Tiered Storage is a framework that allows [Apache Kafka](https://kafka.apache.org/) brokers to offload finalized log segments to a remote storage system.
This allows Kafka to maintain a smaller disk footprint and reduce the need for expensive storage on the brokers.
The framework also provides a Kafka client compatible consumer that can consume from both the broker and the remote storage system.

Pinterest's implementation of Kafka Tiered Storage provides a Kafka broker independent approach to tiered storage. It consists of two main components:
1. [Uploader](ts-segment-uploader): A continuous process that runs on each Kafka broker and uploads finalized log segments to a remote storage system (e.g. Amazon S3, with unique prefix per Kafka cluster and topic).
2. [Consumer](ts-consumer): A Kafka client compatible consumer that consumes from both tiered storage log segments and Kafka cluster.

A third module [ts-common](ts-common) contains common classes and interfaces that are used by the `ts-consumer` and `ts-segment-uploader` modules, such as Metrics, StorageEndpointProvider, etc.

Feel free to read into each module's README for more details.

## Why Tiered Storage?
[Apache Kafka](https://kafka.apache.org/) is a distributed event streaming platform that stores partitioned and replicated log segments on disk for
a configurable retention period. However, as data volume and/or retention periods grow, the disk footprint of Kafka clusters can become expensive.
Tiered Storage allows Kafka to offload finalized log segments to a more cost-effective remote storage system, reducing the need for expensive storage on the brokers.

With Tiered Storage, you can:
1. Maintain a smaller overall broker footprint, reducing operational costs
2. Retain data for longer periods of time while avoiding horizontal and vertical scaling of Kafka clusters
3. Reduce CPU, network, and disk I/O utilization on brokers by reading directly from remote storage

## Highlights
- **Kafka Broker Independent**: The tiered storage solution is designed to be Kafka broker independent, meaning it runs as an independent process alongside the Kafka server process. Currently, it only supports ZooKeeper-based Kafka versions. KRaft support is WIP.
- **Fault Tolerant**: Broker restarts, replacements, leadership changes, and other common Kafka operations / issues are handled gracefully.
- **Skip the broker entirely during consumption**: The consumer can read from both broker and Tiered Storage backend filesystem. When in TIERED_STORAGE_ONLY mode, the consumption loop does not touch the broker itself, allowing for reduction in broker resource utilization.
- **Skip the broker entirely during consumption**: The consumer can read from both broker and Tiered Storage backend filesystem. When in `TIERED_STORAGE_ONLY` mode, the consumption loop does not touch the broker itself, allowing for reduction in broker resource utilization.
- **Pluggable Storage Backends**: The framework is designed to be backend-agnostic. Currently, only S3 is supported. More backend filesystems will be supported in the near future.
- **S3 Partitioning**: Prefix-entropy (salting) is configurable out-of-the-box to allow for prefix-partitioned S3 buckets, allowing for better scalability by avoiding request rate hotspots.
- **Metrics**: Comprehensive metrics are provided out-of-the-box for monitoring and alerting purposes.

# Quick Start
Detailed quickstart instructions are available [here](docs/quickstart.md).

# Usage
Using Kafka Tiered Storage consists of the following high-level steps:
1. Have a remote storage system ready to accept reads and writes of log segments (e.g. Amazon S3 bucket)
2. Configure and start [ts-segment-uploader](ts-segment-uploader) on each Kafka broker
3. Use [ts-consumer](ts-consumer) to read from either the broker or the remote storage system
4. Monitor and manage the tiered storage system using the provided metrics and tools

Feel free to read into each module's README for more details.

# Architecture
![Architecture](docs/images/architecture.png)

# Current Status
This project is currently under active development. Some of our planned features and improvements:
**Kafka Tiered Storage is currently under active development and the APIs may change over time.**

Kafka Tiered Storage currently supports the following remote storage systems:
- Amazon S3

Some of our planned features and improvements:

- KRaft support
- More backend filesystems (e.g. HDFS)
- More storage system support (e.g. HDFS)
- Integration with [PubSub Client](https://github.com/pinterest/psc) (backend-agnostic client library)

Contributions are always welcome!
Expand Down
147 changes: 27 additions & 120 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -1,134 +1,41 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.pinterest.kafka.tieredstorage</groupId>
<artifactId>kafka-tiered-storage</artifactId>
<version>0.0.2-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

<artifactId>ts-common</artifactId>

<groupId>com.pinterest.kafka.tieredstorage</groupId>
<artifactId>kafka-tiered-storage</artifactId>
<version>0.0.2-SNAPSHOT</version>
<packaging>pom</packaging>
<name>kafka-tiered-storage</name>
<description>Kafka Tiered Storage</description>
<modules>
<module>ts-segment-uploader</module>
<module>ts-consumer</module>
<module>ts-common</module>
<module>ts-examples</module>
</modules>
<properties>
<maven.compiler.source>20</maven.compiler.source>
<maven.compiler.target>20</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>2.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.17.1</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.17.1</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.6</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
<version>2.17.273</version>
</dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>4.1.2</version>
</dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-jvm</artifactId>
<version>4.1.2</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>29.0-jre</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>RELEASE</version>
<scope>test</scope>
</dependency>
</dependencies>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>3.2.1</version>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>2.9.1</version>
<executions>
<execution>
<id>attach-javadocs</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.0.0-M5</version>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.3</version>
<configuration>
<!--suppress UnresolvedMavenProperty -->
<argLine>${argLine}</argLine>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>

</project>
</project>
5 changes: 4 additions & 1 deletion ts-consumer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@ The consumer can be configured to read records from Kafka brokers, S3, or both.
## Configuration
The consumer can be configured the same way as a regular Kafka consumer. In addition, the following properties can be set:
1. `tiered.storage.mode`: The consumption mode. It can be one of the following values: `KAFKA_PREFERRED`, `KAFKA_ONLY`, `TIERED_STORAGE_PREFERRED`, `TIERED_STORAGE_ONLY`.
2. `kafka.cluster.id`: This is supplied by the user to identify the Kafka cluster, mainly for metrics purposes.
2. `kafka.cluster.id`: This is supplied by the user to identify the Kafka cluster that the consumer is reading from. This is used to determine the S3 prefix to read from,
and should match the cluster ID supplied by the segment uploader.
3. `offset.reset.policy`: This can be `EARLIEST`, `LATEST`, or `NONE`. Only setting this to `EARLIEST` will result in the consumer
falling back to reading from S3 if an offset is out of range in Kafka. Setting it to `NONE` will result in the consumer throwing an exception if an offset is out of range in Kafka.
Setting it to `LATEST` will result in the consumer resetting offsets to latest in Kafka if an offset is out of range in Kafka.
4. `storage.service.endpoint.provider.class`: The fully qualified class name of the class that provides the backing filesystem's endpoints for tiered storage consumption.
5. `storage.service.endpoint.s3.prefix.entropy.num.bits`: The number of bits of entropy to use for prefixing S3 keys. Make sure to set this to the same value as the segment uploader's equivalent config
in order to allow the consumer to read from the correct S3 location. More details on prefix entropy can be found in the [ts-segment-uploader README](../ts-segment-uploader/README.md).

## Usage
The consumer can be used the same way as a regular Kafka consumer. There are API gaps that are actively being addressed.
Expand Down
Loading

0 comments on commit a5a14dc

Please sign in to comment.