Skip to content

Commit

Permalink
Adding parallelization via sbatch
Browse files Browse the repository at this point in the history
  • Loading branch information
broarr committed Aug 27, 2020
1 parent 78841af commit 39f4a05
Showing 1 changed file with 114 additions and 24 deletions.
138 changes: 114 additions & 24 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,17 @@ use chrono::{DateTime, Utc};
use clap::{App, Arg};
use serde::Deserialize;
use std::fs::{self, File};
use std::io::{self, Read};
use std::io::{self, Read, Write};
use std::path::Path;
use std::process::Command;
use std::process::{Command, Stdio};
use std::time::SystemTime;

#[derive(Debug)]
struct Options {
dry_run: bool,
force: bool,
first_sync: usize,
force: bool,
parallelize: bool,
}

#[derive(Deserialize, Debug)]
Expand Down Expand Up @@ -165,7 +166,106 @@ fn tags_after_timestamp(
Ok(tags)
}

fn sync_docker_image(image: &str, directory: &str, options: &Options) -> Result<()> {
fn slurm_sync_command(
directory: &str,
repository: &String,
image: &String,
tag: &str,
options: &Options,
) -> Result<()> {
let sif_path = format!("{}/{}/{}-{}.sif", directory, repository, image, tag);
let docker_uri = format!("docker://{}/{}:{}", repository, image, tag);
let job_name = format!("SingularitySync-{}-{}", repository, image);
let output = format!(
"/gpfs/scratch/%u/SingularitySync-{}-{}-%j.out",
repository, image
);
let force = if options.force { "-F" } else { "" };

let memory = option_env!("SBATCH_MEM_PER_NODE");
let memory = match memory {
Some(m) => m,
None => "8G",
};

let time = option_env!("SBATCH_TIMELIMIT");
let time = match time {
Some(t) => t,
None => "8:00:00",
};

let singularity_cachedir = option_env!("SINGULARITY_CACHEDIR");
let singularity_cachedir = match singularity_cachedir {
Some(x) => x,
None => "${HOME}/scratch/singularity",
};

let singularity_tmpdir = option_env!("SINGULARITY_TMPDIR");
let singularity_tmpdir = match singularity_tmpdir {
Some(x) => x,
None => "${HOME}/scratch/tmp",
};

let script = [
String::from("#!/usr/bin/env bash\n"),
format!("#SBATCH --time={}\n", time),
format!("#SBATCH --mem={}\n", memory),
format!("#SBATCH -J {}\n", job_name),
format!("#SBATCH -o {}\n", output),
format!("export SINGULARITY_CACHEDIR={}\n", singularity_cachedir),
format!("export SINGULARITY_TMPDIR={}\n", singularity_tmpdir),
format!("singularity build {} {} {}", force, sif_path, docker_uri),
].concat();

if options.dry_run {
println!("sbatch <<EOF");
println!("{}", script);
println!("EOF");
return Ok(());
}

let command = Command::new("sbatch").stdin(Stdio::piped()).spawn()?;
command.stdin.context("Could not get handle to stdin")?.write(script.as_bytes())?;

Ok(())
}

fn run_sync_command(
directory: &str,
repository: &String,
image: &String,
tag: &str,
options: &Options,
) -> Result<()> {
if options.parallelize {
slurm_sync_command(directory, repository, image, tag, options)?;
return Ok(());
}

let sif_path = format!("{}/{}/{}-{}.sif", directory, repository, image, tag);
let docker_uri = format!("docker://{}/{}:{}", repository, image, tag);

if options.dry_run {
let force = if options.force { "-F" } else { "" };
let sbatch_cmd = format!("singularity build {} {} {}", force, sif_path, docker_uri);
println!("{}", sbatch_cmd);
return Ok(());
}

let mut command = Command::new("singularity");

command.arg("build");

if options.force {
command.arg("-F");
}

command.arg(sif_path).arg(docker_uri).status()?;

Ok(())
}

fn sync_docker_image(directory: &str, image: &str, options: &Options) -> Result<()> {
let image_split: Vec<&str> = image.rsplit('/').collect();
let image = String::from(image_split[0]);
let repository = String::from(image_split[1]);
Expand All @@ -190,32 +290,15 @@ fn sync_docker_image(image: &str, directory: &str, options: &Options) -> Result<
};

for tag in tags_to_sync {
let sif_path = format!("{}/{}/{}-{}.sif", directory, repository, image, tag);
let docker_uri = format!("docker://{}/{}:{}", repository, image, tag);

if options.dry_run {
let force = if options.force { "-F" } else { "" };
let sbatch_cmd = format!("singularity build {} {} {}", force, sif_path, docker_uri);
println!("{}", sbatch_cmd);
} else {
let mut command = Command::new("singularity");

command.arg("build");

if options.force {
command.arg("-F");
}

command.arg(sif_path).arg(docker_uri).status()?;
}
run_sync_command(directory, &repository, &image, &tag, options)?;
}

Ok(())
}

fn sync_manifest(directory: &str, manifest: &Manifest, options: &Options) -> Result<()> {
for image in &manifest.docker {
sync_docker_image(image, directory, options)?;
sync_docker_image(directory, image, options)?;
}
Ok(())
}
Expand Down Expand Up @@ -257,6 +340,12 @@ fn main() -> Result<()> {
.default_value("5")
.help("The number of tags to pull on first sync"),
)
.arg(
Arg::with_name("parallelize")
.short("p")
.long("parallelize")
.help("Parallelize using Slurm scheduler"),
)
.version("v0.2.0")
.get_matches();

Expand All @@ -266,8 +355,9 @@ fn main() -> Result<()> {
let directory = String::from(matches.value_of("DIR").unwrap());
let options = Options {
dry_run: matches.is_present("dry_run"),
force: matches.is_present("force"),
first_sync: matches.value_of("first_sync").unwrap().parse()?,
force: matches.is_present("force"),
parallelize: matches.is_present("parallelize"),
};
sync_manifest(&directory, &manifest, &options)?;

Expand Down

0 comments on commit 39f4a05

Please sign in to comment.