Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DB-12042 JMS Input Source #32

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open

DB-12042 JMS Input Source #32

wants to merge 4 commits into from

Conversation

jpanko1
Copy link
Contributor

@jpanko1 jpanko1 commented Jul 7, 2021

Short Description

Support JMS input for spark structured streaming.

Long Description

This code was added to be able to read data from IBM MQ via JMS.

How to test

Bob had a server set up and generating data that the streaming code could read.

@@ -1,6 +1,6 @@
name := "splice-machine-spark-connector"

val spliceVersion = "3.2.0.2001-SNAPSHOT"
val spliceVersion = "3.1.0.2016"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks suspicious. A downgrade?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was changed to get in sync with the version of the Splice DB in the environment where this would be running.

// Added later separately
ExclusionRule(organization = "com.splicemachine", name = "scala_util"),
ExclusionRule(organization = "javax.ws.rs", name = "javax.ws.rs-api"),
ExclusionRule(organization = "org.apache.kafka", name = "kafka_2.11"),
ExclusionRule(organization = "org.scala-lang.modules", name = "scala-parser-combinators_2.11")
)

val excludedDeps = excludedDepsNonSpark ++ Seq(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The following is going to be a bit faster (prepend to a head) and shorter code-wise?

val excludedDeps =
    ExclusionRule(organization = "org.apache.spark") +: excludedDepsNonSpark

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated in commit b79cb27

Copy link
Contributor

@jaceklaskowski jaceklaskowski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some more comments. In general, the code looks very old and could benefit from some polishing here and there.

}
}
import org.apache.spark.unsafe.types.UTF8String._
val internalRDD = messageList.map(message => InternalRow(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code looks very old(ish). InternalRow conversion is not needed (as it's in in-process memory anyway). Just convert JmsMessage to whatever tuple you want and simply Seq(...).toDF(...).


val query = stream.writeStream
.outputMode("append")
.format("console")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

memory format would help you with automated testing.

*/
class JmsSourceRdd(sc:SparkContext) extends RDD[Message](sc, Nil){

override def compute(split: Partition, context: TaskContext): Iterator[Message] = ???
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this class ever used given these ????

/**
* Created by exa00015 on 26/12/18.
*/
case class JmsSourceOffset(val id:Long) extends Offset {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a LongOffset in Spark Structured Streaming already.



override def schema: StructType = {
ScalaReflection.schemaFor[JmsMessage].dataType.asInstanceOf[StructType]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What a trick! I think Encoders.product[JmsMessage].schema could work. If so, use it below to create a DataFrame out of JmsMessages.

*/
class JmsDatasourceRelation(override val sqlContext: SQLContext, parameters: Map[String, String]) extends BaseRelation with TableScan with Serializable {

lazy val RECIEVER_TIMEOUT = parameters.getOrElse("reciever.timeout","3000").toLong
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A typo in reciever

case "amq" => new AMQConnectionFactoryProvider().createConnection(parameters)
case "ibmmq" => new IBMMQConnectionFactoryProvider().createConnection(parameters)
case "rmq" => new RMQConnectionFactoryProvider().createConnection(parameters)
case "kafka" => new KafkaConnectionFactoryProvider().createConnection(parameters)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need kafka as it's among the built-in data sources.

@martinrupp martinrupp removed their request for review November 15, 2021 15:02
@martinrupp
Copy link
Member

see you!

@arnaud-lacurie arnaud-lacurie removed their request for review January 19, 2022 09:54
@dgomezferro dgomezferro removed their request for review March 30, 2022 18:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

3 participants