Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding parallelization via sbatch #2

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ cargo clippy

## Future work

* Parallelize builds using [Slurm](https://slurm.schedmd.com/)
* ~~Parallelize builds using [Slurm](https://slurm.schedmd.com/)~~
* Support single Docker images without a manifest
* Support Docker registries other than [Docker Hub](https://hub.docker.com)
* Support native Singularity builds
Expand Down
142 changes: 118 additions & 24 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,17 @@ use chrono::{DateTime, Utc};
use clap::{App, Arg};
use serde::Deserialize;
use std::fs::{self, File};
use std::io::{self, Read};
use std::io::{self, Read, Write};
use std::path::Path;
use std::process::Command;
use std::process::{Command, Stdio};
use std::time::SystemTime;

#[derive(Debug)]
struct Options {
dry_run: bool,
force: bool,
first_sync: usize,
force: bool,
parallelize: bool,
}

#[derive(Deserialize, Debug)]
Expand Down Expand Up @@ -165,7 +166,110 @@ fn tags_after_timestamp(
Ok(tags)
}

fn sync_docker_image(image: &str, directory: &str, options: &Options) -> Result<()> {
fn slurm_sync_command(
directory: &str,
repository: &str,
image: &str,
tag: &str,
options: &Options,
) -> Result<()> {
let sif_path = format!("{}/{}/{}-{}.sif", directory, repository, image, tag);
let docker_uri = format!("docker://{}/{}:{}", repository, image, tag);
let job_name = format!("SingularitySync-{}-{}", repository, image);
let output = format!(
"/gpfs/scratch/%u/SingularitySync-{}-{}-%j.out",
repository, image
);
let force = if options.force { "-F" } else { "" };

let memory = option_env!("SBATCH_MEM_PER_NODE");

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it be worth making a util that does this pattern since it's used quite a bit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a feeling there's a rust-ism that I'm missing here. Maybe I need to employ the ? operator and some other magic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://dev-notes.eu/2020/05/Set-Default-Values-in-Rust/

Apparently I can do option_env!("FOO").unwrap_or("my value");?

let memory = match memory {
Some(m) => m,
None => "8G",
};

let time = option_env!("SBATCH_TIMELIMIT");
let time = match time {
Some(t) => t,
None => "8:00:00",
};

let singularity_cachedir = option_env!("SINGULARITY_CACHEDIR");
let singularity_cachedir = match singularity_cachedir {
Some(x) => x,
None => "${HOME}/scratch/singularity",
};

let singularity_tmpdir = option_env!("SINGULARITY_TMPDIR");
let singularity_tmpdir = match singularity_tmpdir {
Some(x) => x,
None => "${HOME}/scratch/tmp",
};

let script = [
String::from("#!/usr/bin/env bash\n"),
format!("#SBATCH --time={}\n", time),
format!("#SBATCH --mem={}\n", memory),
format!("#SBATCH -J {}\n", job_name),
format!("#SBATCH -o {}\n", output),
format!("export SINGULARITY_CACHEDIR={}\n", singularity_cachedir),
format!("export SINGULARITY_TMPDIR={}\n", singularity_tmpdir),
format!("singularity build {} {} {}", force, sif_path, docker_uri),
]
.concat();

if options.dry_run {
println!("sbatch <<EOF");
println!("{}", script);
println!("EOF");
return Ok(());
}

let command = Command::new("sbatch").stdin(Stdio::piped()).spawn()?;
command
.stdin
.context("Could not get handle to stdin")?
.write_all(script.as_bytes())?;

Ok(())
}

fn run_sync_command(
directory: &str,
repository: &str,
image: &str,
tag: &str,
options: &Options,
) -> Result<()> {
if options.parallelize {
slurm_sync_command(directory, repository, image, tag, options)?;
return Ok(());
}

let sif_path = format!("{}/{}/{}-{}.sif", directory, repository, image, tag);
let docker_uri = format!("docker://{}/{}:{}", repository, image, tag);

if options.dry_run {
let force = if options.force { "-F" } else { "" };
let sbatch_cmd = format!("singularity build {} {} {}", force, sif_path, docker_uri);
println!("{}", sbatch_cmd);
return Ok(());
}

let mut command = Command::new("singularity");

command.arg("build");

if options.force {
command.arg("-F");
}

command.arg(sif_path).arg(docker_uri).status()?;

Ok(())
}

fn sync_docker_image(directory: &str, image: &str, options: &Options) -> Result<()> {
let image_split: Vec<&str> = image.rsplit('/').collect();
let image = String::from(image_split[0]);
let repository = String::from(image_split[1]);
Expand All @@ -190,32 +294,15 @@ fn sync_docker_image(image: &str, directory: &str, options: &Options) -> Result<
};

for tag in tags_to_sync {
let sif_path = format!("{}/{}/{}-{}.sif", directory, repository, image, tag);
let docker_uri = format!("docker://{}/{}:{}", repository, image, tag);

if options.dry_run {
let force = if options.force { "-F" } else { "" };
let sbatch_cmd = format!("singularity build {} {} {}", force, sif_path, docker_uri);
println!("{}", sbatch_cmd);
} else {
let mut command = Command::new("singularity");

command.arg("build");

if options.force {
command.arg("-F");
}

command.arg(sif_path).arg(docker_uri).status()?;
}
run_sync_command(directory, &repository, &image, &tag, options)?;
}

Ok(())
}

fn sync_manifest(directory: &str, manifest: &Manifest, options: &Options) -> Result<()> {
for image in &manifest.docker {
sync_docker_image(image, directory, options)?;
sync_docker_image(directory, image, options)?;
}
Ok(())
}
Expand Down Expand Up @@ -257,6 +344,12 @@ fn main() -> Result<()> {
.default_value("5")
.help("The number of tags to pull on first sync"),
)
.arg(
Arg::with_name("parallelize")
.short("p")
.long("parallelize")
.help("Parallelize using Slurm scheduler"),
)
.version("v0.2.0")
.get_matches();

Expand All @@ -266,8 +359,9 @@ fn main() -> Result<()> {
let directory = String::from(matches.value_of("DIR").unwrap());
let options = Options {
dry_run: matches.is_present("dry_run"),
force: matches.is_present("force"),
first_sync: matches.value_of("first_sync").unwrap().parse()?,
force: matches.is_present("force"),
parallelize: matches.is_present("parallelize"),
};
sync_manifest(&directory, &manifest, &options)?;

Expand Down