From 3b2c0987adca5591010699d5f1ebc34bd3544e8f Mon Sep 17 00:00:00 2001 From: Colin Walters Date: Tue, 22 Oct 2024 18:43:05 -0400 Subject: [PATCH] store: Only close self-pipe when we're done This took a crazy long time to debug but after lots of false starts I think this is right. Basically what's going on is we have async tasks that are talking over a `pipe()` inside our own process. We must not close the read side of the pipe until the writer is done. I believe this is dependent on tokio task scheduling order, and it's way easier to reproduce when pinned to a single CPU. Closes: https://github.com/ostreedev/ostree-rs-ext/issues/657 Signed-off-by: Colin Walters --- lib/src/container/store.rs | 24 ++++++++++++++++-------- lib/src/container/unencapsulate.rs | 2 +- 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/lib/src/container/store.rs b/lib/src/container/store.rs index a275c0af..d1f49e0e 100644 --- a/lib/src/container/store.rs +++ b/lib/src/container/store.rs @@ -730,8 +730,8 @@ impl ImageImporter { crate::tokio_util::spawn_blocking_cancellable_flatten(move |cancellable| { let txn = repo.auto_transaction(Some(cancellable))?; let mut importer = crate::tar::Importer::new_for_object_set(&repo); - let blob = tokio_util::io::SyncIoBridge::new(blob); - let mut archive = tar::Archive::new(blob); + let mut blob = tokio_util::io::SyncIoBridge::new(blob); + let mut archive = tar::Archive::new(&mut blob); importer.import_objects(&mut archive, Some(cancellable))?; let commit = if write_refs { let commit = importer.finish_import_object_set()?; @@ -742,10 +742,14 @@ impl ImageImporter { None }; txn.commit(Some(cancellable))?; - Ok::<_, anyhow::Error>(commit) + // Pass back ownership, see below + Ok::<_, anyhow::Error>((blob.into_inner(), commit)) }) .map_err(|e| e.context(format!("Layer {}", layer.layer.digest()))); - let commit = super::unencapsulate::join_fetch(import_task, driver).await?; + let (blob, commit) = super::unencapsulate::join_fetch(import_task, driver).await?; + // We can't close the read side until we've completed the rest of the processing + // to avoid breaking our own pipe-to-self. See https://github.com/ostreedev/ostree-rs-ext/issues/657 + drop(blob); layer.commit = commit; if let Some(p) = self.layer_progress.as_ref() { p.send(ImportProgress::OstreeChunkCompleted(layer.layer.clone())) @@ -775,8 +779,8 @@ impl ImageImporter { crate::tokio_util::spawn_blocking_cancellable_flatten(move |cancellable| { let txn = repo.auto_transaction(Some(cancellable))?; let mut importer = crate::tar::Importer::new_for_commit(&repo, remote); - let blob = tokio_util::io::SyncIoBridge::new(blob); - let mut archive = tar::Archive::new(blob); + let mut blob = tokio_util::io::SyncIoBridge::new(blob); + let mut archive = tar::Archive::new(&mut blob); importer.import_commit(&mut archive, Some(cancellable))?; let commit = importer.finish_import_commit(); if write_refs { @@ -785,9 +789,13 @@ impl ImageImporter { } repo.mark_commit_partial(&commit, false)?; txn.commit(Some(cancellable))?; - Ok::<_, anyhow::Error>(commit) + // Pass back ownership, see below + Ok::<_, anyhow::Error>((blob.into_inner(), commit)) }); - let commit = super::unencapsulate::join_fetch(import_task, driver).await?; + let (blob, commit) = super::unencapsulate::join_fetch(import_task, driver).await?; + // We can't close the read side until we've completed the rest of the processing + // to avoid breaking our own pipe-to-self. See https://github.com/ostreedev/ostree-rs-ext/issues/657 + drop(blob); import.ostree_commit_layer.commit = Some(commit); if let Some(p) = self.layer_progress.as_ref() { p.send(ImportProgress::OstreeChunkCompleted( diff --git a/lib/src/container/unencapsulate.rs b/lib/src/container/unencapsulate.rs index 45bfd989..4756da4b 100644 --- a/lib/src/container/unencapsulate.rs +++ b/lib/src/container/unencapsulate.rs @@ -162,7 +162,7 @@ pub struct Import { /// to see if the worker function had an error *and* if the proxy /// had an error, but if the proxy's error ends in `broken pipe` /// then it means the real only error is from the worker. -pub(crate) async fn join_fetch( +pub(crate) async fn join_fetch( worker: impl Future>, driver: impl Future>, ) -> Result {