Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade deps #950

Merged
merged 3 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,035 changes: 679 additions & 356 deletions Cargo.lock

Large diffs are not rendered by default.

45 changes: 23 additions & 22 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,19 @@ build = "build.rs"
[dependencies]
### apache arrow/datafusion dependencies
# arrow = "51.0.0"
arrow-schema = { version = "52.1.0", features = ["serde"] }
arrow-array = { version = "52.1.0" }
arrow-json = "52.1.0"
arrow-ipc = { version = "52.1.0", features = ["zstd"] }
arrow-select = "52.1.0"
datafusion = { git = "https://github.com/apache/datafusion.git", rev = "a64df83502821f18067fb4ff65dd217815b305c9" }
object_store = { version = "0.10.2", features = ["cloud", "aws"] } # cannot update object_store as datafusion has not caught up
parquet = "52.1.0"
arrow-flight = { version = "52.1.0", features = [ "tls" ] }
tonic = {version = "0.11.0", features = ["tls", "transport", "gzip", "zstd"] }
tonic-web = "0.11.0"
tower-http = { version = "0.4.4", features = ["cors"] }
arrow-schema = { version = "53.0.0", features = ["serde"] }
arrow-array = { version = "53.0.0" }
arrow-json = "53.0.0"
arrow-ipc = { version = "53.0.0", features = ["zstd"] }
arrow-select = "53.0.0"
# datafusion = { git = "https://github.com/apache/datafusion.git", rev = "a64df83502821f18067fb4ff65dd217815b305c9" }
parmesant marked this conversation as resolved.
Show resolved Hide resolved
datafusion = "42.0.0"
object_store = { version = "0.11.0", features = ["cloud", "aws"] } # cannot update object_store as datafusion has not caught up
parmesant marked this conversation as resolved.
Show resolved Hide resolved
parquet = "53.0.0"
arrow-flight = { version = "53.0.0", features = [ "tls" ] }
tonic = {version = "0.12.1", features = ["tls", "transport", "gzip", "zstd"] }
tonic-web = "0.12.1"
tower-http = { version = "0.6.1", features = ["cors"] }

### actix dependencies
actix-web-httpauth = "0.8"
Expand Down Expand Up @@ -53,8 +54,8 @@ clap = { version = "4.1", default-features = false, features = [
"error-context",
] }
clokwerk = "0.4"
crossterm = "0.27.0"
derive_more = "0.99"
crossterm = "0.28.1"
derive_more = "0.99.18"
env_logger = "0.11.3"
fs_extra = "1.3"
futures = "0.3"
Expand All @@ -68,7 +69,7 @@ log = "0.4"
num_cpus = "1.15"
once_cell = "1.17.1"
prometheus = { version = "0.13", features = ["process"] }
rand = "0.8"
rand = "0.8.5"
regex = "1.7.3"
relative-path = { version = "1.7", features = ["serde"] }
reqwest = { version = "0.11.27", default-features = false, features = [
Expand All @@ -81,8 +82,8 @@ semver = "1.0"
serde = { version = "1.0", features = ["rc", "derive"] }
serde_json = "1.0"
static-files = "0.2"
sysinfo = "0.30.11"
thiserror = "1"
sysinfo = "0.31.4"
thiserror = "1.0.64"
thread-priority = "1.0.0"
tokio = { version = "1.28", default-features = false, features = [
"sync",
Expand All @@ -97,13 +98,13 @@ xz2 = { version = "*", features = ["static"] }
nom = "7.1.3"
humantime = "2.1.0"
human-size = "0.4"
openid = { version = "0.14.0", default-features = false, features = ["rustls"] }
openid = { version = "0.15.0", default-features = false, features = ["rustls"] }
url = "2.4.0"
http-auth-basic = "0.3.3"
serde_repr = "0.1.17"
hashlru = { version = "0.11.0", features = ["serde"] }
path-clean = "1.0.1"
prost = "0.12.3"
prost = "0.13.3"
prometheus-parse = "0.2.5"
sha2 = "0.10.8"

Expand All @@ -113,13 +114,13 @@ sha1_smol = { version = "1.0", features = ["std"] }
static-files = "0.2"
ureq = "2.6"
vergen = { version = "8.1", features = ["build", "git", "cargo", "gitcl"] }
zip = { version = "1.1.1", default-features = false, features = ["deflate"] }
zip = { version = "2.2.0", default-features = false, features = ["deflate"] }
url = "2.4.0"
prost-build = "0.12.3"
prost-build = "0.13.3"

[dev-dependencies]
maplit = "1.0"
rstest = "0.19.0"
rstest = "0.23.0"

[package.metadata.parseable_ui]
assets-url = "https://github.com/parseablehq/console/releases/download/v0.9.6/build.zip"
Expand Down
50 changes: 33 additions & 17 deletions server/src/catalog/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,44 +136,60 @@ pub struct Column {
impl TryFrom<&Statistics> for TypedStatistics {
type Error = parquet::errors::ParquetError;
fn try_from(value: &Statistics) -> Result<Self, Self::Error> {
if !value.has_min_max_set() {
if value.min_bytes_opt().is_none() || value.max_bytes_opt().is_none() {
return Err(parquet::errors::ParquetError::General(
"min max is not set".to_string(),
));
}

let res = match value {
Statistics::Boolean(stats) => TypedStatistics::Bool(BoolType {
min: *stats.min(),
max: *stats.max(),
min: *stats.min_opt().expect("Boolean stats min not set"),
max: *stats.max_opt().expect("Boolean stats max not set"),
}),
Statistics::Int32(stats) => TypedStatistics::Int(Int64Type {
min: *stats.min() as i64,
max: *stats.max() as i64,
min: *stats.min_opt().expect("Int32 stats min not set") as i64,
max: *stats.max_opt().expect("Int32 stats max not set") as i64,
}),
Statistics::Int64(stats) => TypedStatistics::Int(Int64Type {
min: *stats.min(),
max: *stats.max(),
min: *stats.min_opt().expect("Int64 stats min not set"),
max: *stats.max_opt().expect("Int64 stats max not set"),
}),
Statistics::Int96(stats) => TypedStatistics::Int(Int64Type {
min: stats.min().to_i64(),
max: stats.max().to_i64(),
min: stats.min_opt().expect("Int96 stats min not set").to_i64(),
max: stats.max_opt().expect("Int96 stats max not set").to_i64(),
}),
Statistics::Float(stats) => TypedStatistics::Float(Float64Type {
min: *stats.min() as f64,
max: *stats.max() as f64,
min: *stats.min_opt().expect("Float32 stats min not set") as f64,
max: *stats.max_opt().expect("Float32 stats max not set") as f64,
}),
Statistics::Double(stats) => TypedStatistics::Float(Float64Type {
min: *stats.min(),
max: *stats.max(),
min: *stats.min_opt().expect("Float64 stats min not set"),
max: *stats.max_opt().expect("Float64 stats max not set"),
}),
Statistics::ByteArray(stats) => TypedStatistics::String(Utf8Type {
min: stats.min().as_utf8()?.to_owned(),
max: stats.max().as_utf8()?.to_owned(),
min: stats
.min_opt()
.expect("Utf8 stats min not set")
.as_utf8()?
.to_owned(),
max: stats
.max_opt()
.expect("Utf8 stats max not set")
.as_utf8()?
.to_owned(),
}),
Statistics::FixedLenByteArray(stats) => TypedStatistics::String(Utf8Type {
min: stats.min().as_utf8()?.to_owned(),
max: stats.max().as_utf8()?.to_owned(),
min: stats
.min_opt()
.expect("Utf8 stats min not set")
.as_utf8()?
.to_owned(),
max: stats
.max_opt()
.expect("Utf8 stats max not set")
.as_utf8()?
.to_owned(),
}),
};

Expand Down
20 changes: 17 additions & 3 deletions server/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ use datafusion::arrow::record_batch::RecordBatch;

use datafusion::common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeVisitor};
use datafusion::error::DataFusionError;
use datafusion::execution::context::SessionState;
use datafusion::execution::disk_manager::DiskManagerConfig;
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::execution::SessionStateBuilder;
use datafusion::logical_expr::{Explain, Filter, LogicalPlan, PlanType, ToStringifiedPlan};
use datafusion::prelude::*;
use itertools::Itertools;
Expand Down Expand Up @@ -81,12 +81,26 @@ impl Query {
let runtime_config = runtime_config.with_memory_limit(pool_size, fraction);
let runtime = Arc::new(RuntimeEnv::new(runtime_config).unwrap());

let config = SessionConfig::default()
let mut config = SessionConfig::default()
.with_parquet_pruning(true)
.with_prefer_existing_sort(true)
.with_round_robin_repartition(true);

let state = SessionState::new_with_config_rt(config, runtime);
config.options_mut().execution.parquet.enable_page_index = true;
config.options_mut().execution.parquet.pushdown_filters = true;
config.options_mut().execution.parquet.reorder_filters = true;
config
.options_mut()
.execution
.parquet
.schema_force_view_types = true;
parmesant marked this conversation as resolved.
Show resolved Hide resolved

let state = SessionStateBuilder::new()
.with_default_features()
.with_config(config)
.with_runtime_env(runtime)
.build();

let schema_provider = Arc::new(GlobalSchemaProvider {
storage: storage.get_object_store(),
});
Expand Down
4 changes: 2 additions & 2 deletions server/src/query/listing_table_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use datafusion::{
listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl},
},
error::DataFusionError,
logical_expr::{col, Expr},
logical_expr::{col, SortExpr},
};
use futures_util::{future, stream::FuturesUnordered, Future, TryStreamExt};
use itertools::Itertools;
Expand Down Expand Up @@ -188,7 +188,7 @@ impl ListingTableBuilder {
if self.listing.is_empty() {
return Ok(None);
}
let file_sort_order: Vec<Vec<Expr>>;
let file_sort_order: Vec<Vec<SortExpr>>;
let file_format = ParquetFormat::default().with_enable_pruning(true);
if let Some(time_partition) = time_partition {
file_sort_order = vec![vec![col(time_partition).sort(true, false)]];
Expand Down
22 changes: 10 additions & 12 deletions server/src/query/stream_schema_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ use arrow_array::RecordBatch;
use arrow_schema::{Schema, SchemaRef, SortOptions};
use bytes::Bytes;
use chrono::{DateTime, NaiveDateTime, Timelike, Utc};
use datafusion::catalog::Session;
use datafusion::common::stats::Precision;
use datafusion::logical_expr::utils::conjunction;
use datafusion::{
catalog::schema::SchemaProvider,
catalog::SchemaProvider,
common::{
tree_node::{TreeNode, TreeNodeRecursion},
ToDFSchema,
Expand Down Expand Up @@ -122,7 +123,7 @@ async fn create_parquet_physical_plan(
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
state: &SessionState,
state: &dyn Session,
time_partition: Option<String>,
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
let filters = if let Some(expr) = conjunction(filters.to_vec()) {
Expand All @@ -149,7 +150,7 @@ async fn create_parquet_physical_plan(
// create the execution plan
let plan = file_format
.create_physical_plan(
state,
state.as_any().downcast_ref::<SessionState>().unwrap(), // Remove this when ParquetFormat catches up
FileScanConfig {
object_store_url,
file_schema: schema.clone(),
Expand Down Expand Up @@ -288,7 +289,7 @@ impl TableProvider for StandardTableProvider {

async fn scan(
&self,
state: &SessionState,
state: &dyn Session,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
Expand Down Expand Up @@ -496,7 +497,7 @@ async fn get_cache_exectuion_plan(
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
state: &SessionState,
state: &dyn Session,
time_partition: Option<String>,
) -> Result<Option<Arc<dyn ExecutionPlan>>, DataFusionError> {
let (cached, remainder) = cache_manager
Expand Down Expand Up @@ -545,7 +546,7 @@ async fn get_hottier_exectuion_plan(
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
state: &SessionState,
state: &dyn Session,
time_partition: Option<String>,
) -> Result<Option<Arc<dyn ExecutionPlan>>, DataFusionError> {
let (hot_tier_files, remainder) = hot_tier_manager
Expand Down Expand Up @@ -594,7 +595,7 @@ async fn legacy_listing_table(
object_store: Arc<dyn ObjectStore>,
time_filters: &[PartialTimeFilter],
schema: Arc<Schema>,
state: &SessionState,
state: &dyn Session,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
Expand Down Expand Up @@ -868,10 +869,7 @@ fn extract_timestamp_bound(
binexpr: BinaryExpr,
time_partition: Option<String>,
) -> Option<(Operator, NaiveDateTime)> {
Some((
binexpr.op.clone(),
extract_from_lit(binexpr, time_partition)?,
))
Some((binexpr.op, extract_from_lit(binexpr, time_partition)?))
}

async fn collect_manifest_files(
Expand Down Expand Up @@ -942,7 +940,7 @@ trait ManifestExt: ManifestFile {
return None;
};
/* `BinaryExp` doesn't implement `Copy` */
Some((expr.op.clone(), value))
Some((expr.op, value))
}

let Some(col) = self.find_matching_column(partial_filter) else {
Expand Down
4 changes: 2 additions & 2 deletions server/src/utils/arrow/flight.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ use futures::{stream, TryStreamExt};
use tonic::{Request, Response, Status};

use arrow_flight::FlightClient;
use http::Uri;
use tonic::transport::Channel;
// use http::Uri;
use tonic::transport::{Channel, Uri};

pub type DoGetStream = stream::BoxStream<'static, Result<FlightData, Status>>;

Expand Down
4 changes: 2 additions & 2 deletions server/src/utils/arrow/merged_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ impl MergedRecordReader {
log::error!("Invalid file detected, removing it: {:?}", file);
fs::remove_file(file).unwrap();
} else {
let reader =
StreamReader::try_new(File::open(file).unwrap(), None).map_err(|_| ())?;
let reader = StreamReader::try_new(BufReader::new(File::open(file).unwrap()), None)
.map_err(|_| ())?;
readers.push(reader);
}
}
Expand Down
2 changes: 1 addition & 1 deletion server/src/utils/arrow/reverse_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ pub fn get_reverse_reader<T: Read + Seek>(
// reset reader
reader.rewind()?;

Ok(StreamReader::try_new(OffsetReader::new(reader, messages), None).unwrap())
Ok(StreamReader::try_new(BufReader::new(OffsetReader::new(reader, messages)), None).unwrap())
}

pub fn reverse(rb: &RecordBatch) -> RecordBatch {
Expand Down
Loading