-
Based on the discussion in #7675, I removed The underlying So a Flattener wraps a stream and puts it in a Vec. The Flattener stream pops out the stream from the Vec polls for the next element and then puts the stream back in the Vec. The actual logic in production uses a binary heap to pop streams in a sorted manner but this a minimal version to reproduce the issue. SimpleWrapper is a passthrough wrapper just polls the underlying stream and returns the result. I've added it in as a baseline implementation that works. pin_project! {
struct Flattener<S>
{
data: Vec<S>,
}
}
pin_project! {
struct SimpleWrap<S>
{
data: S
}
}
impl<S> Stream for Flattener<S>
where
S: Stream + Unpin,
{
type Item = S::Item;
fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let this = self.project();
match this.data.pop() {
Some(mut stream) => match ready!(stream.poll_next_unpin(cx)) {
Some(element) => {
this.data.push(stream);
Poll::Ready(Some(element))
}
_ => {
dbg!("No next element");
Poll::Ready(None)
}
},
_ => {
dbg!("empty vec");
Poll::Ready(None)
}
}
}
}
impl<S> Stream for SimpleWrap<S>
where
S: Stream + Unpin,
{
type Item = S::Item;
fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let this = self.project();
if let Some(element) = ready!(this.data.poll_next_unpin(cx)) {
dbg!("Got next element");
Poll::Ready(Some(element))
} else {
Poll::Ready(None)
}
}
} Below is the actual test logic, which registers a file on disk as a table and queries for all its records. There's also a mock data stream created from a vec of numbers and put in a flattener. let file_path = "../../tests/test_data/quote_tick_data.parquet";
let table_name = "data";
let session_ctx = SessionContext::default();
let parquet_options = ParquetReadOptions::<'_> {
skip_metadata: Some(false),
..Default::default()
};
session_ctx
.register_parquet(table_name, file_path, parquet_options)
.await
.unwrap();
let batch_stream = session_ctx
.sql(&format!("SELECT * FROM {} ORDER BY ts_init", &table_name))
// .sql(&format!("SELECT * FROM {}", &table_name))
.await
.unwrap()
.execute_stream()
.await
.unwrap();
let mock_data: Vec<Result<usize, ()>> = (0..100000).map(Ok).collect();
let mut mock_data = futures::stream::iter(mock_data);
let mut iter = Flattener {
data: vec![batch_stream],
};
let mut mock_iter = Flattener {
data: vec![mock_data],
};
// let mut iter = SimpleWrap {
// data: batch_stream,
// };
let step = 10_000;
while let Some(c) = mock_iter.next().await {
c.map(|val| if val % step == 0 { dbg!(val) } else { 0 })
.unwrap_or(0);
}
dbg!("Stream batch");
while let Some(c) = iter.next().await {
dbg!(c.map(|batch| batch.num_rows()).unwrap_or(0));
} Here's the results for the various combinations
My guess is that it has the to be the Vec push and pop that's messing with the behavior but it's so weird and not clear why it's messing up the underlying stream. |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 3 replies
-
I didn't quite follow how Flattener is working, but if you want to convert a |
Beta Was this translation helpful? Give feedback.
BTW
That will return if the underlying stream is not ready, even if there is something in your buffer, right?