Skip to content

Commit

Permalink
Fix race condition with body parsing
Browse files Browse the repository at this point in the history
This involved a lot of refactoring around response bodies. It removes the Arc that wrapped responses, so that the parsed body can be saved inline. It turns out this Arc wasn't really needed, it just added complexity more than anything.
  • Loading branch information
LucasPickering committed Nov 1, 2024
1 parent e87ac66 commit 5a8ce26
Show file tree
Hide file tree
Showing 19 changed files with 439 additions and 219 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions crates/core/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use rusqlite::{named_params, Connection, DatabaseName, OptionalExtension};
use serde::{de::DeserializeOwned, Serialize};
use std::{
fmt::Debug,
ops::DerefMut,
ops::{Deref, DerefMut},
path::{Path, PathBuf},
sync::{Arc, Mutex},
};
Expand Down Expand Up @@ -361,7 +361,7 @@ impl CollectionDatabase {

":status_code": exchange.response.status.as_u16(),
":response_headers": SqlWrap(&exchange.response.headers),
":response_body": exchange.response.body.bytes(),
":response_body": exchange.response.body.bytes().deref(),
},
)
.context(format!(
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/db/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,13 +209,13 @@ impl<'a, 'b> TryFrom<&'a Row<'b>> for Exchange {
.get::<_, Option<SqlWrap<Bytes>>>("request_body")?
.map(|wrap| wrap.0),
}),
response: Arc::new(ResponseRecord {
response: ResponseRecord {
status: row.get::<_, SqlWrap<StatusCode>>("status_code")?.0,
headers: row
.get::<_, SqlWrap<HeaderMap>>("response_headers")?
.0,
body: row.get::<_, SqlWrap<Bytes>>("response_body")?.0.into(),
}),
},
})
}
}
Expand Down
8 changes: 4 additions & 4 deletions crates/core/src/db/migrations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use rusqlite::{
};
use rusqlite_migration::{HookResult, Migrations, M};
use serde::{de::DeserializeOwned, Serialize};
use std::{path::PathBuf, sync::Arc};
use std::{ops::Deref, path::PathBuf, sync::Arc};
use tracing::info;

/// Get all DB migrations in history
Expand Down Expand Up @@ -128,7 +128,7 @@ fn migrate_requests_v2_up(transaction: &Transaction) -> HookResult {
end_time: row.get("end_time")?,
// Deserialize from bytes
request: Arc::new(row.get::<_, ByteEncoded<_>>("request")?.0),
response: Arc::new(row.get::<_, ByteEncoded<_>>("response")?.0),
response: row.get::<_, ByteEncoded<_>>("response")?.0,
};
Ok((collection_id, exchange))
}
Expand Down Expand Up @@ -196,7 +196,7 @@ fn migrate_requests_v2_up(transaction: &Transaction) -> HookResult {

":status_code": exchange.response.status.as_u16(),
":response_headers": SqlWrap(&exchange.response.headers),
":response_body": exchange.response.body.bytes(),
":response_body": exchange.response.body.bytes().deref(),
})?;
}

Expand Down Expand Up @@ -430,7 +430,7 @@ mod tests {
":start_time": &exchange.start_time,
":end_time": &exchange.end_time,
":request": &ByteEncoded(&*exchange.request),
":response": &ByteEncoded(&*exchange.response),
":response": &ByteEncoded(&exchange.response),
":status_code": exchange.response.status.as_u16(),
},
)
Expand Down
6 changes: 3 additions & 3 deletions crates/core/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ use reqwest::{
Client, RequestBuilder, Response, Url,
};
use serde::{Deserialize, Serialize};
use std::{collections::HashSet, sync::Arc};
use std::collections::HashSet;
use tracing::{info, info_span};

const USER_AGENT: &str = concat!("slumber/", env!("CARGO_PKG_VERSION"));
Expand Down Expand Up @@ -375,7 +375,7 @@ impl RequestTicket {
let exchange = Exchange {
id,
request: self.record,
response: Arc::new(response),
response,
start_time,
end_time,
};
Expand Down Expand Up @@ -1362,7 +1362,7 @@ mod tests {
.to_str()
.unwrap();
assert_eq!(
*exchange.response,
exchange.response,
ResponseRecord {
status: StatusCode::OK,
headers: header_map([
Expand Down
23 changes: 10 additions & 13 deletions crates/core/src/http/content_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
//! not a value, use [ContentType]. If you want to parse dynamically based on
//! the response's metadata, use [ResponseRecord::parse_body].
use crate::{http::ResponseRecord, util::Mapping};
use crate::util::Mapping;
use anyhow::{anyhow, Context};
use derive_more::{Deref, Display, From};
use mime::{Mime, APPLICATION, JSON};
Expand Down Expand Up @@ -121,15 +121,6 @@ impl ContentType {
ContentType::Json => serde_json::to_string(&values).unwrap(),
}
}

/// Helper for parsing the body of a response. Use
/// [ResponseRecord::parse_body] for external usage.
pub(super) fn parse_response(
response: &ResponseRecord,
) -> anyhow::Result<Box<dyn ResponseContent>> {
let content_type = Self::from_headers(&response.headers)?;
content_type.parse_content(response.body.bytes())
}
}

/// A response content type that we know how to parse. This is defined as a
Expand Down Expand Up @@ -188,7 +179,7 @@ impl ResponseContent for Json {
#[cfg(test)]
mod tests {
use super::*;
use crate::{assert_err, test_util::Factory};
use crate::{assert_err, http::ResponseRecord, test_util::Factory};
use reqwest::header::{
HeaderMap, HeaderValue, InvalidHeaderValue, CONTENT_TYPE,
};
Expand Down Expand Up @@ -261,8 +252,11 @@ mod tests {
body: body.into(),
..ResponseRecord::factory(())
};
let content_type =
ContentType::from_headers(&response.headers).unwrap();
assert_eq!(
ContentType::parse_response(&response)
content_type
.parse_content(response.body.bytes())
.unwrap()
.deref()
// Downcast the result to desired type
Expand Down Expand Up @@ -303,7 +297,10 @@ mod tests {
body: body.into(),
..ResponseRecord::factory(())
};
assert_err!(ContentType::parse_response(&response), expected_error);
let result = ContentType::from_headers(&response.headers).and_then(
|content_type| content_type.parse_content(response.body.bytes()),
);
assert_err!(result, expected_error);
}

/// Create header map with the given value for the content-type header
Expand Down
40 changes: 17 additions & 23 deletions crates/core/src/http/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use crate::{
content_type::{ContentType, ResponseContent},
},
template::Template,
util::ResultTraced,
};
use anyhow::Context;
use bytes::Bytes;
Expand All @@ -25,7 +24,7 @@ use serde::{Deserialize, Serialize};
use std::{
collections::HashMap,
fmt::{Debug, Write},
sync::{Arc, OnceLock},
sync::Arc,
};
use thiserror::Error;
use tracing::error;
Expand Down Expand Up @@ -183,8 +182,8 @@ pub struct Exchange {
pub id: RequestId,
/// What we said. Use an Arc so the view can hang onto it.
pub request: Arc<RequestRecord>,
/// What we heard. Use an Arc so the view can hang onto it.
pub response: Arc<ResponseRecord>,
/// What we heard
pub response: ResponseRecord,
/// When was the request sent to the server?
pub start_time: DateTime<Utc>,
/// When did we finish receiving the *entire* response?
Expand Down Expand Up @@ -424,7 +423,7 @@ impl crate::test_util::Factory<(RequestRecord, ResponseRecord)> for Exchange {
Self {
id: request.id,
request: request.into(),
response: response.into(),
response,
start_time: Utc::now(),
end_time: Utc::now(),
}
Expand All @@ -451,21 +450,9 @@ pub struct ResponseRecord {
}

impl ResponseRecord {
/// Attempt to parse the body of this response, and store it in the body
/// struct. If parsing fails, we'll store `None` instead.
pub fn parse_body(&self) {
let body = ContentType::parse_response(self)
.context("Error parsing response body")
.traced()
.ok();
// Store whether we succeeded or not, so we know not to try again
if self.body.parsed.set(body).is_err() {
// This indicates a logic error, because this should only be
// called once, when the response is first received.
// Unfortunately we don't have any helpful context to include
// here. The body could potentially be huge so don't log it.
error!("Response body parsed twice");
}
/// Stored the parsed form of this request's body
pub fn set_parsed_body(&mut self, body: Box<dyn ResponseContent>) {
self.body.parsed = Some(body);
}

/// Get the content type of the response body, according to the
Expand Down Expand Up @@ -509,6 +496,13 @@ impl ResponseRecord {
}
}

pub enum ParseMode {
Immediate,
Background {
callback: Box<dyn 'static + FnOnce(Box<dyn ResponseContent>) + Send>,
},
}

/// HTTP response body. Content is stored as bytes because it may not
/// necessarily be valid UTF-8. Converted to text only as needed.
#[derive(Default, Deserialize)]
Expand All @@ -521,7 +515,7 @@ pub struct ResponseBody {
/// [ResponseRecord::parse_body] to set the parsed body. This uses a lock
/// so it can be parsed and populated in a background thread.
#[serde(skip)]
parsed: OnceLock<Option<Box<dyn ResponseContent>>>,
parsed: Option<Box<dyn ResponseContent>>,
}

impl ResponseBody {
Expand All @@ -533,7 +527,7 @@ impl ResponseBody {
}

/// Raw content bytes
pub fn bytes(&self) -> &[u8] {
pub fn bytes(&self) -> &Bytes {
&self.data
}

Expand All @@ -559,7 +553,7 @@ impl ResponseBody {
///
/// Return `None` if parsing either hasn't happened yet, or failed.
pub fn parsed(&self) -> Option<&dyn ResponseContent> {
self.parsed.get().and_then(Option::as_deref)
self.parsed.as_deref()
}
}

Expand Down
6 changes: 3 additions & 3 deletions crates/core/src/template.rs
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ mod tests {
response: ResponseRecord {
body: "not json!".into(),
..ResponseRecord::factory(())
}.into(),
},
..Exchange::factory(RecipeId::from("recipe1"))
}),
"content type not provided",
Expand All @@ -560,7 +560,7 @@ mod tests {
response: ResponseRecord {
body: "not json!".into(),
..ResponseRecord::factory(())
}.into(),
},
..Exchange::factory(RecipeId::from("recipe1"))
}),
"Parsing response: expected ident at line 1 column 2",
Expand All @@ -582,7 +582,7 @@ mod tests {
response: ResponseRecord {
body: "[1, 2]".into(),
..ResponseRecord::factory(())
}.into(),
},
..Exchange::factory(RecipeId::from("recipe1"))
}),
"No results from JSONPath query",
Expand Down
4 changes: 1 addition & 3 deletions crates/core/src/template/render.rs
Original file line number Diff line number Diff line change
Expand Up @@ -539,9 +539,7 @@ impl<'a> ChainTemplateSource<'a> {
ChainRequestTrigger::Always => send_request().await?,
};

// We haven't passed the exchange around so we can unwrap the Arc safely
Ok(Arc::try_unwrap(exchange.response)
.expect("Request Arc should have only one reference"))
Ok(exchange.response)
}

/// Extract the specified component bytes from the response. For headers,
Expand Down
1 change: 1 addition & 0 deletions crates/tui/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ version = "2.2.0"

[dependencies]
anyhow = {workspace = true}
bytes = {workspace = true}
chrono = {workspace = true}
cli-clipboard = "0.4.0"
crossterm = {workspace = true, features = ["bracketed-paste", "windows", "events", "event-stream"]}
Expand Down
Loading

0 comments on commit 5a8ce26

Please sign in to comment.