From b717279e5da4082222b8a3a6591b9f4f69d4eb58 Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Fri, 11 Aug 2023 00:43:32 +0800 Subject: [PATCH] [TOREE-544] Remove Scala 2.11 code (#210) --- build.sbt | 5 +- .../dependencies/ClassLoaderHelper.scala | 29 -- .../scala/ScalaInterpreterSpecific.scala | 456 ----------------- .../scala/ScalaInterpreterSpec.scala | 479 ------------------ 4 files changed, 2 insertions(+), 967 deletions(-) delete mode 100644 plugins/src/test/scala-2.11/org/apache/toree/plugins/dependencies/ClassLoaderHelper.scala delete mode 100644 scala-interpreter/src/main/scala-2.11/org/apache/toree/kernel/interpreter/scala/ScalaInterpreterSpecific.scala delete mode 100644 scala-interpreter/src/test/scala-2.11/scala/ScalaInterpreterSpec.scala diff --git a/build.sbt b/build.sbt index 37066dec9..384557fd9 100644 --- a/build.sbt +++ b/build.sbt @@ -44,9 +44,8 @@ ThisBuild / scalacOptions ++= Seq( "-unchecked", "-feature", "-Xfatal-warnings", - "-language:reflectiveCalls" -// "-target:jvm-1.6", -// "-Xlint" // Scala 2.11.x only + "-language:reflectiveCalls", + "-target:jvm-1.8" ) // Java-based options for compilation (all tasks) // NOTE: Providing a blank flag causes failures, only uncomment with options diff --git a/plugins/src/test/scala-2.11/org/apache/toree/plugins/dependencies/ClassLoaderHelper.scala b/plugins/src/test/scala-2.11/org/apache/toree/plugins/dependencies/ClassLoaderHelper.scala deleted file mode 100644 index 20e185659..000000000 --- a/plugins/src/test/scala-2.11/org/apache/toree/plugins/dependencies/ClassLoaderHelper.scala +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License - */ -package org.apache.toree.plugins.dependencies - -import java.net.URL -import java.lang.{ClassLoader => JClassLoader} - -import scala.reflect.internal.util.ScalaClassLoader - -object ClassLoaderHelper { - - def URLClassLoader(urls: Seq[URL], parent: JClassLoader): ScalaClassLoader.URLClassLoader = { - new scala.reflect.internal.util.ScalaClassLoader.URLClassLoader(urls, parent) - } -} diff --git a/scala-interpreter/src/main/scala-2.11/org/apache/toree/kernel/interpreter/scala/ScalaInterpreterSpecific.scala b/scala-interpreter/src/main/scala-2.11/org/apache/toree/kernel/interpreter/scala/ScalaInterpreterSpecific.scala deleted file mode 100644 index c941935b3..000000000 --- a/scala-interpreter/src/main/scala-2.11/org/apache/toree/kernel/interpreter/scala/ScalaInterpreterSpecific.scala +++ /dev/null @@ -1,456 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License - */ - -package org.apache.toree.kernel.interpreter.scala - -import java.io._ -import java.net.URL - -import org.apache.toree.global.StreamState -import org.apache.toree.interpreter.imports.printers.{WrapperConsole, WrapperSystem} -import org.apache.toree.interpreter.{ExecuteError, Interpreter} -import scala.tools.nsc.interpreter._ -import scala.concurrent.Future -import scala.tools.nsc.{Global, Settings, util} -import scala.util.Try - -trait ScalaInterpreterSpecific extends SettingsProducerLike { this: ScalaInterpreter => - private val ExecutionExceptionName = "lastException" - - private[toree] var iMain: IMain = _ - private var completer: PresentationCompilerCompleter = _ - private val exceptionHack = new ExceptionHack() - - def _runtimeClassloader = { - _thisClassloader - } - - protected def newIMain(settings: Settings, out: JPrintWriter): IMain = { - val s = new IMain(settings, out) - s.initializeSynchronous() - s - } - - protected def convertAnnotationsToModifiers( - annotationInfos: List[Global#AnnotationInfo] - ) = annotationInfos map { - case a if a.toString == "transient" => "@transient" - case a => - logger.debug(s"Ignoring unknown annotation: $a") - "" - } filterNot { - _.isEmpty - } - - protected def convertScopeToModifiers(scopeSymbol: Global#Symbol) = { - (if (scopeSymbol.isImplicit) "implicit" else "") :: - Nil - } - - protected def buildModifierList(termNameString: String) = { - import scala.language.existentials - val termSymbol = iMain.symbolOfTerm(termNameString) - - - convertAnnotationsToModifiers( - if (termSymbol.hasAccessorFlag) termSymbol.accessed.annotations - else termSymbol.annotations - ) ++ convertScopeToModifiers(termSymbol) - } - - - protected def refreshDefinitions(): Unit = { - iMain.definedTerms.foreach(termName => { - val termNameString = termName.toString - val termTypeString = iMain.typeOfTerm(termNameString).toLongString - iMain.valueOfTerm(termNameString) match { - case Some(termValue) => - val modifiers = buildModifierList(termNameString) - logger.debug(s"Rebinding of $termNameString as " + - s"${modifiers.mkString(" ")} $termTypeString") - Try(iMain.beSilentDuring { - iMain.bind( - termNameString, termTypeString, termValue, modifiers - ) - }) - case None => - logger.debug(s"Ignoring rebinding of $termNameString") - } - }) - } - - protected def reinitializeSymbols(): Unit = { - val global = iMain.global - import global._ - new Run // Initializes something needed for Scala classes - } - - /** - * Adds jars to the runtime and compile time classpaths. Does not work with - * directories or expanding star in a path. - * @param jars The list of jar locations - */ - override def addJars(jars: URL*): Unit = { - iMain.addUrlsToClassPath(jars:_*) - // the Scala interpreter will invalidate definitions for any package defined in - // the new Jars. This can easily include org.* and make the kernel inaccessible - // because it is bound using the previous package definition. To avoid problems, - // it is necessary to refresh variable definitions to use the new packages and - // to rebind the global definitions. - refreshDefinitions() - bindVariables() - } - - /** - * Binds a variable in the interpreter to a value. - * @param variableName The name to expose the value in the interpreter - * @param typeName The type of the variable, must be the fully qualified class name - * @param value The value of the variable binding - * @param modifiers Any annotation, scoping modifiers, etc on the variable - */ - override def bind( - variableName: String, - typeName: String, - value: Any, - modifiers: List[String] - ): Unit = { - logger.warn(s"Binding $modifiers $variableName $typeName $value") - require(iMain != null) - val sIMain = iMain - - val bindRep = new sIMain.ReadEvalPrint() - iMain.interpret(s"import $typeName") - bindRep.compile(""" - |object %s { - | var value: %s = _ - | def set(x: Any) = value = x.asInstanceOf[%s] - |} - """.stripMargin.format(bindRep.evalName, typeName, typeName) - ) - bindRep.callEither("set", value) match { - case Left(ex) => - logger.error("Set failed in bind(%s, %s, %s)".format(variableName, typeName, value)) - logger.error(util.stackTraceString(ex)) - IR.Error - - case Right(_) => - val line = "%sval %s = %s.value".format(modifiers map (_ + " ") mkString, variableName, bindRep.evalPath) - logger.debug("Interpreting: " + line) - iMain.interpret(line) - } - - } - - - /** - * Executes body and will not print anything to the console during the execution - * @param body The function to execute - * @tparam T The return type of body - * @return The return value of body - */ - override def doQuietly[T](body: => T): T = { - require(iMain != null) - iMain.beQuietDuring[T](body) - } - - - /** - * Stops the interpreter, removing any previous internal state. - * @return A reference to the interpreter - */ - override def stop(): Interpreter = { - logger.info("Shutting down interpreter") - - // Shut down the task manager (kills current execution - if (taskManager != null) taskManager.stop() - taskManager = null - - // Erase our completer - completer = null - - // Close the entire interpreter (loses all state) - if (iMain != null) iMain.close() - iMain = null - - this - } - - /** - * Returns the name of the variable created from the last execution. - * @return Some String name if a variable was created, otherwise None - */ - override def lastExecutionVariableName: Option[String] = { - require(iMain != null) - - // TODO: Get this API method changed back to public in Apache Spark - val lastRequestMethod = classOf[IMain].getDeclaredMethod("lastRequest") - lastRequestMethod.setAccessible(true) - - val mostRecentVariableName = iMain.mostRecentVar - - iMain.allDefinedNames.map(_.toString).find(_ == mostRecentVariableName) - } - - /** - * Mask the Console and System objects with our wrapper implementations - * and dump the Console methods into the public namespace (similar to - * the Predef approach). - * @param in The new input stream - * @param out The new output stream - * @param err The new error stream - */ - override def updatePrintStreams( - in: InputStream, - out: OutputStream, - err: OutputStream - ): Unit = { - val inReader = new BufferedReader(new InputStreamReader(in)) - val outPrinter = new PrintStream(out) - val errPrinter = new PrintStream(err) - - iMain.beQuietDuring { - iMain.bind( - "Console", classOf[WrapperConsole].getName, - new WrapperConsole(inReader, outPrinter, errPrinter), - List("""@transient""") - ) - iMain.bind( - "System", classOf[WrapperSystem].getName, - new WrapperSystem(in, out, err), - List("""@transient""") - ) - iMain.interpret("import Console._") - } - } - - /** - * Retrieves the contents of the variable with the provided name from the - * interpreter. - * @param variableName The name of the variable whose contents to read - * @return An option containing the variable contents or None if the - * variable does not exist - */ - override def read(variableName: String): Option[AnyRef] = { - require(iMain != null) - - try { - iMain.eval(variableName) match { - case null => None - case str: String if str.isEmpty => None - case res => Some(res) - } - } catch { - // if any error returns None - case e: Throwable => { - logger.debug(s"Error reading variable name: ${variableName}", e) - clearLastException() - None - } - } - } - - /** - * Starts the interpreter, initializing any internal state. - * @return A reference to the interpreter - */ - override def start(): Interpreter = { - require(iMain == null && taskManager == null) - - taskManager = newTaskManager() - - logger.debug("Initializing task manager") - taskManager.start() - - iMain = newIMain(settings, new JPrintWriter(lastResultOut, true)) - - //logger.debug("Initializing interpreter") - //iMain.initializeSynchronous() - - logger.debug("Initializing completer") - completer = new PresentationCompilerCompleter(iMain) - - iMain.beQuietDuring { - //logger.info("Rerouting Console and System related input and output") - //updatePrintStreams(System.in, multiOutputStream, multiOutputStream) - - // ADD IMPORTS generates too many classes, client is responsible for adding import - logger.debug("Adding org.apache.spark.SparkContext._ to imports") - iMain.interpret("import org.apache.spark.SparkContext._") - - logger.debug("Adding the hack for the exception handling retrieval.") - iMain.bind("_exceptionHack", classOf[ExceptionHack].getName, exceptionHack, List("@transient")) - } - - this - } - - /** - * Attempts to perform code completion via the command. - * @param code The current cell to complete - * @param pos The cursor position - * @return The cursor position and list of possible completions - */ - override def completion(code: String, pos: Int): (Int, List[String]) = { - - require(completer != null) - - logger.debug(s"Attempting code completion for ${code}") - val result = completer.complete(code, pos) - - (result.cursor, result.candidates) - } - - /** - * Attempts to perform completeness checking for a statement by seeing if we can parse it - * using the scala parser. - * - * @param code The current cell to complete - * @return tuple of (completeStatus, indent) - */ - override def isComplete(code: String): (String, String) = { - val result = iMain.beSilentDuring { - val parse = iMain.parse - parse(code) match { - case t: parse.Error => ("invalid", "") - case t: parse.Success => - val lines = code.split("\n", -1) - val numLines = lines.length - // for multiline code blocks, require an empty line before executing - // to mimic the behavior of ipython - if (numLines > 1 && lines.last.matches("\\s*\\S.*")) { - ("incomplete", startingWhiteSpace(lines.last)) - } else { - ("complete", "") - } - case t: parse.Incomplete => - val lines = code.split("\n", -1) - // For now lets just grab the indent of the current line, if none default to 2 spaces. - ("incomplete", startingWhiteSpace(lines.last)) - } - } - lastResultOut.reset() - result - } - - private def startingWhiteSpace(line: String): String = { - val indent = "^\\s+".r.findFirstIn(line).getOrElse("") - // increase the indent if the line ends with => or { - if (line.matches(".*(?:(?:\\{)|(?:=>))\\s*")) { - indent + " " - } else { - indent - } - } - - override def newSettings(args: List[String]): Settings = { - val s = new Settings() - - val dir = ScalaInterpreter.ensureTemporaryFolder() - - s.processArguments(args ++ - List( - "-Yrepl-class-based", - "-Yrepl-outdir", s"$dir" - // useful for debugging compiler classpath or package issues - // "-uniqid", "-explaintypes", "-usejavacp", "-Ylog-classpath" - ), processAll = true) - s - } - - protected def interpretAddTask(code: String, silent: Boolean): Future[IR.Result] = { - if (iMain == null) throw new IllegalArgumentException("Interpreter not started yet!") - - taskManager.add { - // Add a task using the given state of our streams - StreamState.withStreams { - if (silent) { - iMain.beSilentDuring { - iMain.interpret(code) - } - } else { - iMain.interpret(code) - } - } - } - } - - private def retrieveLastException: Throwable = { - iMain.beSilentDuring { - iMain.interpret("_exceptionHack.lastException = lastException") - } - exceptionHack.lastException - } - - private def clearLastException(): Unit = { - iMain.directBind( - ExecutionExceptionName, - classOf[Throwable].getName, - null - ) - exceptionHack.lastException = null - } - - protected def interpretConstructExecuteError(output: String) = { - Option(retrieveLastException) match { - // Runtime error - case Some(e) => - val ex = e.asInstanceOf[Throwable] - clearLastException() - - // The scala REPL does a pretty good job of returning us a stack trace that is free from all the bits that the - // interpreter uses before it. - // - // The REPL emits its message as something like this, so trim off the first and last element - // - // java.lang.ArithmeticException: / by zero - // at failure(:17) - // at call_failure(:19) - // ... 40 elided - - val formattedException = output.split("\n") - - ExecuteError( - ex.getClass.getName, - ex.getLocalizedMessage, - formattedException.toList - ) - // Compile time error, need to check internal reporter - case _ => - if (iMain.reporter.hasErrors) - // TODO: This wrapper is not needed when just getting compile - // error that we are not parsing... maybe have it be purely - // output and have the error check this? - ExecuteError( - "Compile Error", output, List() - ) - else - // May as capture the output here. Could be useful - ExecuteError("Unknown Error", output, List()) - } - } -} - -/** - * Due to a bug in the scala interpreter under scala 2.11 (SI-8935) with IMain.valueOfTerm we can hack around it by - * binding an instance of ExceptionHack into iMain and interpret the "_exceptionHack.lastException = lastException". - * This makes it possible to extract the exception. - * - * TODO: Revisit this once Scala 2.12 is released. - */ -class ExceptionHack { - var lastException: Throwable = _ -} diff --git a/scala-interpreter/src/test/scala-2.11/scala/ScalaInterpreterSpec.scala b/scala-interpreter/src/test/scala-2.11/scala/ScalaInterpreterSpec.scala deleted file mode 100644 index 9d024f1bb..000000000 --- a/scala-interpreter/src/test/scala-2.11/scala/ScalaInterpreterSpec.scala +++ /dev/null @@ -1,479 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License - */ - -package org.apache.toree.kernel.interpreter.scala - -import java.io.{InputStream, OutputStream} -import java.net.{URL, URLClassLoader} -import org.apache.spark.SparkContext -import org.apache.spark.sql.SparkSession -import org.apache.toree.interpreter.Results.Result -import org.apache.toree.interpreter._ -import org.apache.toree.kernel.api.KernelLike -import org.apache.toree.utils.TaskManager -import org.mockito.Matchers._ -import org.mockito.Mockito._ -import org.scalatest.mock.MockitoSugar -import org.scalatest.{BeforeAndAfter, FunSpec, Matchers} -import scala.concurrent.Future -import scala.tools.nsc.Settings -import scala.tools.nsc.interpreter.{IMain, IR, JPrintWriter} -import scala.tools.nsc.util.ClassPath - -class ScalaInterpreterSpec extends FunSpec - with Matchers with MockitoSugar with BeforeAndAfter -{ - private var interpreter: ScalaInterpreter = _ - private var interpreterNoPrintStreams: ScalaInterpreter = _ - private var mockSparkIMain: IMain = _ - private var mockTaskManager: TaskManager = _ - private var mockSettings: Settings = _ - private var mockKernel: KernelLike = _ - private var mockSparkSession: SparkSession = _ - private var mockSparkContext: SparkContext = _ - - trait StubbedUpdatePrintStreams extends Interpreter { - override def updatePrintStreams( - in: InputStream, - out: OutputStream, - err: OutputStream - ): Unit = {} - } - - trait SingleLineInterpretLineRec extends StubbedStartInterpreter { - protected def interpretRec(lines: List[String], silent: Boolean, results: (Result, Either[ExecuteOutput, ExecuteFailure])): (Result, Either[ExecuteOutput, ExecuteFailure]) = - interpretBlock(lines.mkString("\n")) - } - - trait StubbedInterpretAddTask extends StubbedStartInterpreter { - override protected def interpretAddTask(code: String, silent: Boolean) = - mock[Future[IR.Result]] - } - - trait StubbedInterpretMapToCustomResult extends StubbedStartInterpreter { - override protected def interpretMapToCustomResult(future: Future[IR.Result]) = - mock[Future[Results.Result with Product with Serializable]] - } - - trait StubbedInterpretMapToResultAndOutput extends StubbedStartInterpreter { - override protected def interpretMapToResultAndOutput(future: Future[Results.Result]) = - mock[Future[(Results.Result, Either[Map[String, String], ExecuteError])]] - } - - trait StubbedInterpretMapToResultAndExecuteInfo extends StubbedStartInterpreter { - protected def interpretMapToResultAndExecuteInfo(future: Future[(Results.Result, String)]) = - mock[Future[( - Results.Result with Product with Serializable, - Either[ExecuteOutput, ExecuteFailure] with Product with Serializable - )]] - } - - trait StubbedInterpretConstructExecuteError extends StubbedStartInterpreter { - protected def interpretConstructExecuteError(value: Option[AnyRef], output: String) = - mock[ExecuteError] - } - - class StubbedStartInterpreter - extends ScalaInterpreter - { - - override protected def newIMain(settings: Settings, out: JPrintWriter): IMain = mockSparkIMain - override def newTaskManager(): TaskManager = mockTaskManager - override def newSettings(args: List[String]): Settings = mockSettings - - override protected def kernel: KernelLike = mockKernel - - // mocking out these - override protected def reinitializeSymbols(): Unit = {} - override protected def refreshDefinitions(): Unit = {} - - // Stubbed out (not testing this) - } - - before { - mockSparkIMain = mock[IMain] - - mockTaskManager = mock[TaskManager] - - val mockSettingsClasspath = mock[Settings#PathSetting] - doNothing().when(mockSettingsClasspath).value_=(any[Settings#PathSetting#T]) - - mockSettings = mock[Settings] - doReturn(mockSettingsClasspath).when(mockSettings).classpath - doNothing().when(mockSettings).embeddedDefaults(any[ClassLoader]) - - mockKernel = mock[KernelLike] - mockSparkSession = mock[SparkSession] - mockSparkContext = mock[SparkContext] - doReturn(mockSparkSession).when(mockKernel).sparkSession - doReturn(mockSparkContext).when(mockKernel).sparkContext - - interpreter = new StubbedStartInterpreter - - interpreterNoPrintStreams = - new StubbedStartInterpreter with StubbedUpdatePrintStreams - } - - after { - mockSparkIMain = null - mockTaskManager = null - mockSettings = null - mockKernel = null - mockSparkSession = null - mockSparkContext = null - interpreter = null - } - - describe("ScalaInterpreter") { - describe("#addJars") { - // Mocked test ignored. - ignore("should add each jar URL to the runtime classloader") { - // Needed to access runtimeClassloader method -// import scala.language.reflectiveCalls - - // Create a new interpreter exposing the internal runtime classloader - val itInterpreter = new StubbedStartInterpreter { - // Expose the runtime classloader - - - def runtimeClassloader = _runtimeClassloader - - } - - val url = new URL("file://expected") - itInterpreter.start() - itInterpreter.addJars(url) - -// itInterpreter.runtimeClassloader - val cl = itInterpreter.runtimeClassloader -// cl.getURLs should contain (url) - itInterpreter.stop() - } - - it("should add each jar URL to the interpreter classpath") { - val url = new URL("file://expected") - interpreter.start() - interpreter.addJars(url) - } - } - - describe("#buildClasspath") { - it("should return classpath based on classloader hierarchy") { - // Needed to access runtimeClassloader method -// import scala.language.reflectiveCalls - - // Create a new interpreter exposing the internal runtime classloader - val itInterpreter = new StubbedStartInterpreter - - val parentUrls = Array( - new URL("file:/some/dir/a.jar"), - new URL("file:/some/dir/b.jar"), - new URL("file:/some/dir/c.jar") - ) - - val theParentClassloader = new URLClassLoader(parentUrls, null) - - val urls = Array( - new URL("file:/some/dir/1.jar"), - new URL("file:/some/dir/2.jar"), - new URL("file:/some/dir/3.jar") - ) - - val theClassloader = new URLClassLoader(urls, theParentClassloader) - - val expected = ClassPath.join((parentUrls ++ urls).map(_.toString) :_*) - - itInterpreter.buildClasspath(theClassloader) should be(expected) - } - } - - describe("#interrupt") { - it("should fail a require if the interpreter is not started") { - intercept[IllegalArgumentException] { - interpreter.interrupt() - } - } - - it("should call restart() on the task manager and cancelAllJobs on SparkContext") { - interpreterNoPrintStreams.start() - - // cancelAllJobs still leaves the task running - doReturn(true).when(mockTaskManager).isExecutingTask - - interpreterNoPrintStreams.interrupt() - - // restart is called - verify(mockSparkContext).cancelAllJobs() - verify(mockTaskManager).restart() - } - - it("should only call cancelAllJobs and not restart if task execution ends") { - interpreterNoPrintStreams.start() - - interpreterNoPrintStreams.interrupt() - - // Spark jobs are cancelled - verify(mockSparkContext).cancelAllJobs() - // The task manager is not executing, so it is not restarted - verify(mockTaskManager, atLeastOnce).isExecutingTask - verifyNoMoreInteractions(mockSparkContext) - } - } - - // TODO: Provide testing for the helper functions that return various - // mapped futures -- this was too difficult for me to figure out - // in a short amount of time - describe("#interpret") { - it("should fail if not started") { - intercept[IllegalArgumentException] { - interpreter.interpret("val x = 3") - } - } - - it("should add a new task to the task manager") { - var taskManagerAddCalled = false - val itInterpreter = - new StubbedStartInterpreter - with SingleLineInterpretLineRec - with StubbedUpdatePrintStreams - //with StubbedInterpretAddTask - with StubbedInterpretMapToCustomResult - with StubbedInterpretMapToResultAndOutput - with StubbedInterpretMapToResultAndExecuteInfo - with StubbedInterpretConstructExecuteError - with TaskManagerProducerLike - { - // Must override this way since cannot figure out the signature - // to verify this as a mock - override def newTaskManager(): TaskManager = new TaskManager { - override def add[T](taskFunction: => T): Future[T] = { - taskManagerAddCalled = true - mock[TaskManager].add(taskFunction) - } - } - } - - itInterpreter.start() - - itInterpreter.interpret("val x = 3") - - taskManagerAddCalled should be (true) - } - } - - describe("#start") { - it("should initialize the task manager") { - interpreterNoPrintStreams.start() - - verify(mockTaskManager).start() - } - - // TODO: Figure out how to trigger sparkIMain.beQuietDuring { ... } - /*it("should add an import for SparkContext._") { - interpreterNoPrintStreams.start() - - verify(mockSparkIMain).addImports("org.apache.spark.SparkContext._") - }*/ - } - - describe("#stop") { - describe("when interpreter already started") { - it("should stop the task manager") { - interpreterNoPrintStreams.start() - interpreterNoPrintStreams.stop() - - verify(mockTaskManager).stop() - } - - it("should stop the SparkIMain") { - interpreterNoPrintStreams.start() - interpreterNoPrintStreams.stop() - - verify(mockSparkIMain).close() - } - } - } - - describe("#updatePrintStreams") { - // TODO: Figure out how to trigger sparkIMain.beQuietDuring { ... } - } - -// describe("#classServerUri") { -// it("should fail a require if the interpreter is not started") { -// intercept[IllegalArgumentException] { -// interpreter.classServerURI -// } -// } - -// TODO: Find better way to test this -// it("should invoke the underlying SparkIMain implementation") { - // Using hack to access private class -// val securityManagerClass = -// java.lang.Class.forName("org.apache.spark.SecurityManager") -// val httpServerClass = -// java.lang.Class.forName("org.apache.spark.HttpServer") -// val httpServerConstructor = httpServerClass.getDeclaredConstructor( -// classOf[SparkConf], classOf[File], securityManagerClass, classOf[Int], -// classOf[String]) -// val httpServer = httpServerConstructor.newInstance( -// null, null, null, 0: java.lang.Integer, "") -// -// // Return the server instance (cannot mock a private class) -// // NOTE: Can mock the class through reflection, but cannot verify -// // a method was called on it since treated as type Any -// //val mockHttpServer = org.mockito.Mockito.mock(httpServerClass) -// doAnswer(new Answer[String] { -// override def answer(invocation: InvocationOnMock): String = { -// val exceptionClass = -// java.lang.Class.forName("org.apache.spark.ServerStateException") -// val exception = exceptionClass -// .getConstructor(classOf[String]) -// .newInstance("") -// .asInstanceOf[Exception] -// throw exception -// } -// } -// ).when(mockSparkIMain) - -// interpreterNoPrintStreams.start() - - // Not going to dig so deeply that we actually start a web server for - // this to work... just throwing this specific exception proves that - // we have called the uri method of the server -// try { -// interpreterNoPrintStreams.classServerURI -// fail() -// } catch { -// // Have to catch this way because... of course... the exception is -// // also private -// case ex: Throwable => -// ex.getClass.getName should be ("org.apache.spark.ServerStateException") -// } -// } -// } - - describe("#read") { - it("should fail a require if the interpreter is not started") { - intercept[IllegalArgumentException] { - interpreter.read("someVariable") - } - } - - it("should execute the underlying eval method") { - interpreter.start() - interpreter.read("someVariable") - - verify(mockSparkIMain).eval(anyString()) - } - } - - describe("#doQuietly") { - it("should fail a require if the interpreter is not started") { - intercept[IllegalArgumentException] { - interpreter.doQuietly {} - } - } - - // TODO: Figure out how to verify sparkIMain.beQuietDuring { ... } - /*it("should invoke the underlying SparkIMain implementation") { - interpreterNoPrintStreams.start() - interpreterNoPrintStreams.doQuietly {} - - verify(mockSparkIMain).beQuietDuring(any[IR.Result]) - }*/ - } - - describe("#bind") { - it("should fail a require if the interpreter is not started") { - intercept[IllegalArgumentException] { - interpreter.bind("", "", null, null) - } - } - - // TODO: Re-enable tests since we've commented this one out. -// it("should invoke the underlying SparkIMain implementation") { -// interpreterNoPrintStreams.start() -// interpreterNoPrintStreams.bind("", "", null, null) -// -// verify(mockSparkIMain).bind( -// anyString(), anyString(), any[Any], any[List[String]]) -// } - } - - describe("#prepareResult") { - it("should truncate result of res result") { - interpreter.start() - doReturn(38).when(mockSparkIMain).eval("i") - doReturn("ABC").when(mockSparkIMain).eval("s") - doReturn("abc").when(mockSparkIMain).eval("res4") - - // Results that match ==> Result, Definitions, Text - // val i: Int = 38 ==> i: Int = 38 - interpreter.prepareResult("i: Int = 38") should be((Some("38"), Some("i = 38\n"), None)) - interpreter.prepareResult("i: Int = 38",true) should be((Some("i: Int = 38\n"), Some("i: Int = 38\n"), None)) - // val s = "ABC" ==> s: String = ABC - interpreter.prepareResult("s: String = ABC") should be((Some("ABC"), Some("s = ABC\n"), None)) - interpreter.prepareResult("s: String = ABC",true) should be((Some("s: String = ABC\n"), Some("s: String = ABC\n"), None)) - // resN results are suppressed - interpreter.prepareResult("res4: String = abc") should be((Some("abc"), None, None)) - interpreter.prepareResult("res4: String = abc",true) should be((Some("String = abc\n"), None, None)) - // missing variables are None, unmatched lines are returned in text - interpreter.prepareResult("res123") should be((None, None, Some("res123\n"))) - interpreter.prepareResult("res123: Int = 38") should be((None, None, Some("res123: Int = 38\n"))) - - interpreter.stop() - } - - it("should properly handle higher order functions") { - interpreter.start() - doReturn("myFunction: (x: Int, foo: Int => Int)Int").when(mockSparkIMain).eval("myFunction") - - // Results that match - interpreter.prepareResult("myFunction: (x: Int, foo: Int => Int)Int") should be( - (None, - Some("myFunction: (x: Int, foo: Int => Int)Int\n"), - None)) - - - interpreter.stop() - - } - - it("should truncate res results that have tuple values") { - //val t: (String, Int) = ("hello",1) ==> t: (String, Int) = (hello,1) - interpreter.start() - doReturn("(hello, 1)").when(mockSparkIMain).eval("res0") - - interpreter.prepareResult("res0: (String, Int) = (hello,1)") should be((Some("(hello,1)"), None, None)) - - interpreter.stop() - } - - it("should truncate res results that have parameterized types") { - interpreter.start() - doReturn(scala.Tuple2).when(mockSparkIMain).eval("res0") - - interpreter.prepareResult( - "res0: Class[_ <: (String, Int)] = class scala.Tuple2", noTruncate = true - ) should be((Some(scala.Tuple2), None, None)) - - interpreter.stop() - } - - } - } -}