Skip to content

Commit

Permalink
Merge pull request #145 from lensesio-dev/bugfix/http-sink-npe-templa…
Browse files Browse the repository at this point in the history
…te-substitution

Gracefully handle null values from templating substitutions with configurable behaviour
  • Loading branch information
davidsloan authored Oct 17, 2024
2 parents 5b910a3 + 5430d32 commit 7aae8d9
Show file tree
Hide file tree
Showing 12 changed files with 421 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class HttpSinkTask extends SinkTask with LazyLogging with JarManifestProvided {

(for {
config <- IO.fromEither(HttpSinkConfig.from(propsAsScala))
template = RawTemplate(config.endpoint, config.content, config.headers)
template = RawTemplate(config.endpoint, config.content, config.headers, config.nullPayloadHandler)
writerManager <- HttpWriterManager.apply(sinkName, config, template, deferred)
_ <- writerManager.start(refUpdateCallback)
} yield {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ case class HttpSinkConfig(
headers: List[(String, String)],
ssl: StoresInfo,
batch: BatchConfig,
nullPayloadHandler: NullPayloadHandler,
errorThreshold: Int,
uploadSyncPeriod: Int,
retries: RetriesConfig,
Expand Down Expand Up @@ -115,8 +116,12 @@ object HttpSinkConfig {
connectConfig,
AuthenticationKeys(BasicAuthenticationUsernameProp, BasicAuthenticationPasswordProp, AuthenticationTypeProp),
)
ssl <- CyclopsToScalaEither.convertToScalaEither(StoresInfo.fromConfig(connectConfig))
batch = BatchConfig.from(connectConfig)
ssl <- CyclopsToScalaEither.convertToScalaEither(StoresInfo.fromConfig(connectConfig))
batch = BatchConfig.from(connectConfig)
nullPayloadHandler <- NullPayloadHandler(
connectConfig.getString(HttpSinkConfigDef.NullPayloadHandler),
connectConfig.getString(HttpSinkConfigDef.CustomNullPayloadHandler),
)
errorThreshold = connectConfig.getInt(HttpSinkConfigDef.ErrorThresholdProp)
uploadSyncPeriod = connectConfig.getInt(HttpSinkConfigDef.UploadSyncPeriodProp)
maxRetries = connectConfig.getInt(HttpSinkConfigDef.RetriesMaxRetriesProp)
Expand Down Expand Up @@ -157,6 +162,7 @@ object HttpSinkConfig {
headers,
ssl,
batch,
nullPayloadHandler,
errorThreshold,
uploadSyncPeriod,
retries,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,18 @@ object HttpSinkConfigDef {
|Tidy the output json.
|""".stripMargin

val CustomNullPayloadHandler: String = "connect.http.null.payload.handler.custom"
val CustomNullPayloadHandlerDoc: String =
"""
|Custom string to use in place of a null template.
|""".stripMargin

val NullPayloadHandler: String = "connect.http.null.payload.handler"
val NullPayloadHandlerDoc: String =
s"""
|Literal to output in templates in place of a null payload. Values are `error` (raises an error), `empty` (empty string, eg ""), `null` (the literal 'null') or `custom` (a string of your choice, as defined by `$CustomNullPayloadHandler`). `Defaults to `error`.
|""".stripMargin

val config: ConfigDef = {
val configDef = new ConfigDef()
.withClientSslSupport()
Expand Down Expand Up @@ -266,6 +278,20 @@ object HttpSinkConfigDef {
Importance.HIGH,
JsonTidyPropDoc,
)
.define(
NullPayloadHandler,
Type.STRING,
"error",
Importance.HIGH,
NullPayloadHandlerDoc,
)
.define(
CustomNullPayloadHandler,
Type.STRING,
"",
Importance.HIGH,
CustomNullPayloadHandlerDoc,
)
ReporterConfig.withErrorRecordReportingSupport(configDef)
ReporterConfig.withSuccessRecordReportingSupport(configDef)
OAuth2Config.append(configDef)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Copyright 2017-2024 Lenses.io Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.connect.http.sink.config

import cats.implicits.catsSyntaxEitherId
import io.lenses.streamreactor.connect.http.sink.tpl.substitutions.SubstitutionError
import org.apache.kafka.common.config.ConfigException

/**
* Companion object for NullPayloadHandler.
* Provides factory methods and constants for different null payload handlers.
*/
object NullPayloadHandler {

val NullPayloadHandlerName = "null"
val ErrorPayloadHandlerName = "error"
val EmptyPayloadHandlerName = "empty"
val CustomPayloadHandlerName = "custom"

/**
* Factory method to create a NullPayloadHandler based on the provided name.
*
* @param nullPayloadHandlerName the name of the null payload handler
* @param customSubstitution the custom substitution value (used only for custom handler)
* @return Either a NullPayloadHandler or a Throwable if the handler name is invalid
*/
def apply(nullPayloadHandlerName: String, customSubstitution: String): Either[Throwable, NullPayloadHandler] =
nullPayloadHandlerName.toLowerCase() match {
case NullPayloadHandlerName => NullLiteralNullPayloadHandler.asRight
case ErrorPayloadHandlerName => ErrorNullPayloadHandler.asRight
case EmptyPayloadHandlerName => EmptyStringNullPayloadHandler.asRight
case CustomPayloadHandlerName => CustomNullPayloadHandler(customSubstitution).asRight
case _ => new ConfigException("Invalid null payload handler specified").asLeft
}
}

/**
* Trait representing a handler for null payloads.
*/
trait NullPayloadHandler {

/**
* Handles a null value and returns either a SubstitutionError or a String.
*
* @return Either a SubstitutionError or a String
*/
def handleNullValue: Either[SubstitutionError, String]

}

/**
* NullPayloadHandler implementation that returns a SubstitutionError.
*/
object ErrorNullPayloadHandler extends NullPayloadHandler {

/**
* Returns a SubstitutionError indicating that a null payload was encountered.
*
* @return Left containing a SubstitutionError
*/
override def handleNullValue: Either[SubstitutionError, String] = SubstitutionError(
"Templating substitution returned a null payload, and you have configured this to cause an error.",
).asLeft
}

/**
* NullPayloadHandler implementation that returns the string "null".
*/
object NullLiteralNullPayloadHandler extends NullPayloadHandler {

/**
* Returns the string "null".
*
* @return Right containing the string "null"
*/
override def handleNullValue: Either[SubstitutionError, String] = "null".asRight
}

/**
* NullPayloadHandler implementation that returns an empty string.
*/
object EmptyStringNullPayloadHandler extends NullPayloadHandler {

/**
* Returns an empty string.
*
* @return Right containing an empty string
*/
override def handleNullValue: Either[SubstitutionError, String] = "".asRight
}

/**
* NullPayloadHandler implementation that returns a custom value.
*
* @param customValue the custom value to return
*/
case class CustomNullPayloadHandler(customValue: String) extends NullPayloadHandler {

/**
* Returns the custom value.
*
* @return Right containing the custom value
*/
override def handleNullValue: Either[SubstitutionError, String] = customValue.asRight
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package io.lenses.streamreactor.connect.http.sink.tpl
import cats.implicits.catsSyntaxEitherId
import cats.implicits.catsSyntaxOptionId
import cats.implicits.toTraverseOps
import io.lenses.streamreactor.connect.http.sink.config.NullPayloadHandler
import io.lenses.streamreactor.connect.http.sink.tpl.JsonTidy.cleanUp
import io.lenses.streamreactor.connect.http.sink.tpl.renderer.RecordRenderer
import io.lenses.streamreactor.connect.http.sink.tpl.substitutions.SubstitutionError
Expand All @@ -26,14 +27,19 @@ import org.apache.kafka.connect.sink.SinkRecord
object RawTemplate {
private val innerTemplatePattern = """\{\{#message}}([\s\S]*?)\{\{/message}}""".r

def apply(endpoint: String, content: String, headers: Seq[(String, String)]): TemplateType =
def apply(
endpoint: String,
content: String,
headers: Seq[(String, String)],
nullPayloadHandler: NullPayloadHandler,
): TemplateType =
innerTemplatePattern.findFirstMatchIn(content) match {
case Some(innerTemplate) =>
val start = content.substring(0, innerTemplate.start)
val end = content.substring(innerTemplate.end)
TemplateWithInnerLoop(endpoint, start, end, innerTemplate.group(1), headers)
TemplateWithInnerLoop(endpoint, start, end, innerTemplate.group(1), headers, nullPayloadHandler)
case None =>
SimpleTemplate(endpoint, content, headers)
SimpleTemplate(endpoint, content, headers, nullPayloadHandler)
}
}

Expand All @@ -48,13 +54,14 @@ trait TemplateType {

// this template type will require individual requests, the messages can't be batched
case class SimpleTemplate(
endpoint: String,
content: String,
headers: Seq[(String, String)],
endpoint: String,
content: String,
headers: Seq[(String, String)],
nullPayloadHandler: NullPayloadHandler,
) extends TemplateType {

override def renderRecords(records: Seq[SinkRecord]): Either[SubstitutionError, Seq[RenderedRecord]] =
RecordRenderer.renderRecords(records, endpoint.some, content, headers)
RecordRenderer.renderRecords(records, endpoint.some, content, headers, nullPayloadHandler)

override def process(records: Seq[RenderedRecord], tidyJson: Boolean): Either[SubstitutionError, ProcessedTemplate] =
records.headOption match {
Expand All @@ -65,11 +72,12 @@ case class SimpleTemplate(
}

case class TemplateWithInnerLoop(
endpoint: String,
prefixContent: String,
suffixContent: String,
innerTemplate: String,
headers: Seq[(String, String)],
endpoint: String,
prefixContent: String,
suffixContent: String,
innerTemplate: String,
headers: Seq[(String, String)],
nullPayloadHandler: NullPayloadHandler,
) extends TemplateType {

override def renderRecords(records: Seq[SinkRecord]): Either[SubstitutionError, Seq[RenderedRecord]] =
Expand All @@ -80,6 +88,7 @@ case class TemplateWithInnerLoop(
Option.when(i == 0)(endpoint),
innerTemplate,
headers,
nullPayloadHandler,
)
}.sequence

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import cats.implicits._
import io.lenses.streamreactor.connect.cloud.common.model.Offset
import io.lenses.streamreactor.connect.cloud.common.model.Topic
import io.lenses.streamreactor.connect.cloud.common.model.TopicPartitionOffset
import io.lenses.streamreactor.connect.http.sink.config.NullPayloadHandler
import io.lenses.streamreactor.connect.http.sink.tpl.substitutions.SubstitutionError
import io.lenses.streamreactor.connect.http.sink.tpl.RenderedRecord
import io.lenses.streamreactor.connect.http.sink.tpl.substitutions.SubstitutionType
Expand All @@ -29,50 +30,55 @@ object RecordRenderer {
private val templateRenderer = new TemplateRenderer[SubstitutionType](SubstitutionType)

def renderRecords(
data: Seq[SinkRecord],
endpointTpl: Option[String],
contentTpl: String,
headers: Seq[(String, String)],
data: Seq[SinkRecord],
endpointTpl: Option[String],
contentTpl: String,
headers: Seq[(String, String)],
nullPayloadHandler: NullPayloadHandler,
): Either[SubstitutionError, Seq[RenderedRecord]] =
data.map(renderRecord(_, endpointTpl, contentTpl, headers)).sequence
data.map(renderRecord(_, endpointTpl, contentTpl, headers, nullPayloadHandler)).sequence
def renderRecord(
sinkRecord: SinkRecord,
endpointTpl: Option[String],
contentTpl: String,
headers: Seq[(String, String)],
sinkRecord: SinkRecord,
endpointTpl: Option[String],
contentTpl: String,
headers: Seq[(String, String)],
nullPayloadHandler: NullPayloadHandler,
): Either[SubstitutionError, RenderedRecord] = {
val topicPartitionOffset: TopicPartitionOffset =
Topic(sinkRecord.topic()).withPartition(sinkRecord.kafkaPartition()).withOffset(Offset(sinkRecord.kafkaOffset()))

for {
recordRend: String <- templateRenderer.render(sinkRecord, contentTpl)
headersRend: Seq[(String, String)] <- renderHeaders(sinkRecord, headers)
endpointRend: Option[String] <- renderEndpoint(sinkRecord, endpointTpl)
recordRend: String <- templateRenderer.render(sinkRecord, contentTpl, nullPayloadHandler)
headersRend: Seq[(String, String)] <- renderHeaders(sinkRecord, headers, nullPayloadHandler)
endpointRend: Option[String] <- renderEndpoint(sinkRecord, endpointTpl, nullPayloadHandler)
} yield RenderedRecord(topicPartitionOffset, sinkRecord.timestamp(), recordRend, headersRend, endpointRend)
}

private def renderHeader(
sinkRecord: SinkRecord,
header: (String, String),
sinkRecord: SinkRecord,
header: (String, String),
nullPayloadHandler: NullPayloadHandler,
): Either[SubstitutionError, (String, String)] =
header match {
case (hKey, hVal) =>
for {
k <- templateRenderer.render(sinkRecord, hKey)
v <- templateRenderer.render(sinkRecord, hVal)
k <- templateRenderer.render(sinkRecord, hKey, nullPayloadHandler)
v <- templateRenderer.render(sinkRecord, hVal, nullPayloadHandler)
} yield k -> v
}

private def renderHeaders(
sinkRecord: SinkRecord,
headers: Seq[(String, String)],
sinkRecord: SinkRecord,
headers: Seq[(String, String)],
nullPayloadHandler: NullPayloadHandler,
): Either[SubstitutionError, Seq[(String, String)]] =
headers.map(h => renderHeader(sinkRecord, h)).sequence
headers.map(h => renderHeader(sinkRecord, h, nullPayloadHandler)).sequence

private def renderEndpoint(
sinkRecord: SinkRecord,
endpointTpl: Option[String],
sinkRecord: SinkRecord,
endpointTpl: Option[String],
nullPayloadHandler: NullPayloadHandler,
): Either[SubstitutionError, Option[String]] =
endpointTpl.map(tpl => templateRenderer.render(sinkRecord, tpl)).sequence
endpointTpl.map(tpl => templateRenderer.render(sinkRecord, tpl, nullPayloadHandler)).sequence

}
Loading

0 comments on commit 7aae8d9

Please sign in to comment.