Skip to content

Commit

Permalink
Improvements to the HTTP Sink (#163)
Browse files Browse the repository at this point in the history
* Improvements to the HTTP Sink

This PR introduces the following enhancements to the HTTP Sink:

1. **Queue Limiting**: We've set a limit on the queue size per topic to reduce the chances of an Out-of-Memory (OOM) issue. Previously the queue was unbounded and in a scenario where the http calls are slow and the sink gets more records than it clears, it would eventually lead to OOM.

2. **Offering Timeout**: The offering to the queue now includes a timeout. If there are records to be offered, but the timeout is exceeded, a retriable exception is thrown. Depending on the connector's retry settings, the operation will be attempted again. This helps avoid situations where the sink gets stuck processing a slow or unresponsive batch.

3. **Duplicate Record Handling**: To prevent the same records from being added to the queue multiple times, we've introduced a `Map[TopicPartition, Offset]` to track the last processed offset for each topic-partition. This ensures that the sink does not attempt to process the same records repeatedly.

4. **Batch Failure Handling**: The changes also address a situation where an HTTP call fails due to a specific input, but the batch is not removed from the queue. This could have led to the batch being retried indefinitely, which is now prevented.

In the near future, there will be a new PR to further reduce the code complexity around the batching approach and the boilerplate code.

* fix the unit test

* Rename variable

* Removes the invalid functional tests. a failed batch request is not retried anymore.

* Remove unused functions

---------

Co-authored-by: stheppi <[email protected]>
  • Loading branch information
stheppi and stheppi authored Nov 12, 2024
1 parent 9f630fb commit 8652879
Show file tree
Hide file tree
Showing 7 changed files with 218 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,45 +132,6 @@ class HttpSinkTest
}
}

/**
* Retries occur by default and the failed HTTP post will be retried twice before succeeding.
*/
test("failing scenario written to error reporter") {

setUpWiremockFailureResponse()

sendRecordsWithProducer(
stringProducer,
stringConverters,
randomTestId,
topic,
"My Static Content Template",
BatchSizeSingleRecord,
false,
1,
2,
"Record number 1",
).asserting {
case (requests, successReporterRecords, failureReporterRecords) =>
requests.size should be(3)
requests.map(_.getBody).map(new String(_)).toSet should contain only "My Static Content Template"
requests.map(_.getMethod).toSet should be(Set(RequestMethod.POST))

failureReporterRecords.size should be(2)
failureReporterRecords.foreach {
rec =>
rec.topic() should be(failureTopicName)
rec.value() should be("My Static Content Template")
}

successReporterRecords.size should be(1)
val successRecord = successReporterRecords.head
successRecord.topic() should be(successTopicName)
successRecord.value() should be("My Static Content Template")

}
}

test("dynamic string template containing message content should be sent to endpoint") {

setUpWiremockResponse()
Expand Down Expand Up @@ -311,45 +272,6 @@ class HttpSinkTest
()
}

private def setUpWiremockFailureResponse(): Unit = {
WireMock.configureFor(container.getHost, container.getFirstMappedPort)
WireMock.resetAllScenarios()
WireMock.resetAllRequests()
WireMock.resetToDefault()
WireMock.reset()

val url = s"/$randomTestId"

stubFor(
post(urlEqualTo(url))
.inScenario("failure")
.whenScenarioStateIs("STARTED")
.willSetStateTo("ONE ATTEMPT")
.willReturn(aResponse.withStatus(404).withHeader("Content-Type", "text/plain").withBody("File Not Found")),
)

stubFor(
post(urlEqualTo(url))
.inScenario("failure")
.whenScenarioStateIs("ONE ATTEMPT")
.willSetStateTo("TWO ATTEMPTS")
.willReturn(aResponse.withStatus(404).withHeader("Content-Type", "text/plain").withBody("File Not Found")),
)

stubFor(
post(urlEqualTo(url))
.inScenario("failure")
.whenScenarioStateIs("TWO ATTEMPTS")
.willReturn(aResponse.withHeader("Content-Type", "text/plain")
.withBody("Hello world!")),
)

WireMock.setScenarioState("failure", "STARTED")

()

}

def getBootstrapServers: String = s"PLAINTEXT://kafka:9092"

private def sendRecordsWithProducer[K, V](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,19 +80,21 @@ class HttpWriter(
nonEmptyBatchInfo: NonEmptyBatchInfo,
batch: NonEmptySeq[RenderedRecord],
totalQueueSize: Int,
) =
): IO[Unit] =
for {
_ <- IO(
logger.debug(s"[$sinkName] HttpWriter.process, batch of ${batch.length}, queue size: $totalQueueSize"),
)
// remove the batch from the queue before any of the operation
_ <- recordsQueue.dequeue(batch)
_ <- IO.delay(logger.trace(s"[$sinkName] modifyCommitContext for batch of ${nonEmptyBatchInfo.batch.length}"))
_ <- flush(nonEmptyBatchInfo.batch)
updatedCommitContext = updateCommitContextPostCommit(nonEmptyBatchInfo.updatedCommitContext)
_ <- IO.delay(logger.trace(s"[$sinkName] Updating sink context to: $updatedCommitContext"))
_ <- commitContextRef.set(updatedCommitContext)
removedElements <- recordsQueue.dequeue(batch)
_ <- resetErrorsInCommitContext()
} yield removedElements

_ <- resetErrorsInCommitContext()
} yield ()

def preCommit(
initialOffsetAndMetaMap: Map[TopicPartition, OffsetAndMetadata],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import com.typesafe.scalalogging.LazyLogging
import com.typesafe.scalalogging.StrictLogging
import io.lenses.streamreactor.common.util.EitherUtils.unpackOrThrow
import io.lenses.streamreactor.common.utils.CyclopsToScalaOption.convertToScalaOption
import io.lenses.streamreactor.connect.cloud.common.model.Offset
import io.lenses.streamreactor.connect.cloud.common.model.Topic
import io.lenses.streamreactor.connect.cloud.common.model.TopicPartition
import io.lenses.streamreactor.connect.http.sink.client.HttpRequestSender
Expand All @@ -50,6 +51,7 @@ import java.net.http.HttpClient
import java.time.Duration
import scala.collection.immutable.Queue
import scala.concurrent.duration.DurationInt
import scala.concurrent.duration.FiniteDuration

/**
* The `HttpWriterManager` object provides a factory method to create an instance of `HttpWriterManager`.
Expand Down Expand Up @@ -123,6 +125,8 @@ object HttpWriterManager extends StrictLogging {
config.tidyJson,
config.errorReportingController,
config.successReportingController,
config.maxQueueSize,
config.maxQueueOfferTimeout,
)
}

Expand Down Expand Up @@ -172,6 +176,8 @@ class HttpWriterManager(
tidyJson: Boolean,
errorReportingController: ReportingController[HttpFailureConnectorSpecificRecordData],
successReportingController: ReportingController[HttpSuccessConnectorSpecificRecordData],
maxQueueSize: Int,
maxQueueOfferTimeout: FiniteDuration,
)(
implicit
t: Temporal[IO],
Expand All @@ -184,15 +190,21 @@ class HttpWriterManager(
*/
private def createNewHttpWriter(): IO[HttpWriter] =
for {
batchPolicy <- IO.pure(batchPolicy)
recordsQueueRef <- Ref.of[IO, Queue[RenderedRecord]](Queue.empty)
commitContextRef <- Ref.of[IO, HttpCommitContext](HttpCommitContext.default(sinkName))
offsetsRef <- Ref.of[IO, Map[TopicPartition, Offset]](Map.empty)
} yield new HttpWriter(
sinkName = sinkName,
sender = httpRequestSender,
template = template,
recordsQueue =
new RecordsQueue(recordsQueueRef, commitContextRef, batchPolicy),
new RecordsQueue(recordsQueueRef,
commitContextRef,
batchPolicy,
maxQueueSize,
maxQueueOfferTimeout,
offsetsRef,
),
errorThreshold = errorThreshold,
tidyJson = tidyJson,
errorReporter = errorReportingController,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,14 @@ import io.lenses.streamreactor.connect.http.sink.RecordsQueueBatcher.takeBatch
import io.lenses.streamreactor.connect.http.sink.commit.BatchPolicy
import io.lenses.streamreactor.connect.http.sink.commit.HttpCommitContext
import io.lenses.streamreactor.connect.http.sink.tpl.RenderedRecord
import cats.implicits.toFoldableOps
import io.lenses.streamreactor.connect.cloud.common.model.Offset
import io.lenses.streamreactor.connect.cloud.common.model.TopicPartition
import org.apache.kafka.connect.errors.RetriableException

import scala.collection.immutable.Queue
import scala.concurrent.duration.DurationInt
import scala.concurrent.duration.FiniteDuration

/**
* The `RecordsQueue` class manages a queue of `RenderedRecord` objects and handles the logic for
Expand All @@ -37,19 +43,85 @@ class RecordsQueue(
val recordsQueue: Ref[IO, Queue[RenderedRecord]],
commitContextRef: Ref[IO, HttpCommitContext],
batchPolicy: BatchPolicy,
maxSize: Int,
offerTimeout: FiniteDuration,
offsetMapRef: Ref[IO, Map[TopicPartition, Offset]],
) extends LazyLogging {

/**
* Enqueues a sequence of `RenderedRecord` objects into the queue.
* Enqueues a sequence of `RenderedRecord` objects into the queue, with a maximum size limit.
* If the queue is full, it retries adding the remaining records within the specified timeout.
* If after the timeout records remain, it throws a RetriableException.
* Also, it discards any records for which the offset was already queued.
*
* @param records The records to be enqueued.
* @return An `IO` action that enqueues the records.
* @return An `IO` action that enqueues the records or throws a RetriableException if the queue remains full.
*/
def enqueueAll(records: NonEmptySeq[RenderedRecord]): IO[Unit] =
def enqueueAll(records: NonEmptySeq[RenderedRecord]): IO[Unit] = {

// Filter out records with offsets that have already been processed
def filterDuplicates(records: List[RenderedRecord], offsetMap: Map[TopicPartition, Offset]): List[RenderedRecord] =
records.filter { record =>
val tp = record.topicPartitionOffset.toTopicPartition
offsetMap.get(tp) match {
case Some(lastOffset) if record.topicPartitionOffset.offset.value <= lastOffset.value =>
// Offset already processed, discard this record
false
case _ =>
true
}
}

def attemptEnqueue(remainingRecords: List[RenderedRecord], startTime: Long): IO[Unit] =
if (remainingRecords.isEmpty) {
IO.unit
} else {
for {
currentTime <- IO.realTime.map(_.toMillis)
elapsedTime = currentTime - startTime
_ <- if (elapsedTime >= offerTimeout.toMillis) {
IO.raiseError(new RetriableException("Enqueue timed out and records remain"))
} else {
for {
(recordsToAdd, recordsRemaining) <- recordsQueue.modify { queue =>
val queueSize = queue.size
val spaceAvailable = maxSize - queueSize
val recordsToAdd = remainingRecords.take(spaceAvailable)
val recordsRemaining = remainingRecords.drop(spaceAvailable)
val newQueue = queue.enqueueAll(recordsToAdd)
(newQueue, (recordsToAdd, recordsRemaining))
}
_ <- if (recordsToAdd.nonEmpty) {
// Update the offset map with the offsets of the records that were actually enqueued
offsetMapRef.update { offsetMap =>
recordsToAdd.foldLeft(offsetMap) { (accOffsets, record) =>
val tp = record.topicPartitionOffset.toTopicPartition
val offset = record.topicPartitionOffset.offset
// Only update if the new offset is greater
val updatedOffset: Offset = accOffsets.get(tp) match {
case Some(existingOffset) if existingOffset.value >= offset.value => existingOffset
case _ => offset
}
accOffsets.updated(tp, updatedOffset)
}
}
} else IO.unit
_ <- if (recordsRemaining.nonEmpty) {
IO.sleep(5.millis) *>
attemptEnqueue(recordsRemaining, startTime)
} else IO.unit
} yield ()
}
} yield ()
}

for {
_ <- IO.delay(logger.debug(s"${records.length} records added to $recordsQueue"))
_ <- recordsQueue.getAndUpdate(q => q ++ records.toSeq).void
offsetMap <- offsetMapRef.get
uniqueRecords = filterDuplicates(records.toList, offsetMap)
startTime <- IO.realTime.map(_.toMillis)
_ <- attemptEnqueue(uniqueRecords, startTime)
} yield ()
}

/**
* Takes a batch of records from the queue based on the commit policy.
Expand Down Expand Up @@ -78,8 +150,9 @@ class RecordsQueue(
def dequeue(nonEmptyBatch: NonEmptySeq[RenderedRecord]): IO[Unit] =
recordsQueue.access.flatMap {
case (records, updater) =>
val lookup = nonEmptyBatch.toSeq.toSet
for {
newQueue <- IO(records.dropWhile(nonEmptyBatch.toSeq.contains))
newQueue <- IO(records.dropWhile(lookup.contains))
_ <- updater(newQueue)
_ <- IO.delay(logger.debug("Queue before: {}, after: {}", records, newQueue))
} yield ()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import java.net.MalformedURLException
import java.net.URL
import java.time.Clock
import java.time.Duration
import scala.concurrent.duration.FiniteDuration
import scala.jdk.CollectionConverters._
import scala.util.Try

Expand Down Expand Up @@ -101,6 +102,8 @@ case class HttpSinkConfig(
tidyJson: Boolean,
errorReportingController: ReportingController[HttpFailureConnectorSpecificRecordData],
successReportingController: ReportingController[HttpSuccessConnectorSpecificRecordData],
maxQueueSize: Int,
maxQueueOfferTimeout: FiniteDuration,
)

object HttpSinkConfig {
Expand Down Expand Up @@ -159,6 +162,12 @@ object HttpSinkConfig {
connectConfig,
),
)

maxQueueSize = connectConfig.getInt(HttpSinkConfigDef.MaxQueueSizeProp)
maxQueueOfferTimeout = FiniteDuration(
connectConfig.getLong(HttpSinkConfigDef.MaxQueueOfferTimeoutProp),
scala.concurrent.duration.MILLISECONDS,
)
} yield HttpSinkConfig(
method,
endpoint,
Expand All @@ -175,6 +184,8 @@ object HttpSinkConfig {
jsonTidy,
errorReportingController,
successReportingController,
maxQueueSize,
maxQueueOfferTimeout,
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,20 @@ object HttpSinkConfigDef {
|Literal to output in templates in place of a null payload. Values are `error` (raises an error), `empty` (empty string, eg ""), `null` (the literal 'null') or `custom` (a string of your choice, as defined by `$CustomNullPayloadHandler`). `Defaults to `error`.
|""".stripMargin

val MaxQueueSizeProp: String = "connect.http.max.queue.size"
val MaxQueueSizeDoc: String =
"""
|The maximum number of records to queue per topic before blocking. If the queue limit is reached the connector will throw RetriableException and the connector settings to handle retries will be used.
|""".stripMargin
val MaxQueueSizeDefault = 1000000

val MaxQueueOfferTimeoutProp: String = "connect.http.max.queue.offer.timeout.ms"
val MaxQueueOfferTimeoutDoc: String =
"""
|The maximum time in milliseconds to wait for the queue to accept a record. If the queue does not accept the record within this time, the connector will throw RetriableException and the connector settings to handle retries will be used.
|""".stripMargin
val MaxQueueOfferTimeoutDefault = 120000

val config: ConfigDef = {
val configDef = new ConfigDef()
.withClientSslSupport()
Expand Down Expand Up @@ -292,6 +306,20 @@ object HttpSinkConfigDef {
Importance.HIGH,
CustomNullPayloadHandlerDoc,
)
.define(
MaxQueueSizeProp,
Type.INT,
MaxQueueSizeDefault,
Importance.HIGH,
MaxQueueSizeDoc,
)
.define(
MaxQueueOfferTimeoutProp,
Type.LONG,
MaxQueueOfferTimeoutDefault,
Importance.HIGH,
MaxQueueOfferTimeoutDoc,
)
ReporterConfig.withErrorRecordReportingSupport(configDef)
ReporterConfig.withSuccessRecordReportingSupport(configDef)
OAuth2Config.append(configDef)
Expand Down
Loading

0 comments on commit 8652879

Please sign in to comment.