Skip to content

Commit

Permalink
Merge branch 'master' into andrewstevenson-patch-1
Browse files Browse the repository at this point in the history
  • Loading branch information
stheppi authored Apr 24, 2024
2 parents 16930ad + d30c75a commit a003877
Show file tree
Hide file tree
Showing 21 changed files with 314 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -278,4 +278,11 @@ class AwsS3StorageInterface(connectorTaskId: ConnectorTaskId, s3Client: S3Client
lastModified
.map(lmValue => S3FileMetadata(fileName, lmValue))
.orElse(getMetadata(bucket, fileName).map(oMeta => S3FileMetadata(fileName, oMeta.lastModified)).toOption)

/**
* Gets the system name for use in log messages.
*
* @return
*/
override def system(): String = "S3"
}
Original file line number Diff line number Diff line change
Expand Up @@ -220,4 +220,11 @@ class DatalakeStorageInterface(connectorTaskId: ConnectorTaskId, client: DataLak
}.toEither.leftMap(FileDeleteError(_, file))
}.sequence
} yield ()

/**
* Gets the system name for use in log messages.
*
* @return
*/
override def system(): String = "Azure Datalake"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2017-2024 Lenses.io Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.connect.cloud.common.config.kcqlprops

import enumeratum.Enum
import enumeratum.EnumEntry
import io.lenses.streamreactor.connect.cloud.common.config.kcqlprops.PropsKeyEnum.KeyNameFormatVersion
import io.lenses.streamreactor.connect.config.kcqlprops.KcqlProperties

sealed trait KeyNamerVersion extends EnumEntry

object KeyNamerVersion extends Enum[KeyNamerVersion] {

case object V0 extends KeyNamerVersion

case object V1 extends KeyNamerVersion

def apply(
props: KcqlProperties[PropsKeyEntry, PropsKeyEnum.type],
default: KeyNamerVersion,
): KeyNamerVersion = fromProps(props).getOrElse(default)

private def fromProps(props: KcqlProperties[PropsKeyEntry, PropsKeyEnum.type]): Option[KeyNamerVersion] =
props.getOptionalInt(KeyNameFormatVersion).collect {
case 0 => V0
case 1 => V1
}

override def values: IndexedSeq[KeyNamerVersion] = findValues
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,6 @@ object PropsKeyEnum extends Enum[PropsKeyEntry] {
case object FlushCount extends PropsKeyEntry("flush.count")

case object FlushInterval extends PropsKeyEntry("flush.interval")

case object KeyNameFormatVersion extends PropsKeyEntry("key.name.format.version")
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ object FormatWriter {
outputStream <- Try(new BuildLocalOutputStream(toBufferedOutputStream(path), topicPartition))
writer <- Try(FormatWriter(formatSelection, outputStream))
} yield writer
}.toEither.leftMap(ex => NonFatalCloudSinkError(ex.getMessage, ex))
}.toEither.leftMap(ex => new NonFatalCloudSinkError(ex.getMessage, ex.some))

def apply(
formatInfo: FormatSelection,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import io.lenses.streamreactor.connect.cloud.common.utils.MapUtils
import io.lenses.streamreactor.connect.cloud.common.utils.TimestampUtils
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.common.{ TopicPartition => KafkaTopicPartition }
import org.apache.kafka.connect.errors.ConnectException
import org.apache.kafka.connect.sink.SinkRecord
import org.apache.kafka.connect.sink.SinkTask

Expand Down Expand Up @@ -88,7 +89,7 @@ abstract class CloudSinkTask[MD <: FileMetadata, C <: CloudSinkConfig, CT](
if (error.rollBack()) {
rollback(error.topicPartitions())
}
throw new IllegalStateException(error.message(), error.exception())
throw new ConnectException(error.message(), error.exception().orNull)
case Right(_) =>
}

Expand Down Expand Up @@ -252,7 +253,7 @@ abstract class CloudSinkTask[MD <: FileMetadata, C <: CloudSinkConfig, CT](

def convertPropsToConfig(connectorTaskId: ConnectorTaskId, props: Map[String, String]): Either[Throwable, C]

def createWriterMan(props: Map[String, String]): Either[Throwable, WriterManager[MD]] =
private def createWriterMan(props: Map[String, String]): Either[Throwable, WriterManager[MD]] =
for {
config <- convertPropsToConfig(connectorTaskId, props)
s3Client <- createClient(config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
*/
package io.lenses.streamreactor.connect.cloud.common.sink

import cats.implicits.catsSyntaxOptionId
import io.lenses.streamreactor.connect.cloud.common.model.TopicPartition

trait SinkError {
def exception(): Throwable
def exception(): Option[Throwable]

def message(): String

Expand All @@ -28,7 +29,7 @@ trait SinkError {
}

// Cannot be retried, must be cleaned up
case class FatalCloudSinkError(message: String, exception: Throwable, topicPartition: TopicPartition)
case class FatalCloudSinkError(message: String, exception: Option[Throwable], topicPartition: TopicPartition)
extends SinkError {

override def rollBack(): Boolean = true
Expand All @@ -39,12 +40,12 @@ case class FatalCloudSinkError(message: String, exception: Throwable, topicParti
case object FatalCloudSinkError {

def apply(message: String, topicPartition: TopicPartition): FatalCloudSinkError =
FatalCloudSinkError(message, new IllegalStateException(message), topicPartition)
FatalCloudSinkError(message, Option.empty, topicPartition)

}

// Can be retried
case class NonFatalCloudSinkError(message: String, exception: Throwable) extends SinkError {
case class NonFatalCloudSinkError(message: String, exception: Option[Throwable]) extends SinkError {

override def rollBack(): Boolean = false

Expand All @@ -53,10 +54,10 @@ case class NonFatalCloudSinkError(message: String, exception: Throwable) extends

case object NonFatalCloudSinkError {
def apply(message: String): NonFatalCloudSinkError =
NonFatalCloudSinkError(message, new IllegalStateException(message))
NonFatalCloudSinkError(message, Option.empty)

def apply(exception: Throwable): NonFatalCloudSinkError =
NonFatalCloudSinkError(exception.getMessage, exception)
NonFatalCloudSinkError(exception.getMessage, exception.some)
}

case object BatchCloudSinkError {
Expand All @@ -76,11 +77,10 @@ case class BatchCloudSinkError(
nonFatal: Set[NonFatalCloudSinkError],
) extends SinkError {

override def exception(): Throwable =
override def exception(): Option[Throwable] =
fatal.++(nonFatal)
.headOption
.map(_.exception)
.getOrElse(new IllegalStateException("No exception found in BatchCloudSinkError"))
.flatMap { ex: SinkError => ex.exception() }

override def message(): String =
"fatal:\n" + fatal.map(_.message).mkString("\n") + "\n\nnonFatal:\n" + nonFatal.map(_.message).mkString(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ object SinkPropsSchema {
FlushCount -> LongPropsSchema,
FlushSize -> LongPropsSchema,
FlushInterval -> IntPropsSchema,
KeyNameFormatVersion -> IntPropsSchema,
)

val schema: KcqlPropsSchema[PropsKeyEntry, PropsKeyEnum.type] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.lenses.streamreactor.connect.cloud.common.sink.naming

import cats.implicits.catsSyntaxEitherId
import cats.implicits.catsSyntaxOptionId
import cats.implicits.toTraverseOps
import io.lenses.streamreactor.connect.cloud.common.config.FormatSelection
import io.lenses.streamreactor.connect.cloud.common.formats.writer.MessageDetail
Expand Down Expand Up @@ -94,7 +95,7 @@ class CloudKeyNamer(
.toFile
createFileAndParents(file)
file
}.toEither.left.map(ex => FatalCloudSinkError(ex.getMessage, ex, topicPartition))
}.toEither.left.map(ex => new FatalCloudSinkError(ex.getMessage, ex.some, topicPartition))

private def buildPartitionPrefix(partitionValues: Map[PartitionField, String]): String =
partitionSelection.partitions.map {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
*/
package io.lenses.streamreactor.connect.cloud.common.sink.naming

import cats.implicits.toTraverseOps
import io.lenses.streamreactor.connect.cloud.common.config.ConnectorTaskId
import io.lenses.streamreactor.connect.cloud.common.model.Offset
import io.lenses.streamreactor.connect.cloud.common.model.TopicPartition
import io.lenses.streamreactor.connect.cloud.common.model.TopicPartitionOffset

import scala.util.Try

Expand All @@ -34,10 +37,26 @@ object IndexFilenames {
def indexForTopicPartition(topic: String, partition: Int)(implicit connectorTaskId: ConnectorTaskId): String =
f".indexes/${connectorTaskId.name}/$topic/$partition%05d/"

/**
* Parses the filename of the index file, converting it to a TopicPartitionOffset
*
* @param maybeIndex option of the index filename
* @return either an error, or a TopicPartitionOffset
*/
def indexToOffset(
topicPartition: TopicPartition,
maybeIndex: Option[String],
): Either[Throwable, Option[TopicPartitionOffset]] =
maybeIndex.map(index =>
for {
offset <- IndexFilenames.offsetFromIndex(index)
} yield topicPartition.withOffset(offset),
).sequence

/**
* Parses an index filename and returns an offset
*/
def offsetFromIndex(indexFilename: String): Either[Throwable, Offset] = {
private def offsetFromIndex(indexFilename: String): Either[Throwable, Offset] = {
val lastIndex = indexFilename.lastIndexOf("/")
val (_, last) = indexFilename.splitAt(lastIndex + 1)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ import io.lenses.streamreactor.connect.cloud.common.sink.FatalCloudSinkError
import io.lenses.streamreactor.connect.cloud.common.sink.NonFatalCloudSinkError
import io.lenses.streamreactor.connect.cloud.common.sink.SinkError
import io.lenses.streamreactor.connect.cloud.common.sink.naming.IndexFilenames
import io.lenses.streamreactor.connect.cloud.common.storage.FileDeleteError
import io.lenses.streamreactor.connect.cloud.common.storage.FileLoadError
import io.lenses.streamreactor.connect.cloud.common.storage.FileMetadata
import io.lenses.streamreactor.connect.cloud.common.storage.StorageInterface
import io.lenses.streamreactor.connect.cloud.common.sink.naming.IndexFilenames.indexToOffset
import io.lenses.streamreactor.connect.cloud.common.sink.seek.IndexManagerErrors.corruptStorageState
import io.lenses.streamreactor.connect.cloud.common.sink.seek.IndexManagerErrors.fileDeleteError
import io.lenses.streamreactor.connect.cloud.common.storage._

class IndexManager[SM <: FileMetadata](
maxIndexes: Int,
Expand All @@ -55,9 +55,9 @@ class IndexManager[SM <: FileMetadata](
indexFileLocation.some,
)
.leftMap { e =>
val logLine = s"Couldn't retrieve listing for (${mostRecentIndexFile}})"
val logLine = s"Couldn't retrieve listing for ($mostRecentIndexFile})"
logger.error("[{}] {}", connectorTaskId.show, logLine, e.exception)
new NonFatalCloudSinkError(logLine, e.exception)
new NonFatalCloudSinkError(logLine, e.exception.some)
}
.flatMap {
case None => 0.asRight
Expand Down Expand Up @@ -127,13 +127,13 @@ class IndexManager[SM <: FileMetadata](
}

/**
* Seeks the filesystem to find the latyest offsets for a topic/partition.
* Seeks the filesystem to find the latest offsets for a topic/partition.
*
* @param topicPartition the TopicPartition for which to retrieve the offsets
* @param bucket the configured bucket
* @return either a SinkError or an option to a TopicPartitionOffset with the seek result.
*/
def seek(
def initialSeek(
topicPartition: TopicPartition,
bucket: String,
): Either[SinkError, Option[TopicPartitionOffset]] = {
Expand All @@ -144,7 +144,7 @@ class IndexManager[SM <: FileMetadata](
)
.leftMap { e =>
logger.error("Error retrieving listing", e.exception)
new NonFatalCloudSinkError("Couldn't retrieve listing", e.exception)
new NonFatalCloudSinkError("Couldn't retrieve listing", Option(e.exception))
}
.flatMap {
case None => Option.empty[TopicPartitionOffset].asRight[SinkError]
Expand All @@ -164,48 +164,33 @@ class IndexManager[SM <: FileMetadata](
topicPartition: TopicPartition,
bucket: String,
indexes: Seq[String],
) = {

/**
* Parses the filename of the index file, converting it to a TopicPartitionOffset
*
* @param maybeIndex option of the index filename
* @return either an error, or a TopicPartitionOffset
*/
def indexToOffset(maybeIndex: Option[String]): Either[Throwable, Option[TopicPartitionOffset]] =
maybeIndex match {
case Some(index) =>
for {
offset <- IndexFilenames.offsetFromIndex(index)
} yield Some(topicPartition.withOffset(offset))
case None => Option.empty[TopicPartitionOffset].asRight
}
): Either[NonFatalCloudSinkError, Option[TopicPartitionOffset]] = {
for {
validIndex <- scanIndexes(bucket, indexes)
indexesToDelete = indexes.filterNot(validIndex.contains)
_ <- storageInterface.deleteFiles(bucket, indexesToDelete)
offset <- indexToOffset(topicPartition, validIndex).leftMap(FileNameParseError(_, s"$validIndex"))
} yield {
logger.info("[{}] Seeked offset {} for TP {}", connectorTaskId.show, offset, topicPartition)
offset
}
}.leftMap(e => handleSeekAndCleanErrors(e))

{
for {
validIndex <- scanIndexes(bucket, indexes)
indexesToDelete = indexes.filterNot(validIndex.contains)
_ <- storageInterface.deleteFiles(bucket, indexesToDelete)
offset <- indexToOffset(validIndex)
} yield {
logger.info("[{}] Seeked offset {} for TP {}", connectorTaskId.show, offset, topicPartition)
offset
}
}.leftMap {
def handleSeekAndCleanErrors(uploadError: UploadError): NonFatalCloudSinkError =
uploadError match {
case err: FileLoadError =>
val logLine = s"File load error while seeking: ${err.message()}"
logger.error(s"[{}] {}", connectorTaskId.show, logLine, err.exception)
new NonFatalCloudSinkError(logLine, err.exception)
NonFatalCloudSinkError(corruptStorageState(storageInterface.system()))
case err: FileDeleteError =>
val logLine = s"File delete error while seeking: ${err.message()}"
logger.error(s"[{}] {}", connectorTaskId.show, logLine, err.exception)
new NonFatalCloudSinkError(logLine, err.exception)
case err: Throwable =>
val logLine = s"Error while seeking: ${err.getMessage}"
NonFatalCloudSinkError(fileDeleteError(storageInterface.system()))
case err: FileNameParseError =>
val logLine = s"Error while seeking: ${err.message()}"
logger.error(s"[{}] {}", connectorTaskId.show, logLine, err)
new NonFatalCloudSinkError(logLine, err)
new NonFatalCloudSinkError(logLine, err.exception.some)
}
}

/**
* Given a bucket and a list of files, attempts to load them to establish the most recent valid index
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2017-2024 Lenses.io Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.connect.cloud.common.sink.seek

object IndexManagerErrors {

def corruptStorageState(system: String): String =
s"""
|The $system storage state is corrupted. The connector state is out of sync
|with the data. This could happen if the connector has been recreated and the data was deleted.
|Delete the connector's .index subfolder as well and restart the connector.""".stripMargin

def fileDeleteError(system: String): String =
s"""
|There was an issue deleting old index files from the indexes directory. This could happen if
|you have not granted the connector role appropriate delete permissions via the $system
|permissions model.""".stripMargin

}
Loading

0 comments on commit a003877

Please sign in to comment.