Skip to content

Commit

Permalink
code cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
parmesant committed Aug 29, 2024
1 parent 03c4e06 commit 625e557
Showing 1 changed file with 23 additions and 24 deletions.
47 changes: 23 additions & 24 deletions server/src/query/stream_schema_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ impl TableProvider for StandardTableProvider {
let mut memory_exec = None;
let mut cache_exec = None;
let mut hot_tier_exec = None;
let mut remote_table = None;
let mut listing_exec = None;
let object_store = state
.runtime_env()
.object_store_registry
Expand Down Expand Up @@ -354,10 +354,10 @@ impl TableProvider for StandardTableProvider {
// Is query timerange is overlapping with older data.
// if true, then get listing table time filters and execution plan separately
if is_overlapping_query(&merged_snapshot.manifest_list, &time_filters) {
let (listing_time_fiters, new_time_filters) =
return_listing_time_filters(&merged_snapshot.manifest_list, time_filters);
let listing_time_fiters =
return_listing_time_filters(&merged_snapshot.manifest_list, &mut time_filters);

let listing_tables = if let Some(listing_time_filter) = listing_time_fiters {
listing_exec = if let Some(listing_time_filter) = listing_time_fiters {
legacy_listing_table(
self.stream.clone(),
glob_storage.clone(),
Expand All @@ -374,9 +374,6 @@ impl TableProvider for StandardTableProvider {
} else {
None
};
time_filters = new_time_filters;

remote_table = listing_tables;
}

let mut manifest_files = collect_from_snapshot(
Expand All @@ -390,7 +387,7 @@ impl TableProvider for StandardTableProvider {

if manifest_files.is_empty() {
return final_plan(
vec![remote_table, memory_exec],
vec![listing_exec, memory_exec],
projection,
self.schema.clone(),
);
Expand Down Expand Up @@ -432,7 +429,7 @@ impl TableProvider for StandardTableProvider {
if manifest_files.is_empty() {
QUERY_CACHE_HIT.with_label_values(&[&self.stream]).inc();
return final_plan(
vec![remote_table, memory_exec, cache_exec, hot_tier_exec],
vec![listing_exec, memory_exec, cache_exec, hot_tier_exec],
projection,
self.schema.clone(),
);
Expand All @@ -454,7 +451,7 @@ impl TableProvider for StandardTableProvider {

Ok(final_plan(
vec![
remote_table,
listing_exec,
memory_exec,
cache_exec,
hot_tier_exec,
Expand Down Expand Up @@ -746,48 +743,50 @@ fn is_overlapping_query(
/// For manifest time filter, we will manifest lower bound and OG upper bound
fn return_listing_time_filters(
manifest_list: &[ManifestItem],
time_filters: Vec<PartialTimeFilter>,
) -> (Option<Vec<PartialTimeFilter>>, Vec<PartialTimeFilter>) {
time_filters: &mut Vec<PartialTimeFilter>,
) -> Option<Vec<PartialTimeFilter>> {
// vec to hold timestamps for listing
let mut vec_listing_timestamps = Vec::new();

let Some(first_entry_lower_bound) =
manifest_list.iter().map(|file| file.time_lower_bound).min()
else {
return (None, time_filters);
};
let mut first_entry_lower_bound = manifest_list
.iter()
.map(|file| file.time_lower_bound.naive_utc())
.min()?;

let mut new_time_filters = vec![PartialTimeFilter::Low(Bound::Included(
first_entry_lower_bound.naive_utc(),
first_entry_lower_bound,
))];

time_filters.into_iter().for_each(|filter| {
time_filters.iter_mut().for_each(|filter| {
match filter {
// since we've already determined that there is a need to list tables,
// we just need to check whether the filter's upper bound is < manifest lower bound
PartialTimeFilter::High(Bound::Included(upper))
| PartialTimeFilter::High(Bound::Excluded(upper)) => {
if upper.lt(&first_entry_lower_bound.naive_utc()) {
if upper.lt(&&mut first_entry_lower_bound) {
// filter upper bound is less than manifest lower bound, continue using filter upper bound
vec_listing_timestamps.push(filter.clone());
} else {
// use manifest lower bound as excluded
vec_listing_timestamps.push(PartialTimeFilter::High(Bound::Excluded(
first_entry_lower_bound.naive_utc(),
first_entry_lower_bound,
)));
}
new_time_filters.push(filter.clone());
}
_ => {
vec_listing_timestamps.push(filter);
vec_listing_timestamps.push(filter.clone());
}
}
});

// update time_filters
*time_filters = new_time_filters;

if vec_listing_timestamps.len().gt(&0) {
(Some(vec_listing_timestamps), new_time_filters)
Some(vec_listing_timestamps)
} else {
(None, new_time_filters)
None
}
}

Expand Down

0 comments on commit 625e557

Please sign in to comment.