diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/templates/RawTemplate.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/templates/RawTemplate.scala index 17b6173266..b0d191a49a 100644 --- a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/templates/RawTemplate.scala +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/templates/RawTemplate.scala @@ -31,8 +31,12 @@ case class RawTemplate( val endpointTemplate = mf.compile(new StringReader(endpoint), "endpoint") val contentTemplate = mf.compile(new StringReader(content), "content") - //val headerTemplates = template.headers.map() - // TODO - Template(endpointTemplate, contentTemplate) + val headerTemplates = headers.zipWithIndex.map { + case ((tplKey, tplVal), idx) => + mf.compile(new StringReader(tplKey), s"headerKey_$idx") -> mf.compile(new StringReader(tplVal), + s"headerVal_$idx", + ) + } + Template(endpointTemplate, contentTemplate, headerTemplates) } } diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/templates/Template.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/templates/Template.scala index f9091f7eb7..98f003beaa 100644 --- a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/templates/Template.scala +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/tpl/templates/Template.scala @@ -23,6 +23,7 @@ import java.io.StringWriter case class Template( endpointTemplate: Mustache, contentTemplate: Mustache, + headerTemplates: Seq[(Mustache, Mustache)], ) { def process( sinkRecord: SinkRecord, @@ -30,7 +31,10 @@ case class Template( ProcessedTemplate( executeTemplate(endpointTemplate, sinkRecord), executeTemplate(contentTemplate, sinkRecord), - Seq(), + headerTemplates.map { + case (keyTpl, valTpl) => + executeTemplate(keyTpl, sinkRecord) -> executeTemplate(valTpl, sinkRecord) + }, ) private def executeTemplate( @@ -38,9 +42,11 @@ case class Template( sinkRecord: SinkRecord, ): String = { val stringWriter = new StringWriter() - template.execute(stringWriter, sinkRecord) - stringWriter.flush() - stringWriter.toString + val newWriter = template.execute(stringWriter, sinkRecord) + // really these writers are the same but checking the writer ref + // that is returned helps with testing. + newWriter.flush() + newWriter.toString } } diff --git a/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/tpl/templates/RawTemplateTest.scala b/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/tpl/templates/RawTemplateTest.scala new file mode 100644 index 0000000000..894d2f13e7 --- /dev/null +++ b/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/tpl/templates/RawTemplateTest.scala @@ -0,0 +1,84 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.http.sink.tpl.templates + +import com.github.mustachejava.Mustache +import org.apache.kafka.connect.sink.SinkRecord +import org.mockito.MockitoSugar.mock +import org.mockito.MockitoSugar.when +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers + +class RawTemplateTest extends AnyFunSuite with Matchers { + + test("RawTemplate.parse should create a Template with Mustache templates") { + val endpoint = "Endpoint: {{key}}" + val content = "Content: {{value}}" + val headers = Seq(("HeaderKey1", "HeaderValue1"), ("HeaderKey2", "HeaderValue2")) + + val rawTemplate = RawTemplate(endpoint, content, headers) + val template = rawTemplate.parse() + + val sinkRecord = mock[SinkRecord] + when(sinkRecord.key()).thenReturn("KeyData") + when(sinkRecord.value()).thenReturn("ValueData") + + val endpointResult = "Endpoint: KeyData" + val contentResult = "Content: ValueData" + + val headerResults = Seq("HeaderKey1" -> "HeaderValue1", "HeaderKey2" -> "HeaderValue2") + + val processedTemplate = template.process(sinkRecord) + + processedTemplate.endpoint shouldBe endpointResult + processedTemplate.content shouldBe contentResult + processedTemplate.headers shouldBe headerResults + } + + test("RawTemplate.parse should create Mustache templates for headers") { + val endpoint = "Endpoint: {{key}}" + val content = "Content: {{value}}" + val headers = Seq(("HeaderKey1", "HeaderValue1"), ("HeaderKey2", "HeaderValue2")) + + val rawTemplate = RawTemplate(endpoint, content, headers) + val template = rawTemplate.parse() + + template.headerTemplates.foreach { + case (keyTemplate, valueTemplate) => + keyTemplate shouldBe a[Mustache] + valueTemplate shouldBe a[Mustache] + } + } + + test("RawTemplate.parse should correctly execute header Mustache templates") { + val endpoint = "Endpoint: {{key}}" + val content = "Content: {{value}}" + val headers = Seq(("HeaderKey1", "HeaderValue1"), ("HeaderKey2", "HeaderValue2")) + + val rawTemplate = RawTemplate(endpoint, content, headers) + val template = rawTemplate.parse() + + val sinkRecord = mock[SinkRecord] + when(sinkRecord.key()).thenReturn("KeyData") + when(sinkRecord.value()).thenReturn("ValueData") + + val headerResults = Seq("HeaderKey1" -> "HeaderValue1", "HeaderKey2" -> "HeaderValue2") + + val processedTemplate = template.process(sinkRecord) + + processedTemplate.headers shouldBe headerResults + } +} diff --git a/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/tpl/templates/SubstitutionTypeTest.scala b/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/tpl/templates/SubstitutionTypeTest.scala new file mode 100644 index 0000000000..cd083716c9 --- /dev/null +++ b/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/tpl/templates/SubstitutionTypeTest.scala @@ -0,0 +1,67 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.http.sink.tpl.templates + +import io.lenses.streamreactor.connect.http.sink.tpl.binding.KafkaConnectHeaderBinding +import io.lenses.streamreactor.connect.http.sink.tpl.binding.KafkaConnectKeyBinding +import io.lenses.streamreactor.connect.http.sink.tpl.binding.KafkaConnectOffsetBinding +import io.lenses.streamreactor.connect.http.sink.tpl.binding.KafkaConnectPartitionBinding +import io.lenses.streamreactor.connect.http.sink.tpl.binding.KafkaConnectTopicBinding +import io.lenses.streamreactor.connect.http.sink.tpl.binding.KafkaConnectValueBinding +import org.apache.kafka.connect.errors.ConnectException +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers + +class SubstitutionTypeTest extends AnyFunSuite with Matchers { + + test("SubstitutionType.Key should create KafkaConnectKeyBinding") { + val keySubstitutionType = SubstitutionType.Key + keySubstitutionType.toBinding(None) shouldBe a[KafkaConnectKeyBinding] + } + + test("SubstitutionType.Value should create KafkaConnectValueBinding") { + val valueSubstitutionType = SubstitutionType.Value + valueSubstitutionType.toBinding(None) shouldBe a[KafkaConnectValueBinding] + } + + test("SubstitutionType.Header should create KafkaConnectHeaderBinding with valid locator") { + val headerSubstitutionType = SubstitutionType.Header + headerSubstitutionType.toBinding(Some("valid_locator")) shouldBe a[KafkaConnectHeaderBinding] + } + + test("SubstitutionType.Header should throw ConnectException with invalid locator") { + val headerSubstitutionType = SubstitutionType.Header + assertThrows[ConnectException] { + headerSubstitutionType.toBinding(None) + } + } + + test("SubstitutionType.Topic should create KafkaConnectTopicBinding") { + val topicSubstitutionType = SubstitutionType.Topic + topicSubstitutionType.toBinding(None) shouldBe a[KafkaConnectTopicBinding] + } + + test("SubstitutionType.Partition should create KafkaConnectPartitionBinding") { + val partitionSubstitutionType = SubstitutionType.Partition + partitionSubstitutionType.toBinding(None) shouldBe a[KafkaConnectPartitionBinding] + } + + test("SubstitutionType.Offset should create KafkaConnectOffsetBinding") { + val offsetSubstitutionType = SubstitutionType.Offset + offsetSubstitutionType.toBinding(None) shouldBe a[KafkaConnectOffsetBinding] + } + +} diff --git a/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/tpl/templates/TemplateTest.scala b/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/tpl/templates/TemplateTest.scala new file mode 100644 index 0000000000..160f36ad9d --- /dev/null +++ b/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/tpl/templates/TemplateTest.scala @@ -0,0 +1,114 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.http.sink.tpl.templates + +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers +import org.mockito.Mockito._ +import org.mockito.ArgumentMatchers._ +import com.github.mustachejava.Mustache +import org.apache.kafka.connect.sink.SinkRecord + +import java.io.StringWriter + +class TemplateTest extends AnyFunSuite with Matchers { + + test("Template.process should execute endpoint and content templates") { + + val endpointResult = TestStringWriter("endpoint_result") + val endpointTemplate = mock(classOf[Mustache]) + when(endpointTemplate.execute(any[StringWriter](), any[Object]())).thenReturn(endpointResult) + + val contentResult = TestStringWriter("content_result") + val contentTemplate = mock(classOf[Mustache]) + when(contentTemplate.execute(any[StringWriter](), any[Object]())).thenReturn(contentResult) + + val headerTemplates = Seq[(Mustache, Mustache)]() + val template = Template(endpointTemplate, contentTemplate, headerTemplates) + + val sinkRecord = mock(classOf[SinkRecord]) + + val processedTemplate = template.process(sinkRecord) + + processedTemplate.endpoint shouldBe endpointResult.toString + processedTemplate.content shouldBe contentResult.toString + processedTemplate.headers shouldBe Seq() + + when(endpointTemplate.execute(any[StringWriter](), any[Object]())).thenReturn(endpointResult) + when(contentTemplate.execute(any[StringWriter](), any[Object]())).thenReturn(contentResult) + + } + + test("Template.executeTemplate should execute the given Mustache template") { + val expectedResult = TestStringWriter("template_result") + val mustacheTemplate = mock(classOf[Mustache]) + when(mustacheTemplate.execute(any[StringWriter](), any[Object]())).thenReturn(expectedResult) + + val emptyHeaderTemplates = Seq[(Mustache, Mustache)]() + val template = Template(mustacheTemplate, mustacheTemplate, emptyHeaderTemplates) + + val sinkRecord = mock(classOf[SinkRecord]) + + val result = template.process(sinkRecord) + + result.endpoint shouldBe expectedResult.toString + result.content shouldBe expectedResult.toString + result.headers shouldBe Seq() + + verify(mustacheTemplate, times(2)).execute(any[StringWriter](), any[Object]()) + } + + test("Template.executeTemplate should execute header Mustache template") { + + val expectedResult = TestStringWriter("template_result") + + val mustacheTemplate = mock(classOf[Mustache]) + when(mustacheTemplate.execute(any[StringWriter](), any[Object]())).thenReturn(expectedResult) + + val header1Key = mock(classOf[Mustache]) + when(header1Key.execute(any[StringWriter](), any[Object]())).thenReturn(TestStringWriter("header1Key")) + + val header1Value = mock(classOf[Mustache]) + when(header1Value.execute(any[StringWriter](), any[Object]())).thenReturn(TestStringWriter("header1Value")) + + val header2Key = mock(classOf[Mustache]) + when(header2Key.execute(any[StringWriter](), any[Object]())).thenReturn(TestStringWriter("header2Key")) + + val header2Value = mock(classOf[Mustache]) + when(header2Value.execute(any[StringWriter](), any[Object]())).thenReturn(TestStringWriter("header2Value")) + + val headerTemplates = Seq( + header1Key -> header1Value, + header2Key -> header2Value, + ) + val template = Template(mustacheTemplate, mustacheTemplate, headerTemplates) + + val sinkRecord = mock(classOf[SinkRecord]) + + val result = template.process(sinkRecord) + + result.headers shouldBe Seq( + "header1Key" -> "header1Value", + "header2Key" -> "header2Value", + ) + + verify(mustacheTemplate, times(2)).execute(any[StringWriter](), any[Object]()) + verify(header1Key).execute(any[StringWriter](), any[Object]()) + verify(header1Value).execute(any[StringWriter](), any[Object]()) + verify(header2Key).execute(any[StringWriter](), any[Object]()) + verify(header2Value).execute(any[StringWriter](), any[Object]()) + } +} diff --git a/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/tpl/templates/TestStringWriter.scala b/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/tpl/templates/TestStringWriter.scala new file mode 100644 index 0000000000..3de1ad844e --- /dev/null +++ b/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/tpl/templates/TestStringWriter.scala @@ -0,0 +1,24 @@ +/* + * Copyright 2017-2023 Lenses.io Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lenses.streamreactor.connect.http.sink.tpl.templates + +import java.io.StringWriter + +object TestStringWriter { + + def apply(init: String): StringWriter = + new StringWriter().append(init) +}