diff --git a/README.md b/README.md index bfa7e23..2865c10 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,9 @@ # Kafka Tiered Storage -This edition of Kafka tiered storage provides a Kafka broker independent approach to tiered storage. It consists of two main components: +Kafka Tiered Storage is a framework that allows [Apache Kafka](https://kafka.apache.org/) brokers to offload finalized log segments to a remote storage system. +This allows Kafka to maintain a smaller disk footprint and reduce the need for expensive storage on the brokers. +The framework also provides a Kafka client compatible consumer that can consume from both the broker and the remote storage system. + +Pinterest's implementation of Kafka Tiered Storage provides a Kafka broker independent approach to tiered storage. It consists of two main components: 1. [Uploader](ts-segment-uploader): A continuous process that runs on each Kafka broker and uploads finalized log segments to a remote storage system (e.g. Amazon S3, with unique prefix per Kafka cluster and topic). 2. [Consumer](ts-consumer): A Kafka client compatible consumer that consumes from both tiered storage log segments and Kafka cluster. @@ -7,10 +11,20 @@ A third module [ts-common](ts-common) contains common classes and interfaces tha Feel free to read into each module's README for more details. +## Why Tiered Storage? +[Apache Kafka](https://kafka.apache.org/) is a distributed event streaming platform that stores partitioned and replicated log segments on disk for +a configurable retention period. However, as data volume and/or retention periods grow, the disk footprint of Kafka clusters can become expensive. +Tiered Storage allows Kafka to offload finalized log segments to a more cost-effective remote storage system, reducing the need for expensive storage on the brokers. + +With Tiered Storage, you can: +1. Maintain a smaller overall broker footprint, reducing operational costs +2. Retain data for longer periods of time while avoiding horizontal and vertical scaling of Kafka clusters +3. Reduce CPU, network, and disk I/O utilization on brokers by reading directly from remote storage + ## Highlights - **Kafka Broker Independent**: The tiered storage solution is designed to be Kafka broker independent, meaning it runs as an independent process alongside the Kafka server process. Currently, it only supports ZooKeeper-based Kafka versions. KRaft support is WIP. - **Fault Tolerant**: Broker restarts, replacements, leadership changes, and other common Kafka operations / issues are handled gracefully. -- **Skip the broker entirely during consumption**: The consumer can read from both broker and Tiered Storage backend filesystem. When in TIERED_STORAGE_ONLY mode, the consumption loop does not touch the broker itself, allowing for reduction in broker resource utilization. +- **Skip the broker entirely during consumption**: The consumer can read from both broker and Tiered Storage backend filesystem. When in `TIERED_STORAGE_ONLY` mode, the consumption loop does not touch the broker itself, allowing for reduction in broker resource utilization. - **Pluggable Storage Backends**: The framework is designed to be backend-agnostic. Currently, only S3 is supported. More backend filesystems will be supported in the near future. - **S3 Partitioning**: Prefix-entropy (salting) is configurable out-of-the-box to allow for prefix-partitioned S3 buckets, allowing for better scalability by avoiding request rate hotspots. - **Metrics**: Comprehensive metrics are provided out-of-the-box for monitoring and alerting purposes. @@ -18,14 +32,28 @@ Feel free to read into each module's README for more details. # Quick Start Detailed quickstart instructions are available [here](docs/quickstart.md). +# Usage +Using Kafka Tiered Storage consists of the following high-level steps: +1. Have a remote storage system ready to accept reads and writes of log segments (e.g. Amazon S3 bucket) +2. Configure and start [ts-segment-uploader](ts-segment-uploader) on each Kafka broker +3. Use [ts-consumer](ts-consumer) to read from either the broker or the remote storage system +4. Monitor and manage the tiered storage system using the provided metrics and tools + +Feel free to read into each module's README for more details. + # Architecture ![Architecture](docs/images/architecture.png) # Current Status -This project is currently under active development. Some of our planned features and improvements: +**Kafka Tiered Storage is currently under active development and the APIs may change over time.** + +Kafka Tiered Storage currently supports the following remote storage systems: +- Amazon S3 + +Some of our planned features and improvements: - KRaft support -- More backend filesystems (e.g. HDFS) +- More storage system support (e.g. HDFS) - Integration with [PubSub Client](https://github.com/pinterest/psc) (backend-agnostic client library) Contributions are always welcome! diff --git a/pom.xml b/pom.xml index a70d883..0200ea2 100644 --- a/pom.xml +++ b/pom.xml @@ -1,134 +1,41 @@ - + 4.0.0 - - com.pinterest.kafka.tieredstorage - kafka-tiered-storage - 0.0.2-SNAPSHOT - ../pom.xml - - - ts-common - + com.pinterest.kafka.tieredstorage + kafka-tiered-storage + 0.0.2-SNAPSHOT + pom + kafka-tiered-storage + Kafka Tiered Storage + + ts-segment-uploader + ts-consumer + ts-common + ts-examples + - 20 - 20 UTF-8 - - - - org.apache.kafka - kafka-clients - 2.3.1 - - - org.apache.kafka - kafka_2.12 - 2.3.1 - - - org.apache.logging.log4j - log4j-core - 2.17.1 - - - org.apache.logging.log4j - log4j-api - 2.17.1 - - - log4j - log4j - 1.2.17 - - - com.google.code.gson - gson - 2.8.6 - - - software.amazon.awssdk - s3 - 2.17.273 - - - io.dropwizard.metrics - metrics-core - 4.1.2 - - - io.dropwizard.metrics - metrics-jvm - 4.1.2 - - - com.google.guava - guava - 29.0-jre - - - org.junit.jupiter - junit-jupiter - RELEASE - test - - - + + + + junit + junit + 4.12 + + + - - maven-dependency-plugin - - - package - - copy-dependencies - - - ${project.build.directory}/lib - - - - - - org.apache.maven.plugins - maven-source-plugin - 3.2.1 - - - attach-sources - - jar - - - - - - org.apache.maven.plugins - maven-javadoc-plugin - 2.9.1 - - - attach-javadocs - - jar - - - - org.apache.maven.plugins - maven-surefire-plugin - 3.0.0-M5 + maven-compiler-plugin + 3.3 - - ${argLine} + 1.8 + 1.8 - - \ No newline at end of file + diff --git a/ts-consumer/README.md b/ts-consumer/README.md index fe40018..a7e7da6 100644 --- a/ts-consumer/README.md +++ b/ts-consumer/README.md @@ -18,11 +18,14 @@ The consumer can be configured to read records from Kafka brokers, S3, or both. ## Configuration The consumer can be configured the same way as a regular Kafka consumer. In addition, the following properties can be set: 1. `tiered.storage.mode`: The consumption mode. It can be one of the following values: `KAFKA_PREFERRED`, `KAFKA_ONLY`, `TIERED_STORAGE_PREFERRED`, `TIERED_STORAGE_ONLY`. -2. `kafka.cluster.id`: This is supplied by the user to identify the Kafka cluster, mainly for metrics purposes. +2. `kafka.cluster.id`: This is supplied by the user to identify the Kafka cluster that the consumer is reading from. This is used to determine the S3 prefix to read from, +and should match the cluster ID supplied by the segment uploader. 3. `offset.reset.policy`: This can be `EARLIEST`, `LATEST`, or `NONE`. Only setting this to `EARLIEST` will result in the consumer falling back to reading from S3 if an offset is out of range in Kafka. Setting it to `NONE` will result in the consumer throwing an exception if an offset is out of range in Kafka. Setting it to `LATEST` will result in the consumer resetting offsets to latest in Kafka if an offset is out of range in Kafka. 4. `storage.service.endpoint.provider.class`: The fully qualified class name of the class that provides the backing filesystem's endpoints for tiered storage consumption. +5. `storage.service.endpoint.s3.prefix.entropy.num.bits`: The number of bits of entropy to use for prefixing S3 keys. Make sure to set this to the same value as the segment uploader's equivalent config +in order to allow the consumer to read from the correct S3 location. More details on prefix entropy can be found in the [ts-segment-uploader README](../ts-segment-uploader/README.md). ## Usage The consumer can be used the same way as a regular Kafka consumer. There are API gaps that are actively being addressed. diff --git a/ts-segment-uploader/README.md b/ts-segment-uploader/README.md index 3431f39..20d5969 100644 --- a/ts-segment-uploader/README.md +++ b/ts-segment-uploader/README.md @@ -2,40 +2,75 @@ ## Overview This module contains the uploader code that is used to upload Kafka log segments to the backing tiered storage filesystem. -It is designed to be a long-running, standalone, and independent process that runs on each Kafka broker. -It uploads finalized log segments to the backing filesystem. -Only S3 is supported as the backing filesystem at the moment. +It is designed to be a long-running and independent process that runs on each Kafka broker in order to upload +finalized (closed) log segments to a remote storage system. These log segments can later be read by a +[TieredStorageConsumer](../ts-consumer) even if the log segments have already been deleted from local storage on the broker, provided +that the retention period of the segments on remote storage system is longer than that of the Kafka topic itself. ## Architecture ![Uploader Architecture](../docs/images/uploader.png) +The uploader process runs alongside the Kafka server process on every broker, and is responsible for uploading log segments +to the configured remote storage system. + +The key components to the uploader are: +1. [KafkaSegmentUploader](src/main/java/com/pinterest/kafka/tieredstorage/uploader/KafkaSegmentUploader.java): The entrypoint class +2. [DirectoryTreeWatcher](src/main/java/com/pinterest/kafka/tieredstorage/uploader/DirectoryTreeWatcher.java): Watches the Kafka log directory for log rotations (closing and opening of new log segments) +3. [KafkaLeadershipWatcher](src/main/java/com/pinterest/kafka/tieredstorage/uploader/KafkaLeadershipWatcher.java): Monitors leadership changes in the Kafka cluster and updates the watched filepaths as necessary +4. [FileUploader](src/main/java/com/pinterest/kafka/tieredstorage/uploader/S3FileUploader.java): Uploads the log segments to the remote storage system +5. [KafkaEnvironmentProvider](src/main/java/com/pinterest/kafka/tieredstorage/uploader/KafkaEnvironmentProvider.java): Provides information about the Kafka environment for the uploader + +If a broker is currently a leader for a topic-partition, the uploader will watch the log directory for that topic-partition. +When a previously-active log segment is closed, the uploader will upload the closed segment to the remote storage system. Typically, +a log segment is closed when it reaches a time or size-based threshold, configurable via Kafka broker properties. + +Each upload will consist of 3 parts: the segment file, the index file, and the time index file. Once these files are successfully +uploaded, an `offset.wm` file will also be uploaded for that topic partition which contains the offset of the last uploaded log segment. +This is used to resume uploads from the last uploaded offset in case of a failure or restart. + ## Usage The segment uploader entrypoint class is `KafkaSegmentUploader`. At a minimum, running the segment uploader requires: -1. `-DkafkaEnvironmentProviderClass`: This system property should be provided upon running the segment uploader. It should -provide the FQDN of the class that provides the Kafka environment, which should be an implementation of `KafkaEnvironmentProvider`. -2. Config directory: this should be provided as the first argument to the segment uploader. It should point to a -directory which contains a `.properties` file that contains the configurations for the segment uploader. The file that +1. **KafkaEnvironmentProvider**: This system property should be provided upon running the segment uploader (i.e. `-DkafkaEnvironmentProvider`). It should +provide the FQDN of the class that provides the Kafka environment, which should be an implementation of [KafkaEnvironmentProvider](src/main/java/com/pinterest/kafka/tieredstorage/uploader/KafkaEnvironmentProvider.java). +2. **Uploader Configurations**: The directory in which uploader configurations live should be provided as the first argument to the segment uploader main class. +It should point to a directory which contains a `.properties` file that contains the configurations for the segment uploader. The file that the uploader chooses to use in this directory is determined by the Kafka cluster ID that is provided by `clusterId()` method for the `KafkaEnvironmentProvider` implementation. Therefore, the properties file should be named as `.properties`. +3. **Storage Endpoint Configurations**: The segment uploader requires a [StorageServiceEndpointProvider](../ts-common/src/main/java/com/pinterest/kafka/tieredstorage/common/discovery/StorageServiceEndpointProvider.java) +to be specified in the properties file. This provider's implementation dictates where each topic's log segments should be uploaded to. -## Configuration -The segment uploader configurations are passed via the aforementioned properties file. Available configurations -are listed in `SegmentUploaderConfiguration` class. +For a more detailed guide on how to run the segment uploader, refer to the [Quickstart Guide](../docs/quickstart.md). + +## S3 Prefix Entropy +The segment uploader supports prefix entropy (salting) for S3 uploads. This is useful for avoiding request rate hotspots on S3. To enable +prefix entropy, set the `ts.segment.uploader.s3.prefix.entropy.bits` configuration in the properties file. This configuration specifies +the number of bits of entropy to use to inject in the prefix of keys uploaded to S3. -## Architecture Overview -The segment uploader is designed to be a long-running process that uploads Kafka log segments to the backing tiered storage filesystem. -It is designed to be run as a standalone process on every Kafka broker, and only uploads log segments for a topic partition -if it is the leader for that partition. +The bits are generated via an MD5 (128-bit) hash of the cluster ID, topic name, and partition ID of the log segment being uploaded. The hash is then +truncated to the leftmost X number of bits specified in the configuration. -The segment uploader relies on Zookeeper to keep track of leadership changes. +Here is a simple example: Let's assume that with 0 bits of entropy (i.e. no entropy at all), a log segment object uploaded to S3 +would be identified by the URI: +``` +s3://my-bucket/my-prefix/my-cluster/my-topic-0/00000000000000000000.log +``` -The segment uploader places a watch on the Kafka log directory and its subdirectories for filesystem changes. -When a new log segment is created, the uploader will upload the previous log segment that was just closed to the backing filesystem. +With 3 bits of entropy, the leftmost 3 bits of the MD5 hash generated by the cluster, topic, and partition combination +would be `111`. Therefore, the object would be uploaded to: +``` +s3://my-bucket/my-prefix/111/my-cluster/my-topic-0/00000000000000000000.log +``` -Each upload will consist of 3 parts: the segment file, the index file, and the time index file. Once these files are successfully -uploaded, an `offset.wm` file will also be uploaded for that topic partition which contains the offset of the last uploaded log segment. -This is used to resume uploads from the last uploaded offset in case of a failure or restart. +With prefix entropy, one can pre-partition the S3 bucket using the prefix bits to avoid request rate hotspots on S3. + +Because the hash is generated via the cluster, topic, and partition ID combination, all log segments (and their associated `.index`, `.timeindex`, and `offset.wm` files) +for a given topic-partition on a given cluster will be uploaded to the same prefix in S3. This way, the [TieredStorageConsumer](../ts-consumer/src/main/java/com/pinterest/kafka/tieredstorage/consumer/TieredStorageConsumer.java) +can deterministically reconstruct the entire key during consumption. Due to this reason, **it is important to ensure that the `ts.segment.uploader.s3.prefix.entropy.bits` +configuration is consistent across all brokers in the Kafka cluster, and consistent in the consumer's configuration as well**. +## Configuration +The segment uploader configurations are passed via the aforementioned properties file. Available configurations +are listed in [SegmentUploaderConfiguration](src/main/java/com/pinterest/kafka/tieredstorage/uploader/SegmentUploaderConfiguration.java) class. ## Build To build with maven: