Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/slurm remote support #250

Draft
wants to merge 30 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
60b78d0
Move slurm job template to separate SLURM directory
mhrtmnn Oct 21, 2020
68f49fb
Add parser for new JSON file type, that describes a remote SLURM node
mhrtmnn Oct 21, 2020
f5ea82d
Change CLI such that different SLURM node templates can be selected
mhrtmnn Oct 21, 2020
0ba49cb
Add additional fields to Slurm.Job case class
mhrtmnn Oct 28, 2020
4b01364
Add option for remote execution to Slurm object
mhrtmnn Oct 28, 2020
b35dc81
Add a wrapper that handles local/remote execution of shell commands
mhrtmnn Oct 28, 2020
9683925
Add Pre/Postamble that runs before/after SLURM Job
mhrtmnn Oct 28, 2020
3b390c8
Make writeJobScript() work with remote paths
mhrtmnn Oct 28, 2020
b029b77
Adapt Slurm.apply() to remote execution
mhrtmnn Oct 28, 2020
5e43cfa
Add postamble that pulls generated files from node
mhrtmnn Oct 28, 2020
c54e988
Use explicit identity func instead of custom lambda
mhrtmnn Nov 5, 2020
92415ba
Postamble: add timing and utilization report
mhrtmnn Nov 5, 2020
3e2183f
Fix local SLURM execution
mhrtmnn Nov 5, 2020
619893e
Add optional sbatch CLI options to slurm json
mhrtmnn Nov 5, 2020
16b4464
Add support for user-defined pre/postamble
mhrtmnn Nov 5, 2020
773c47b
Remove unnecessary func parameter
mhrtmnn Nov 5, 2020
0d4c832
Fix: Copy SLURM job script to correct host
mhrtmnn Nov 5, 2020
c5911ef
Finalize SLURM job template for ESA cluster
mhrtmnn Nov 5, 2020
6b01e25
Use Paths.get() instead of Path.of()
mhrtmnn Nov 16, 2020
74436f4
Use unique slurm script names
mhrtmnn Nov 16, 2020
48c5fde
Fix hls postamble
mhrtmnn Nov 16, 2020
6c92d0c
Only pull SLURM artefacts if job was successful
mhrtmnn Nov 17, 2020
27d8af3
HLS task: check slurm return code
mhrtmnn Nov 17, 2020
9d496b3
Handle return value correctly in slurm job
mhrtmnn Nov 17, 2020
3207acd
Fix vivado hang
mhrtmnn Nov 17, 2020
1d81cd6
Reduce verbosity, cleanup
mhrtmnn Nov 17, 2020
ff4a8e9
Cancel SLURM job if tapasco is terminated
mhrtmnn Nov 28, 2020
b7c0a20
Show duration for which SLURM job has been running
mhrtmnn Nov 28, 2020
259e6ee
Refactor Slurm tasks into slurm jobs
mhrtmnn Dec 29, 2020
f84e5cc
Adapt Slurm pre/post-amble to changes in 259e6ee1e
mhrtmnn Dec 29, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion toolflow/scala/src/main/scala/tapasco/Tapasco.scala
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,12 @@ object Tapasco {
logger.trace("configuring FileAssetManager...")
FileAssetManager(cfg)
logger.trace("SLURM: {}", cfg.slurm)
if (cfg.slurm) Slurm.enabled = cfg.slurm
if (cfg.slurm.isDefined) {
Slurm.set_cfg(cfg.slurm.get match {
case "local" => Slurm.EnabledLocal()
case t => Slurm.EnabledRemote(t)
})
}
FileAssetManager.start()
logger.trace("parallel: {}", cfg.parallel)
cfg.logFile map { logfile: Path => setupLogFileAppender(logfile.toString) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ trait Configuration {

def logFile(p: Option[Path]): Configuration

def slurm: Boolean
def slurm: Option[String]

def slurm(enabled: Boolean): Configuration
def slurm(template: Option[String]): Configuration

def parallel: Boolean

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ private case class ConfigurationImpl(
private val _coreDir: Path = BasePathManager.DEFAULT_DIR_CORES,
private val _compositionDir: Path = BasePathManager.DEFAULT_DIR_COMPOSITIONS,
private val _logFile: Option[Path] = None,
slurm: Boolean = false,
slurm: Option[String] = None,
parallel: Boolean = false,
maxThreads: Option[Int] = None,
maxTasks: Option[Int] = None,
Expand Down Expand Up @@ -81,7 +81,7 @@ private case class ConfigurationImpl(

def logFile(op: Option[Path]): Configuration = this.copy(_logFile = op)

def slurm(enabled: Boolean): Configuration = this.copy(slurm = enabled)
def slurm(template: Option[String]): Configuration = this.copy(slurm = template)

def parallel(enabled: Boolean): Configuration = this.copy(parallel = enabled)

Expand All @@ -97,8 +97,10 @@ private case class ConfigurationImpl(

def jobs(js: Seq[Job]): Configuration = this.copy(jobs = js)

// these directories must exist
for ((d, n) <- Seq((archDir, "architectures"),
(platformDir, "platforms")))
require(mustExist(d), "%s directory %s does not exist".format(n, d.toString))
// these directories must exist, unless we execute on remote SLURM node
if (this.slurm.getOrElse(true).equals("local")) {
for ((d, n) <- Seq((archDir, "architectures"),
(platformDir, "platforms")))
require(mustExist(d), "%s directory %s does not exist".format(n, d.toString))
}
}
45 changes: 45 additions & 0 deletions toolflow/scala/src/main/scala/tapasco/base/SlurmRemoteConfig.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
*
* Copyright (c) 2014-2020 Embedded Systems and Applications, TU Darmstadt.
*
* This file is part of TaPaSCo
* (see https://github.com/esa-tu-darmstadt/tapasco).
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
/**
* @file SlurmRemoteConfig.scala
* @brief Model: TPC remote Slurm Configuration.
* @authors M. Hartmann, TU Darmstadt
**/

package tapasco.base

import java.nio.file.Path
import tapasco.base.builder.Builds

case class SlurmRemoteConfig(
name: String,
host: String,
workstation: String,
workdir: Path,
installdir: Path,
jobFile: String,
SbatchOptions: String,
PreambleScript: Option[String],
PostambleScript: Option[String]
)

object SlurmRemoteConfig extends Builds[SlurmRemoteConfig]
19 changes: 17 additions & 2 deletions toolflow/scala/src/main/scala/tapasco/base/json/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ package object json {
(JsPath \ "CoreDir").readNullable[Path].map(_ getOrElse BasePathManager.DEFAULT_DIR_CORES) ~
(JsPath \ "CompositionDir").readNullable[Path].map(_ getOrElse BasePathManager.DEFAULT_DIR_COMPOSITIONS) ~
(JsPath \ "LogFile").readNullable[Path] ~
(JsPath \ "Slurm").readNullable[Boolean].map(_ getOrElse false) ~
(JsPath \ "Slurm").readNullable[String] ~
(JsPath \ "Parallel").readNullable[Boolean].map(_ getOrElse false) ~
(JsPath \ "MaxThreads").readNullable[Int] ~
(JsPath \ "MaxTasks").readNullable[Int] ~
Expand All @@ -419,7 +419,7 @@ package object json {
(JsPath \ "CoreDir").write[Path] ~
(JsPath \ "CompositionDir").write[Path] ~
(JsPath \ "LogFile").writeNullable[Path] ~
(JsPath \ "Slurm").write[Boolean] ~
(JsPath \ "Slurm").writeNullable[String] ~
(JsPath \ "Parallel").write[Boolean] ~
(JsPath \ "MaxThreads").writeNullable[Int] ~
(JsPath \ "HlsTimeOut").writeNullable[Int] ~
Expand All @@ -437,6 +437,21 @@ package object json {
}

/* Configuration @} */

/* @{ SlurmRemoteConfig */
implicit val slurmRemoteConfigReads: Reads[SlurmRemoteConfig] = (
(JsPath \ "Name").read[String](minimumLength(length = 1)) ~
(JsPath \ "SlurmHost").read[String](minimumLength(length = 1)) ~
(JsPath \ "WorkstationHost").read[String](minimumLength(length = 1)) ~
(JsPath \ "Workdir").read[Path] ~
(JsPath \ "TapascoInstallDir").read[Path] ~
(JsPath \ "JobFile").read[String](minimumLength(length = 1)) ~
(JsPath \ "SbatchOptions").readNullable[String].map(_.getOrElse("")) ~
(JsPath \ "PreambleScript").readNullable[String] ~
(JsPath \ "PostambleScript").readNullable[String]
) (SlurmRemoteConfig.apply _)
/* SlurmRemoteConfig @} */

}

// vim: foldmarker=@{,@} foldmethod=marker foldlevel=0
57 changes: 50 additions & 7 deletions toolflow/scala/src/main/scala/tapasco/jobs/executors/Compose.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,20 @@ package tapasco.jobs.executors

import java.util.concurrent.Semaphore

import tapasco.activity.composers.Composer
import tapasco.base._
import tapasco.filemgmt._
import tapasco.jobs.{ComposeJob, HighLevelSynthesisJob}
import tapasco.slurm.Slurm.Completed
import tapasco.task._
import tapasco.slurm._

private object Compose extends Executor[ComposeJob] {
private implicit val logger = tapasco.Logging.logger(getClass)
private[this] val _slurm = Slurm.enabled

def execute(job: ComposeJob)
(implicit cfg: Configuration, tsk: Tasks): Boolean = {
val signal = new Semaphore(0)

logger.trace("composition: {}", job.composition)

// first, collect all kernels and trigger HLS if not built yet
Expand Down Expand Up @@ -74,7 +76,18 @@ private object Compose extends Executor[ComposeJob] {
logger.info("all HLS tasks finished successfully, beginning compose run...")
logger.debug("job: {}", job)

val composeTasks = for {
if (!_slurm) nodeExecution(job) else slurmExecution(job)
} else {
logger.error("HLS tasks failed, aborting composition")
false
}
}

private def nodeExecution(job: ComposeJob)
(implicit cfg: Configuration, tsk: Tasks): Boolean = {
val signal = new Semaphore(0)

val composeTasks = for {
p <- job.platforms
a <- job.architectures
t = Target(a, p)
Expand Down Expand Up @@ -104,10 +117,40 @@ private object Compose extends Executor[ComposeJob] {

// successful, if all successful
(composeTasks map (_.result) fold true) (_ && _)
} else {
logger.error("HLS tasks failed, aborting composition")
false
}

private def slurmExecution(job: ComposeJob)
(implicit cfg: Configuration, tsk: Tasks): Boolean = {

val ComposeJob(c, f, i, _, _, _, _, _, _, _) = job
val name = c.composition.map(_.kernel).fold("compose")(_ ++ "-" ++ _)
val outDir = FileAssetManager.TAPASCO_WORK_DIR.resolve("Slurm").resolve("Compose").resolve(name)
// needed for resource-based scheduling
val consumer = new ComposeTask(
composition = c,
designFrequency = f,
implementation = Composer.Implementation(i),
target = Target(job.architectures.head, job.platforms.head),
onComplete = _ => ()
)

// define SLURM job
val sjob = Slurm.Job(
name = name,
log = outDir.resolve("tapasco.log"),
slurmLog = outDir.resolve("slurm-compose.log"),
errorLog = outDir.resolve("slurm-compose.errors.log"),
consumer = consumer,
maxHours = ComposeTask.MAX_COMPOSE_HOURS,
comment = Some(outDir.toString),
job = job,
cfg_file = outDir.resolve("slurm-compose.cfg")
)

// start slurm job and wait for finish
Slurm(sjob)(cfg) match {
case Some(id) => Slurm.waitFor(id) == Completed()
case None => false
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,25 @@
package tapasco.jobs.executors

import java.util.concurrent.Semaphore

import tapasco.Logging
import tapasco.activity.hls.HighLevelSynthesizer
import tapasco.activity.hls.HighLevelSynthesizer.Implementation._
import tapasco.activity.hls.HighLevelSynthesizer._
import tapasco.base._
import tapasco.filemgmt.FileAssetManager
import tapasco.jobs._
import tapasco.slurm.Slurm
import tapasco.slurm.Slurm.Completed
import tapasco.task._

protected object HighLevelSynthesis extends Executor[HighLevelSynthesisJob] {
private implicit final val logger = Logging.logger(getClass)
private[this] val _slurm = Slurm.enabled

def execute(job: HighLevelSynthesisJob)(implicit cfg: Configuration, tsk: Tasks): Boolean =
if (!_slurm) nodeExecution(job) else slurmExecution(job)

def execute(job: HighLevelSynthesisJob)(implicit cfg: Configuration, tsk: Tasks): Boolean = {
def nodeExecution(job: HighLevelSynthesisJob)(implicit cfg: Configuration, tsk: Tasks): Boolean = {
val signal = new Semaphore(0)
val runs: Seq[(Kernel, Target)] = for {
a <- job.architectures.toSeq.sortBy(_.name)
Expand Down Expand Up @@ -94,4 +99,39 @@ protected object HighLevelSynthesis extends Executor[HighLevelSynthesisJob] {
// success, if all tasks were successful
((tasks ++ importTasks) map (_.result) fold true) (_ && _)
}

def slurmExecution(job: HighLevelSynthesisJob)
(implicit cfg: Configuration, tsk: Tasks): Boolean = {

val name = job.kernels.map(_.name).fold("hls")(_++"-"++_)
val outDir = FileAssetManager.TAPASCO_WORK_DIR.resolve("Slurm").resolve("HLS").resolve(name)
// needed for resource-based scheduling
val consumer = new HighLevelSynthesisTask(
job.kernels.head,
Target(job.architectures.head, job.platforms.head),
cfg,
VivadoHLS,
_ => ()
)

// define SLURM job
val sjob = Slurm.Job(
name = name,
log = outDir.resolve("tapasco.log"),
slurmLog = outDir.resolve("slurm-hls.log"),
errorLog = outDir.resolve("hls-slurm.errors.log"),
consumer = consumer,
maxHours = HighLevelSynthesisTask.MAX_SYNTH_HOURS,
job = job,
cfg_file = outDir.resolve("slurm-hls.cfg")
)

// execute sbatch to enqueue job, then wait for it
val r = Slurm(sjob)(cfg) match {
case Some(id) => Slurm.waitFor(id) == Completed()
case None => false
}
FileAssetManager.reset()
r
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ private object GlobalOptions {
def inputFiles: Parser[(String, Path)] =
jobsFile | configFile | logFile

def slurm: Parser[(String, Boolean)] =
longOption("slurm", "Slurm").map((_, true)) ~ ws
def slurm: Parser[(String, String)] =
longOption("slurm", "Slurm") ~ ws ~/ string.opaque("slurm template name") ~ ws

def parallel: Parser[(String, Boolean)] =
longOption("parallel", "Parallel").map((_, true)) ~ ws
Expand Down Expand Up @@ -131,7 +131,7 @@ private object GlobalOptions {
case ("Core", p: Path) => mkConfig(as, Some(c getOrElse Configuration() coreDir p))
case ("Kernel", p: Path) => mkConfig(as, Some(c getOrElse Configuration() kernelDir p))
case ("Platform", p: Path) => mkConfig(as, Some(c getOrElse Configuration() platformDir p))
case ("Slurm", e: Boolean) => mkConfig(as, Some(c getOrElse Configuration() slurm e))
case ("Slurm", t: String) => mkConfig(as, Some(c getOrElse Configuration() slurm Some(t)))
case ("Parallel", e: Boolean) => mkConfig(as, Some(c getOrElse Configuration() parallel e))
case ("JobsFile", p: Path) => mkConfig(as, Some(c getOrElse Configuration() jobs readJobsFile(p)))
case ("LogFile", p: Path) => mkConfig(as, Some(c getOrElse Configuration() logFile Some(p)))
Expand Down
3 changes: 2 additions & 1 deletion toolflow/scala/src/main/scala/tapasco/parser/Usage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ configuration via `tapasco -n config.json`.
Arg("--logFile FILE", "Path to output log file") &
Arg("--configFile FILE", "Path to Json file with Configuration") &
Arg("--jobsFile FILE", "Path to Json file with Jobs array") &
Arg("--slurm", "Activate SLURM cluster execution (requires sbatch)") &
Arg("--slurm TEMPLATE", "Activate SLURM cluster execution." ~
"TEMPLATE describes a remote SLURM node, use 'local' for local execution (requires sbatch).") &
Arg("--parallel", "Execute all jobs in parallel (careful!)") &
Arg("--maxThreads NUM", "Limit internal parallelism of tasks (e.g., Vivado)" ~
"to the given number of threads.") &
Expand Down
Loading