Skip to content

Commit

Permalink
62 - JMS Headers Fix + Unit Testing
Browse files Browse the repository at this point in the history
  • Loading branch information
davidsloan committed Feb 14, 2024
1 parent e4e11e6 commit 075c497
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.kafka.connect.sink.SinkRecord
import javax.jms.Message
import javax.jms.Session
import scala.jdk.CollectionConverters.IterableHasAsScala
import scala.util.Try

class JMSHeadersConverterWrapper(headers: Map[String, String], delegate: JMSSinkMessageConverter)
extends JMSSinkMessageConverter {
Expand All @@ -29,7 +30,7 @@ class JMSHeadersConverterWrapper(headers: Map[String, String], delegate: JMSSink
val response = delegate.convert(record, session, setting)
val message = response._2
for (header <- record.headers().asScala) {
message.setStringProperty(header.key(), header.value().toString)
message.setStringProperty(header.key(), Try(header.value().toString).toOption.orNull)
}
message.setStringProperty("JMSXGroupID", record.kafkaPartition().toString)
for ((key, value) <- headers) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ object JMSStructMessage {
.put("message_timestamp", Option(message.getJMSTimestamp).getOrElse(null))
.put("correlation_id", Option(message.getJMSCorrelationID).getOrElse(null))
.put("redelivered", Option(message.getJMSRedelivered).getOrElse(null))
.put("reply_to", Option(message.getJMSReplyTo).getOrElse(null))
.put("destination", Option(message.getJMSDestination.toString).getOrElse(null))
.put("reply_to", Option(message.getJMSReplyTo).map(_.toString).orNull)
.put("destination", Option(message.getJMSDestination).map(_.toString).orNull)
.put("message_id", Option(message.getJMSMessageID).getOrElse(null))
.put("mode", Option(message.getJMSDeliveryMode).getOrElse(null))
.put("type", Option(message.getJMSType).getOrElse(null))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright 2017-2024 Lenses.io Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.connect.jms.sink.converters

import io.lenses.kcql.FormatType
import io.lenses.streamreactor.connect.jms.config.JMSSetting
import io.lenses.streamreactor.connect.jms.config.SinkConverterConfigWrapper
import io.lenses.streamreactor.connect.jms.config.StorageOptions
import io.lenses.streamreactor.connect.jms.config.TopicDestination
import org.apache.activemq.command.ActiveMQObjectMessage
import org.apache.kafka.connect.data.Schema
import org.apache.kafka.connect.data.SchemaAndValue
import org.apache.kafka.connect.header.ConnectHeaders
import org.apache.kafka.connect.header.Header
import org.apache.kafka.connect.sink.SinkRecord
import org.mockito.ArgumentMatchers.any
import org.mockito.MockitoSugar.mock
import org.mockito.MockitoSugar.when
import org.scalatest.funsuite.AnyFunSuiteLike
import java.lang.{ Iterable => JavaIterable }
import javax.jms.Session
import scala.jdk.CollectionConverters.IterableHasAsJava
import scala.jdk.CollectionConverters.IteratorHasAsScala

class JMSHeadersConverterWrapperTest extends AnyFunSuiteLike {

test("should handle missing values in headers") {

val jmsSession: Session = mockSession
val setting: JMSSetting = createSettings
val headers: JavaIterable[Header] = createHeaders
val sinkRecord = new SinkRecord(
"myTopic",
1,
null,
null,
Schema.STRING_SCHEMA,
"myValue",
1,
null,
null,
headers,
)

JMSHeadersConverterWrapper(Map.empty, new ObjectMessageConverter())
.convert(sinkRecord, jmsSession, setting)

}

private def createSettings = {
val setting: JMSSetting = JMSSetting(
source = "mySource",
target = "myTarget",
fields = Map(),
ignoreField = Set(),
destinationType = TopicDestination,
format = FormatType.JSON,
storageOptions = StorageOptions("JSON", Map.empty),
converter = SinkConverterConfigWrapper(new TextMessageConverter {}),
messageSelector = Option.empty,
subscriptionName = Option.empty,
headers = Map.empty,
)
setting
}

private def createHeaders = {
val headers = new ConnectHeaders()
headers.add("JMSReplyTo", null)
headers.add("Destination", new SchemaAndValue(Schema.STRING_SCHEMA, "topic://myFunTopicName"))

headers.iterator().asScala.iterator.to(Iterable).asJava
}

private def mockSession = {
val objectMessage = new ActiveMQObjectMessage

val jmsSession = mock[Session]
when(jmsSession.createObjectMessage()).thenReturn(objectMessage)
when(jmsSession.createObjectMessage(any[Serializable])).thenReturn(objectMessage)
jmsSession
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2017-2024 Lenses.io Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lenses.streamreactor.connect.jms.source.domain

import org.apache.activemq.command.ActiveMQTextMessage
import org.apache.kafka.connect.data.Struct
import org.scalatest.funsuite.AnyFunSuiteLike
import org.scalatest.matchers.should.Matchers

import javax.jms.Topic

class JMSStructMessageTest extends AnyFunSuiteLike with Matchers {

private val TestTopicName = "MyFunkyTopicName"

test("should handle 'reply to' and 'destination' in headers") {

val target = "myTarget"

val jmsDestination = new Topic {
override def getTopicName: String = TestTopicName
}

val message = new ActiveMQTextMessage()
message.setJMSReplyTo(jmsDestination)
message.setJMSDestination(jmsDestination)
message.setText("Some very important text")

val sourceRecord = JMSStructMessage.getStruct(target, message)

sourceRecord.value() match {
case connectStruct: Struct =>
connectStruct.get("reply_to") should be(s"topic://$TestTopicName")
connectStruct.get("destination") should be(s"topic://$TestTopicName")
}
}
}

0 comments on commit 075c497

Please sign in to comment.