Skip to content

Commit

Permalink
init
Browse files Browse the repository at this point in the history
  • Loading branch information
parmesant committed Oct 1, 2024
1 parent 715a546 commit dad3fe3
Show file tree
Hide file tree
Showing 9 changed files with 765 additions and 414 deletions.
1,035 changes: 679 additions & 356 deletions Cargo.lock

Large diffs are not rendered by default.

45 changes: 23 additions & 22 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,19 @@ build = "build.rs"
[dependencies]
### apache arrow/datafusion dependencies
# arrow = "51.0.0"
arrow-schema = { version = "52.1.0", features = ["serde"] }
arrow-array = { version = "52.1.0" }
arrow-json = "52.1.0"
arrow-ipc = { version = "52.1.0", features = ["zstd"] }
arrow-select = "52.1.0"
datafusion = { git = "https://github.com/apache/datafusion.git", rev = "a64df83502821f18067fb4ff65dd217815b305c9" }
object_store = { version = "0.10.2", features = ["cloud", "aws"] } # cannot update object_store as datafusion has not caught up
parquet = "52.1.0"
arrow-flight = { version = "52.1.0", features = [ "tls" ] }
tonic = {version = "0.11.0", features = ["tls", "transport", "gzip", "zstd"] }
tonic-web = "0.11.0"
tower-http = { version = "0.4.4", features = ["cors"] }
arrow-schema = { version = "53.0.0", features = ["serde"] }
arrow-array = { version = "53.0.0" }
arrow-json = "53.0.0"
arrow-ipc = { version = "53.0.0", features = ["zstd"] }
arrow-select = "53.0.0"
# datafusion = { git = "https://github.com/apache/datafusion.git", rev = "a64df83502821f18067fb4ff65dd217815b305c9" }
datafusion = "42.0.0"
object_store = { version = "0.11.0", features = ["cloud", "aws"] } # cannot update object_store as datafusion has not caught up
parquet = "53.0.0"
arrow-flight = { version = "53.0.0", features = [ "tls" ] }
tonic = {version = "0.12.1", features = ["tls", "transport", "gzip", "zstd"] }
tonic-web = "0.12.1"
tower-http = { version = "0.6.1", features = ["cors"] }

### actix dependencies
actix-web-httpauth = "0.8"
Expand Down Expand Up @@ -53,8 +54,8 @@ clap = { version = "4.1", default-features = false, features = [
"error-context",
] }
clokwerk = "0.4"
crossterm = "0.27.0"
derive_more = "0.99"
crossterm = "0.28.1"
derive_more = "0.99.18"
env_logger = "0.11.3"
fs_extra = "1.3"
futures = "0.3"
Expand All @@ -68,7 +69,7 @@ log = "0.4"
num_cpus = "1.15"
once_cell = "1.17.1"
prometheus = { version = "0.13", features = ["process"] }
rand = "0.8"
rand = "0.8.5"
regex = "1.7.3"
relative-path = { version = "1.7", features = ["serde"] }
reqwest = { version = "0.11.27", default-features = false, features = [
Expand All @@ -81,8 +82,8 @@ semver = "1.0"
serde = { version = "1.0", features = ["rc", "derive"] }
serde_json = "1.0"
static-files = "0.2"
sysinfo = "0.30.11"
thiserror = "1"
sysinfo = "0.31.4"
thiserror = "1.0.64"
thread-priority = "1.0.0"
tokio = { version = "1.28", default-features = false, features = [
"sync",
Expand All @@ -97,13 +98,13 @@ xz2 = { version = "*", features = ["static"] }
nom = "7.1.3"
humantime = "2.1.0"
human-size = "0.4"
openid = { version = "0.14.0", default-features = false, features = ["rustls"] }
openid = { version = "0.15.0", default-features = false, features = ["rustls"] }
url = "2.4.0"
http-auth-basic = "0.3.3"
serde_repr = "0.1.17"
hashlru = { version = "0.11.0", features = ["serde"] }
path-clean = "1.0.1"
prost = "0.12.3"
prost = "0.13.3"
prometheus-parse = "0.2.5"
sha2 = "0.10.8"

Expand All @@ -113,13 +114,13 @@ sha1_smol = { version = "1.0", features = ["std"] }
static-files = "0.2"
ureq = "2.6"
vergen = { version = "8.1", features = ["build", "git", "cargo", "gitcl"] }
zip = { version = "1.1.1", default-features = false, features = ["deflate"] }
zip = { version = "2.2.0", default-features = false, features = ["deflate"] }
url = "2.4.0"
prost-build = "0.12.3"
prost-build = "0.13.3"

[dev-dependencies]
maplit = "1.0"
rstest = "0.19.0"
rstest = "0.23.0"

[package.metadata.parseable_ui]
assets-url = "https://github.com/parseablehq/console/releases/download/v0.9.6/build.zip"
Expand Down
50 changes: 33 additions & 17 deletions server/src/catalog/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,44 +136,60 @@ pub struct Column {
impl TryFrom<&Statistics> for TypedStatistics {
type Error = parquet::errors::ParquetError;
fn try_from(value: &Statistics) -> Result<Self, Self::Error> {
if !value.has_min_max_set() {
if value.min_bytes_opt().is_none() || value.max_bytes_opt().is_none() {
return Err(parquet::errors::ParquetError::General(
"min max is not set".to_string(),
));
}

let res = match value {
Statistics::Boolean(stats) => TypedStatistics::Bool(BoolType {
min: *stats.min(),
max: *stats.max(),
min: *stats.min_opt().expect("Boolean stats min not set"),
max: *stats.max_opt().expect("Boolean stats max not set"),
}),
Statistics::Int32(stats) => TypedStatistics::Int(Int64Type {
min: *stats.min() as i64,
max: *stats.max() as i64,
min: *stats.min_opt().expect("Int32 stats min not set") as i64,
max: *stats.max_opt().expect("Int32 stats max not set") as i64,
}),
Statistics::Int64(stats) => TypedStatistics::Int(Int64Type {
min: *stats.min(),
max: *stats.max(),
min: *stats.min_opt().expect("Int64 stats min not set"),
max: *stats.max_opt().expect("Int64 stats max not set"),
}),
Statistics::Int96(stats) => TypedStatistics::Int(Int64Type {
min: stats.min().to_i64(),
max: stats.max().to_i64(),
min: stats.min_opt().expect("Int96 stats min not set").to_i64(),
max: stats.max_opt().expect("Int96 stats max not set").to_i64(),
}),
Statistics::Float(stats) => TypedStatistics::Float(Float64Type {
min: *stats.min() as f64,
max: *stats.max() as f64,
min: *stats.min_opt().expect("Float32 stats min not set") as f64,
max: *stats.max_opt().expect("Float32 stats max not set") as f64,
}),
Statistics::Double(stats) => TypedStatistics::Float(Float64Type {
min: *stats.min(),
max: *stats.max(),
min: *stats.min_opt().expect("Float64 stats min not set"),
max: *stats.max_opt().expect("Float64 stats max not set"),
}),
Statistics::ByteArray(stats) => TypedStatistics::String(Utf8Type {
min: stats.min().as_utf8()?.to_owned(),
max: stats.max().as_utf8()?.to_owned(),
min: stats
.min_opt()
.expect("Utf8 stats min not set")
.as_utf8()?
.to_owned(),
max: stats
.max_opt()
.expect("Utf8 stats max not set")
.as_utf8()?
.to_owned(),
}),
Statistics::FixedLenByteArray(stats) => TypedStatistics::String(Utf8Type {
min: stats.min().as_utf8()?.to_owned(),
max: stats.max().as_utf8()?.to_owned(),
min: stats
.min_opt()
.expect("Utf8 stats min not set")
.as_utf8()?
.to_owned(),
max: stats
.max_opt()
.expect("Utf8 stats max not set")
.as_utf8()?
.to_owned(),
}),
};

Expand Down
16 changes: 13 additions & 3 deletions server/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ use datafusion::arrow::record_batch::RecordBatch;

use datafusion::common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeVisitor};
use datafusion::error::DataFusionError;
use datafusion::execution::context::SessionState;
use datafusion::execution::disk_manager::DiskManagerConfig;
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::execution::SessionStateBuilder;
use datafusion::logical_expr::{Explain, Filter, LogicalPlan, PlanType, ToStringifiedPlan};
use datafusion::prelude::*;
use itertools::Itertools;
Expand Down Expand Up @@ -81,12 +81,22 @@ impl Query {
let runtime_config = runtime_config.with_memory_limit(pool_size, fraction);
let runtime = Arc::new(RuntimeEnv::new(runtime_config).unwrap());

let config = SessionConfig::default()
let mut config = SessionConfig::default()
.with_parquet_pruning(true)
.with_prefer_existing_sort(true)
.with_round_robin_repartition(true);

config.options_mut().execution.parquet.enable_page_index = true;
config.options_mut().execution.parquet.pushdown_filters = true;
config.options_mut().execution.parquet.reorder_filters = true;
config.options_mut().execution.parquet.schema_force_view_types = true;

let state = SessionStateBuilder::new()
.with_default_features()
.with_config(config)
.with_runtime_env(runtime)
.build();

let state = SessionState::new_with_config_rt(config, runtime);
let schema_provider = Arc::new(GlobalSchemaProvider {
storage: storage.get_object_store(),
});
Expand Down
4 changes: 2 additions & 2 deletions server/src/query/listing_table_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use datafusion::{
listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl},
},
error::DataFusionError,
logical_expr::{col, Expr},
logical_expr::{col, SortExpr},
};
use futures_util::{future, stream::FuturesUnordered, Future, TryStreamExt};
use itertools::Itertools;
Expand Down Expand Up @@ -188,7 +188,7 @@ impl ListingTableBuilder {
if self.listing.is_empty() {
return Ok(None);
}
let file_sort_order: Vec<Vec<Expr>>;
let file_sort_order: Vec<Vec<SortExpr>>;
let file_format = ParquetFormat::default().with_enable_pruning(true);
if let Some(time_partition) = time_partition {
file_sort_order = vec![vec![col(time_partition).sort(true, false)]];
Expand Down
19 changes: 10 additions & 9 deletions server/src/query/stream_schema_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ use arrow_array::RecordBatch;
use arrow_schema::{Schema, SchemaRef, SortOptions};
use bytes::Bytes;
use chrono::{DateTime, NaiveDateTime, Timelike, Utc};
use datafusion::catalog::Session;
use datafusion::common::stats::Precision;
use datafusion::logical_expr::utils::conjunction;
use datafusion::{
catalog::schema::SchemaProvider,
catalog::SchemaProvider,
common::{
tree_node::{TreeNode, TreeNodeRecursion},
ToDFSchema,
Expand Down Expand Up @@ -122,7 +123,7 @@ async fn create_parquet_physical_plan(
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
state: &SessionState,
state: &dyn Session,
time_partition: Option<String>,
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
let filters = if let Some(expr) = conjunction(filters.to_vec()) {
Expand All @@ -149,7 +150,7 @@ async fn create_parquet_physical_plan(
// create the execution plan
let plan = file_format
.create_physical_plan(
state,
state.as_any().downcast_ref::<SessionState>().unwrap(), // Remove this when ParquetFormat catches up
FileScanConfig {
object_store_url,
file_schema: schema.clone(),
Expand Down Expand Up @@ -288,7 +289,7 @@ impl TableProvider for StandardTableProvider {

async fn scan(
&self,
state: &SessionState,
state: &dyn Session,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
Expand Down Expand Up @@ -496,7 +497,7 @@ async fn get_cache_exectuion_plan(
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
state: &SessionState,
state: &dyn Session,
time_partition: Option<String>,
) -> Result<Option<Arc<dyn ExecutionPlan>>, DataFusionError> {
let (cached, remainder) = cache_manager
Expand Down Expand Up @@ -545,7 +546,7 @@ async fn get_hottier_exectuion_plan(
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
state: &SessionState,
state: &dyn Session,
time_partition: Option<String>,
) -> Result<Option<Arc<dyn ExecutionPlan>>, DataFusionError> {
let (hot_tier_files, remainder) = hot_tier_manager
Expand Down Expand Up @@ -594,7 +595,7 @@ async fn legacy_listing_table(
object_store: Arc<dyn ObjectStore>,
time_filters: &[PartialTimeFilter],
schema: Arc<Schema>,
state: &SessionState,
state: &dyn Session,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
Expand Down Expand Up @@ -869,7 +870,7 @@ fn extract_timestamp_bound(
time_partition: Option<String>,
) -> Option<(Operator, NaiveDateTime)> {
Some((
binexpr.op.clone(),
binexpr.op,
extract_from_lit(binexpr, time_partition)?,
))
}
Expand Down Expand Up @@ -942,7 +943,7 @@ trait ManifestExt: ManifestFile {
return None;
};
/* `BinaryExp` doesn't implement `Copy` */
Some((expr.op.clone(), value))
Some((expr.op, value))
}

let Some(col) = self.find_matching_column(partial_filter) else {
Expand Down
4 changes: 2 additions & 2 deletions server/src/utils/arrow/flight.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ use futures::{stream, TryStreamExt};
use tonic::{Request, Response, Status};

use arrow_flight::FlightClient;
use http::Uri;
use tonic::transport::Channel;
// use http::Uri;
use tonic::transport::{Channel, Uri};

pub type DoGetStream = stream::BoxStream<'static, Result<FlightData, Status>>;

Expand Down
4 changes: 2 additions & 2 deletions server/src/utils/arrow/merged_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ impl MergedRecordReader {
log::error!("Invalid file detected, removing it: {:?}", file);
fs::remove_file(file).unwrap();
} else {
let reader =
StreamReader::try_new(File::open(file).unwrap(), None).map_err(|_| ())?;
let reader = StreamReader::try_new(BufReader::new(File::open(file).unwrap()), None)
.map_err(|_| ())?;
readers.push(reader);
}
}
Expand Down
2 changes: 1 addition & 1 deletion server/src/utils/arrow/reverse_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ pub fn get_reverse_reader<T: Read + Seek>(
// reset reader
reader.rewind()?;

Ok(StreamReader::try_new(OffsetReader::new(reader, messages), None).unwrap())
Ok(StreamReader::try_new(BufReader::new(OffsetReader::new(reader, messages)), None).unwrap())
}

pub fn reverse(rb: &RecordBatch) -> RecordBatch {
Expand Down

0 comments on commit dad3fe3

Please sign in to comment.