Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the S3 sink for consumer offsets topic. #1010

Merged
merged 2 commits into from
Dec 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ object S3ConfigSettings {
val CUSTOM_ENDPOINT: String = s"$CONNECTOR_PREFIX.custom.endpoint"
val ENABLE_VIRTUAL_HOST_BUCKETS: String = s"$CONNECTOR_PREFIX.vhost.bucket"

val PROFILES: String = s"$CONNECTOR_PREFIX.config.profiles"

val KCQL_CONFIG = s"$CONNECTOR_PREFIX.$KCQL_PROP_SUFFIX"
val KCQL_DOC =
"Contains the Kafka Connect Query Language describing the flow from Apache Kafka topics to Apache Hive tables."
Expand Down Expand Up @@ -92,4 +90,8 @@ object S3ConfigSettings {
val SOURCE_ORDERING_TYPE_DOC: String = "AlphaNumeric (the default)"
val SOURCE_ORDERING_TYPE_DEFAULT: String = "AlphaNumeric"

// used by the consumer groups sink
val S3_BUCKET_CONFIG: String = s"$CONNECTOR_PREFIX.location"
val S3_BUCKET_DOC: String =
"Specify the S3 bucket, and optionally, a prefix, where Kafka consumer group offsets will be stored."
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2017-2023 Lenses.io Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.connect.aws.s3.sink

import com.typesafe.scalalogging.LazyLogging
import io.lenses.streamreactor.common.utils.JarManifest
import io.lenses.streamreactor.connect.aws.s3.config.S3ConfigSettings
import io.lenses.streamreactor.connect.aws.s3.sink.config.S3ConsumerGroupsSinkConfigDef
import io.lenses.streamreactor.connect.cloud.common.config.TaskDistributor
import org.apache.kafka.common.config.ConfigDef
import org.apache.kafka.connect.connector.Task
import org.apache.kafka.connect.sink.SinkConnector

import java.util

/**
* A connector which stores the latest Kafka consumer group offset from "__consumer_offsets" topic in S3.
*/
class S3ConsumerGroupsSinkConnector extends SinkConnector with LazyLogging {

private val manifest = JarManifest(getClass.getProtectionDomain.getCodeSource.getLocation)
private val props: util.Map[String, String] = new util.HashMap[String, String]()

override def version(): String = manifest.version()

override def taskClass(): Class[_ <: Task] = classOf[S3ConsumerGroupsSinkTask]

override def config(): ConfigDef = S3ConsumerGroupsSinkConfigDef.config

override def start(props: util.Map[String, String]): Unit = {
logger.info(s"Creating S3 consumer groups sink connector")
this.props.putAll(props)
}

override def stop(): Unit = ()

override def taskConfigs(maxTasks: Int): util.List[util.Map[String, String]] =
new TaskDistributor(S3ConfigSettings.CONNECTOR_PREFIX).distributeTasks(props, maxTasks)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright 2017-2023 Lenses.io Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.connect.aws.s3.sink

import cats.implicits.toShow
import io.lenses.streamreactor.common.errors.ErrorHandler
import io.lenses.streamreactor.common.utils.AsciiArtPrinter.printAsciiHeader
import io.lenses.streamreactor.common.utils.JarManifest
import io.lenses.streamreactor.connect.aws.s3.auth.AwsS3ClientCreator
import io.lenses.streamreactor.connect.aws.s3.config.S3ConfigSettings.CONNECTOR_PREFIX
import io.lenses.streamreactor.connect.aws.s3.sink.config.S3ConsumerGroupsSinkConfig
import io.lenses.streamreactor.connect.aws.s3.storage.AwsS3Uploader
import io.lenses.streamreactor.connect.cloud.common.config.ConnectorTaskId
import io.lenses.streamreactor.connect.cloud.common.config.ConnectorTaskIdCreator
import io.lenses.streamreactor.connect.cloud.common.consumers.ConsumerGroupsWriter
import io.lenses.streamreactor.connect.cloud.common.utils.MapUtils
import org.apache.kafka.common.{ TopicPartition => KafkaTopicPartition }
import org.apache.kafka.connect.sink.SinkRecord
import org.apache.kafka.connect.sink.SinkTask

import java.util
import scala.jdk.CollectionConverters.CollectionHasAsScala
import scala.jdk.CollectionConverters.MapHasAsJava
import scala.jdk.CollectionConverters.MapHasAsScala

/**
* A Kafka connector designed to persist the most recent Kafka consumer group offsets from the `__consumer_offsets` topic to an S3 storage.
* The connector adheres to an eventually consistent model. It captures and stores the latest offset for each partition within each consumer group.
* These offsets are organized in S3 objects using the following key structure: `bucket/prefix.../consumerGroup/topic/partition`, with the offset value represented as a long.
*
* However, it's important to note that the connector does not actively track consumer groups that drop topic subscriptions or unsubscribe from topics. As a result, it does not automatically remove redundant keys.
* Also the writes are eventually consistent. Depending on Connect replaying messages, the offsets may be written multiple times.
* But since the s3 key is unique for group-topic-partition the last write will be the latest offset.
*/

class S3ConsumerGroupsSinkTask extends SinkTask with ErrorHandler {

private val manifest = JarManifest(getClass.getProtectionDomain.getCodeSource.getLocation)

private var connectorTaskId: ConnectorTaskId = _
private var writerManager: ConsumerGroupsWriter = _

override def version(): String = manifest.version()

override def start(fallbackProps: util.Map[String, String]): Unit = {

printAsciiHeader(manifest, "/aws-s3-cg-sink-ascii.txt")

logger.debug(s"[{}] S3ConsumerGroupSinkTask.start", fallbackProps.get("name"))

val contextProps = Option(context).flatMap(c => Option(c.configs())).map(_.asScala.toMap).getOrElse(Map.empty)
val props = MapUtils.mergeProps(contextProps, fallbackProps.asScala.toMap).asJava
(for {
taskId <- new ConnectorTaskIdCreator(CONNECTOR_PREFIX).fromProps(fallbackProps)
config <- S3ConsumerGroupsSinkConfig.fromProps(props)
s3Client <- AwsS3ClientCreator.make(config.config)
uploader = new AwsS3Uploader(s3Client, taskId)
} yield new ConsumerGroupsWriter(config.location, uploader, taskId) -> taskId) match {
case Left(value) => throw value
case Right((writer, taskId)) =>
writerManager = writer
connectorTaskId = taskId
}
}

override def put(records: util.Collection[SinkRecord]): Unit =
writerManager.write(records.asScala.toList) match {
case Left(ex) =>
logger.error(s"[{}] Failed to write records to S3",
Option(connectorTaskId).map(_.show).getOrElse("Unnamed"),
ex,
)
throw ex
case Right(_) => ()
}

override def close(partitions: util.Collection[KafkaTopicPartition]): Unit = {
logger.debug(
"[{}] S3ConsumerGroupsSinkTask.close with {} partitions",
Option(connectorTaskId).map(_.show).getOrElse("Unnamed"),
partitions.size(),
)

Option(writerManager).foreach(_.close())
}

override def stop(): Unit = {
logger.debug("[{}] Stop", Option(connectorTaskId).map(_.show).getOrElse("Unnamed"))

Option(writerManager).foreach(_.close())
writerManager = null
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2017-2023 Lenses.io Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.connect.aws.s3.sink.config

import io.lenses.streamreactor.connect.aws.s3.config.S3ConfigSettings._
import io.lenses.streamreactor.connect.aws.s3.config._
import io.lenses.streamreactor.connect.cloud.common.config.PropertiesHelper
import io.lenses.streamreactor.connect.cloud.common.consumers.CloudObjectKey

import java.util

case class S3ConsumerGroupsSinkConfig(
location: CloudObjectKey,
config: S3Config,
)

object S3ConsumerGroupsSinkConfig extends PropertiesHelper {
def fromProps(
props: util.Map[String, String],
): Either[Throwable, S3ConsumerGroupsSinkConfig] =
S3ConsumerGroupsSinkConfig(S3ConsumerGroupsSinkConfigDef(props))

def apply(
s3ConfigDefBuilder: S3ConsumerGroupsSinkConfigDef,
): Either[Throwable, S3ConsumerGroupsSinkConfig] =
S3ConsumerGroupsSinkConfig.from(
s3ConfigDefBuilder.getParsedValues,
)

def from(props: Map[String, _]): Either[Throwable, S3ConsumerGroupsSinkConfig] =
for {
bucketAndPrefix <- getStringEither(props, S3_BUCKET_CONFIG)
bucket <- CloudObjectKey.from(bucketAndPrefix)
_ <- AuthMode.withNameInsensitiveEither(getString(props, AUTH_MODE).getOrElse(AuthMode.Default.toString))
} yield {
S3ConsumerGroupsSinkConfig(
bucket,
S3Config(props),
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Copyright 2017-2023 Lenses.io Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.connect.aws.s3.sink.config

import io.lenses.streamreactor.common.config.base.traits.BaseConfig
import io.lenses.streamreactor.connect.aws.s3.config.S3ConfigSettings._
import io.lenses.streamreactor.connect.aws.s3.config._
import org.apache.kafka.common.config.ConfigDef
import org.apache.kafka.common.config.ConfigDef.Importance
import org.apache.kafka.common.config.ConfigDef.Type

import java.util
import scala.jdk.CollectionConverters._

object S3ConsumerGroupsSinkConfigDef {

val config: ConfigDef = new ConfigDef()
.define(S3_BUCKET_CONFIG,
Type.STRING,
Importance.HIGH,
S3_BUCKET_DOC,
"S3",
1,
ConfigDef.Width.LONG,
S3_BUCKET_CONFIG,
)
.define(
AWS_REGION,
Type.STRING,
"",
Importance.HIGH,
"AWS region",
)
.define(
AWS_ACCESS_KEY,
Type.PASSWORD,
"",
Importance.HIGH,
"AWS access key",
)
.define(
AWS_SECRET_KEY,
Type.PASSWORD,
"",
Importance.HIGH,
"AWS password key",
)
.define(
AUTH_MODE,
Type.STRING,
AuthMode.Default.toString,
Importance.HIGH,
"Authenticate mode, 'credentials' or 'default'",
)
.define(
CUSTOM_ENDPOINT,
Type.STRING,
"",
Importance.LOW,
"Custom S3-compatible endpoint (usually for testing)",
)
.define(
ENABLE_VIRTUAL_HOST_BUCKETS,
Type.BOOLEAN,
false,
Importance.LOW,
"Enable virtual host buckets",
).define(
HTTP_NBR_OF_RETRIES,
Type.INT,
HTTP_NBR_OF_RETIRES_DEFAULT,
Importance.MEDIUM,
HTTP_NBR_OF_RETRIES_DOC,
"Error",
2,
ConfigDef.Width.LONG,
HTTP_NBR_OF_RETRIES,
)
.define(
HTTP_ERROR_RETRY_INTERVAL,
Type.LONG,
HTTP_ERROR_RETRY_INTERVAL_DEFAULT,
Importance.MEDIUM,
HTTP_ERROR_RETRY_INTERVAL_DOC,
"Error",
3,
ConfigDef.Width.LONG,
HTTP_ERROR_RETRY_INTERVAL,
)
.define(HTTP_SOCKET_TIMEOUT, Type.LONG, HTTP_SOCKET_TIMEOUT_DEFAULT, Importance.LOW, HTTP_SOCKET_TIMEOUT_DOC)
.define(HTTP_CONNECTION_TIMEOUT,
Type.INT,
HTTP_CONNECTION_TIMEOUT_DEFAULT,
Importance.LOW,
HTTP_CONNECTION_TIMEOUT_DOC,
)
.define(
POOL_MAX_CONNECTIONS,
Type.INT,
POOL_MAX_CONNECTIONS_DEFAULT,
Importance.LOW,
POOL_MAX_CONNECTIONS_DOC,
)
}

case class S3ConsumerGroupsSinkConfigDef(props: util.Map[String, String])
extends BaseConfig(S3ConfigSettings.CONNECTOR_PREFIX, S3ConsumerGroupsSinkConfigDef.config, props) {
def getParsedValues: Map[String, _] = values().asScala.toMap

}
Loading
Loading