From 625e5575c4aa7084f00531ca132f7031b67614a4 Mon Sep 17 00:00:00 2001 From: anant Date: Thu, 29 Aug 2024 22:02:43 +0530 Subject: [PATCH] code cleanup --- server/src/query/stream_schema_provider.rs | 47 +++++++++++----------- 1 file changed, 23 insertions(+), 24 deletions(-) diff --git a/server/src/query/stream_schema_provider.rs b/server/src/query/stream_schema_provider.rs index b2041ffb..e5d6055d 100644 --- a/server/src/query/stream_schema_provider.rs +++ b/server/src/query/stream_schema_provider.rs @@ -296,7 +296,7 @@ impl TableProvider for StandardTableProvider { let mut memory_exec = None; let mut cache_exec = None; let mut hot_tier_exec = None; - let mut remote_table = None; + let mut listing_exec = None; let object_store = state .runtime_env() .object_store_registry @@ -354,10 +354,10 @@ impl TableProvider for StandardTableProvider { // Is query timerange is overlapping with older data. // if true, then get listing table time filters and execution plan separately if is_overlapping_query(&merged_snapshot.manifest_list, &time_filters) { - let (listing_time_fiters, new_time_filters) = - return_listing_time_filters(&merged_snapshot.manifest_list, time_filters); + let listing_time_fiters = + return_listing_time_filters(&merged_snapshot.manifest_list, &mut time_filters); - let listing_tables = if let Some(listing_time_filter) = listing_time_fiters { + listing_exec = if let Some(listing_time_filter) = listing_time_fiters { legacy_listing_table( self.stream.clone(), glob_storage.clone(), @@ -374,9 +374,6 @@ impl TableProvider for StandardTableProvider { } else { None }; - time_filters = new_time_filters; - - remote_table = listing_tables; } let mut manifest_files = collect_from_snapshot( @@ -390,7 +387,7 @@ impl TableProvider for StandardTableProvider { if manifest_files.is_empty() { return final_plan( - vec![remote_table, memory_exec], + vec![listing_exec, memory_exec], projection, self.schema.clone(), ); @@ -432,7 +429,7 @@ impl TableProvider for StandardTableProvider { if manifest_files.is_empty() { QUERY_CACHE_HIT.with_label_values(&[&self.stream]).inc(); return final_plan( - vec![remote_table, memory_exec, cache_exec, hot_tier_exec], + vec![listing_exec, memory_exec, cache_exec, hot_tier_exec], projection, self.schema.clone(), ); @@ -454,7 +451,7 @@ impl TableProvider for StandardTableProvider { Ok(final_plan( vec![ - remote_table, + listing_exec, memory_exec, cache_exec, hot_tier_exec, @@ -746,48 +743,50 @@ fn is_overlapping_query( /// For manifest time filter, we will manifest lower bound and OG upper bound fn return_listing_time_filters( manifest_list: &[ManifestItem], - time_filters: Vec, -) -> (Option>, Vec) { + time_filters: &mut Vec, +) -> Option> { // vec to hold timestamps for listing let mut vec_listing_timestamps = Vec::new(); - let Some(first_entry_lower_bound) = - manifest_list.iter().map(|file| file.time_lower_bound).min() - else { - return (None, time_filters); - }; + let mut first_entry_lower_bound = manifest_list + .iter() + .map(|file| file.time_lower_bound.naive_utc()) + .min()?; let mut new_time_filters = vec![PartialTimeFilter::Low(Bound::Included( - first_entry_lower_bound.naive_utc(), + first_entry_lower_bound, ))]; - time_filters.into_iter().for_each(|filter| { + time_filters.iter_mut().for_each(|filter| { match filter { // since we've already determined that there is a need to list tables, // we just need to check whether the filter's upper bound is < manifest lower bound PartialTimeFilter::High(Bound::Included(upper)) | PartialTimeFilter::High(Bound::Excluded(upper)) => { - if upper.lt(&first_entry_lower_bound.naive_utc()) { + if upper.lt(&&mut first_entry_lower_bound) { // filter upper bound is less than manifest lower bound, continue using filter upper bound vec_listing_timestamps.push(filter.clone()); } else { // use manifest lower bound as excluded vec_listing_timestamps.push(PartialTimeFilter::High(Bound::Excluded( - first_entry_lower_bound.naive_utc(), + first_entry_lower_bound, ))); } new_time_filters.push(filter.clone()); } _ => { - vec_listing_timestamps.push(filter); + vec_listing_timestamps.push(filter.clone()); } } }); + // update time_filters + *time_filters = new_time_filters; + if vec_listing_timestamps.len().gt(&0) { - (Some(vec_listing_timestamps), new_time_filters) + Some(vec_listing_timestamps) } else { - (None, new_time_filters) + None } }