Skip to content

Commit

Permalink
Stream test results without batched loads into server memory (#73)
Browse files Browse the repository at this point in the history
  • Loading branch information
ayazhafiz authored Aug 1, 2023
1 parent 6ad7eec commit 8567a8c
Show file tree
Hide file tree
Showing 9 changed files with 586 additions and 471 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 1.6.2

ABQ 1.6.2 is a patch release fixing an issue that could result in
denial-of-service of an ABQ queue due to large test results.

## 1.6.1

ABQ 1.6.1 is a patch release fixing an issue that would not continue offloading
Expand Down
119 changes: 55 additions & 64 deletions crates/abq_cli/src/report.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use abq_utils::{
self,
entity::{Entity, WorkerRunner},
queue::AssociatedTestResults,
results::{ResultsLine, Summary},
results::{OpaqueLazyAssociatedTestResults, ResultsLine, Summary},
runners::{TestResult, TestResultSpec},
workers::{RunId, WorkId},
},
Expand Down Expand Up @@ -52,10 +52,10 @@ pub(crate) async fn report_results(
let reporters = build_reporters(reporter_kinds, stdout_preferences, test_suite_name, ONE);
let mut stdout = stdout_preferences.stdout_stream();

let all_results: Vec<Vec<ResultsLine>> =
let all_results: Vec<ResultsLine> =
wait_for_results(abq, entity, run_id, results_timeout).await?;

process_results(&mut stdout, reporters, all_results.into_iter().flatten())
process_results(&mut stdout, reporters, all_results.into_iter())
}

pub(crate) async fn list_tests(
Expand All @@ -67,12 +67,12 @@ pub(crate) async fn list_tests(
worker: u32,
runner: NonZeroUsize,
) -> anyhow::Result<ExitCode> {
let all_results: Vec<Vec<ResultsLine>> =
let all_results: Vec<ResultsLine> =
wait_for_results(abq, entity, run_id, results_timeout).await?;

print_tests_for_runner(
&mut stdout_preferences.stdout_stream(),
all_results.into_iter().flatten(),
all_results.into_iter(),
WorkerRunner::from((worker, runner.get() as u32)),
);

Expand Down Expand Up @@ -229,7 +229,7 @@ async fn wait_for_results(
entity: Entity,
run_id: RunId,
results_timeout: Duration,
) -> anyhow::Result<Vec<Vec<ResultsLine>>> {
) -> anyhow::Result<Vec<ResultsLine>> {
let queue_addr = abq.server_addr();
let client = abq.client_options_owned().build_async()?;

Expand All @@ -251,7 +251,7 @@ async fn wait_for_results_help(
client: Box<dyn net_async::ConfiguredClient>,
entity: Entity,
run_id: RunId,
) -> anyhow::Result<Vec<Vec<ResultsLine>>> {
) -> anyhow::Result<Vec<ResultsLine>> {
let mut attempt = 1;
loop {
let client = &client;
Expand All @@ -267,50 +267,40 @@ async fn wait_for_results_help(
};
net_protocol::async_write(&mut conn, &request).await?;

let mut results = Vec::with_capacity(1);

// TODO: as this is a hot loop of just fetching results, reporting would be more
// interactive if we wrote results into a channel as they came in, with the
// results processing happening on a separate thread.
loop {
use net_protocol::queue::TestResultsResponse::*;
let response = net_protocol::async_read(&mut conn).await?;
match response {
Results { chunk, final_chunk } => {
let chunk = chunk.decode().map_err(|e| {
anyhow!(
"failed to decode corrupted test results message: {}",
e.to_string()
)
})?;
use net_protocol::queue::TestResultsResponse::*;
let response = net_protocol::async_read(&mut conn).await?;
match response {
StreamingResults => {
let mut stream = net_protocol::async_read_stream(&mut conn).await?;

results.push(chunk);
let results =
OpaqueLazyAssociatedTestResults::read_results_lines(&mut stream).await?;
let results = results.decode()?;

match final_chunk {
true => return Ok(results),
false => continue,
}
}
Pending => {
tracing::debug!(
attempt,
"deferring fetching results do to pending notification"
);
tokio::time::sleep(PENDING_RESULTS_DELAY).await;
attempt += 1;
continue;
}
OutstandingRunners(tags) => {
let active_runners = tags
.into_iter()
.map(|t| t.to_string())
.collect::<Vec<_>>()
.join(", ");

bail!("failed to fetch test results because the following runners are still active: {active_runners}")
}
Error(reason) => bail!("failed to fetch test results because {reason}"),
return Ok(results);
}
Pending => {
tracing::debug!(
attempt,
"deferring fetching results do to pending notification"
);
tokio::time::sleep(PENDING_RESULTS_DELAY).await;
attempt += 1;
continue;
}
OutstandingRunners(tags) => {
let active_runners = tags
.into_iter()
.map(|t| t.to_string())
.collect::<Vec<_>>()
.join(", ");

bail!("failed to fetch test results because the following runners are still active: {active_runners}")
}
Error(reason) => bail!("failed to fetch test results because {reason}"),
}
}
}
Expand Down Expand Up @@ -397,7 +387,7 @@ mod test {
use super::{print_tests_for_runner, process_results, wait_for_results_help};

#[tokio::test]
async fn fetches_chunked_tests() {
async fn fetches_streamed_tests() {
let (server, client) = build_fake_server_client().await;
let server_addr = server.local_addr().unwrap();

Expand Down Expand Up @@ -430,24 +420,25 @@ mod test {
}
));

let chunks = [
queue::TestResultsResponse::Results {
chunk: OpaqueLazyAssociatedTestResults::from_raw_json_lines(vec![
serde_json::value::to_raw_value(results1).unwrap(),
]),
final_chunk: false,
},
queue::TestResultsResponse::Results {
chunk: OpaqueLazyAssociatedTestResults::from_raw_json_lines(vec![
serde_json::value::to_raw_value(results2).unwrap(),
]),
final_chunk: true,
},
];
let results_buffer = OpaqueLazyAssociatedTestResults::into_jsonl_buffer(&[
serde_json::value::to_raw_value(results1).unwrap(),
serde_json::value::to_raw_value(results2).unwrap(),
])
.unwrap();

for chunk in chunks {
net_protocol::async_write(&mut conn, &chunk).await.unwrap();
}
let mut results_buffer_slice = &results_buffer[..];

net_protocol::async_write(&mut conn, &queue::TestResultsResponse::StreamingResults)
.await
.unwrap();

net_protocol::async_write_stream(
&mut conn,
results_buffer.len(),
&mut results_buffer_slice,
)
.await
.unwrap();
}
};

Expand All @@ -457,7 +448,7 @@ mod test {
let ((), results) = tokio::join!(server_task, client_task);

let results = results.unwrap();
let expected = [[results1], [results2]];
let expected = [results1, results2];
assert_eq!(results, expected);
}

Expand Down
69 changes: 45 additions & 24 deletions crates/abq_queue/src/persistence/results.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

mod fs;
mod in_memory;
#[cfg(test)]
pub(crate) mod test_utils;

pub use fs::FilesystemPersistor;
pub use in_memory::InMemoryPersistor;
Expand All @@ -13,14 +15,29 @@ use abq_utils::{
error::LocatedError,
net_protocol::{
queue::AssociatedTestResults,
results::{OpaqueLazyAssociatedTestResults, ResultsLine, Summary},
results::{ResultsLine, Summary},
workers::RunId,
},
};
use async_trait::async_trait;

type Result<T> = std::result::Result<T, LocatedError>;

pub type OpaqueAsyncReader<'a> = dyn tokio::io::AsyncRead + Send + Unpin + 'a;

pub struct ResultsStream<'a> {
pub stream: Box<&'a mut OpaqueAsyncReader<'a>>,
pub len: usize,
}

#[async_trait]
pub trait WithResultsStream {
async fn with_results_stream<'a>(
self: Box<Self>,
results_stream: ResultsStream<'a>,
) -> Result<()>;
}

#[async_trait]
pub trait PersistResults: Send + Sync {
/// Dumps a summary line.
Expand All @@ -29,8 +46,12 @@ pub trait PersistResults: Send + Sync {
/// Dumps the persisted results to a remote, if any is configured.
async fn dump_to_remote(&self, run_id: &RunId) -> Result<()>;

/// Load a set of test results as [OpaqueLazyAssociatedTestResults].
async fn get_results(&self, run_id: &RunId) -> Result<OpaqueLazyAssociatedTestResults>;
/// Execute a closure with access to a stream of raw bytes interpretable as [OpaqueLazyAssociatedTestResults].
async fn with_results_stream(
&self,
run_id: &RunId,
f: Box<dyn WithResultsStream + Send>,
) -> Result<()>;

fn boxed_clone(&self) -> Box<dyn PersistResults>;
}
Expand Down Expand Up @@ -136,16 +157,21 @@ impl ResultsPersistedCell {
}
}

pub fn eligible_to_retrieve(&self) -> bool {
self.0.processing.load(atomic::ORDERING) == 0
}

/// Attempts to retrieve a set of test results.
/// If there are persistence jobs pending, returns [None].
pub async fn retrieve(
pub async fn retrieve_with_callback(
&self,
persistence: &SharedPersistResults,
) -> Option<Result<OpaqueLazyAssociatedTestResults>> {
if self.0.processing.load(atomic::ORDERING) != 0 {
return None;
}
Some(persistence.0.get_results(&self.0.run_id).await)
callback: Box<dyn WithResultsStream + Send>,
) -> Result<()> {
persistence
.0
.with_results_stream(&self.0.run_id, callback)
.await
}
}

Expand Down Expand Up @@ -197,20 +223,17 @@ mod test {

use crate::persistence::{
remote::{self, fake_error, PersistenceKind},
results::EligibleForRemoteDump,
results::{test_utils::retrieve_results, EligibleForRemoteDump},
};

use super::{fs::FilesystemPersistor, ResultsPersistedCell};

#[tokio::test]
async fn retrieve_is_none_while_pending() {
let tempdir = tempfile::tempdir().unwrap();
let persistence = FilesystemPersistor::new_shared(tempdir.path(), 1, remote::NoopPersister);

async fn not_eligible_to_retrieve_while_there_are_pending_results() {
let cell = ResultsPersistedCell::new(RunId::unique());
cell.0.processing.fetch_add(1, atomic::ORDERING);

assert!(cell.retrieve(&persistence).await.is_none());
assert!(!cell.eligible_to_retrieve());
}

#[tokio::test]
Expand All @@ -232,8 +255,8 @@ mod test {

let cell = ResultsPersistedCell::new(RunId::unique());

let retrieved = cell.retrieve(&persistence).await.unwrap().unwrap();
let results = retrieved.decode().unwrap();
let results = retrieve_results(&cell, &persistence).await.unwrap();
let results = results.decode().unwrap();
assert!(results.is_empty());
}

Expand Down Expand Up @@ -265,12 +288,11 @@ mod test {
// That's okay. But the retrieved must definitely include at least results1.
let retrieve_task = {
async {
loop {
match cell.retrieve(&persistence).await {
None => tokio::time::sleep(Duration::from_micros(1)).await,
Some(results) => break results,
}
while !cell.eligible_to_retrieve() {
tokio::time::sleep(Duration::from_micros(1)).await;
}

retrieve_results(&cell, &persistence).await
}
};
let persist_task = async {
Expand All @@ -283,8 +305,7 @@ mod test {
};
let ((), retrieve_result) = tokio::join!(persist_task, retrieve_task);

let retrieved = retrieve_result.unwrap();
let results = retrieved.decode().unwrap();
let results = retrieve_result.unwrap().decode().unwrap();

use ResultsLine::Results;
match results.len() {
Expand Down
Loading

0 comments on commit 8567a8c

Please sign in to comment.