Skip to content

Commit

Permalink
Limit repo clone concurrency (#144)
Browse files Browse the repository at this point in the history
  • Loading branch information
j178 authored Dec 10, 2024
1 parent 7c6ae1b commit d74fcee
Showing 1 changed file with 37 additions and 24 deletions.
61 changes: 37 additions & 24 deletions src/hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ use std::collections::{HashMap, HashSet};
use std::fmt::Display;
use std::path::{Path, PathBuf};
use std::rc::Rc;
use std::sync::Mutex;

use anyhow::Result;
use clap::ValueEnum;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use itertools::zip_eq;
use thiserror::Error;
Expand Down Expand Up @@ -162,40 +162,53 @@ impl Project {
store: &Store,
reporter: Option<&dyn HookInitReporter>,
) -> Result<(), Error> {
let mut tasks = FuturesUnordered::new();
let remote_repos = Rc::new(Mutex::new(HashMap::new()));
let mut seen = HashSet::new();
for repo in &self.config.repos {
if let config::Repo::Remote(repo) = repo {
if !seen.insert(repo) {
continue;
}
tasks.push(async move {
let progress = reporter
.map(|reporter| (reporter, reporter.on_clone_start(&format!("{repo}"))));

let path = store.prepare_remote_repo(repo, &[]).await;
// Prepare remote repos in parallel.
let remotes_iter = self.config.repos.iter().filter_map(|repo| match repo {
// Deduplicate remote repos.
config::Repo::Remote(repo) if seen.insert(repo) => Some(repo),
_ => None,
});
let mut tasks = futures::stream::iter(remotes_iter)
.map(|repo_config| {
let remote_repos = remote_repos.clone();
async move {
let progress = reporter.map(|reporter| {
(reporter, reporter.on_clone_start(&format!("{repo_config}")))
});

let path = store
.prepare_remote_repo(repo_config, &[])
.await
.map_err(Box::new)?;

if let Some((reporter, progress)) = progress {
reporter.on_clone_complete(progress);
}

(repo, path)
});
}
}
let repo = Rc::new(Repo::remote(
repo_config.repo.as_str(),
&repo_config.rev,
&path.to_string_lossy(),
)?);
remote_repos
.lock()
.unwrap()
.insert(repo_config, repo.clone());

Ok::<(), Error>(())
}
})
.buffer_unordered(5);

let mut remote_repos = HashMap::new();
while let Some((repo_config, repo_path)) = tasks.next().await {
let repo_path = repo_path.map_err(Box::new)?;
let repo = Rc::new(Repo::remote(
repo_config.repo.as_str(),
&repo_config.rev,
&repo_path.to_string_lossy(),
)?);
remote_repos.insert(repo_config, repo.clone());
while let Some(result) = tasks.next().await {
result?;
}

let mut repos = Vec::with_capacity(self.config.repos.len());
let remote_repos = remote_repos.lock().unwrap();
for repo in &self.config.repos {
match repo {
config::Repo::Remote(repo) => {
Expand Down

0 comments on commit d74fcee

Please sign in to comment.