diff --git a/toolflow/scala/src/main/scala/tapasco/Tapasco.scala b/toolflow/scala/src/main/scala/tapasco/Tapasco.scala index 29c57ce5..33c548e7 100644 --- a/toolflow/scala/src/main/scala/tapasco/Tapasco.scala +++ b/toolflow/scala/src/main/scala/tapasco/Tapasco.scala @@ -87,7 +87,12 @@ object Tapasco { logger.trace("configuring FileAssetManager...") FileAssetManager(cfg) logger.trace("SLURM: {}", cfg.slurm) - if (cfg.slurm) Slurm.enabled = cfg.slurm + if (cfg.slurm.isDefined) { + Slurm.set_cfg(cfg.slurm.get match { + case "local" => Slurm.EnabledLocal() + case t => Slurm.EnabledRemote(t) + }) + } FileAssetManager.start() logger.trace("parallel: {}", cfg.parallel) cfg.logFile map { logfile: Path => setupLogFileAppender(logfile.toString) } diff --git a/toolflow/scala/src/main/scala/tapasco/base/Configuration.scala b/toolflow/scala/src/main/scala/tapasco/base/Configuration.scala index e05cb57b..11fe8214 100644 --- a/toolflow/scala/src/main/scala/tapasco/base/Configuration.scala +++ b/toolflow/scala/src/main/scala/tapasco/base/Configuration.scala @@ -65,9 +65,9 @@ trait Configuration { def logFile(p: Option[Path]): Configuration - def slurm: Boolean + def slurm: Option[String] - def slurm(enabled: Boolean): Configuration + def slurm(template: Option[String]): Configuration def parallel: Boolean diff --git a/toolflow/scala/src/main/scala/tapasco/base/ConfigurationImpl.scala b/toolflow/scala/src/main/scala/tapasco/base/ConfigurationImpl.scala index 8d91b2b1..2d1da73b 100644 --- a/toolflow/scala/src/main/scala/tapasco/base/ConfigurationImpl.scala +++ b/toolflow/scala/src/main/scala/tapasco/base/ConfigurationImpl.scala @@ -46,7 +46,7 @@ private case class ConfigurationImpl( private val _coreDir: Path = BasePathManager.DEFAULT_DIR_CORES, private val _compositionDir: Path = BasePathManager.DEFAULT_DIR_COMPOSITIONS, private val _logFile: Option[Path] = None, - slurm: Boolean = false, + slurm: Option[String] = None, parallel: Boolean = false, maxThreads: Option[Int] = None, maxTasks: Option[Int] = None, @@ -81,7 +81,7 @@ private case class ConfigurationImpl( def logFile(op: Option[Path]): Configuration = this.copy(_logFile = op) - def slurm(enabled: Boolean): Configuration = this.copy(slurm = enabled) + def slurm(template: Option[String]): Configuration = this.copy(slurm = template) def parallel(enabled: Boolean): Configuration = this.copy(parallel = enabled) @@ -97,8 +97,10 @@ private case class ConfigurationImpl( def jobs(js: Seq[Job]): Configuration = this.copy(jobs = js) - // these directories must exist - for ((d, n) <- Seq((archDir, "architectures"), - (platformDir, "platforms"))) - require(mustExist(d), "%s directory %s does not exist".format(n, d.toString)) + // these directories must exist, unless we execute on remote SLURM node + if (this.slurm.getOrElse(true).equals("local")) { + for ((d, n) <- Seq((archDir, "architectures"), + (platformDir, "platforms"))) + require(mustExist(d), "%s directory %s does not exist".format(n, d.toString)) + } } diff --git a/toolflow/scala/src/main/scala/tapasco/base/SlurmRemoteConfig.scala b/toolflow/scala/src/main/scala/tapasco/base/SlurmRemoteConfig.scala new file mode 100644 index 00000000..05d5400c --- /dev/null +++ b/toolflow/scala/src/main/scala/tapasco/base/SlurmRemoteConfig.scala @@ -0,0 +1,45 @@ +/* + * + * Copyright (c) 2014-2020 Embedded Systems and Applications, TU Darmstadt. + * + * This file is part of TaPaSCo + * (see https://github.com/esa-tu-darmstadt/tapasco). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program. If not, see . + * + */ +/** + * @file SlurmRemoteConfig.scala + * @brief Model: TPC remote Slurm Configuration. + * @authors M. Hartmann, TU Darmstadt + **/ + +package tapasco.base + +import java.nio.file.Path +import tapasco.base.builder.Builds + +case class SlurmRemoteConfig( + name: String, + host: String, + workstation: String, + workdir: Path, + installdir: Path, + jobFile: String, + SbatchOptions: String, + PreambleScript: Option[String], + PostambleScript: Option[String] + ) + +object SlurmRemoteConfig extends Builds[SlurmRemoteConfig] diff --git a/toolflow/scala/src/main/scala/tapasco/base/json/package.scala b/toolflow/scala/src/main/scala/tapasco/base/json/package.scala index 2d5cf570..3dac4fc4 100644 --- a/toolflow/scala/src/main/scala/tapasco/base/json/package.scala +++ b/toolflow/scala/src/main/scala/tapasco/base/json/package.scala @@ -402,7 +402,7 @@ package object json { (JsPath \ "CoreDir").readNullable[Path].map(_ getOrElse BasePathManager.DEFAULT_DIR_CORES) ~ (JsPath \ "CompositionDir").readNullable[Path].map(_ getOrElse BasePathManager.DEFAULT_DIR_COMPOSITIONS) ~ (JsPath \ "LogFile").readNullable[Path] ~ - (JsPath \ "Slurm").readNullable[Boolean].map(_ getOrElse false) ~ + (JsPath \ "Slurm").readNullable[String] ~ (JsPath \ "Parallel").readNullable[Boolean].map(_ getOrElse false) ~ (JsPath \ "MaxThreads").readNullable[Int] ~ (JsPath \ "MaxTasks").readNullable[Int] ~ @@ -419,7 +419,7 @@ package object json { (JsPath \ "CoreDir").write[Path] ~ (JsPath \ "CompositionDir").write[Path] ~ (JsPath \ "LogFile").writeNullable[Path] ~ - (JsPath \ "Slurm").write[Boolean] ~ + (JsPath \ "Slurm").writeNullable[String] ~ (JsPath \ "Parallel").write[Boolean] ~ (JsPath \ "MaxThreads").writeNullable[Int] ~ (JsPath \ "HlsTimeOut").writeNullable[Int] ~ @@ -437,6 +437,21 @@ package object json { } /* Configuration @} */ + + /* @{ SlurmRemoteConfig */ + implicit val slurmRemoteConfigReads: Reads[SlurmRemoteConfig] = ( + (JsPath \ "Name").read[String](minimumLength(length = 1)) ~ + (JsPath \ "SlurmHost").read[String](minimumLength(length = 1)) ~ + (JsPath \ "WorkstationHost").read[String](minimumLength(length = 1)) ~ + (JsPath \ "Workdir").read[Path] ~ + (JsPath \ "TapascoInstallDir").read[Path] ~ + (JsPath \ "JobFile").read[String](minimumLength(length = 1)) ~ + (JsPath \ "SbatchOptions").readNullable[String].map(_.getOrElse("")) ~ + (JsPath \ "PreambleScript").readNullable[String] ~ + (JsPath \ "PostambleScript").readNullable[String] + ) (SlurmRemoteConfig.apply _) + /* SlurmRemoteConfig @} */ + } // vim: foldmarker=@{,@} foldmethod=marker foldlevel=0 diff --git a/toolflow/scala/src/main/scala/tapasco/jobs/executors/Compose.scala b/toolflow/scala/src/main/scala/tapasco/jobs/executors/Compose.scala index 50ff9005..db42774d 100644 --- a/toolflow/scala/src/main/scala/tapasco/jobs/executors/Compose.scala +++ b/toolflow/scala/src/main/scala/tapasco/jobs/executors/Compose.scala @@ -28,18 +28,20 @@ package tapasco.jobs.executors import java.util.concurrent.Semaphore +import tapasco.activity.composers.Composer import tapasco.base._ import tapasco.filemgmt._ import tapasco.jobs.{ComposeJob, HighLevelSynthesisJob} +import tapasco.slurm.Slurm.Completed import tapasco.task._ +import tapasco.slurm._ private object Compose extends Executor[ComposeJob] { private implicit val logger = tapasco.Logging.logger(getClass) + private[this] val _slurm = Slurm.enabled def execute(job: ComposeJob) (implicit cfg: Configuration, tsk: Tasks): Boolean = { - val signal = new Semaphore(0) - logger.trace("composition: {}", job.composition) // first, collect all kernels and trigger HLS if not built yet @@ -74,7 +76,18 @@ private object Compose extends Executor[ComposeJob] { logger.info("all HLS tasks finished successfully, beginning compose run...") logger.debug("job: {}", job) - val composeTasks = for { + if (!_slurm) nodeExecution(job) else slurmExecution(job) + } else { + logger.error("HLS tasks failed, aborting composition") + false + } + } + + private def nodeExecution(job: ComposeJob) + (implicit cfg: Configuration, tsk: Tasks): Boolean = { + val signal = new Semaphore(0) + + val composeTasks = for { p <- job.platforms a <- job.architectures t = Target(a, p) @@ -104,10 +117,40 @@ private object Compose extends Executor[ComposeJob] { // successful, if all successful (composeTasks map (_.result) fold true) (_ && _) - } else { - logger.error("HLS tasks failed, aborting composition") - false + } + + private def slurmExecution(job: ComposeJob) + (implicit cfg: Configuration, tsk: Tasks): Boolean = { + + val ComposeJob(c, f, i, _, _, _, _, _, _, _) = job + val name = c.composition.map(_.kernel).fold("compose")(_ ++ "-" ++ _) + val outDir = FileAssetManager.TAPASCO_WORK_DIR.resolve("Slurm").resolve("Compose").resolve(name) + // needed for resource-based scheduling + val consumer = new ComposeTask( + composition = c, + designFrequency = f, + implementation = Composer.Implementation(i), + target = Target(job.architectures.head, job.platforms.head), + onComplete = _ => () + ) + + // define SLURM job + val sjob = Slurm.Job( + name = name, + log = outDir.resolve("tapasco.log"), + slurmLog = outDir.resolve("slurm-compose.log"), + errorLog = outDir.resolve("slurm-compose.errors.log"), + consumer = consumer, + maxHours = ComposeTask.MAX_COMPOSE_HOURS, + comment = Some(outDir.toString), + job = job, + cfg_file = outDir.resolve("slurm-compose.cfg") + ) + + // start slurm job and wait for finish + Slurm(sjob)(cfg) match { + case Some(id) => Slurm.waitFor(id) == Completed() + case None => false } } } - diff --git a/toolflow/scala/src/main/scala/tapasco/jobs/executors/HighLevelSynthesis.scala b/toolflow/scala/src/main/scala/tapasco/jobs/executors/HighLevelSynthesis.scala index 80fbe765..7f2d04f4 100644 --- a/toolflow/scala/src/main/scala/tapasco/jobs/executors/HighLevelSynthesis.scala +++ b/toolflow/scala/src/main/scala/tapasco/jobs/executors/HighLevelSynthesis.scala @@ -22,7 +22,6 @@ package tapasco.jobs.executors import java.util.concurrent.Semaphore - import tapasco.Logging import tapasco.activity.hls.HighLevelSynthesizer import tapasco.activity.hls.HighLevelSynthesizer.Implementation._ @@ -30,12 +29,18 @@ import tapasco.activity.hls.HighLevelSynthesizer._ import tapasco.base._ import tapasco.filemgmt.FileAssetManager import tapasco.jobs._ +import tapasco.slurm.Slurm +import tapasco.slurm.Slurm.Completed import tapasco.task._ protected object HighLevelSynthesis extends Executor[HighLevelSynthesisJob] { private implicit final val logger = Logging.logger(getClass) + private[this] val _slurm = Slurm.enabled + + def execute(job: HighLevelSynthesisJob)(implicit cfg: Configuration, tsk: Tasks): Boolean = + if (!_slurm) nodeExecution(job) else slurmExecution(job) - def execute(job: HighLevelSynthesisJob)(implicit cfg: Configuration, tsk: Tasks): Boolean = { + def nodeExecution(job: HighLevelSynthesisJob)(implicit cfg: Configuration, tsk: Tasks): Boolean = { val signal = new Semaphore(0) val runs: Seq[(Kernel, Target)] = for { a <- job.architectures.toSeq.sortBy(_.name) @@ -94,4 +99,39 @@ protected object HighLevelSynthesis extends Executor[HighLevelSynthesisJob] { // success, if all tasks were successful ((tasks ++ importTasks) map (_.result) fold true) (_ && _) } + + def slurmExecution(job: HighLevelSynthesisJob) + (implicit cfg: Configuration, tsk: Tasks): Boolean = { + + val name = job.kernels.map(_.name).fold("hls")(_++"-"++_) + val outDir = FileAssetManager.TAPASCO_WORK_DIR.resolve("Slurm").resolve("HLS").resolve(name) + // needed for resource-based scheduling + val consumer = new HighLevelSynthesisTask( + job.kernels.head, + Target(job.architectures.head, job.platforms.head), + cfg, + VivadoHLS, + _ => () + ) + + // define SLURM job + val sjob = Slurm.Job( + name = name, + log = outDir.resolve("tapasco.log"), + slurmLog = outDir.resolve("slurm-hls.log"), + errorLog = outDir.resolve("hls-slurm.errors.log"), + consumer = consumer, + maxHours = HighLevelSynthesisTask.MAX_SYNTH_HOURS, + job = job, + cfg_file = outDir.resolve("slurm-hls.cfg") + ) + + // execute sbatch to enqueue job, then wait for it + val r = Slurm(sjob)(cfg) match { + case Some(id) => Slurm.waitFor(id) == Completed() + case None => false + } + FileAssetManager.reset() + r + } } diff --git a/toolflow/scala/src/main/scala/tapasco/parser/GlobalOptions.scala b/toolflow/scala/src/main/scala/tapasco/parser/GlobalOptions.scala index c31fa1a7..7572d503 100644 --- a/toolflow/scala/src/main/scala/tapasco/parser/GlobalOptions.scala +++ b/toolflow/scala/src/main/scala/tapasco/parser/GlobalOptions.scala @@ -96,8 +96,8 @@ private object GlobalOptions { def inputFiles: Parser[(String, Path)] = jobsFile | configFile | logFile - def slurm: Parser[(String, Boolean)] = - longOption("slurm", "Slurm").map((_, true)) ~ ws + def slurm: Parser[(String, String)] = + longOption("slurm", "Slurm") ~ ws ~/ string.opaque("slurm template name") ~ ws def parallel: Parser[(String, Boolean)] = longOption("parallel", "Parallel").map((_, true)) ~ ws @@ -131,7 +131,7 @@ private object GlobalOptions { case ("Core", p: Path) => mkConfig(as, Some(c getOrElse Configuration() coreDir p)) case ("Kernel", p: Path) => mkConfig(as, Some(c getOrElse Configuration() kernelDir p)) case ("Platform", p: Path) => mkConfig(as, Some(c getOrElse Configuration() platformDir p)) - case ("Slurm", e: Boolean) => mkConfig(as, Some(c getOrElse Configuration() slurm e)) + case ("Slurm", t: String) => mkConfig(as, Some(c getOrElse Configuration() slurm Some(t))) case ("Parallel", e: Boolean) => mkConfig(as, Some(c getOrElse Configuration() parallel e)) case ("JobsFile", p: Path) => mkConfig(as, Some(c getOrElse Configuration() jobs readJobsFile(p))) case ("LogFile", p: Path) => mkConfig(as, Some(c getOrElse Configuration() logFile Some(p))) diff --git a/toolflow/scala/src/main/scala/tapasco/parser/Usage.scala b/toolflow/scala/src/main/scala/tapasco/parser/Usage.scala index 3cd54c2d..390f9624 100644 --- a/toolflow/scala/src/main/scala/tapasco/parser/Usage.scala +++ b/toolflow/scala/src/main/scala/tapasco/parser/Usage.scala @@ -88,7 +88,8 @@ configuration via `tapasco -n config.json`. Arg("--logFile FILE", "Path to output log file") & Arg("--configFile FILE", "Path to Json file with Configuration") & Arg("--jobsFile FILE", "Path to Json file with Jobs array") & - Arg("--slurm", "Activate SLURM cluster execution (requires sbatch)") & + Arg("--slurm TEMPLATE", "Activate SLURM cluster execution." ~ + "TEMPLATE describes a remote SLURM node, use 'local' for local execution (requires sbatch).") & Arg("--parallel", "Execute all jobs in parallel (careful!)") & Arg("--maxThreads NUM", "Limit internal parallelism of tasks (e.g., Vivado)" ~ "to the given number of threads.") & diff --git a/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala b/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala index afd0049d..6958fc24 100644 --- a/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala +++ b/toolflow/scala/src/main/scala/tapasco/slurm/Slurm.scala @@ -24,13 +24,20 @@ package tapasco.slurm import java.nio.file._ import java.nio.file.attribute.PosixFilePermission._ +import tapasco.Common import tapasco.Logging._ +import tapasco.activity.composers.Composer +import tapasco.base.{Configuration, SlurmRemoteConfig, Target} import tapasco.filemgmt._ import tapasco.task.ResourceConsumer import tapasco.util.{Publisher, Template} +import scala.concurrent.duration.Duration import scala.collection.JavaConverters._ +import scala.sys.ShutdownHookThread import scala.sys.process._ +import tapasco.base.json._ +import tapasco.jobs.{ComposeJob, HighLevelSynthesisJob} /** * Primitive interface to SLURM scheduler: @@ -46,11 +53,14 @@ final object Slurm extends Publisher { /** Name of the job. */ name: String, - /** File name of the stdout logfile. */ - slurmLog: String, + /** File name of the tapasco logfile. */ + log: Path, - /** File name of the stderr logfile. */ - errorLog: String, + /** File name of the stdout slurm logfile. */ + slurmLog: Path, + + /** File name of the stderr slurm logfile. */ + errorLog: Path, /** Consumer to schedule. */ consumer: ResourceConsumer, @@ -58,11 +68,14 @@ final object Slurm extends Publisher { /** Time limit (in hours). */ maxHours: Int, - /** Sequence of commands to execute (bash). */ - commands: Seq[String], - /** Optional comment. */ - comment: Option[String] = None + comment: Option[String] = None, + + /** The job to execute */ + job: tapasco.jobs.Job, + + /** Filename of the tapasco configuration file */ + cfg_file: Path ) /** Exception class for negative SLURM responses. */ @@ -79,18 +92,20 @@ final object Slurm extends Publisher { } /** Template file for job script. */ - final val slurmTemplate = FileAssetManager.TAPASCO_HOME.resolve("common").resolve("slurm.job.template") + final val SLURM_TEMPLATE_DIR = Common.commonDir.resolve("SLURM"); /** Default output directory for SLURM-related outputs. */ final val slurmOutput = FileAssetManager.TAPASCO_HOME.resolve("slurm") /** Regular expression: Positive ACK from `sbatch`. */ final val slurmSubmissionAck = """[Ss]ubmitted batch job (\d+)""".r - /** Polling interval for `squeue`. */ + /** Polling interval for `sacct`. */ final val slurmDelay = 15000 // 15 secs /** Set of POSIX permissions for SLURM job scripts. */ final val slurmScriptPermissions = Set(OWNER_READ, OWNER_WRITE, OWNER_EXECUTE, GROUP_READ, OTHERS_READ).asJava /** Wait interval between retries. */ final val slurmRetryDelay = 10000 // 10 secs + /** Stores a closure for every slurm job id, which is called once that job finishes. */ + var postambles: Map[Int, Int => Boolean => Unit] = Map() /** Returns true if SLURM is available on host running iTPC. */ lazy val available: Boolean = "which sbatch".! == 0 @@ -101,17 +116,32 @@ final object Slurm extends Publisher { } /** Enables or disables SLURM, returns new value for enabled. */ - def enabled_=(en: Boolean): Boolean = if (en && available) { - Slurm.synchronized { - _enabled = en + def set_cfg(cfg: SlurmConfig): Boolean = cfg match { + case Disabled() => false + case EnabledLocal() => if (available) { + Slurm.synchronized { + _enabled = true + } + publish(Events.SlurmModeEnabled(true)) + true + } else { + logger.warn("SLURM local mode was selected, but could be not activated (sbatch not found)") + false } - publish(Events.SlurmModeEnabled(en)) - enabled - } else { - if (en) { - logger.warn("SLURM mode was selected, but could be not activated (sbatch not found)") + case EnabledRemote(template_name) => { + val template_path = SLURM_TEMPLATE_DIR.resolve(template_name + ".json") + if (template_path.toFile.exists()) { + Slurm.synchronized { + _enabled = true + slurm_remote_cfg = SlurmRemoteConfig.from(template_path).toOption + } + publish(Events.SlurmModeEnabled(true)) + true + } else { + logger.warn("SLURM mode was selected, but the specified template was not found") + false + } } - false } /** Helper function: Sets correct file permissions on job scripts. */ @@ -122,26 +152,33 @@ final object Slurm extends Publisher { * * @param job Job to execute. * @param file File to write script to. + * @param upd_wd Function that converts local workdir file paths to valid paths on a remote SLURM node. * @return True, iff successful. **/ - def writeJobScript(job: Job, file: Path): Boolean = (catchDefault[Boolean](false, Seq(classOf[java.io.IOException]), - prefix = "could not write %s: ".format(file.toString)) _) { + def writeJobScript(job: Job, file: Path, upd_wd: Path => Path): Boolean = + (catchDefault[Boolean](false, Seq(classOf[java.io.IOException]), prefix = "could not write %s: ".format(file.toString)) _) { // fill in template needles val jobScript = new Template jobScript("JOB_NAME") = job.name - jobScript("SLURM_LOG") = job.slurmLog - jobScript("ERROR_LOG") = job.errorLog + jobScript("SLURM_LOG") = upd_wd(job.slurmLog).toString + jobScript("ERROR_LOG") = upd_wd(job.errorLog).toString jobScript("MEM_PER_CPU") = (job.consumer.memory / 1024).toString jobScript("CPUS") = (job.consumer.cpus).toString jobScript("TIMELIMIT") = "%02d:00:00".format(job.maxHours) - jobScript("TAPASCO_HOME") = FileAssetManager.TAPASCO_HOME.toString - jobScript("COMMANDS") = job.commands mkString "\n" + jobScript("TAPASCO_HOME") = upd_wd(FileAssetManager.TAPASCO_WORK_DIR).toString + jobScript("COMMANDS") = "tapasco --configFile %s".format(upd_wd(job.cfg_file).toString) jobScript("COMMENT") = job.comment getOrElse "" + if (slurm_remote_cfg.isDefined) + jobScript("WORKSTATION") = slurm_remote_cfg.get.workstation // create parent directory Files.createDirectories(file.getParent()) // write file val fw = new java.io.FileWriter(file.toString) - fw.append(jobScript.interpolateFile(Slurm.slurmTemplate.toString)) + val template_name = slurm_remote_cfg match { + case Some(c) => c.jobFile + case None => "default.job.template" + } + fw.append(jobScript.interpolateFile(SLURM_TEMPLATE_DIR.resolve(template_name).toString)) fw.flush() fw.close() // set executable permissions @@ -149,45 +186,230 @@ final object Slurm extends Publisher { true } + /** + * Preamble is run before the SLURM job is started. + * Copy required files from host to SLURM workstation. + * @param slurm_job Job to execute. + * @param update_paths Function that converts local workdir file paths to valid paths on a remote SLURM node. + **/ + def slurm_preamble(slurm_job: Job, update_paths: Path => Path)(implicit cfg: Configuration): Unit = { + val local_files = Seq(slurm_job.cfg_file) ++ (slurm_job.job match { + case j@ComposeJob(c, _, _, _, _, _, _, _, _, _) => { + val cores = for { + p <- j.platforms + a <- j.architectures + tgt = Target(a, p) + } yield { + val tgt_cores = c.composition.map(ce => FileAssetManager.entities.core(ce.kernel, tgt)) + tgt_cores.map(_.get.zipPath) ++ tgt_cores.map(_.get.descPath) + } + cores flatten + } + case HighLevelSynthesisJob(_, _, _, k, _) => { + val kernels = FileAssetManager.entities.kernels.filter( kernel => k.get.contains(kernel.name) ).toSeq + kernels.map(_.descPath.getParent) + } + case _ => Seq() + }) + val remote_files = local_files map update_paths + file_transfer(local_files.zip(remote_files).toMap, tx = true) + + // run preamble script, if specified + slurm_remote_cfg.get.PreambleScript map ("sh %s".format(_).!) + } + + /** + * Postamble is run after the SLURM job is finished. + * Copy generated artefacts back from the SLURM node. + * @param slurm_job Job to execute. + * @param slurm_success Indicates if the SLURM job finished successfully. + * @param update_paths Function that converts local workdir file paths to valid paths on a remote SLURM node. + **/ + def slurm_postamble(slurm_job: Job, cfg: Configuration, slurm_success: Boolean, update_paths: Path => Path): Unit = { + val loc_files = Seq(slurm_job.log, slurm_job.slurmLog, slurm_job.errorLog) ++ (slurm_job.job match { + case j@ComposeJob(c, f, _, _, _, feat, _, _, _, _) if slurm_success => { + val compose_out = for { + p <- j.platforms + a <- j.architectures + tgt = Target(a, p) + } yield { + val out_dir = cfg.outputDir(c,tgt,f,feat.getOrElse(Seq())) + val bit_name = Composer.mkProjectName(c, tgt, f) + val fnames = Seq(bit_name + ".bit", "timing.txt", "utilization.txt") + fnames.map(out_dir.resolve) + } + compose_out flatten + } + case j@HighLevelSynthesisJob(_, _,_, _, _) if slurm_success => { + val cores = for { + p <- j.platforms + a <- j.architectures + k <- j.kernels + tgt = Target(a, p) + } yield { + val core_dir = cfg.outputDir(k, tgt).resolve("ipcore") + Seq(core_dir.resolve("%s.zip".format(k.name)), core_dir.resolve("core.json")) + } + cores flatten + } + case _ => Seq() + }) + val remote_files = loc_files map update_paths + file_transfer(remote_files.zip(loc_files).toMap, tx=false) + + // run postamble script, if specified + slurm_remote_cfg.get.PostambleScript map ("sh %s".format(_).!) + } + + /** + * Copy a set of files either from a host to a remote SLURM node or vice versa, depending on the @param tx + * @param tfer A map from SRC to DST file paths + * @param tx indicates the direction of transfer. If value is true (false), the direction is push (pull). + **/ + def file_transfer(tfer: Map[Path, Path], tx: Boolean, host: Option[String] = None): Boolean = { + for ((from, to) <- tfer) { + val target_host = host.getOrElse(slurm_remote_cfg.get.workstation) + logger.info("Copying %s to %s on %s".format(from, to, target_host)) + + // parent directory may not exist + val mkdir = "mkdir -p %s".format(to.getParent) + if (tx) exec_cmd(mkdir, hostname = Some(target_host)) else mkdir.! + + val cpy_cmd = if (tx) { + "scp -r %s %s:%s".format(from, target_host, to) + } else { + "scp -r %s:%s %s".format(target_host, from, to) + } + logger.debug("Copy Command: " + cpy_cmd) + if (cpy_cmd.! != 0) throw new Exception("Could not copy file %s to %s!".format(from, to)) + } + true + } + /** * Schedules a job on SLURM. * - * @param script Job script file to schedule via `sbatch`. + * @param slurm_job Job script to schedule via `sbatch`. * @return Either a positive integer (SLURM id), or an Exception. **/ - def apply(script: Path, retries: Int = SLURM_RETRIES): Option[Int] = - catchAllDefault[Option[Int]](None, "Slurm scheduling failed: ") { - val cmd = "sbatch %s".format(script.toAbsolutePath().normalize().toString) + def apply(slurm_job: Job)(implicit cfg: Configuration): Option[Int] = { + val local_base = slurm_job.cfg_file.getParent + val jobFile = local_base.resolve("%s.slurm".format(slurm_job.name)) // SLURM job script + + /** replace a prefix of a Path by a different prefix. Used to convert local file paths to paths that are valid on SLURM node */ + def prefix_subst(old_pre: Path, new_pre: Path): (Path => Path) = { + f => { + val postfix = f.toString.stripPrefix(old_pre.toString).stripPrefix("/") + new_pre.resolve(postfix) + } + } + val (wd_to_rmt, tpsc_to_rmt) = if (slurm_remote_cfg.isDefined) + (prefix_subst(cfg.kernelDir.getParent, slurm_remote_cfg.get.workdir), + prefix_subst(cfg.platformDir.getParent.getParent.getParent, slurm_remote_cfg.get.installdir)) + else (identity[Path] _, identity[Path] _) + + /** Create non-slurm cfg, with updated paths such that they match the folder structure on SLURM node */ + val newCfg = cfg + .descPath(wd_to_rmt(cfg.descPath)) + .compositionDir(wd_to_rmt(cfg.compositionDir)) + .coreDir(wd_to_rmt(cfg.coreDir)) + .kernelDir(wd_to_rmt(cfg.kernelDir)) + .platformDir(tpsc_to_rmt(cfg.platformDir)) + .archDir(tpsc_to_rmt(cfg.archDir)) + .jobs(Seq(slurm_job.job)) + .slurm(None) + .logFile(Some(wd_to_rmt(slurm_job.log))) + + logger.info("starting " + slurm_job.name + " job on SLURM ({})", slurm_job.cfg_file) + catchAllDefault[Option[Int]](None, "error during SLURM job execution (%s): ".format(jobFile)) { + Files.createDirectories(local_base) // create base directory + + Slurm.writeJobScript(slurm_job, jobFile, wd_to_rmt) // write job script + Configuration.to(newCfg, slurm_job.cfg_file) // write Configuration to file + + /** preamble: copy required files to SLURM node */ + if (slurm_remote_cfg.isDefined) { + // copy all required files to workstation + slurm_preamble(slurm_job, wd_to_rmt) + + // copy slurm job file to slurm login node + file_transfer(Map(jobFile -> Paths.get("~/%s.slurm".format(slurm_job.name))), + tx = true, host=Some(slurm_remote_cfg.get.host)) + } + + val cmd = "sbatch " ++ (slurm_remote_cfg match { + case Some(c) => "%s ~/%s.slurm".format(c.SbatchOptions, slurm_job.name) + case None => jobFile.toAbsolutePath().normalize().toString + }) logger.debug("running slurm batch job: '%s'".format(cmd)) - val res = cmd.!! - val id = slurmSubmissionAck.findFirstMatchIn(res) map (_ group (1) toInt) - if (id.isEmpty) { - if (retries > 0) { - // wait for 10 secs + random up to 5 secs to avoid congestion - Thread.sleep(slurmRetryDelay + scala.util.Random.nextInt() % (slurmRetryDelay / 2)) - apply(script, retries - 1) - } else { - throw new SlurmException(script.toString, res) + + var id: Option[Int] = None + var retries = SLURM_RETRIES + while (id.isEmpty) { + val res = exec_cmd(cmd) + id = slurmSubmissionAck.findFirstMatchIn(res) map (_ group (1) toInt) + if (id.isEmpty) { + if (retries > 0) { + // wait for 10 secs + random up to 5 secs to avoid congestion + Thread.sleep(slurmRetryDelay + scala.util.Random.nextInt() % (slurmRetryDelay / 2)) + retries -= 1 + } else { + throw new SlurmException(jobFile.toString, res) + } } - } else { - logger.debug("received SLURM id: {}", id) - id } + logger.debug("received SLURM id: {}", id) + + /** define postamble that shall be run once job is finished */ + if (slurm_remote_cfg.isDefined) { + postambles += (id.get -> {slurm_id:Int => slurm_success:Boolean => + logger.info("Running postamble for SLURM id: {}", slurm_id) + slurm_postamble(slurm_job, cfg, slurm_success, wd_to_rmt) + }) + } + id } + } - /** Check via `squeue` if the SLURM job is still running. */ - def isRunning(id: Int): Boolean = catchAllDefault[Boolean](true, "Slurm `squeue` failed: ") { - val squeue = "squeue -h".!! - logger.trace("squeue output: {}", squeue) - !"%d".format(id).r.findFirstIn(squeue).isEmpty + /** Check via `sacct` if the SLURM job is still running. */ + def getSlurmStatus(id: Int): SlurmStatus = catchAllDefault[SlurmStatus](Unknown(), "Slurm `sacct` failed: ") { + val sacct = exec_cmd("sacct -pn") + val pattern = """%d\|([^|]*)\|[^|]*\|[^|]*\|[^|]*\|([A-Z]*)( [^|]*)?\|[^|]*\|""".format(id).r + pattern.findFirstIn(sacct) match { + case None => + logger.warn("Job ID %d not listed in sacct".format(id)) + Slurm.Unknown() + case Some(m) => m match { + case pattern(name, status, cancelledBy) => status match { + case "RUNNING" => Slurm.Running () + case "COMPLETED" => Slurm.Completed () + case "CANCELLED" => Slurm.Cancelled (cancelledBy) + case _ => + logger.warn ("Job %s (ID=%d) has status %s".format (name, id, status) ) + Slurm.Unknown () + } + } + } } - /** Wait until the given SLURM job disappears from `squeue` output. */ - def waitFor(id: Int): Unit = { - while (isRunning(id)) { - logger.trace("SLURM job #%d is still running, sleeping for %d secs ...".format(id, slurmDelay / 1000)) + /** Wait until the given SLURM job is not listed as RUNNING anymore in `sacct` output. */ + def waitFor(id: Int): SlurmStatus = { + val hook = ShutdownHookThread(Slurm.cancel(id)) + val start = System.currentTimeMillis() + var status: SlurmStatus = Slurm.Running() + while (status == Running()) { // can be cancelled by SIGINT + val dur = Duration(System.currentTimeMillis() - start, "millis") + logger.info("SLURM job #%d is running since %dh %02dm %02ds" + .format(id, dur.toHours, dur.toMinutes % 60, dur.toSeconds % 60)) Thread.sleep(slurmDelay) + status = getSlurmStatus(id) } + hook.remove() + + // callback that pulls generated files from remote node + if (slurm_remote_cfg.isDefined) + postambles(id)(id)(status == Slurm.Completed()) + status } /** Returns a list of all SLURM job ids which are registered under the @@ -196,7 +418,7 @@ final object Slurm extends Publisher { Seq() } else { catchAllDefault(Seq[Int](), "could not get squeue output: ") { - val lines = "squeue -u %s".format(sys.env("USER")).!! + val lines = exec_cmd("squeue -u %s".format(sys.env("USER"))) val ids = ("""\n\s*(\d+)""".r.unanchored.findAllMatchIn(lines) map (m => m.group(1).toInt)).toSeq logger.debug("running SLURM jobs: {}", ids mkString " ") ids @@ -205,7 +427,7 @@ final object Slurm extends Publisher { /** Cancels the SLURM job with the given ID. */ def cancel(id: Int): Unit = catchAllDefault((), "canceling SLURM job %d failed: ".format(id)) { - "scancel %d".format(id).!! + exec_cmd("scancel %d".format(id)) } /** Cancels all currently running SLURM jobs. */ @@ -215,10 +437,35 @@ final object Slurm extends Publisher { val cmd = "scancel %s" format (ids mkString " ") logger.info("canceling SLURM jobs: {}", ids mkString ", ") logger.debug("command: '{}'", cmd) - cmd.! + exec_cmd(cmd) } } + /** Execute a SLURM command, either locally or on a remote host */ + def exec_cmd(c: String, hostname: Option[String] = None): String = { + val cmd = if (slurm_remote_cfg.isEmpty) c else { + val host = hostname.getOrElse(slurm_remote_cfg.get.host) + "ssh %s %s".format(host, c) + } + + logger.debug("Executing command: %s".format(cmd)) + cmd.!! + } + /** Use SLURM? */ private var _enabled = false + + /** Host for executing SLURM */ + private var slurm_remote_cfg: Option[SlurmRemoteConfig] = None + + sealed trait SlurmStatus + final case class Completed() extends SlurmStatus + final case class Cancelled(by: String) extends SlurmStatus + final case class Running() extends SlurmStatus + final case class Unknown() extends SlurmStatus + + sealed trait SlurmConfig + final case class EnabledLocal() extends SlurmConfig + final case class EnabledRemote(template_name: String) extends SlurmConfig + final case class Disabled() extends SlurmConfig } diff --git a/toolflow/scala/src/main/scala/tapasco/task/ComposeTask.scala b/toolflow/scala/src/main/scala/tapasco/task/ComposeTask.scala index ee5703af..d07fab1f 100644 --- a/toolflow/scala/src/main/scala/tapasco/task/ComposeTask.scala +++ b/toolflow/scala/src/main/scala/tapasco/task/ComposeTask.scala @@ -26,10 +26,7 @@ import java.nio.file._ import tapasco.Logging._ import tapasco.activity.composers._ import tapasco.base._ -import tapasco.base.json._ import tapasco.dse.Heuristics -import tapasco.jobs._ -import tapasco.slurm._ import tapasco.util._ import scala.util.Properties.{lineSeparator => NL} @@ -52,20 +49,16 @@ class ComposeTask(composition: Composition, val onComplete: Boolean => Unit) (implicit cfg: Configuration) extends Task with LogTracking { private[this] implicit val _logger = tapasco.Logging.logger(getClass) - private[this] val _slurm = Slurm.enabled private[this] var _composerResult: Option[Composer.Result] = None private[this] val _outDir = cfg.outputDir(composition, target, designFrequency, features getOrElse Seq()) private[this] val _logFile = logFile getOrElse _outDir.resolve("tapasco.log").toString - private[this] val _errorLogFile = Paths.get(_logFile).resolveSibling("slurm-compose.errors.log") import LogFormatter._ def composerResult: Option[Composer.Result] = _composerResult /** @inheritdoc**/ - def job: Boolean = if (!_slurm) nodeExecution else slurmExecution - - private def nodeExecution: Boolean = { + def job: Boolean = { val appender = LogFileTracker.setupLogFileAppender(_logFile.toString) val composer = Composer(implementation)(cfg) _logger.debug("launching compose run for {}@{} [current thread: {}], logfile {}", @@ -107,47 +100,6 @@ class ComposeTask(composition: Composition, result } - private def slurmExecution: Boolean = { - val l = Paths.get(_logFile).toAbsolutePath().normalize() - val cfgFile = l.resolveSibling("slurm-compose.cfg") // Configuration Json - val jobFile = l.resolveSibling("slurm-compose.slurm") // SLURM job script - val slgFile = l.resolveSibling("slurm-compose.log") // SLURM job stdout log - val cmpsJob = ComposeJob( - composition, designFrequency, implementation.toString, Some(Seq(target.ad.name)), Some(Seq(target.pd.name)), - features, debugMode - ) - // define SLURM job - val job = Slurm.Job( - name = l.getParent.getParent.getFileName.resolve(l.getParent.getFileName).toString, - slurmLog = slgFile.toString, - errorLog = _errorLogFile.toString, - consumer = this, - maxHours = ComposeTask.MAX_COMPOSE_HOURS, - commands = Seq("tapasco --configFile %s".format(cfgFile.toString)), - comment = Some(_outDir.toString) - ) - // generate non-SLURM config with single job - val newCfg = cfg - .logFile(Some(l)) - .slurm(false) - .jobs(Seq(cmpsJob)) - - _logger.info("launching Compose job on SLURM ({})", cfgFile) - - catchAllDefault(false, "error during SLURM job execution (%s): ".format(jobFile)) { - Files.createDirectories(jobFile.getParent()) // create base directory - Slurm.writeJobScript(job, jobFile) // write job script - Configuration.to(newCfg, cfgFile) // write Configuration to file - Slurm(jobFile) foreach (Slurm.waitFor(_)) // execute and wait - _composerResult = if (debugMode.isEmpty) { - ComposeTask.parseResultInLog(l.toString) - } else { - ComposeTask.makeDebugResult(debugMode.get) - } - (_composerResult map (_.result) getOrElse false) == ComposeResult.Success - } - } - private def elementdesc = "%s [F=%2.2f]".format(logformat(composition), designFrequency.toDouble) /** @inheritdoc*/ @@ -180,7 +132,7 @@ object ComposeTask { import scala.io._ - private final val MAX_COMPOSE_HOURS = 23 + final val MAX_COMPOSE_HOURS = 23 private final val RE_RESULT = """compose run .*result: ([^,]+)""".r.unanchored private final val RE_LOG = """compose run .*result: \S+.*logfile: '([^']+)'""".r.unanchored private final val RE_TIMING = """compose run .*result: \S+.*timing report: '([^']+)'""".r.unanchored diff --git a/toolflow/scala/src/main/scala/tapasco/task/HighLevelSynthesisTask.scala b/toolflow/scala/src/main/scala/tapasco/task/HighLevelSynthesisTask.scala index 99edeb23..3d89ae0c 100644 --- a/toolflow/scala/src/main/scala/tapasco/task/HighLevelSynthesisTask.scala +++ b/toolflow/scala/src/main/scala/tapasco/task/HighLevelSynthesisTask.scala @@ -37,10 +37,8 @@ class HighLevelSynthesisTask(val k: Kernel, val t: Target, val cfg: Configuratio val onComplete: Boolean => Unit) extends Task with LogTracking { private[this] implicit val logger = tapasco.Logging.logger(getClass) private[this] var result: Option[HighLevelSynthesizer.Result] = None - private[this] val slurm = Slurm.enabled private[this] val r = HighLevelSynthesizer(hls) private[this] val l = r.logFile(k, t)(cfg).resolveSibling("hls.log") - private[this] val e = l.resolveSibling("hls-slurm.errors.log") def synthesizer: HighLevelSynthesizer = r @@ -49,42 +47,12 @@ class HighLevelSynthesisTask(val k: Kernel, val t: Target, val cfg: Configuratio def description: String = "High-Level-Synthesis for '%s' with target %s @ %s".format(k.name, t.pd.name, t.ad.name) - def job: Boolean = if (!slurm) { + def job: Boolean = { val appender = LogFileTracker.setupLogFileAppender(l.toString) logger.trace("current thread name: {}", Thread.currentThread.getName()) result = Some(r.synthesize(k, t)(cfg)) LogFileTracker.stopLogFileAppender(appender) result map (_.toBoolean) getOrElse false - } else { - val cfgFile = l.resolveSibling("slurm-hls.cfg") // Configuration Json - val jobFile = l.resolveSibling("hls.slurm") // SLURM job script - val slurmLog = l.resolveSibling("slurm-hls.log") // raw log file (stdout w/colors) - val hlsJob = HighLevelSynthesisJob(hls.toString, Some(Seq(t.ad.name)), Some(Seq(t.pd.name)), Some(Seq(k.name))) - // define SLURM job - val job = Slurm.Job( - name = "hls-%s-%s-%s".format(t.ad.name, t.pd.name, k.name), - slurmLog = slurmLog.toString, - errorLog = e.toString, - consumer = this, - maxHours = HighLevelSynthesisTask.MAX_SYNTH_HOURS, - commands = Seq("tapasco --configFile %s".format(cfgFile.toString, k.name.toString)) - ) - // generate non-SLURM config with single job - val newCfg = cfg - .logFile(Some(l)) - .jobs(Seq(hlsJob)) - .slurm(false) - - logger.info("starting HLS job on SLURM ({})", cfgFile) - - catchAllDefault(false, "error during SLURM job execution (%s): ".format(jobFile)) { - Files.createDirectories(l.getParent()) // create base directory - Slurm.writeJobScript(job, jobFile) // write job script - Configuration.to(newCfg, cfgFile) // write Configuration to file - val r = (Slurm(jobFile) map (Slurm.waitFor(_))).nonEmpty // execute sbatch to enqueue job, then wait for it - FileAssetManager.reset() - r - } } def logFiles: Set[String] = Set(l.toString) @@ -100,6 +68,6 @@ class HighLevelSynthesisTask(val k: Kernel, val t: Target, val cfg: Configuratio ) } -private object HighLevelSynthesisTask { +object HighLevelSynthesisTask { final val MAX_SYNTH_HOURS = 8 } diff --git a/toolflow/vivado/common/SLURM/ESA.json b/toolflow/vivado/common/SLURM/ESA.json new file mode 100644 index 00000000..1245260b --- /dev/null +++ b/toolflow/vivado/common/SLURM/ESA.json @@ -0,0 +1,9 @@ +{ + "Name" : "ESA Cluster", + "SlurmHost" : "slurm", + "WorkstationHost" : "balin", + "Workdir" : "/scratch/SLURM/tapasco_workdir", + "TapascoInstallDir" : "/scratch/SLURM/tapasco", + "JobFile" : "slurm_ESA.job.template" +} + diff --git a/toolflow/vivado/common/slurm.job.template b/toolflow/vivado/common/SLURM/default.job.template similarity index 96% rename from toolflow/vivado/common/slurm.job.template rename to toolflow/vivado/common/SLURM/default.job.template index 8fd31317..c9b25b97 100644 --- a/toolflow/vivado/common/slurm.job.template +++ b/toolflow/vivado/common/SLURM/default.job.template @@ -26,7 +26,7 @@ #SBATCH -t @@TIMELIMIT@@ #SBATCH --comment="@@COMMENT@@" -source @@TAPASCO_HOME@@/setup.sh +source @@TAPASCO_HOME@@/tapasco-setup.sh # user commands begin here echo "SLURM job #$SLURM_JOB_ID started at $(date)" diff --git a/toolflow/vivado/common/SLURM/slurm_ESA.job.template b/toolflow/vivado/common/SLURM/slurm_ESA.job.template new file mode 100644 index 00000000..d51ad687 --- /dev/null +++ b/toolflow/vivado/common/SLURM/slurm_ESA.job.template @@ -0,0 +1,48 @@ +#!/bin/bash -x +# Copyright (c) 2014-2020 Embedded Systems and Applications, TU Darmstadt. +# +# This file is part of TaPaSCo +# (see https://github.com/esa-tu-darmstadt/tapasco). +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with this program. If not, see . +# + +#SBATCH -J "@@JOB_NAME@@" +#SBATCH -o "@@SLURM_LOG@@" +#SBATCH -e "@@ERROR_LOG@@" +#SBATCH --mem-per-cpu=@@MEM_PER_CPU@@ +#SBATCH -n @@CPUS@@ +#SBATCH -t @@TIMELIMIT@@ +#SBATCH --comment="@@COMMENT@@" + +# Setup env +source @@TAPASCO_HOME@@/tapasco-setup.sh +export PATH="/opt/cad/xilinx/vivado/Vivado/2019.2/bin/:$PATH" +# Vivado will hang otherwise, since it has no write permissions to real $HOME +export HOME=@@TAPASCO_HOME@@/../ + +# user commands begin here +echo "SLURM job #$SLURM_JOB_ID started at $(date)" +rsync -a /net/@@WORKSTATION@@/SLURM/tapasco_workdir/ @@TAPASCO_HOME@@ +@@COMMANDS@@ + +retVal=$? +if [ $retVal -ne 0 ]; then + echo "SLURM job #$SLURM_JOB_ID failed at $(date)" +else + echo "SLURM job #$SLURM_JOB_ID finished at $(date)" +fi + +rsync -a @@TAPASCO_HOME@@/ /net/@@WORKSTATION@@/SLURM/tapasco_workdir +exit $retVal