Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pagination to query Execution #141

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
255 changes: 234 additions & 21 deletions src/app/app_execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,46 @@ use crate::app::state::tabs::sql::Query;
use crate::app::AppEvent;
use crate::execution::ExecutionContext;
use color_eyre::eyre::Result;
use datafusion::arrow::array::RecordBatch;
use datafusion::execution::context::SessionContext;
use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream};
use futures::StreamExt;
use log::{error, info};
use std::fmt::Debug;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::Mutex;

/// Handles executing queries for the TUI application, formatting results
/// and sending them to the UI.
#[derive(Debug)]
pub(crate) struct AppExecution {
inner: Arc<ExecutionContext>,
results: Arc<Mutex<Option<PaginatingRecordBatchStream>>>,
}

impl AppExecution {
/// Create a new instance of [`AppExecution`].
pub fn new(inner: Arc<ExecutionContext>) -> Self {
Self { inner }
Self {
inner,
results: Arc::new(Mutex::new(None)),
}
}

pub fn session_ctx(&self) -> &SessionContext {
self.inner.session_ctx()
}

pub fn results(&self) -> Arc<Mutex<Option<PaginatingRecordBatchStream>>> {
Arc::clone(&self.results)
}

async fn set_results(&self, results: PaginatingRecordBatchStream) {
let mut r = self.results.lock().await;
*r = Some(results);
}

/// Run the sequence of SQL queries, sending the results as [`AppEvent::QueryResult`] via the sender.
Expand All @@ -61,26 +85,30 @@ impl AppExecution {
if i == statement_count - 1 {
info!("Executing last query and display results");
match self.inner.execute_sql(sql).await {
Ok(mut stream) => {
let mut batches = Vec::new();
while let Some(maybe_batch) = stream.next().await {
match maybe_batch {
Ok(batch) => {
batches.push(batch);
}
Err(e) => {
let elapsed = start.elapsed();
query.set_error(Some(e.to_string()));
query.set_execution_time(elapsed);
break;
}
}
}
let elapsed = start.elapsed();
let rows: usize = batches.iter().map(|r| r.num_rows()).sum();
query.set_results(Some(batches));
query.set_num_rows(Some(rows));
query.set_execution_time(elapsed);
Ok(stream) => {
let mut paginating_stream = PaginatingRecordBatchStream::new(stream);
paginating_stream.next_batch().await?;
self.set_results(paginating_stream).await;

// let mut batches = Vec::new();
// while let Some(maybe_batch) = stream.next().await {
// match maybe_batch {
// Ok(batch) => {
// batches.push(batch);
// }
// Err(e) => {
// let elapsed = start.elapsed();
// query.set_error(Some(e.to_string()));
// query.set_execution_time(elapsed);
// break;
// }
// }
// }
// let elapsed = start.elapsed();
// let rows: usize = batches.iter().map(|r| r.num_rows()).sum();
// query.set_results(Some(batches));
// query.set_num_rows(Some(rows));
// query.set_execution_time(elapsed);
}
Err(e) => {
error!("Error creating dataframe: {:?}", e);
Expand Down Expand Up @@ -108,3 +136,188 @@ impl AppExecution {
Ok(())
}
}

/// A stream of [`RecordBatch`]es that can be paginated for display in the TUI.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing that isn't entirely clear to me is if an entire RecordBatch is shown at a time or just a slice (like only rows 100-200)

This might be an important distinction in terms of what "current batch" means and if you want this structure to pagninate based on batch index or logical row number

I think either could work -- but if you pagniate on batch, you'll have to implement logic somewhere else to translate that into logical row number for display

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that could depend on the UI that we expose. I have two options in mind:

  1. Incrementing a page completely replaces the displayed records with the next page of records
  2. Incrementing a page adds the records to the existing view (i think getting the UI correct for this may not be straight forward as i believe the naive expectation could be that the next page is triggered while scrolling and reaching the bottom of the table - like how it works in DBeaver. But happy to hear other views on this)

One thing to note - is that the Table widget we use automatically provides some scrolling capabilities (it tracks the selected row automatically - so if we continue to use that, which i think is beneficial at our stage, we at least dont have to do everything from scratch).

One potentially terrible idea I had, that would at least enable a very simple v1 (at the cost of worse query performance) is default batch size for the TUI to some relatively small amount (say 200 rows) and then all rows for that batch would be added to the Table. Next page just gets the next batch and replaces the Table records. (I.e. option 1). We wouldnt need to track rows or figure out how to stitch records together between record batch boundaries. I believe this is the simplest approach and would at least make the app usable for larger queries, then we could add a todo for something more user friendly / that doesnt impact query performance.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incrementing a page completely replaces the displayed records with the next page of records

This is how I (naively) as a user would expect things to behave

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One potentially terrible idea I had, that would at least enable a very simple v1 (at the cost of worse query performance) is default batch size for the TUI to some relatively small amount (say 200 rows) and then all rows for that batch would be added to the Table. Next page just gets the next batch and replaces the Table records. (I.e. option 1). We wouldnt need to track rows or figure out how to stitch records together between record batch boundaries. I believe this is the simplest approach and would at least make the app usable for larger queries, then we could add a todo for something more user friendly / that doesnt impact query performance.

This seems reasonable to me.

I also think all the APIs to stitch rows together are in arrow-rs (like concat_batches and slice) so we could also make some sort of adapter stream (another wrapper!) that took the incoming stream and reformatted it to smaller sized record batches

pub struct PaginatingRecordBatchStream {
// currently executing stream
inner: SendableRecordBatchStream,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because SendableRecordBatchStream is not Sync this struct doesnt work well with our AppEvent which requires Send and Sync. I think we can restructure so that the batches / current idx are managed separate from the batch stream so they can be sent as AppEvent and state can be updated in the existing pattern (i.e in our handlers) and then we dont need to lock during rendering.

// any batches that have been buffered so far
batches: Vec<RecordBatch>,
// current batch being shown
current_batch: Option<usize>,
}

impl PaginatingRecordBatchStream {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure yet if this is the final api signatures - I will know once I plug into the display code

pub fn new(inner: Pin<Box<dyn RecordBatchStream + Send>>) -> Self {
Self {
inner,
batches: Vec::new(),
current_batch: None,
}
}

/// Return the batch at the current index
pub fn current_batch(&self) -> Option<&RecordBatch> {
if let Some(idx) = self.current_batch {
self.batches.get(idx)
} else {
None
}
}

/// Return the next batch
/// TBD on logic for handling the end
pub async fn next_batch(&mut self) -> Result<Option<&RecordBatch>> {
if let Some(b) = self.inner.next().await {
match b {
Ok(batch) => {
self.batches.push(batch);
self.current_batch = Some(self.batches.len() - 1);
Ok(self.current_batch())
}
Err(e) => Err(e.into()),
}
} else {
Ok(None)
}
}

/// Return the previous batch
/// TBD on logic for handling the beginning
pub fn previous_batch(&mut self) -> Option<&RecordBatch> {
if let Some(idx) = self.current_batch {
if idx > 0 {
self.current_batch = Some(idx - 1);
}
}
self.current_batch()
}
}

impl Debug for PaginatingRecordBatchStream {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PaginatingRecordBatchStream")
.field("batches", &self.batches)
.field("current_batch", &self.current_batch)
.finish()
}
}

#[cfg(test)]
mod tests {
use super::PaginatingRecordBatchStream;
use datafusion::{
arrow::array::{ArrayRef, Int32Array, RecordBatch},
common::Result,
physical_plan::stream::RecordBatchStreamAdapter,
};
use std::sync::Arc;

#[tokio::test]
async fn test_paginating_record_batch_stream() {
let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2]));
let b: ArrayRef = Arc::new(Int32Array::from(vec![1, 1]));

let record_batch1 = RecordBatch::try_from_iter(vec![("a", a)]).unwrap();
let record_batch2 = RecordBatch::try_from_iter(vec![("b", b)]).unwrap();

let schema = record_batch1.schema();
let batches: Vec<Result<RecordBatch>> =
vec![Ok(record_batch1.clone()), Ok(record_batch2.clone())];
let stream = futures::stream::iter(batches);
let sendable_stream = Box::pin(RecordBatchStreamAdapter::new(schema, stream));

let mut paginating_stream = PaginatingRecordBatchStream::new(sendable_stream);

assert_eq!(paginating_stream.current_batch(), None);
assert_eq!(
paginating_stream.next_batch().await.unwrap(),
Some(&record_batch1)
);
assert_eq!(
paginating_stream.next_batch().await.unwrap(),
Some(&record_batch2)
);
assert_eq!(paginating_stream.next_batch().await.unwrap(), None);
}

#[tokio::test]
async fn test_paginating_record_batch_stream_previous() {
let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2]));
let b: ArrayRef = Arc::new(Int32Array::from(vec![1, 1]));

let record_batch1 = RecordBatch::try_from_iter(vec![("a", a)]).unwrap();
let record_batch2 = RecordBatch::try_from_iter(vec![("b", b)]).unwrap();

let schema = record_batch1.schema();
let batches: Vec<Result<RecordBatch>> =
vec![Ok(record_batch1.clone()), Ok(record_batch2.clone())];
let stream = futures::stream::iter(batches);
let sendable_stream = Box::pin(RecordBatchStreamAdapter::new(schema, stream));

let mut paginating_stream = PaginatingRecordBatchStream::new(sendable_stream);

assert_eq!(paginating_stream.current_batch(), None);
assert_eq!(
paginating_stream.next_batch().await.unwrap(),
Some(&record_batch1)
);
assert_eq!(
paginating_stream.next_batch().await.unwrap(),
Some(&record_batch2)
);
assert_eq!(paginating_stream.next_batch().await.unwrap(), None);
assert_eq!(paginating_stream.current_batch(), Some(&record_batch2));
assert_eq!(paginating_stream.previous_batch(), Some(&record_batch1));
assert_eq!(paginating_stream.previous_batch(), Some(&record_batch1));
}

#[tokio::test]
async fn test_paginating_record_batch_stream_one_error() {
let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2]));
let record_batch1 = RecordBatch::try_from_iter(vec![("a", a)]).unwrap();

let schema = record_batch1.schema();
let batches: Vec<Result<RecordBatch>> = vec![Err(
datafusion::error::DataFusionError::Execution("Error creating dataframe".to_string()),
)];
let stream = futures::stream::iter(batches);
let sendable_stream = Box::pin(RecordBatchStreamAdapter::new(schema, stream));

let mut paginating_stream = PaginatingRecordBatchStream::new(sendable_stream);

assert_eq!(paginating_stream.current_batch(), None);

let res = paginating_stream.next_batch().await;
assert!(res.is_err());
}

#[tokio::test]
async fn test_paginating_record_batch_stream_successful_then_error() {
let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2]));

let record_batch1 = RecordBatch::try_from_iter(vec![("a", a)]).unwrap();

let schema = record_batch1.schema();
let batches: Vec<Result<RecordBatch>> = vec![
Ok(record_batch1.clone()),
Err(datafusion::error::DataFusionError::Execution(
"Error creating dataframe".to_string(),
)),
];
let stream = futures::stream::iter(batches);
let sendable_stream = Box::pin(RecordBatchStreamAdapter::new(schema, stream));

let mut paginating_stream = PaginatingRecordBatchStream::new(sendable_stream);

assert_eq!(paginating_stream.current_batch(), None);
assert_eq!(
paginating_stream.next_batch().await.unwrap(),
Some(&record_batch1)
);
let res = paginating_stream.next_batch().await;
assert!(res.is_err());
assert_eq!(paginating_stream.next_batch().await.unwrap(), None);
assert_eq!(paginating_stream.current_batch(), Some(&record_batch1));
}
}
Loading
Loading