From 859cbea2cc11da7fde28e9b4276ea3e09110166a Mon Sep 17 00:00:00 2001 From: Tobias Frischholz Date: Tue, 11 Jun 2019 17:12:57 +0200 Subject: [PATCH 1/2] Introduced the possibility to use the ivy dependency downloader --- .../IvyDependencyDownloader.scala | 162 +++++++++--------- .../toree/boot/CommandLineOptions.scala | 8 +- .../boot/layer/ComponentInitialization.scala | 105 ++++++------ 3 files changed, 145 insertions(+), 130 deletions(-) diff --git a/kernel-api/src/main/scala/org/apache/toree/dependencies/IvyDependencyDownloader.scala b/kernel-api/src/main/scala/org/apache/toree/dependencies/IvyDependencyDownloader.scala index e5939afc9..b7eab125a 100644 --- a/kernel-api/src/main/scala/org/apache/toree/dependencies/IvyDependencyDownloader.scala +++ b/kernel-api/src/main/scala/org/apache/toree/dependencies/IvyDependencyDownloader.scala @@ -33,24 +33,28 @@ import org.apache.ivy.util.{DefaultMessageLogger, Message} import org.springframework.core.io.support._ /** - * Represents a dependency downloader for jars that uses Ivy underneath. - */ + * Represents a dependency downloader for jars that uses Ivy underneath. + */ class IvyDependencyDownloader( - val repositoryUrl: String, - val baseDirectory: String -) extends DependencyDownloader { - private val ivySettings = new IvySettings() - private val resolver = new IBiblioResolver + val baseDirectory: String, + val ivySettingsFile: String + ) extends DependencyDownloader { - resolver.setUsepoms(true) - resolver.setM2compatible(true) - resolver.setName("central") - - // Add our resolver as the main resolver (IBiblio goes to Maven Central) - ivySettings.addResolver(resolver) + private val ivySettings = new IvySettings() - // Mark our resolver as the default one to use - ivySettings.setDefaultResolver(resolver.getName) + private val settingsFile = new File(ivySettingsFile) + + if (settingsFile.exists()) { + ivySettings.load(settingsFile) + } else { + // Use default maven resolver, when ivy settings file doesn't exist. + val resolver = new IBiblioResolver + resolver.setUsepoms(true) + resolver.setM2compatible(true) + resolver.setName("central") + ivySettings.addResolver(resolver) + ivySettings.setDefaultResolver(resolver.getName) + } // Set the destination ivySettings.setBaseDir(new File(baseDirectory)) @@ -58,45 +62,23 @@ class IvyDependencyDownloader( ivySettings.setDefaultRepositoryCacheBasedir(baseDirectory) //creates an Ivy instance with settings - val ivy = Ivy.newInstance(ivySettings) - - private def getBaseDependencies: Iterable[DependencyDescriptor] = { - val xmlModuleDescriptor = XmlModuleDescriptorParser.getInstance() - val getDependencies = (url: URL) => xmlModuleDescriptor.parseDescriptor( - new IvySettings(), url, false - ).getDependencies - - // Find all of the *ivy.xml files on the classpath. - val ivyFiles = new PathMatchingResourcePatternResolver().getResources( - "classpath*:**/*ivy.xml" - ) - val classpathURLs = ivyFiles.map(_.getURI.toURL) - - // Get all of the dependencies from the *ivy.xml files - val dependencies = classpathURLs.map(getDependencies).flatMap(_.toSeq) - - // Remove duplicates based on artifact name - val distinctDependencies = - dependencies.groupBy(_.getDependencyId.getName).map(_._2.head) - - distinctDependencies - } + private val ivy = Ivy.newInstance(ivySettings) override def retrieve( - groupId: String, - artifactId: String, - version: String, - transitive: Boolean = true, - excludeBaseDependencies: Boolean, - ignoreResolutionErrors: Boolean, - extraRepositories: Seq[(URL, Option[Credentials])] = Nil, - verbose: Boolean, - trace: Boolean, - configuration: Option[String] = None, - artifactType: Option[String] = None, - artifactClassifier: Option[String] = None, - excludes: Set[(String,String)] = Set.empty - ): Seq[URI] = { + groupId: String, + artifactId: String, + version: String, + transitive: Boolean = true, + excludeBaseDependencies: Boolean, + ignoreResolutionErrors: Boolean, + extraRepositories: Seq[(URL, Option[Credentials])] = Nil, + verbose: Boolean, + trace: Boolean, + configuration: Option[String] = None, + artifactType: Option[String] = None, + artifactClassifier: Option[String] = None, + excludes: Set[(String, String)] = Set.empty + ): Seq[URI] = { // Start building the ivy.xml file val ivyFile = File.createTempFile("ivy-custom", ".xml") ivyFile.deleteOnExit() @@ -153,8 +135,8 @@ class IvyDependencyDownloader( excludes.foreach(x => { - val moduleId = new ModuleId(x._1,x._2); - val artifactId = new ArtifactId(moduleId,"*","*","*"); + val moduleId = new ModuleId(x._1, x._2); + val artifactId = new ArtifactId(moduleId, "*", "*", "*"); val exclusion = new DefaultExcludeRule(artifactId, new RegexpPatternMatcher(), null); md.addExcludeRule(exclusion); }) @@ -196,11 +178,33 @@ class IvyDependencyDownloader( artifactURLs.map(_.toURI) } + private def getBaseDependencies: Iterable[DependencyDescriptor] = { + val xmlModuleDescriptor = XmlModuleDescriptorParser.getInstance() + val getDependencies = (url: URL) => xmlModuleDescriptor.parseDescriptor( + new IvySettings(), url, false + ).getDependencies + + // Find all of the *ivy.xml files on the classpath. + val ivyFiles = new PathMatchingResourcePatternResolver().getResources( + "classpath*:**/*ivy.xml" + ) + val classpathURLs = ivyFiles.map(_.getURI.toURL) + + // Get all of the dependencies from the *ivy.xml files + val dependencies = classpathURLs.map(getDependencies).flatMap(_.toSeq) + + // Remove duplicates based on artifact name + val distinctDependencies = + dependencies.groupBy(_.getDependencyId.getName).map(_._2.head) + + distinctDependencies + } + /** - * Uses our printstream in Ivy's LoggingEngine - * - * @param printStream the print stream to use - */ + * Uses our printstream in Ivy's LoggingEngine + * + * @param printStream the print stream to use + */ override def setPrintStream(printStream: PrintStream): Unit = { ivy.getLoggerEngine.setDefaultLogger( new DefaultMessageLogger(Message.MSG_INFO) { @@ -215,40 +219,40 @@ class IvyDependencyDownloader( } /** - * Adds the specified resolver url as an additional search option. - * - * @param url The url of the repository - */ + * Adds the specified resolver url as an additional search option. + * + * @param url The url of the repository + */ override def addMavenRepository(url: URL, credentials: Option[Credentials]): Unit = ??? /** - * Remove the specified resolver url from the search options. - * - * @param url The url of the repository - */ + * Remove the specified resolver url from the search options. + * + * @param url The url of the repository + */ override def removeMavenRepository(url: URL): Unit = ??? /** - * Returns a list of all repositories used by the downloader. - * - * @return The list of repositories as URIs - */ + * Returns a list of all repositories used by the downloader. + * + * @return The list of repositories as URIs + */ override def getRepositories: Seq[URI] = Seq( DependencyDownloader.DefaultMavenRepository.toURI ) /** - * Sets the directory where all downloaded jars will be stored. - * - * @param directory The directory to use - * @return True if successfully set directory, otherwise false - */ + * Sets the directory where all downloaded jars will be stored. + * + * @param directory The directory to use + * @return True if successfully set directory, otherwise false + */ override def setDownloadDirectory(directory: File): Boolean = false /** - * Returns the current directory where dependencies will be downloaded. - * - * @return The directory as a string - */ + * Returns the current directory where dependencies will be downloaded. + * + * @return The directory as a string + */ override def getDownloadDirectory: String = baseDirectory } diff --git a/kernel/src/main/scala/org/apache/toree/boot/CommandLineOptions.scala b/kernel/src/main/scala/org/apache/toree/boot/CommandLineOptions.scala index 59c04371b..8b342e990 100644 --- a/kernel/src/main/scala/org/apache/toree/boot/CommandLineOptions.scala +++ b/kernel/src/main/scala/org/apache/toree/boot/CommandLineOptions.scala @@ -20,8 +20,8 @@ package org.apache.toree.boot import java.io.{File, OutputStream} import com.typesafe.config.{Config, ConfigFactory} -import joptsimple.{OptionParser, OptionSpec} import joptsimple.util.RegexMatcher._ +import joptsimple.{OptionParser, OptionSpec} import scala.collection.JavaConverters._ @@ -88,6 +88,11 @@ class CommandLineOptions(args: Seq[String]) { "directory where user added jars are stored (MUST EXIST)" ).withRequiredArg().ofType(classOf[String]) + private val _ivy_settings = parser.accepts( + "ivy-settings", + "location of ivy settings xml file" + ).withRequiredArg().ofType(classOf[String]) + private val _default_interpreter = parser.accepts("default-interpreter", "default interpreter for the kernel") .withRequiredArg().ofType(classOf[String]) @@ -171,6 +176,7 @@ class CommandLineOptions(args: Seq[String]) { "max_interpreter_threads" -> get(_max_interpreter_threads), "alternate_sigint" -> get(_alternate_sigint), "jar_dir" -> get(_jar_dir), + "ivy_settings" -> get(_ivy_settings), "default_interpreter" -> get(_default_interpreter), // deprecated in favor of spark-context-initialization-mode none // "nosparkcontext" -> (if (has(_nosparkcontext)) Some(true) else Some(false)), diff --git a/kernel/src/main/scala/org/apache/toree/boot/layer/ComponentInitialization.scala b/kernel/src/main/scala/org/apache/toree/boot/layer/ComponentInitialization.scala index 46f079664..9b0c797a4 100644 --- a/kernel/src/main/scala/org/apache/toree/boot/layer/ComponentInitialization.scala +++ b/kernel/src/main/scala/org/apache/toree/boot/layer/ComponentInitialization.scala @@ -18,62 +18,62 @@ package org.apache.toree.boot.layer import java.io.File -import java.net.URL import java.nio.file.{Files, Paths} import java.util.concurrent.ConcurrentHashMap + import akka.actor.ActorRef import com.typesafe.config.Config import org.apache.spark.SparkConf import org.apache.toree.comm.{CommManager, CommRegistrar, CommStorage, KernelCommManager} -import org.apache.toree.dependencies.{CoursierDependencyDownloader, Credentials, DependencyDownloader} +import org.apache.toree.dependencies.{CoursierDependencyDownloader, DependencyDownloader, IvyDependencyDownloader} import org.apache.toree.interpreter._ import org.apache.toree.kernel.api.Kernel import org.apache.toree.kernel.protocol.v5.KMBuilder import org.apache.toree.kernel.protocol.v5.kernel.ActorLoader import org.apache.toree.magic.MagicManager -import org.apache.toree.plugins.PluginManager -import org.apache.toree.utils.{LogLike, FileUtils} +import org.apache.toree.plugins.{AllInterpretersReady, PluginManager} +import org.apache.toree.utils.{FileUtils, LogLike} + import scala.collection.JavaConverters._ -import org.apache.toree.plugins.AllInterpretersReady /** - * Represents the component initialization. All component-related pieces of the - * kernel (non-actors) should be created here. Limited items should be exposed. - */ + * Represents the component initialization. All component-related pieces of the + * kernel (non-actors) should be created here. Limited items should be exposed. + */ trait ComponentInitialization { /** - * Initializes and registers all components (not needed by bare init). - * - * @param config The config used for initialization - * @param actorLoader The actor loader to use for some initialization - */ + * Initializes and registers all components (not needed by bare init). + * + * @param config The config used for initialization + * @param actorLoader The actor loader to use for some initialization + */ def initializeComponents( - config: Config, actorLoader: ActorLoader - ): (CommStorage, CommRegistrar, CommManager, Interpreter, + config: Config, actorLoader: ActorLoader + ): (CommStorage, CommRegistrar, CommManager, Interpreter, Kernel, DependencyDownloader, MagicManager, PluginManager, collection.mutable.Map[String, ActorRef]) } /** - * Represents the standard implementation of ComponentInitialization. - */ + * Represents the standard implementation of ComponentInitialization. + */ trait StandardComponentInitialization extends ComponentInitialization { this: LogLike => /** - * Initializes and registers all components (not needed by bare init). - * - * @param config The config used for initialization - * @param actorLoader The actor loader to use for some initialization - */ + * Initializes and registers all components (not needed by bare init). + * + * @param config The config used for initialization + * @param actorLoader The actor loader to use for some initialization + */ def initializeComponents( - config: Config, actorLoader: ActorLoader - ) = { + config: Config, actorLoader: ActorLoader + ) = { val (commStorage, commRegistrar, commManager) = initializeCommObjects(actorLoader) - val interpreterManager = InterpreterManager(config) - interpreterManager.interpreters foreach(println) + val interpreterManager = InterpreterManager(config) + interpreterManager.interpreters foreach (println) val dependencyDownloader = initializeDependencyDownloader(config) val pluginManager = createPluginManager(config, interpreterManager, dependencyDownloader) @@ -85,7 +85,7 @@ trait StandardComponentInitialization extends ComponentInitialization { initializeSparkContext(config, kernel) interpreterManager.initializeInterpreters(kernel) - + pluginManager.fireEvent(AllInterpretersReady) val responseMap = initializeResponseMap() @@ -110,25 +110,30 @@ trait StandardComponentInitialization extends ComponentInitialization { (commStorage, commRegistrar, commManager) } - def initializeSparkContext(config:Config, kernel:Kernel) = { - if(config.getString("spark_context_initialization_mode") == "eager") { + def initializeSparkContext(config: Config, kernel: Kernel) = { + if (config.getString("spark_context_initialization_mode") == "eager") { kernel.sparkSession } } private def initializeDependencyDownloader(config: Config) = { val depsDir = { - if(config.hasPath("deps_dir") && Files.exists(Paths.get(config.getString("deps_dir")))) { + if (config.hasPath("deps_dir") && Files.exists(Paths.get(config.getString("deps_dir")))) { config.getString("deps_dir") } else { FileUtils.createManagedTempDirectory("toree_add_deps").getAbsolutePath } } - val dependencyDownloader = new CoursierDependencyDownloader - dependencyDownloader.setDownloadDirectory( - new File(depsDir) - ) + val dependencyDownloader = if (config.hasPath("ivy_settings")) { + new IvyDependencyDownloader(depsDir, config.getString("ivy_settings")) + } else { + val dependencyDownloader = new CoursierDependencyDownloader + dependencyDownloader.setDownloadDirectory( + new File(depsDir) + ) + dependencyDownloader + } if (config.hasPath("default_repositories")) { val repository = config.getStringList("default_repositories").asScala.toList @@ -138,7 +143,7 @@ trait StandardComponentInitialization extends ComponentInitialization { } else Nil dependencyDownloader.resolveRepositoriesAndCredentials(repository, credentials) - .foreach{case (u, c) => dependencyDownloader.addMavenRepository(u, c)} + .foreach { case (u, c) => dependencyDownloader.addMavenRepository(u, c) } } dependencyDownloader @@ -148,17 +153,17 @@ trait StandardComponentInitialization extends ComponentInitialization { new ConcurrentHashMap[String, ActorRef]().asScala private def initializeKernel( - config: Config, - actorLoader: ActorLoader, - interpreterManager: InterpreterManager, - commManager: CommManager, - pluginManager: PluginManager - ) = { + config: Config, + actorLoader: ActorLoader, + interpreterManager: InterpreterManager, + commManager: CommManager, + pluginManager: PluginManager + ) = { //kernel has a dependency on ScalaInterpreter to get the ClassServerURI for the SparkConf //we need to pre-start the ScalaInterpreter -// val scalaInterpreter = interpreterManager.interpreters("Scala") -// scalaInterpreter.start() + // val scalaInterpreter = interpreterManager.interpreters("Scala") + // scalaInterpreter.start() val kernel = new Kernel( config, @@ -166,7 +171,7 @@ trait StandardComponentInitialization extends ComponentInitialization { interpreterManager, commManager, pluginManager - ){ + ) { override protected[toree] def createSparkConf(conf: SparkConf) = { val theConf = super.createSparkConf(conf) @@ -185,9 +190,9 @@ trait StandardComponentInitialization extends ComponentInitialization { } private def createPluginManager( - config: Config, interpreterManager: InterpreterManager, - dependencyDownloader: DependencyDownloader - ) = { + config: Config, interpreterManager: InterpreterManager, + dependencyDownloader: DependencyDownloader + ) = { logger.debug("Constructing plugin manager") val pluginManager = new PluginManager() @@ -202,9 +207,9 @@ trait StandardComponentInitialization extends ComponentInitialization { } private def initializePlugins( - config: Config, - pluginManager: PluginManager - ) = { + config: Config, + pluginManager: PluginManager + ) = { val magicUrlArray = config.getStringList("magic_urls").asScala .map(s => new java.net.URL(s)).toArray From a533cb4d6939e4627187861d546dc2c972b40984 Mon Sep 17 00:00:00 2001 From: Tobias Frischholz Date: Thu, 13 Jun 2019 11:13:02 +0200 Subject: [PATCH 2/2] * Added possibility to use a user directory for storing jars * Introduced the possibility to use the ivy dependency downloader --- .../scala/org/apache/toree/boot/CommandLineOptions.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/kernel/src/main/scala/org/apache/toree/boot/CommandLineOptions.scala b/kernel/src/main/scala/org/apache/toree/boot/CommandLineOptions.scala index 8b342e990..dcc4b561f 100644 --- a/kernel/src/main/scala/org/apache/toree/boot/CommandLineOptions.scala +++ b/kernel/src/main/scala/org/apache/toree/boot/CommandLineOptions.scala @@ -88,6 +88,11 @@ class CommandLineOptions(args: Seq[String]) { "directory where user added jars are stored (MUST EXIST)" ).withRequiredArg().ofType(classOf[String]) + private val _deps_dir = parser.accepts( + "deps-dir", + "directory where user dependencies are stored (MUST EXIST)" + ).withRequiredArg().ofType(classOf[String]) + private val _ivy_settings = parser.accepts( "ivy-settings", "location of ivy settings xml file" @@ -176,6 +181,7 @@ class CommandLineOptions(args: Seq[String]) { "max_interpreter_threads" -> get(_max_interpreter_threads), "alternate_sigint" -> get(_alternate_sigint), "jar_dir" -> get(_jar_dir), + "deps_dir" -> get(_deps_dir), "ivy_settings" -> get(_ivy_settings), "default_interpreter" -> get(_default_interpreter), // deprecated in favor of spark-context-initialization-mode none