Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scala2.13 + Spark v3.3 #199

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .jvmopts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
-XX:+UseCompressedClassPointers
-XX:ReservedCodeCacheSize=256M
-XX:+TieredCompilation
-XX:+CMSClassUnloadingEnabled
-XX:+UseConcMarkSweepGC
-XX:+HeapDumpOnOutOfMemoryError

12 changes: 7 additions & 5 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ RUN curl -sL https://deb.nodesource.com/setup_0.12 | bash - && \
npm install -g bower

# for Apache Spark demos
ENV APACHE_SPARK_VERSION 3.0.3
ENV APACHE_SPARK_VERSION 3.3.1 \
HADOOP_VERSION=3 \
SCALA_VERSION=2.13

RUN apt-get -y update && \
apt-get -y install software-properties-common
Expand All @@ -46,11 +48,11 @@ RUN echo "===> install Java" && \
update-java-alternatives -s java-8-oracle

RUN cd /tmp && \
wget -q http://apache.claz.org/spark/spark-${APACHE_SPARK_VERSION}/spark-${APACHE_SPARK_VERSION}-bin-hadoop2.7.tgz && \
tar xzf spark-${APACHE_SPARK_VERSION}-bin-hadoop2.7.tgz -C /usr/local && \
rm spark-${APACHE_SPARK_VERSION}-bin-hadoop2.7.tgz
wget -q https://archive.apache.org/dist/spark/spark-${APACHE_SPARK_VERSION}/spark-${APACHE_SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}-scala${SCALA_VERSION}.tgz && \
tar xzf spark-${APACHE_SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}-scala${SCALA_VERSION}.tgz -C /usr/local && \
rm spark-${APACHE_SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}-scala${SCALA_VERSION}.tgz

RUN cd /usr/local && ln -s spark-${APACHE_SPARK_VERSION}-bin-hadoop2.7 spark
RUN cd /usr/local && ln -s spark-${APACHE_SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}-scala${SCALA_VERSION} spark

# R support
RUN apt-get update && \
Expand Down
16 changes: 9 additions & 7 deletions Dockerfile.toree-dev
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,27 @@ FROM jupyter/all-spark-notebook
USER root

# Spark dependencies
ENV APACHE_SPARK_VERSION 3.0.3
ENV APACHE_SPARK_VERSION 3.3.1 \
HADOOP_VERSION=3 \
SCALA_VERSION=2.13

RUN apt-get -y update && \
apt-get install -y --no-install-recommends openjdk-8-jdk ca-certificates-java && \
apt-get install -y --no-install-recommends openjdk-11-jdk ca-certificates-java && \
apt-get clean && \
rm -rf /var/lib/apt/lists/* && \
update-ca-certificates -f && \
update-alternatives --set java /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java
update-alternatives --set java /usr/lib/jvm/java-11-openjdk-amd64/jre/bin/java

# Installing Spark3
RUN cd /tmp && \
wget -q https://archive.apache.org/dist/spark/spark-${APACHE_SPARK_VERSION}/spark-${APACHE_SPARK_VERSION}-bin-hadoop2.7.tgz && \
tar xzf spark-${APACHE_SPARK_VERSION}-bin-hadoop2.7.tgz -C /usr/local && \
rm spark-${APACHE_SPARK_VERSION}-bin-hadoop2.7.tgz
wget -q https://archive.apache.org/dist/spark/spark-${APACHE_SPARK_VERSION}/spark-${APACHE_SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}-scala${SCALA_VERSION}.tgz && \
tar xzf spark-${APACHE_SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}-scala${SCALA_VERSION}.tgz -C /usr/local && \
rm spark-${APACHE_SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}-scala${SCALA_VERSION}.tgz

# Overwrite symlink
RUN cd /usr/local && \
rm spark && \
ln -s spark-${APACHE_SPARK_VERSION}-bin-hadoop2.7 spark
ln -s spark-${APACHE_SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}-scala${SCALA_VERSION} spark

# Remove other scala kernels
RUN cd /opt/conda/share/jupyter/kernels/ && \
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ IS_SNAPSHOT?=true
SNAPSHOT:=-SNAPSHOT
endif

APACHE_SPARK_VERSION?=3.0.3
SCALA_VERSION?=2.12
APACHE_SPARK_VERSION?=3.3.1
SCALA_VERSION?=2.13
IMAGE?=jupyter/all-spark-notebook:latest
EXAMPLE_IMAGE?=apache/toree-examples
TOREE_DEV_IMAGE?=apache/toree-dev
Expand Down
7 changes: 3 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ ThisBuild / version := Properties.envOrElse("VERSION", "0.0.0-dev") +
(if ((ThisBuild / isSnapshot ).value) "-SNAPSHOT" else "")
ThisBuild / isSnapshot := Properties.envOrElse("IS_SNAPSHOT","true").toBoolean
ThisBuild / organization := "org.apache.toree.kernel"
ThisBuild / crossScalaVersions := Seq("2.12.15") // https://github.com/scala/bug/issues/12475, for Spark 3.2.0
ThisBuild / scalaVersion := (ThisBuild / crossScalaVersions ).value.head
ThisBuild / crossScalaVersions := Seq("2.13.10")
ThisBuild / scalaVersion := "2.13.10"
ThisBuild / Dependencies.sparkVersion := {
val envVar = "APACHE_SPARK_VERSION"
val defaultVersion = "3.0.0"
val defaultVersion = "3.3.1"

Properties.envOrNone(envVar) match {
case None =>
Expand Down Expand Up @@ -65,7 +65,6 @@ ThisBuild / javacOptions ++= Seq(
ThisBuild / javaOptions ++= Seq(
"-Xms1024M", "-Xmx4096M", "-Xss2m", "-XX:MaxPermSize=1024M",
"-XX:ReservedCodeCacheSize=256M", "-XX:+TieredCompilation",
"-XX:+CMSClassUnloadingEnabled",
"-XX:+UseConcMarkSweepGC", "-XX:+HeapDumpOnOutOfMemoryError"
)
// Add additional test option to show time taken per test
Expand Down
6 changes: 5 additions & 1 deletion client/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,9 @@ scalacOptions += "-language:reflectiveCalls"
libraryDependencies ++= Seq(
Dependencies.akkaActor,
Dependencies.akkaSlf4j,
Dependencies.akkaTestkit % "test"
Dependencies.scalaTest % "test",
Dependencies.akkaTestkit % "test",
Dependencies.mockitoSugar % "test",
Dependencies.mockitoScala % "test",
Dependencies.mockitoScalaScalaTest % "test"
)
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ object Utilities extends LogLike {
val header = Json.parse(message.frames(delimiterIndex + 2)).as[Header]
val parentHeader = Json.parse(message.frames(delimiterIndex + 3)).validate[ParentHeader].fold[ParentHeader](
// TODO: Investigate better solution than setting parentHeader to null for {}
(invalid: Seq[(JsPath, Seq[JsonValidationError])]) => null, //HeaderBuilder.empty,
(invalid: collection.Seq[(JsPath, collection.Seq[JsonValidationError])]) => null, //HeaderBuilder.empty,
(valid: ParentHeader) => valid
)
val metadata = Json.parse(message.frames(delimiterIndex + 4)).as[Metadata]
Expand All @@ -78,20 +78,20 @@ object Utilities extends LogLike {
}

implicit def KernelMessageToZMQMessage(kernelMessage : KernelMessage) : ZMQMessage = {
val frames: scala.collection.mutable.ListBuffer[ByteString] = scala.collection.mutable.ListBuffer()
kernelMessage.ids.map((id : Array[Byte]) => frames += ByteString.apply(id) )
frames += "<IDS|MSG>"
frames += kernelMessage.signature
frames += Json.toJson(kernelMessage.header).toString()
frames += Json.toJson(kernelMessage.parentHeader).toString()
frames += Json.toJson(kernelMessage.metadata).toString
frames += kernelMessage.contentString
var frames: Seq[ByteString] = Seq()
kernelMessage.ids.map((id : Array[Byte]) => frames = frames :+ ByteString.apply(id) )
frames = frames :+ "<IDS|MSG>"
frames = frames :+ kernelMessage.signature
frames = frames :+ Json.toJson(kernelMessage.header).toString()
frames = frames :+ Json.toJson(kernelMessage.parentHeader).toString()
frames = frames :+ Json.toJson(kernelMessage.metadata).toString
frames = frames :+ kernelMessage.contentString
ZMQMessage(frames : _*)
}

def parseAndHandle[T](json: String, reads: Reads[T], handler: T => Unit) : Unit = {
Json.parse(json).validate[T](reads).fold(
(invalid: Seq[(JsPath, Seq[JsonValidationError])]) =>
(invalid: collection.Seq[(JsPath, collection.Seq[JsonValidationError])]) =>
logger.error(s"Could not parse JSON, ${json}"),
(content: T) => handler(content)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class HeartbeatClient(
case HeartbeatMessage =>
import scala.concurrent.ExecutionContext.Implicits.global
val id = java.util.UUID.randomUUID().toString
futureMap += (id -> sender)
futureMap += (id -> sender())
logger.info(s"Heartbeat client send: $id")
val future = socket ? ZMQMessage(ByteString(id.getBytes))
future.onComplete {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class IOPubClient(
} else {
logger.warn("Received message with null parent header.")
logger.debug(s"Kernel message is: $kernelMessage")
sender.forward(Failure(new RuntimeException(PARENT_HEADER_NULL_MESSAGE)))
sender().forward(Failure(new RuntimeException(PARENT_HEADER_NULL_MESSAGE)))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.toree.kernel.protocol.v5.SocketType
import org.apache.toree.kernel.protocol.v5.socket._
import org.apache.toree.kernel.protocol.v5.socket.SocketConfig
import com.typesafe.config.ConfigFactory
import org.mockito.Matchers._
import org.mockito.ArgumentMatchers._
import org.mockito.Mockito._
import org.scalatestplus.mockito.MockitoSugar
import org.scalatest.{FunSpecLike, Matchers}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import akka.testkit.{ImplicitSender, TestKit, TestProbe}
import org.apache.toree.kernel.protocol.v5._
import org.apache.toree.kernel.protocol.v5.content.ExecuteResult
import org.apache.toree.kernel.protocol.v5.socket._
import org.mockito.Matchers._
import org.mockito.ArgumentMatchers._
import org.mockito.Mockito._
import org.scalatestplus.mockito.MockitoSugar
import org.scalatest.{FunSpecLike, Matchers}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import akka.zeromq._
import org.apache.toree.kernel.protocol.v5._
import org.apache.toree.kernel.protocol.v5.content.ExecuteRequest
import org.apache.toree.kernel.protocol.v5.socket._
import org.mockito.Matchers._
import org.mockito.ArgumentMatchers._
import org.mockito.Mockito._
import org.scalatestplus.mockito.MockitoSugar
import org.scalatest.{FunSpecLike, Matchers}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ import org.apache.toree.kernel.protocol.v5.client.ActorLoader
import org.apache.toree.kernel.protocol.v5.content.CommContent
import org.scalatestplus.mockito.MockitoSugar
import org.mockito.Mockito._
import org.mockito.Matchers._
import org.scalatest.{BeforeAndAfter, FunSpec, Matchers}
import org.mockito.ArgumentMatchers._
import org.scalatest.BeforeAndAfterEach
import org.scalatest.funspec.AnyFunSpec
import org.scalatest.matchers.should.Matchers

class ClientCommManagerSpec extends FunSpec with Matchers with BeforeAndAfter
class ClientCommManagerSpec extends AnyFunSpec with Matchers with BeforeAndAfterEach
with MockitoSugar
{
private val TestTargetName = "some target"
Expand All @@ -38,7 +40,7 @@ class ClientCommManagerSpec extends FunSpec with Matchers with BeforeAndAfter

private var generatedCommWriter: CommWriter = _

before {
override def beforeEach(): Unit = {
mockActorLoader = mock[ActorLoader]
mockKMBuilder = mock[KMBuilder]
mockCommRegistrar = mock[CommRegistrar]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ import org.apache.toree.kernel.protocol.v5._
import org.apache.toree.kernel.protocol.v5.client.ActorLoader
import org.apache.toree.kernel.protocol.v5.content._
import com.typesafe.config.ConfigFactory
import org.mockito.Matchers._
import org.mockito.ArgumentMatchers._
import org.mockito.Mockito._
import org.scalatestplus.mockito.MockitoSugar
import org.scalatest.{BeforeAndAfter, FunSpecLike, Matchers}
import org.scalatest.BeforeAndAfterEach
import org.scalatest.funspec.AnyFunSpecLike
import org.scalatest.matchers.should.Matchers
import play.api.libs.json.Json

import scala.concurrent.duration._
Expand All @@ -43,7 +45,7 @@ object ClientCommWriterSpec {
class ClientCommWriterSpec extends TestKit(
ActorSystem("ClientCommWriterSpec",
ConfigFactory.parseString(ClientCommWriterSpec.config))
) with FunSpecLike with Matchers with BeforeAndAfter with MockitoSugar
) with AnyFunSpecLike with Matchers with BeforeAndAfterEach with MockitoSugar
{

private val commId = UUID.randomUUID().toString
Expand Down Expand Up @@ -84,7 +86,7 @@ class ClientCommWriterSpec extends TestKit(
Json.parse(receivedMessage.contentString).as[T]
}

before {
override def beforeEach(): Unit = {
kernelMessageBuilder = spy(KMBuilder())

// Construct path for kernel message relay
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,16 @@ import org.apache.toree.kernel.protocol.v5._
import org.apache.toree.kernel.protocol.v5.client.execution.ExecuteRequestTuple
import scala.concurrent.duration._
import org.mockito.Mockito._
import org.mockito.Matchers.{eq => mockEq, _}
import org.mockito.ArgumentMatchers.{eq => mockEq, _}
import org.scalatest.BeforeAndAfterEach
import org.scalatestplus.mockito.MockitoSugar
import org.scalatest.{BeforeAndAfter, FunSpecLike, Matchers}
import org.scalatest.funspec.AnyFunSpecLike
import org.scalatest.matchers.should.Matchers
import org.scalatest.BeforeAndAfter

class SparkKernelClientSpec
extends TestKit(ActorSystem("SparkKernelClientActorSystem"))
with Matchers with MockitoSugar with FunSpecLike with BeforeAndAfter
with Matchers with MockitoSugar with AnyFunSpecLike with BeforeAndAfterEach
{
private val TestTargetName = "some target"

Expand All @@ -41,7 +44,7 @@ class SparkKernelClientSpec
private var executeRequestProbe: TestProbe = _
private var shellClientProbe: TestProbe = _

before {
override def beforeEach(): Unit = {
mockActorLoader = mock[ActorLoader]
mockCommRegistrar = mock[CommRegistrar]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ package org.apache.toree.kernel.protocol.v5.client.execution
import org.apache.toree.kernel.protocol.v5.content.{StreamContent, ExecuteResult}
import org.apache.toree.kernel.protocol.v5.content._
import org.apache.toree.kernel.protocol.v5._
import org.scalatest.{Matchers, FunSpec}
import org.scalatest.funspec.AnyFunSpec
import org.scalatest.matchers.should.Matchers
import org.scalatest.concurrent.ScalaFutures

import scala.concurrent.Promise
Expand Down Expand Up @@ -64,7 +65,7 @@ object DeferredExecutionTest {
class ExecuteResultPromise {}
class ExecuteReplyPromise {}

class DeferredExecutionTest extends FunSpec with ScalaFutures with Matchers {
class DeferredExecutionTest extends AnyFunSpec with ScalaFutures with Matchers {
import DeferredExecutionTest._
describe("DeferredExecution") {
describe("onResult( callback )"){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ import akka.testkit.{TestProbe, ImplicitSender, TestKit}
import org.apache.toree.communication.ZMQMessage
import org.apache.toree.kernel.protocol.v5.client.ActorLoader
import org.scalatestplus.mockito.MockitoSugar
import org.scalatest.{Matchers, FunSpecLike}
import org.mockito.Matchers._
import org.scalatest.funspec.AnyFunSpecLike
import org.scalatest.matchers.should.Matchers
import org.mockito.ArgumentMatchers._
import org.mockito.Mockito._

class HeartbeatClientSpec extends TestKit(ActorSystem("HeartbeatActorSpec"))
with ImplicitSender with FunSpecLike with Matchers with MockitoSugar {
with ImplicitSender with AnyFunSpecLike with Matchers with MockitoSugar {

describe("HeartbeatClientActor") {
val socketFactory = mock[SocketFactory]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@ import org.apache.toree.kernel.protocol.v5.client.execution.{DeferredExecution,
import org.apache.toree.kernel.protocol.v5.client.{ActorLoader, Utilities}
import org.apache.toree.kernel.protocol.v5.content.{CommClose, CommMsg, CommOpen, StreamContent}
import com.typesafe.config.ConfigFactory
import org.mockito.Matchers.{eq => mockEq, _}
import org.mockito.ArgumentMatchers.{eq => mockEq, _}
import org.mockito.Mockito._
import org.scalatest.concurrent.{Eventually, ScalaFutures}
import org.scalatestplus.mockito.MockitoSugar
import org.scalatest.time.{Milliseconds, Span}
import org.scalatest.{BeforeAndAfter, FunSpecLike, Matchers}
import org.scalatest.funspec.AnyFunSpecLike
import org.scalatest.matchers.should.Matchers
import org.scalatest.BeforeAndAfterEach
import play.api.libs.json.Json

import scala.concurrent.duration._
Expand All @@ -53,8 +55,8 @@ object IOPubClientSpec {

class IOPubClientSpec extends TestKit(ActorSystem(
"IOPubClientSpecSystem", ConfigFactory.parseString(IOPubClientSpec.config)
)) with ImplicitSender with FunSpecLike with Matchers with MockitoSugar
with ScalaFutures with BeforeAndAfter with Eventually
)) with ImplicitSender with AnyFunSpecLike with Matchers with MockitoSugar
with ScalaFutures with BeforeAndAfterEach with Eventually
{
private val TestTimeout = Timeout(10.seconds)
implicit override val patienceConfig = PatienceConfig(
Expand All @@ -77,7 +79,7 @@ class IOPubClientSpec extends TestKit(ActorSystem(
private val TestTargetName = "some target"
private val TestCommId = UUID.randomUUID().toString

before {
override def beforeEach(): Unit = {
kmBuilder = KMBuilder()
mockCommCallbacks = mock[CommCallbacks]
mockCommRegistrar = mock[CommRegistrar]
Expand All @@ -89,7 +91,7 @@ class IOPubClientSpec extends TestKit(ActorSystem(
mockClientSocketFactory = mock[SocketFactory]

// Stub the return value for the socket factory
when(mockClientSocketFactory.IOPubClient(anyObject(), any[ActorRef]))
when(mockClientSocketFactory.IOPubClient(any, any[ActorRef]))
.thenReturn(clientSocketProbe.ref)

// Construct the object we will test against
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@ import org.apache.toree.kernel.protocol.v5._
import org.apache.toree.kernel.protocol.v5.client.ActorLoader
import org.apache.toree.kernel.protocol.v5.content.ExecuteRequest
import org.scalatestplus.mockito.MockitoSugar
import org.scalatest.{Matchers, FunSpecLike}
import org.scalatest.funspec.AnyFunSpecLike
import org.scalatest.matchers.should.Matchers
import org.mockito.Mockito._
import org.mockito.Matchers._
import org.mockito.ArgumentMatchers._
import play.api.libs.json.Json

class ShellClientSpec extends TestKit(ActorSystem("ShellActorSpec"))
with ImplicitSender with FunSpecLike with Matchers with MockitoSugar {
with ImplicitSender with AnyFunSpecLike with Matchers with MockitoSugar {
private val SignatureEnabled = true

describe("ShellClientActor") {
Expand Down
Loading