Skip to content

Commit

Permalink
Merge pull request #4 from cgwalters/tracing
Browse files Browse the repository at this point in the history
Add a lot more tracing usage
  • Loading branch information
cgwalters authored Nov 5, 2021
2 parents 5b74e02 + 3ba2f01 commit a2fc98b
Showing 1 changed file with 18 additions and 0 deletions.
18 changes: 18 additions & 0 deletions src/imageproxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::pin::Pin;
use std::process::Stdio;
use std::sync::{Arc, Mutex};
use tokio::io::{AsyncBufRead, AsyncReadExt};
use tracing::instrument;

pub const OCI_TYPE_LAYER_GZIP: &str = "application/vnd.oci.image.layer.v1.tar+gzip";
pub const OCI_TYPE_LAYER_TAR: &str = "application/vnd.oci.image.layer.v1.tar";
Expand Down Expand Up @@ -80,6 +81,7 @@ impl std::fmt::Debug for ImageProxy {
}

/// Opaque identifier for an image
#[derive(Debug, PartialEq, Eq)]
pub struct OpenedImage(u32);

#[allow(unsafe_code)]
Expand Down Expand Up @@ -108,6 +110,7 @@ fn file_from_scm_rights(cmsg: ControlMessageOwned) -> Option<File> {

impl ImageProxy {
/// Create an image proxy that fetches the target image.
#[instrument]
pub async fn new() -> Result<Self> {
let (mysock, theirsock) = new_seqpacket_pair()?;
let mut c = std::process::Command::new("skopeo");
Expand All @@ -117,6 +120,7 @@ impl ImageProxy {
let mut c = tokio::process::Command::from(c);
c.kill_on_drop(true);
let child = c.spawn().context("Failed to spawn skopeo")?;
tracing::debug!("Spawned skopeo pid={:?}", child.id());
let childwait = Box::pin(child.wait_with_output());

let sockfd = Arc::new(Mutex::new(mysock));
Expand All @@ -142,6 +146,7 @@ impl ImageProxy {
sockfd: Arc<Mutex<File>>,
req: Request,
) -> Result<(T, Option<(File, u32)>)> {
tracing::trace!("sending request {}", req.method.as_str());
// TODO: Investigate https://crates.io/crates/uds for SOCK_SEQPACKET tokio
let r = tokio::task::spawn_blocking(move || {
let sockfd = sockfd.lock().unwrap();
Expand Down Expand Up @@ -187,9 +192,11 @@ impl ImageProxy {
Ok((reply, fdret))
})
.await??;
tracing::trace!("completed request");
Ok(r)
}

#[instrument(skip(args))]
async fn impl_request<R: serde::de::DeserializeOwned + Send + 'static, T, I>(
&mut self,
method: &str,
Expand All @@ -212,22 +219,28 @@ impl ImageProxy {
}
}

#[instrument]
async fn finish_pipe(&mut self, pipeid: u32) -> Result<()> {
tracing::debug!("closing pipe");
let (r, fd) = self.impl_request("FinishPipe", [pipeid]).await?;
if fd.is_some() {
return Err(anyhow!("Unexpected fd in finish_pipe reply"));
}
Ok(r)
}

#[instrument]
pub async fn open_image(&mut self, imgref: &str) -> Result<OpenedImage> {
tracing::debug!("opening image");
let (imgid, _) = self
.impl_request::<u32, _, _>("OpenImage", [imgref])
.await?;
Ok(OpenedImage(imgid))
}

#[instrument]
pub async fn close_image(&mut self, img: &OpenedImage) -> Result<()> {
tracing::debug!("closing image");
let (r, _) = self.impl_request("CloseImage", [img.0]).await?;
Ok(r)
}
Expand All @@ -250,6 +263,7 @@ impl ImageProxy {
/// https://github.com/opencontainers/image-spec/blob/main/descriptor.md
/// Note that right now the proxy does verification of the digest:
/// https://github.com/cgwalters/container-image-proxy/issues/1#issuecomment-926712009
#[instrument]
pub async fn get_blob(
&mut self,
img: &OpenedImage,
Expand All @@ -259,6 +273,7 @@ impl ImageProxy {
impl AsyncBufRead + Send + Unpin,
impl Future<Output = Result<()>> + Unpin + '_,
)> {
tracing::debug!("fetching blob");
let args: Vec<serde_json::Value> =
vec![img.0.into(), digest.to_string().into(), size.into()];
let (_bloblen, fd) = self.impl_request::<i64, _, _>("GetBlob", args).await?;
Expand All @@ -269,18 +284,21 @@ impl ImageProxy {
}

/// Close the connection and wait for the child process to exit successfully.
#[instrument]
pub async fn finalize(self) -> Result<()> {
let req = Request::new_bare("Shutdown");
let sendbuf = serde_json::to_vec(&req)?;
// SAFETY: Only panics if a worker thread already panic'd
let sockfd = Arc::try_unwrap(self.sockfd).unwrap().into_inner().unwrap();
nixsocket::send(sockfd.as_raw_fd(), &sendbuf, nixsocket::MsgFlags::empty())?;
drop(sendbuf);
tracing::debug!("sent shutdown request");
let output = self.childwait.await?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
anyhow::bail!("proxy failed: {}\n{}", output.status, stderr)
}
tracing::debug!("proxy exited successfully");
Ok(())
}
}

0 comments on commit a2fc98b

Please sign in to comment.