Skip to content
This repository has been archived by the owner on Nov 7, 2024. It is now read-only.

Commit

Permalink
store: Only close self-pipe when we're done
Browse files Browse the repository at this point in the history
This took a crazy long time to debug but after lots of false
starts I think this is right. Basically what's going on is
we have async tasks that are talking over a `pipe()` inside
our own process.

We must not close the read side of the pipe until the
writer is done.

I believe this is dependent on tokio task scheduling order,
and it's way easier to reproduce when pinned to a single CPU.

Closes: #657
Signed-off-by: Colin Walters <[email protected]>
  • Loading branch information
cgwalters committed Oct 22, 2024
1 parent 993a583 commit 374fa43
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 5 deletions.
18 changes: 14 additions & 4 deletions lib/src/container/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -742,10 +742,15 @@ impl ImageImporter {
None
};
txn.commit(Some(cancellable))?;
Ok::<_, anyhow::Error>(commit)
// Pass back ownership, see below
let blob = archive.into_inner().into_inner();
Ok::<_, anyhow::Error>((blob, commit))
})
.map_err(|e| e.context(format!("Layer {}", layer.layer.digest())));
let commit = super::unencapsulate::join_fetch(import_task, driver).await?;
let (blob, commit) = super::unencapsulate::join_fetch(import_task, driver).await?;
// We can't close the read side until we've completed the rest of the processing
// to avoid breaking our own pipe-to-self. See https://github.com/ostreedev/ostree-rs-ext/issues/657
drop(blob);
layer.commit = commit;
if let Some(p) = self.layer_progress.as_ref() {
p.send(ImportProgress::OstreeChunkCompleted(layer.layer.clone()))
Expand Down Expand Up @@ -785,9 +790,14 @@ impl ImageImporter {
}
repo.mark_commit_partial(&commit, false)?;
txn.commit(Some(cancellable))?;
Ok::<_, anyhow::Error>(commit)
// Pass back ownership, see below
let blob = archive.into_inner().into_inner();
Ok::<_, anyhow::Error>((blob, commit))
});
let commit = super::unencapsulate::join_fetch(import_task, driver).await?;
let (blob, commit) = super::unencapsulate::join_fetch(import_task, driver).await?;
// We can't close the read side until we've completed the rest of the processing
// to avoid breaking our own pipe-to-self. See https://github.com/ostreedev/ostree-rs-ext/issues/657
drop(blob);
import.ostree_commit_layer.commit = Some(commit);
if let Some(p) = self.layer_progress.as_ref() {
p.send(ImportProgress::OstreeChunkCompleted(
Expand Down
2 changes: 1 addition & 1 deletion lib/src/container/unencapsulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ pub struct Import {
/// to see if the worker function had an error *and* if the proxy
/// had an error, but if the proxy's error ends in `broken pipe`
/// then it means the real only error is from the worker.
pub(crate) async fn join_fetch<T: std::fmt::Debug>(
pub(crate) async fn join_fetch<T>(
worker: impl Future<Output = Result<T>>,
driver: impl Future<Output = Result<()>>,
) -> Result<T> {
Expand Down

0 comments on commit 374fa43

Please sign in to comment.