diff --git a/lib/src/tar/write.rs b/lib/src/tar/write.rs index 29e0d82c..df147c90 100644 --- a/lib/src/tar/write.rs +++ b/lib/src/tar/write.rs @@ -259,16 +259,28 @@ async fn filter_tar_async( mut dest: impl AsyncWrite + Send + Unpin, ) -> Result> { let (tx_buf, mut rx_buf) = tokio::io::duplex(8192); + // The source must be moved to the heap so we know it is stable for passing to the worker thread let src = Box::pin(src); - let tar_transformer = tokio::task::spawn_blocking(move || -> Result<_> { - let src = tokio_util::io::SyncIoBridge::new(src); + let tar_transformer = tokio::task::spawn_blocking(move || { + let mut src = tokio_util::io::SyncIoBridge::new(src); let dest = tokio_util::io::SyncIoBridge::new(tx_buf); - filter_tar(src, dest) + let r = filter_tar(&mut src, dest); + // Pass ownership of the input stream back to the caller - see below. + (r, src) }); let copier = tokio::io::copy(&mut rx_buf, &mut dest); let (r, v) = tokio::join!(tar_transformer, copier); let _v: u64 = v?; - r? + let (r, src) = r?; + // Note that the worker thread took temporary ownership of the input stream; we only close + // it at this point, after we're sure we've done all processing of the input. The reason + // for this is that both the skopeo process *or* us could encounter an error (see join_fetch). + // By ensuring we hold the stream open as long as possible, it ensures that we're going to + // see a remote error first, instead of the remote skopeo process seeing us close the pipe + // because we found an error. + drop(src); + // And pass back the result + r } /// Write the contents of a tarball as an ostree commit.