Skip to content

Commit

Permalink
Add logging on flushing (#57)
Browse files Browse the repository at this point in the history
* Add logging on flushing

* Adds the option to log metrics in the sink

Applies code formatting

* Fix the unit tests

* Fix CVE-2024-25638

* Fix compilation error

---------

Co-authored-by: stheppi <[email protected]>
  • Loading branch information
stheppi and stheppi authored Jul 26, 2024
1 parent 5b59c0f commit bb4e450
Show file tree
Hide file tree
Showing 22 changed files with 211 additions and 55 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2017-2024 Lenses.io Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.common.util;

import java.util.function.Supplier;
import java.util.function.Consumer;

public class TimeLogger {

// Method which wraps any function call and captures the time taken to execute the function in nanoseconds
// And calls a function provided to record the time taken
public static <T> T measureTime(String logInfo, Supplier<T> supplier, Consumer<String> recorder) {
long start = System.nanoTime();
try {
return supplier.get();
} finally {
long end = System.nanoTime();
String metricLog = String.format("Time taken to execute %s: %d ns", logInfo, end - start);
recorder.accept(metricLog);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ class S3AvroWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
batchDelete = true,
errorPolicy = ErrorPolicy(ErrorPolicyEnum.THROW),
connectorRetryConfig = new RetryConfig(1, 1L),
logMetrics = false,
)

"avro sink" should "write 2 records to avro format in s3" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ class S3JsonWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
batchDelete = true,
errorPolicy = ErrorPolicy(ErrorPolicyEnum.THROW),
connectorRetryConfig = new RetryConfig(1, 1L),
logMetrics = false,
)

val sink = writerManagerCreator.from(config)
Expand Down Expand Up @@ -173,6 +174,7 @@ class S3JsonWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
batchDelete = true,
errorPolicy = ErrorPolicy(ErrorPolicyEnum.THROW),
connectorRetryConfig = new RetryConfig(1, 1L),
logMetrics = false,
)

val sink = writerManagerCreator.from(config)
Expand Down Expand Up @@ -240,6 +242,7 @@ class S3JsonWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
batchDelete = true,
errorPolicy = ErrorPolicy(ErrorPolicyEnum.THROW),
connectorRetryConfig = new RetryConfig(1, 1L),
logMetrics = false,
)

val sink = writerManagerCreator.from(config)
Expand Down Expand Up @@ -313,6 +316,7 @@ class S3JsonWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
batchDelete = true,
errorPolicy = ErrorPolicy(ErrorPolicyEnum.THROW),
connectorRetryConfig = new RetryConfig(1, 1L),
logMetrics = false,
)

val sink = writerManagerCreator.from(config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ class S3ParquetWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyC
batchDelete = true,
errorPolicy = ErrorPolicy(ErrorPolicyEnum.THROW),
connectorRetryConfig = new RetryConfig(1, 1L),
logMetrics = false,
)

"parquet sink" should "write 2 records to parquet format in s3" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,13 @@ trait S3CommonConfigDef
Importance.LOW,
DELETE_MODE_DOC,
)
.define(
LOG_METRICS_CONFIG,
Type.BOOLEAN,
false,
Importance.LOW,
LOG_METRICS_DOC,
)
withConnectorRetryConfig(config)
withErrorPolicyConfig(config)
new KcqlSettings(javaConnectorPrefix).withSettings(config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,9 @@ object S3ConfigSettings {
val S3_BUCKET_CONFIG: String = s"$CONNECTOR_PREFIX.location"
val S3_BUCKET_DOC: String =
"Specify the S3 bucket, and optionally, a prefix, where Kafka consumer group offsets will be stored."

val LOG_METRICS_CONFIG: String = s"$CONNECTOR_PREFIX.log.metrics"
val LOG_METRICS_DOC: String =
"If true, the connector will log metrics to the logger. This is useful for debugging and performance tuning."

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package io.lenses.streamreactor.connect.aws.s3.sink.config

import io.lenses.streamreactor.common.config.base.RetryConfig
import io.lenses.streamreactor.common.errors.ErrorPolicy
import io.lenses.streamreactor.connect.aws.s3.config.S3ConfigSettings.LOG_METRICS_CONFIG
import io.lenses.streamreactor.connect.aws.s3.config.S3ConfigSettings.SEEK_MAX_INDEX_FILES
import io.lenses.streamreactor.connect.aws.s3.config.S3ConnectionConfig
import io.lenses.streamreactor.connect.cloud.common.config.ConnectorTaskId
Expand Down Expand Up @@ -55,6 +56,7 @@ object S3SinkConfig extends PropsToConfigConverter[S3SinkConfig] {
offsetSeekerOptions = OffsetSeekerOptions(
s3ConfigDefBuilder.getInt(SEEK_MAX_INDEX_FILES),
)
logMetrics = s3ConfigDefBuilder.getBoolean(LOG_METRICS_CONFIG)
} yield S3SinkConfig(
S3ConnectionConfig(s3ConfigDefBuilder.getParsedValues),
sinkBucketOptions,
Expand All @@ -63,6 +65,7 @@ object S3SinkConfig extends PropsToConfigConverter[S3SinkConfig] {
s3ConfigDefBuilder.batchDelete(),
errorPolicy = s3ConfigDefBuilder.getErrorPolicyOrDefault,
connectorRetryConfig = s3ConfigDefBuilder.getRetryConfig,
logMetrics = logMetrics,
)

}
Expand All @@ -75,4 +78,5 @@ case class S3SinkConfig(
batchDelete: Boolean,
errorPolicy: ErrorPolicy,
connectorRetryConfig: RetryConfig,
logMetrics: Boolean,
) extends CloudSinkConfig[S3ConnectionConfig]
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class S3CommonConfigDefTest extends AnyFlatSpec with Matchers with EitherValues
}
"CommonConfigDef" should "parse original properties" in {
val resultMap = commonConfigDef.config.parse(DefaultProps.asJava).asScala
resultMap should have size 18
resultMap should have size 19
DeprecatedProps.filterNot { case (k, _) => k == KCQL_CONFIG }.foreach {
case (k, _) => resultMap.get(k) should be(None)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class S3ConfigSettingsTest extends AnyFlatSpec with Matchers with LazyLogging {
val configKeys =
S3SinkConfigDef.config.configKeys().keySet().asScala ++ S3SourceConfigDef.config.configKeys().keySet().asScala

configKeys.size shouldBe 47
configKeys.size shouldBe 49
configKeys.foreach {
k => k.toLowerCase should be(k)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,8 @@ object AzureConfigSettings {
val POOL_MAX_CONNECTIONS_DOC = "Max connections in pool. -1: Use default according to underlying client."
val POOL_MAX_CONNECTIONS_DEFAULT: Int = -1

val LOG_METRICS_CONFIG: String = s"$CONNECTOR_PREFIX.log.metrics"
val LOG_METRICS_DOC: String =
"If true, the connector will log metrics to the logger. This is useful for debugging and performance tuning."

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocation
import io.lenses.streamreactor.connect.cloud.common.sink.config.CloudSinkBucketOptions
import io.lenses.streamreactor.connect.cloud.common.sink.config.OffsetSeekerOptions
import io.lenses.streamreactor.connect.datalake.config.AzureConnectionConfig
import io.lenses.streamreactor.connect.datalake.config.AzureConfigSettings.LOG_METRICS_CONFIG
import io.lenses.streamreactor.connect.datalake.config.AzureConfigSettings.SEEK_MAX_INDEX_FILES

object DatalakeSinkConfig extends PropsToConfigConverter[DatalakeSinkConfig] {
Expand All @@ -51,13 +52,15 @@ object DatalakeSinkConfig extends PropsToConfigConverter[DatalakeSinkConfig] {
offsetSeekerOptions = OffsetSeekerOptions(
s3ConfigDefBuilder.getInt(SEEK_MAX_INDEX_FILES),
)
logMetrics = s3ConfigDefBuilder.getBoolean(LOG_METRICS_CONFIG)
} yield DatalakeSinkConfig(
AzureConnectionConfig(s3ConfigDefBuilder.getParsedValues, authMode),
sinkBucketOptions,
offsetSeekerOptions,
s3ConfigDefBuilder.getCompressionCodec(),
s3ConfigDefBuilder.getErrorPolicyOrDefault,
s3ConfigDefBuilder.getRetryConfig,
logMetrics,
)

}
Expand All @@ -69,4 +72,5 @@ case class DatalakeSinkConfig(
compressionCodec: CompressionCodec,
errorPolicy: ErrorPolicy,
connectorRetryConfig: RetryConfig,
logMetrics: Boolean,
) extends CloudSinkConfig[AzureConnectionConfig]
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,17 @@ object DatalakeSinkConfigDef
ConfigDef.Width.LONG,
SEEK_MAX_INDEX_FILES,
)
.define(
LOG_METRICS_CONFIG,
Type.BOOLEAN,
false,
Importance.LOW,
LOG_METRICS_DOC,
"Log Metrics",
3,
ConfigDef.Width.LONG,
LOG_METRICS_CONFIG,
)
addLocalStagingAreaToConfigDef(configDef)
addPaddingToConfigDef(configDef)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ trait CloudSinkConfig[CC] extends CloudConfig {
def connectorRetryConfig: RetryConfig

def errorPolicy: ErrorPolicy

def logMetrics: Boolean
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ import io.lenses.streamreactor.common.errors.ErrorHandler
import io.lenses.streamreactor.common.errors.RetryErrorPolicy
import io.lenses.streamreactor.common.util.AsciiArtPrinter.printAsciiHeader
import io.lenses.streamreactor.common.util.JarManifest
import io.lenses.streamreactor.connect.cloud.common.config.traits.CloudSinkConfig
import io.lenses.streamreactor.connect.cloud.common.config.ConnectorTaskId
import io.lenses.streamreactor.connect.cloud.common.config.ConnectorTaskIdCreator
import io.lenses.streamreactor.connect.cloud.common.config.traits.CloudSinkConfig
import io.lenses.streamreactor.connect.cloud.common.formats.writer.MessageDetail
import io.lenses.streamreactor.connect.cloud.common.formats.writer.NullSinkData
import io.lenses.streamreactor.connect.cloud.common.model.Offset
Expand All @@ -37,6 +37,7 @@ import io.lenses.streamreactor.connect.cloud.common.storage.FileMetadata
import io.lenses.streamreactor.connect.cloud.common.storage.StorageInterface
import io.lenses.streamreactor.connect.cloud.common.utils.MapUtils
import io.lenses.streamreactor.connect.cloud.common.utils.TimestampUtils
import io.lenses.streamreactor.metrics.Metrics
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.common.{ TopicPartition => KafkaTopicPartition }
import org.apache.kafka.connect.errors.ConnectException
Expand Down Expand Up @@ -67,6 +68,7 @@ abstract class CloudSinkTask[MD <: FileMetadata, C <: CloudSinkConfig[CC], CC <:

private val writerManagerCreator = new WriterManagerCreator[MD, C]()

private var logMetrics = false
private var writerManager: WriterManager[MD] = _
implicit var connectorTaskId: ConnectorTaskId = _

Expand Down Expand Up @@ -130,54 +132,58 @@ abstract class CloudSinkTask[MD <: FileMetadata, C <: CloudSinkConfig[CC], CC <:
}
}

override def put(records: util.Collection[SinkRecord]): Unit = {

val _ = handleTry {
Try {

logger.debug(
s"[${connectorTaskId.show}] put records=${records.size()} stats=${buildLogForRecords(records.asScala)
.toList.sortBy(_._1).map { case (k, v) => s"$k=$v" }.mkString(";")}",
)

// a failure in recommitPending will prevent the processing of further records
handleErrors(writerManager.recommitPending())

records.asScala.foreach {
record =>
val topicPartitionOffset =
Topic(record.topic).withPartition(record.kafkaPartition.intValue).withOffset(Offset(record.kafkaOffset))

val key = Option(record.key()) match {
case Some(k) => ValueToSinkDataConverter(k, Option(record.keySchema()))
case None => NullSinkData(Option(record.keySchema()))
}
val msgDetails = MessageDetail(
key = key,
value = ValueToSinkDataConverter(record.value(), Option(record.valueSchema())),
headers = HeaderToStringConverter(record),
TimestampUtils.parseTime(Option(record.timestamp()).map(_.toLong))(_ =>
logger.debug(
s"Record timestamp is invalid ${record.timestamp()}",
override def put(records: util.Collection[SinkRecord]): Unit =
Metrics.withTimer {
handleTry {
Try {

logger.debug(
s"[${connectorTaskId.show}] put records=${records.size()} stats=${buildLogForRecords(records.asScala)
.toList.sortBy(_._1).map { case (k, v) => s"$k=$v" }.mkString(";")}",
)

// a failure in recommitPending will prevent the processing of further records
handleErrors(writerManager.recommitPending())

records.asScala.foreach {
record =>
val topicPartitionOffset =
Topic(record.topic).withPartition(record.kafkaPartition.intValue).withOffset(Offset(record.kafkaOffset))

val key = Option(record.key()) match {
case Some(k) => ValueToSinkDataConverter(k, Option(record.keySchema()))
case None => NullSinkData(Option(record.keySchema()))
}
val msgDetails = MessageDetail(
key = key,
value = ValueToSinkDataConverter(record.value(), Option(record.valueSchema())),
headers = HeaderToStringConverter(record),
TimestampUtils.parseTime(Option(record.timestamp()).map(_.toLong))(_ =>
logger.debug(
s"Record timestamp is invalid ${record.timestamp()}",
),
),
),
Topic(record.topic()),
record.kafkaPartition(),
Offset(record.kafkaOffset()),
)
handleErrors {
writerManager.write(topicPartitionOffset, msgDetails)
}
}

if (records.isEmpty) {
handleErrors(writerManager.commitAllWritersIfFlushRequired())
Topic(record.topic()),
record.kafkaPartition(),
Offset(record.kafkaOffset()),
)
handleErrors {
writerManager.write(topicPartitionOffset, msgDetails)
}
}

if (records.isEmpty) {
handleErrors(writerManager.commitAllWritersIfFlushRequired())
}
}
}
()
} { e =>
if (logMetrics) {
logger.info(s"[${connectorTaskId.show}] put records=${records.size()} took $e ms")
}
}

}

override def preCommit(
currentOffsets: util.Map[KafkaTopicPartition, OffsetAndMetadata],
): util.Map[KafkaTopicPartition, OffsetAndMetadata] = {
Expand Down Expand Up @@ -271,7 +277,10 @@ abstract class CloudSinkTask[MD <: FileMetadata, C <: CloudSinkConfig[CC], CC <:
_ <- setRetryInterval(config)
writerManager <- Try(writerManagerCreator.from(config)(connectorTaskId, storageInterface)).toEither
_ <- initializeFromConfig(config)
} yield writerManager
} yield {
logMetrics = config.logMetrics
writerManager
}

private def initializeFromConfig(config: C): Either[Throwable, Unit] =
Try(initialize(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class WriterManagerCreatorTest extends AnyFunSuite with Matchers with MockitoSug
compressionCodec: CompressionCodec,
connectorRetryConfig: RetryConfig,
errorPolicy: NoopErrorPolicy,
logMetrics: Boolean = false,
) extends CloudSinkConfig[FakeConnectionConfig]

case class FakeFileMetadata(file: String, lastModified: Instant) extends FileMetadata
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ package io.lenses.streamreactor.connect.cloud.common.sink.commit
import com.typesafe.scalalogging.LazyLogging
import com.typesafe.scalalogging.Logger

import scala.util.Try

/**
* The [[CommitPolicy]] is responsible for determining when
* a sink partition (a single file) should be flushed (closed on disk, and moved to be visible).
Expand All @@ -38,16 +36,14 @@ case class CommitPolicy(logger: Logger, conditions: CommitPolicyCondition*) {
* @return true if the partition should be flushed, false otherwise
*/
def shouldFlush(context: CommitContext): Boolean = {

val debugEnabled: Boolean = Try(logger.underlying.isDebugEnabled).getOrElse(false)
val res = conditions.map(_.eval(context, debugEnabled))
val res = conditions.map(_.eval(context, true))
val flush = res.exists {
case ConditionCommitResult(true, _) => true
case _ => false
}

if (debugEnabled) {
logger.debug(context.generateLogLine(flush, res))
if (flush) {
logger.info(context.generateLogLine(flush, res))
}
flush
}
Expand Down
Loading

0 comments on commit bb4e450

Please sign in to comment.