diff --git a/kafka-connect-http/src/fun/scala/io/lenses/streamreactor/connect/test/HttpConfiguration.scala b/kafka-connect-http/src/fun/scala/io/lenses/streamreactor/connect/test/HttpConfiguration.scala index 885cb89d1f..fda9bc21ae 100644 --- a/kafka-connect-http/src/fun/scala/io/lenses/streamreactor/connect/test/HttpConfiguration.scala +++ b/kafka-connect-http/src/fun/scala/io/lenses/streamreactor/connect/test/HttpConfiguration.scala @@ -18,6 +18,8 @@ trait HttpConfiguration extends LazyLogging { headerTemplates: Seq[(String, String)], topicName: String, converters: Map[String, String], + batchSize: Int, + jsonTidy: Boolean, ): ConnectorConfiguration = { val configMap: Map[String, ConfigValue[_]] = converters.view.mapValues(new ConfigValue[String](_)).toMap ++ Map( @@ -29,7 +31,8 @@ trait HttpConfiguration extends LazyLogging { HttpSinkConfigDef.HttpRequestContentProp -> ConfigValue(contentTemplate), HttpSinkConfigDef.HttpRequestHeadersProp -> ConfigValue(headerTemplates.mkString(",")), HttpSinkConfigDef.AuthenticationTypeProp -> ConfigValue("none"), //NoAuthentication - HttpSinkConfigDef.BatchCountProp -> ConfigValue(1), + HttpSinkConfigDef.BatchCountProp -> ConfigValue(batchSize), + HttpSinkConfigDef.JsonTidyProp -> ConfigValue(jsonTidy), ERROR_REPORTING_ENABLED_PROP -> ConfigValue("false"), SUCCESS_REPORTING_ENABLED_PROP -> ConfigValue("false"), ) diff --git a/kafka-connect-http/src/fun/scala/io/lenses/streamreactor/connect/test/HttpSinkTest.scala b/kafka-connect-http/src/fun/scala/io/lenses/streamreactor/connect/test/HttpSinkTest.scala index e4690fd400..21d2957716 100644 --- a/kafka-connect-http/src/fun/scala/io/lenses/streamreactor/connect/test/HttpSinkTest.scala +++ b/kafka-connect-http/src/fun/scala/io/lenses/streamreactor/connect/test/HttpSinkTest.scala @@ -1,5 +1,6 @@ package io.lenses.streamreactor.connect.test +import cats.implicits._ import cats.effect.IO import cats.effect.kernel.Resource import cats.effect.testing.scalatest.AsyncIOSpec @@ -34,6 +35,8 @@ class HttpSinkTest with LazyLogging with EitherValues with HttpConfiguration { + private val BatchSizeSingleRecord = 1 + private val BatchSizeMultipleRecords = 2 private val stringSerializer = classOf[StringSerializer] private val stringProducer = createProducer[String, String](stringSerializer, stringSerializer) @@ -63,11 +66,12 @@ class HttpSinkTest .withNetwork(network) override val connectorModule: String = "http" - private var randomTestId = UUID.randomUUID().toString - private def topic = "topic" + randomTestId + private var randomTestId: String = _ + private var topic: String = _ before { randomTestId = UUID.randomUUID().toString + topic = "topic" + randomTestId } override def beforeAll(): Unit = { @@ -85,12 +89,14 @@ class HttpSinkTest setUpWiremockResponse() val record = "My First Record" - sendRecordWithProducer(stringProducer, - stringConverters, - randomTestId, - topic, - record, - "My Static Content Template", + sendRecordsWithProducer(stringProducer, + stringConverters, + randomTestId, + topic, + "My Static Content Template", + BatchSizeSingleRecord, + false, + record, ).asserting { requests => requests.size should be(1) @@ -105,12 +111,20 @@ class HttpSinkTest setUpWiremockResponse() val record = "My First Record" - sendRecordWithProducer(stringProducer, stringConverters, randomTestId, topic, record, "{{value}}").asserting { + sendRecordsWithProducer(stringProducer, + stringConverters, + randomTestId, + topic, + record, + BatchSizeSingleRecord, + false, + "{{value}}", + ).asserting { requests => requests.size should be(1) val firstRequest = requests.head + new String(firstRequest.getBody) should be(record) firstRequest.getMethod should be(RequestMethod.POST) - new String(firstRequest.getBody) should be("My First Record") } } @@ -118,13 +132,15 @@ class HttpSinkTest setUpWiremockResponse() - sendRecordWithProducer[String, Order]( + sendRecordsWithProducer[String, Order]( orderProducer, jsonConverters, randomTestId, topic, - Order(1, "myOrder product", 1.3d, 10), "product: {{value.product}}", + BatchSizeSingleRecord, + false, + Order(1, "myOrder product", 1.3d, 10), ).asserting { requests => requests.size should be(1) @@ -138,13 +154,15 @@ class HttpSinkTest setUpWiremockResponse() - sendRecordWithProducer[String, Order]( + sendRecordsWithProducer[String, Order]( orderProducer, stringConverters, randomTestId, topic, - Order(1, "myOrder product", 1.3d, 10), "whole product message: {{value}}", + BatchSizeSingleRecord, + false, + Order(1, "myOrder product", 1.3d, 10), ).asserting { requests => requests.size should be(1) @@ -156,18 +174,45 @@ class HttpSinkTest } } + test("batched dynamic string template containing whole json message should be sent to endpoint") { + + setUpWiremockResponse() + + sendRecordsWithProducer[String, Order]( + orderProducer, + stringConverters, + randomTestId, + topic, + "{\"data\":[{{#message}}{{value}},{{/message}}]}", + BatchSizeMultipleRecords, + true, + Order(1, "myOrder product", 1.3d, 10), + Order(2, "another product", 1.4d, 109), + ).asserting { + requests => + requests.size should be(1) + val firstRequest = requests.head + firstRequest.getMethod should be(RequestMethod.POST) + new String(firstRequest.getBody) should be( + "{\"data\":[{\"id\":1,\"product\":\"myOrder product\",\"price\":1.3,\"qty\":10,\"created\":null},{\"id\":2,\"product\":\"another product\",\"price\":1.4,\"qty\":109,\"created\":null}]}", + ) + } + } + test("dynamic string template containing avro message fields should be sent to endpoint") { setUpWiremockResponse() val order = Order(1, "myOrder product", 1.3d, 10, "March").toRecord - sendRecordWithProducer[String, GenericRecord]( + sendRecordsWithProducer[String, GenericRecord]( avroOrderProducer, avroConverters, randomTestId, topic, - order, "product: {{value.product}}", + BatchSizeSingleRecord, + false, + order, ).asserting { requests => requests.size should be(1) @@ -188,25 +233,30 @@ class HttpSinkTest () } - private def sendRecordWithProducer[K, V]( + private def sendRecordsWithProducer[K, V]( producer: Resource[IO, KafkaProducer[K, V]], converters: Map[String, String], randomTestId: String, topic: String, - record: V, contentTemplate: String, + batchSize: Int, + jsonTidy: Boolean, + record: V*, ): IO[List[LoggedRequest]] = producer.use { producer => - createConnectorResource(randomTestId, topic, contentTemplate, converters).use { + createConnectorResource(randomTestId, topic, contentTemplate, converters, batchSize, jsonTidy).use { _ => - IO { - sendRecord[K, V](topic, producer, record) - eventually { - verify(postRequestedFor(urlEqualTo(s"/$randomTestId"))) - findAll(postRequestedFor(urlEqualTo(s"/$randomTestId"))).asScala.toList + record.map { + rec => IO(sendRecord[K, V](topic, producer, rec)) + }.sequence + .map { _ => + eventually { + verify(postRequestedFor(urlEqualTo(s"/$randomTestId"))) + findAll(postRequestedFor(urlEqualTo(s"/$randomTestId"))).asScala.toList + } } - } + } } private def sendRecord[K, V](topic: String, producer: KafkaProducer[K, V], record: V): Unit = { @@ -219,6 +269,8 @@ class HttpSinkTest topic: String, contentTemplate: String, converters: Map[String, String], + batchSize: Int, + jsonTidy: Boolean, ): Resource[IO, String] = createConnector( sinkConfig( @@ -229,6 +281,8 @@ class HttpSinkTest Seq(), topic, converters, + batchSize, + jsonTidy, ), ) diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpWriter.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpWriter.scala index 629f51a728..0fb2227800 100644 --- a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpWriter.scala +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpWriter.scala @@ -46,6 +46,7 @@ class HttpWriter( recordsQueueRef: Ref[IO, Queue[RenderedRecord]], commitContextRef: Ref[IO, HttpCommitContext], errorThreshold: Int, + tidyJson: Boolean, errorReporter: ReportingController, successReporter: ReportingController, ) extends LazyLogging { @@ -177,7 +178,7 @@ class HttpWriter( private def flush(records: Seq[RenderedRecord]): IO[ProcessedTemplate] = for { - processed <- IO.fromEither(template.process(records)) + processed <- IO.fromEither(template.process(records, tidyJson)) _ <- reportResult(records, processed, sender.sendHttpRequest(processed)) } yield processed diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpWriterManager.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpWriterManager.scala index fd048208ff..86f230f2ae 100644 --- a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpWriterManager.scala +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpWriterManager.scala @@ -105,6 +105,7 @@ object HttpWriterManager extends StrictLogging { terminate, config.errorThreshold, config.uploadSyncPeriod, + config.tidyJson, config.errorReportingController, config.successReportingController, ) @@ -130,6 +131,7 @@ class HttpWriterManager( deferred: Deferred[IO, Either[Throwable, Unit]], errorThreshold: Int, uploadSyncPeriod: Int, + tidyJson: Boolean, errorReportingController: ReportingController, successReportingController: ReportingController, )( @@ -146,6 +148,7 @@ class HttpWriterManager( Ref.unsafe[IO, Queue[RenderedRecord]](Queue()), Ref.unsafe[IO, HttpCommitContext](HttpCommitContext.default(sinkName)), errorThreshold, + tidyJson, errorReportingController, successReportingController, ) diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/config/HttpSinkConfig.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/config/HttpSinkConfig.scala index d8a5cbe4b7..211b381d93 100644 --- a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/config/HttpSinkConfig.scala +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/config/HttpSinkConfig.scala @@ -85,6 +85,7 @@ case class HttpSinkConfig( uploadSyncPeriod: Int, retries: RetriesConfig, timeout: TimeoutConfig, + tidyJson: Boolean, errorReportingController: ReportingController, successReportingController: ReportingController, ) @@ -120,6 +121,7 @@ object HttpSinkConfig { retries = RetriesConfig(maxRetries, maxTimeoutMs, onStatusCodes) connectionTimeoutMs = connectConfig.getInt(HttpSinkConfigDef.ConnectionTimeoutMsProp) timeout = TimeoutConfig(connectionTimeoutMs) + jsonTidy = connectConfig.getBoolean(HttpSinkConfigDef.JsonTidyProp) errorReportingController = createAndStartController(new ErrorReportingController(connectConfig)) successReportingController = createAndStartController(new SuccessReportingController(connectConfig)) } yield HttpSinkConfig( @@ -134,6 +136,7 @@ object HttpSinkConfig { uploadSyncPeriod, retries, timeout, + jsonTidy, errorReportingController, successReportingController, ) diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/config/HttpSinkConfigDef.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/config/HttpSinkConfigDef.scala index 9cc538f2a8..df23f9d9cc 100644 --- a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/config/HttpSinkConfigDef.scala +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/config/HttpSinkConfigDef.scala @@ -140,6 +140,12 @@ object HttpSinkConfigDef { |The password for basic authentication. |""".stripMargin + val JsonTidyProp: String = "connect.http.json.tidy" + val JsonTidyPropDoc: String = + """ + |Tidy the output json. + |""".stripMargin + val config: ConfigDef = { val configDef = new ConfigDef() .withClientSslSupport() @@ -253,6 +259,13 @@ object HttpSinkConfigDef { Importance.HIGH, BasicAuthenticationPasswordDoc, ) + .define( + JsonTidyProp, + Type.BOOLEAN, + false, + Importance.HIGH, + JsonTidyPropDoc, + ) ReporterConfig.withErrorRecordReportingSupport(configDef) ReporterConfig.withSuccessRecordReportingSupport(configDef) OAuth2Config.append(configDef) diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/JsonTidy.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/JsonTidy.scala new file mode 100644 index 0000000000..e1381d3869 --- /dev/null +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/JsonTidy.scala @@ -0,0 +1,34 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.http.sink.tpl + +import com.typesafe.scalalogging.LazyLogging +import org.json4s._ +import org.json4s.native.JsonMethods._ + +object JsonTidy extends LazyLogging { + implicit val formats: Formats = DefaultFormats + + def cleanUp(jsonString: String): String = + try { + val json = parse(jsonString) + compact(render(json)) + } catch { + case e: Exception => + logger.error("Error formatting json", e) + jsonString + } +} diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/TemplateTypes.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/TemplateTypes.scala index 1dc53c92d6..36f3006cf8 100644 --- a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/TemplateTypes.scala +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/TemplateTypes.scala @@ -18,6 +18,7 @@ package io.lenses.streamreactor.connect.http.sink.tpl import cats.implicits.catsSyntaxEitherId import cats.implicits.catsSyntaxOptionId import cats.implicits.toTraverseOps +import io.lenses.streamreactor.connect.http.sink.tpl.JsonTidy.cleanUp import io.lenses.streamreactor.connect.http.sink.tpl.renderer.RecordRenderer import io.lenses.streamreactor.connect.http.sink.tpl.substitutions.SubstitutionError import org.apache.kafka.connect.sink.SinkRecord @@ -42,7 +43,7 @@ trait TemplateType { def renderRecords(record: Seq[SinkRecord]): Either[SubstitutionError, Seq[RenderedRecord]] - def process(records: Seq[RenderedRecord]): Either[SubstitutionError, ProcessedTemplate] + def process(records: Seq[RenderedRecord], tidyJson: Boolean): Either[SubstitutionError, ProcessedTemplate] } // this template type will require individual requests, the messages can't be batched @@ -55,7 +56,7 @@ case class SimpleTemplate( override def renderRecords(records: Seq[SinkRecord]): Either[SubstitutionError, Seq[RenderedRecord]] = RecordRenderer.renderRecords(records, endpoint.some, content, headers) - override def process(records: Seq[RenderedRecord]): Either[SubstitutionError, ProcessedTemplate] = + override def process(records: Seq[RenderedRecord], tidyJson: Boolean): Either[SubstitutionError, ProcessedTemplate] = records.headOption match { case Some(RenderedRecord(_, _, recordRendered, headersRendered, Some(endpointRendered))) => ProcessedTemplate(endpointRendered, recordRendered, headersRendered).asRight @@ -82,10 +83,17 @@ case class TemplateWithInnerLoop( ) }.sequence - override def process(records: Seq[RenderedRecord]): Either[SubstitutionError, ProcessedTemplate] = { + override def process( + records: Seq[RenderedRecord], + tidyJson: Boolean, + ): Either[SubstitutionError, ProcessedTemplate] = { + + val replaceWith = records.flatMap(_.recordRendered).mkString("") + val fnContextFix: String => String = content => { + if (tidyJson) cleanUp(content) else content + } + val contentOrError = fnContextFix(prefixContent + replaceWith + suffixContent) - val replaceWith = records.flatMap(_.recordRendered).mkString("") - val contentOrError = prefixContent + replaceWith + suffixContent val maybeProcessedTpl = for { headRecord <- records.headOption ep <- headRecord.endpointRendered diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/renderer/TemplateRenderer.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/renderer/TemplateRenderer.scala index 1fe2baa49c..bbab4755d3 100644 --- a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/renderer/TemplateRenderer.scala +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/renderer/TemplateRenderer.scala @@ -20,10 +20,11 @@ import io.lenses.streamreactor.connect.http.sink.tpl.substitutions.SubstitutionE import io.lenses.streamreactor.connect.http.sink.tpl.substitutions.SubstitutionType import org.apache.kafka.connect.sink.SinkRecord +import java.util.regex.Matcher import scala.util.matching.Regex object TemplateRenderer { - private val templatePattern: Regex = "\\{\\{(.*?)}}".r + private val templatePattern: Regex = "\\{\\{([^{}]*)}}".r // Method to render a single data entry with a template def render(data: SinkRecord, tplText: String): Either[SubstitutionError, String] = @@ -31,12 +32,13 @@ object TemplateRenderer { templatePattern .replaceAllIn( tplText, - matchTag => { - val tag = matchTag.group(1).trim - getValue(tag, data) - .leftMap(throw _) - .merge - }, + matchTag => + Matcher.quoteReplacement { + val tag = matchTag.group(1).trim + getValue(tag, data) + .leftMap(throw _) + .merge + }, ), ) diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/substitutions/SubstitutionError.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/substitutions/SubstitutionError.scala index 5c52bbe3c1..e345b07f39 100644 --- a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/substitutions/SubstitutionError.scala +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/substitutions/SubstitutionError.scala @@ -16,9 +16,16 @@ package io.lenses.streamreactor.connect.http.sink.tpl.substitutions import cats.implicits.catsSyntaxOptionId +import com.typesafe.scalalogging.LazyLogging -object SubstitutionError { - def apply(msg: String): SubstitutionError = SubstitutionError(msg, Option.empty) - def apply(msg: String, throwable: Throwable): SubstitutionError = SubstitutionError(msg, throwable.some) +object SubstitutionError extends LazyLogging { + def apply(msg: String): SubstitutionError = { + logger.error("SubstitutionError Raised: " + msg) + SubstitutionError(msg, Option.empty) + } + def apply(msg: String, throwable: Throwable): SubstitutionError = { + logger.error("SubstitutionError Raised: " + msg, throwable) + SubstitutionError(msg, throwable.some) + } } case class SubstitutionError(msg: String, throwable: Option[Throwable]) extends Throwable diff --git a/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkConfigTest.scala b/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkConfigTest.scala index dcd0051fd9..abd3118a24 100644 --- a/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkConfigTest.scala +++ b/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkConfigTest.scala @@ -65,6 +65,7 @@ class HttpSinkConfigTest extends AnyFunSuiteLike with Matchers with EitherValues ), ) httpSinkConfig.timeout should be(TimeoutConfig(HttpSinkConfigDef.ConnectionTimeoutMsDefault)) + httpSinkConfig.tidyJson should be(false) httpSinkConfig.errorReportingController == null shouldBe false httpSinkConfig.successReportingController == null shouldBe false } @@ -90,6 +91,7 @@ class HttpSinkConfigTest extends AnyFunSuiteLike with Matchers with EitherValues HttpSinkConfigDef.BasicAuthenticationPasswordProp -> "pass", ERROR_REPORTING_ENABLED_PROP -> "false", SUCCESS_REPORTING_ENABLED_PROP -> "false", + HttpSinkConfigDef.JsonTidyProp -> "true", ), ).value @@ -114,6 +116,7 @@ class HttpSinkConfigTest extends AnyFunSuiteLike with Matchers with EitherValues ), ) httpSinkConfig.timeout should be(TimeoutConfig(HttpSinkConfigDef.ConnectionTimeoutMsDefault)) + httpSinkConfig.tidyJson should be(true) httpSinkConfig.errorReportingController == null shouldBe false httpSinkConfig.successReportingController == null shouldBe false diff --git a/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/HttpWriterTest.scala b/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/HttpWriterTest.scala index 46ccedce0a..59870c5de2 100644 --- a/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/HttpWriterTest.scala +++ b/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/HttpWriterTest.scala @@ -30,6 +30,7 @@ import io.lenses.streamreactor.connect.http.sink.tpl.TemplateType import io.lenses.streamreactor.connect.reporting.ReportingController.ErrorReportingController import io.lenses.streamreactor.connect.reporting.ReportingController.SuccessReportingController import org.mockito.ArgumentMatchers.any +import org.mockito.ArgumentMatchersSugar.eqTo import org.mockito.MockitoSugar import org.scalatest.funsuite.AsyncFunSuiteLike import org.scalatest.matchers.should.Matchers @@ -58,6 +59,7 @@ class HttpWriterTest extends AsyncIOSpec with AsyncFunSuiteLike with Matchers wi recordsQueueRef, commitContextRef, 5, + false, mock[ErrorReportingController], mock[SuccessReportingController], ) @@ -80,7 +82,10 @@ class HttpWriterTest extends AsyncIOSpec with AsyncFunSuiteLike with Matchers wi when(senderMock.sendHttpRequest(any[ProcessedTemplate])).thenReturn(IO.unit) val templateMock = mock[TemplateType] - when(templateMock.process(any[Seq[RenderedRecord]])).thenReturn(Right(ProcessedTemplate("a", "b", Seq.empty))) + when(templateMock.process(any[Seq[RenderedRecord]], eqTo(false))).thenReturn(Right(ProcessedTemplate("a", + "b", + Seq.empty, + ))) val recordsToAdd = Seq( RenderedRecord(topicPartition.atOffset(100), TIMESTAMP, "record1", Seq.empty, None), @@ -99,6 +104,7 @@ class HttpWriterTest extends AsyncIOSpec with AsyncFunSuiteLike with Matchers wi recordsQueueRef, commitContextRef, 5, + false, mock[ErrorReportingController], mock[SuccessReportingController], ) @@ -132,6 +138,7 @@ class HttpWriterTest extends AsyncIOSpec with AsyncFunSuiteLike with Matchers wi recordsQueueRef, commitContextRef, 5, + false, mock[ErrorReportingController], mock[SuccessReportingController], ) diff --git a/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/tpl/JsonTidyTest.scala b/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/tpl/JsonTidyTest.scala new file mode 100644 index 0000000000..0c3bf98122 --- /dev/null +++ b/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/tpl/JsonTidyTest.scala @@ -0,0 +1,61 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.http.sink.tpl + +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class JsonTidyTest extends AnyFlatSpec with Matchers { + + "removeTrailingCommas" should "handle empty JSON string" in { + val jsonString = "" + val result = JsonTidy.cleanUp(jsonString) + result shouldEqual jsonString + } + + it should "handle JSON object without trailing commas" in { + val jsonString = """{"key1":"value1","key2":"value2"}""" + val result = JsonTidy.cleanUp(jsonString) + result shouldEqual jsonString + } + + it should "remove trailing commas in JSON object" in { + val jsonString = """{"key1":"value1","key2":"value2",}""" + val expected = """{"key1":"value1","key2":"value2"}""" + val result = JsonTidy.cleanUp(jsonString) + result shouldEqual expected + } + + it should "handle JSON array without trailing commas" in { + val jsonString = """["value1","value2"]""" + val result = JsonTidy.cleanUp(jsonString) + result shouldEqual jsonString + } + + it should "remove trailing commas in JSON array" in { + val jsonString = """["value1","value2",]""" + val expected = """["value1","value2"]""" + val result = JsonTidy.cleanUp(jsonString) + result shouldEqual expected + } + + it should "handle nested JSON structures" in { + val jsonString = """{"key1":["value1","value2",],"key2":{"subkey1":"subvalue1",},}""" + val expected = """{"key1":["value1","value2"],"key2":{"subkey1":"subvalue1"}}""" + val result = JsonTidy.cleanUp(jsonString) + result shouldEqual expected + } +} diff --git a/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/tpl/RawTemplateTest.scala b/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/tpl/RawTemplateTest.scala index 693ce219bd..32d75a2acd 100644 --- a/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/tpl/RawTemplateTest.scala +++ b/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/tpl/RawTemplateTest.scala @@ -42,7 +42,7 @@ class RawTemplateTest extends AnyFunSuite with Matchers with EitherValues with L val rendered = template.renderRecords(Seq(sinkRecord)) - val processedTemplate = template.process(rendered.value) + val processedTemplate = template.process(rendered.value, true) processedTemplate.value.endpoint should be("Endpoint: KeyData") processedTemplate.value.content should be("Content: ValueData") processedTemplate.value.headers should be( @@ -73,7 +73,7 @@ class RawTemplateTest extends AnyFunSuite with Matchers with EitherValues with L val rendered = template.renderRecords(Seq(sinkRecord)) - val processedTemplate = template.process(rendered.value) + val processedTemplate = template.process(rendered.value, true) processedTemplate.value.headers shouldBe headerResults } diff --git a/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/tpl/TemplateTest.scala b/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/tpl/TemplateTest.scala index 726de6e832..5ec1c778ea 100644 --- a/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/tpl/TemplateTest.scala +++ b/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/tpl/TemplateTest.scala @@ -70,7 +70,7 @@ class TemplateTest extends AnyFunSuiteLike with Matchers with EitherValues { val rendered = processedTemplate.renderRecords(Seq(record)) - val processed = processedTemplate.process(rendered.value) + val processed = processedTemplate.process(rendered.value, true) processed.value.endpoint should be("http://myExampleGroup.uk.example.com/10/Abcd1234/myTopic") processed.value.content should be( @@ -116,7 +116,7 @@ class TemplateTest extends AnyFunSuiteLike with Matchers with EitherValues { val rendered = processedTemplate.renderRecords(records) - val processed = processedTemplate.process(rendered.value) + val processed = processedTemplate.process(rendered.value, true) processed.value.endpoint should be("http://myExampleGroup.uk.example.com/10/Abcd1234/myTopic") normalized(processed.value.content) should be( @@ -175,7 +175,7 @@ class TemplateTest extends AnyFunSuiteLike with Matchers with EitherValues { val rendered = processedTemplate.renderRecords(records) - val processed = processedTemplate.process(rendered.value) + val processed = processedTemplate.process(rendered.value, true) processed.value.endpoint should be("http://myExampleGroup.uk.example.com/10/Abcd1234/myTopic") normalized(processed.value.content) should be( diff --git a/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/tpl/renderer/TemplateRendererTest.scala b/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/tpl/renderer/TemplateRendererTest.scala new file mode 100644 index 0000000000..f9ab98e50f --- /dev/null +++ b/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/tpl/renderer/TemplateRendererTest.scala @@ -0,0 +1,56 @@ +/* + * Copyright 2017-2024 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.http.sink.tpl.renderer + +import io.lenses.streamreactor.connect.http.sink.tpl.RawTemplate +import org.apache.kafka.connect.data.Schema +import org.apache.kafka.connect.sink.SinkRecord +import org.scalatest.EitherValues +import org.scalatest.funsuite.AnyFunSuiteLike +import org.scalatest.matchers.should.Matchers + +class TemplateRendererTest extends AnyFunSuiteLike with Matchers with EitherValues { + + test("Should tidy json commas if set") { + + val record1 = new SinkRecord("myTopic", 0, null, null, Schema.STRING_SCHEMA, "\"m1\"", 9) + val record2 = new SinkRecord("myTopic", 0, null, null, Schema.STRING_SCHEMA, "\"m2\"", 10) + val record3 = new SinkRecord("myTopic", 0, null, null, Schema.STRING_SCHEMA, "\"m3\"", 10) + + val records = Seq(record1, record2, record3) + + val processedTemplate = RawTemplate( + endpoint = "http://www.example.com", + content = "{\"data\":[{{#message}}{{value}},{{/message}}]}", + Seq(), + ) + + val rendered = processedTemplate.renderRecords(records) + + val processed = processedTemplate.process(rendered.value, tidyJson = true) + normalized(processed.value.content) should be( + normalized( + """{"data":["m1","m2","m3"]}""".stripMargin, + ), + ) + } + + private def normalized(s: String): String = + s + .replaceAll(">\\s+<", "><") + .replaceAll("(?s)\\s+", " ").trim + +} diff --git a/project/Settings.scala b/project/Settings.scala index 387da35dd5..6c79ad38d3 100644 --- a/project/Settings.scala +++ b/project/Settings.scala @@ -35,8 +35,8 @@ import scala.sys.process.* object Settings extends Dependencies { // keep the SNAPSHOT version numerically higher than the latest release. - val majorVersion = "7.2" - val nextSnapshotVersion = "7.3" + val majorVersion = "8.0" + val nextSnapshotVersion = "8.1" val artifactVersion: String = { val maybeGithubRunId = sys.env.get("github_run_id")