From 23e3f8375edd5bfc61e2c417e3a9613249de0d58 Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Wed, 16 Oct 2024 09:44:09 -0400 Subject: [PATCH] Add parquet metadata func --- Cargo.lock | 193 ++++++++++++- Cargo.toml | 1 + .../datafusion-functions-parquet/Cargo.toml | 19 ++ .../datafusion-functions-parquet/src/lib.rs | 261 ++++++++++++++++++ src/execution/local.rs | 6 + src/execution/mod.rs | 30 -- 6 files changed, 465 insertions(+), 45 deletions(-) create mode 100644 crates/datafusion-functions-parquet/Cargo.toml create mode 100644 crates/datafusion-functions-parquet/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 4d694d3..88e6964 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -23,6 +23,12 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" +[[package]] +name = "adler32" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aae1277d39aeec15cb388266ecc24b11c80469deae6067e17a1a7aa9e5c1f234" + [[package]] name = "ahash" version = "0.8.11" @@ -137,6 +143,34 @@ version = "1.0.89" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "86fdf8605db99b54d3cd748a44c6d04df638eb5dafb219b135d0149bd0db01f6" +[[package]] +name = "apache-avro" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ceb7c683b2f8f40970b70e39ff8be514c95b96fcb9c4af87e1ed2cb2e10801a0" +dependencies = [ + "bzip2", + "crc32fast", + "digest", + "lazy_static", + "libflate", + "log", + "num-bigint", + "quad-rand", + "rand", + "regex-lite", + "serde", + "serde_json", + "snap", + "strum 0.25.0", + "strum_macros 0.25.3", + "thiserror", + "typed-builder", + "uuid", + "xz2", + "zstd 0.12.4", +] + [[package]] name = "arrayref" version = "0.3.9" @@ -427,8 +461,8 @@ dependencies = [ "pin-project-lite", "tokio", "xz2", - "zstd", - "zstd-safe", + "zstd 0.13.2", + "zstd-safe 7.2.1", ] [[package]] @@ -837,8 +871,8 @@ version = "7.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b34115915337defe99b2aff5c2ce6771e5fbc4079f4b506301f5cf394c8452f7" dependencies = [ - "strum", - "strum_macros", + "strum 0.26.3", + "strum_macros 0.26.4", "unicode-width", ] @@ -910,6 +944,15 @@ version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" +[[package]] +name = "core2" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b49ba7ef1ad6107f8824dbe97de947cbaac53c44e7f9756a1fba0d37c1eec505" +dependencies = [ + "memchr", +] + [[package]] name = "cpufeatures" version = "0.2.14" @@ -997,6 +1040,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "dary_heap" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7762d17f1241643615821a8455a0b2c3e803784b058693d990b11f2dce25a0ca" + [[package]] name = "dashmap" version = "6.1.0" @@ -1018,6 +1067,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e4fd4a99fc70d40ef7e52b243b4a399c3f8d353a40d5ecb200deee05e49c61bb" dependencies = [ "ahash", + "apache-avro", "arrow", "arrow-array", "arrow-ipc", @@ -1050,6 +1100,7 @@ dependencies = [ "indexmap 2.5.0", "itertools 0.12.1", "log", + "num-traits", "num_cpus", "object_store", "parking_lot", @@ -1064,7 +1115,7 @@ dependencies = [ "url", "uuid", "xz2", - "zstd", + "zstd 0.13.2", ] [[package]] @@ -1088,6 +1139,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "44fdbc877e3e40dcf88cc8f283d9f5c8851f0a3aa07fee657b1b75ac1ad49b9c" dependencies = [ "ahash", + "apache-avro", "arrow", "arrow-array", "arrow-buffer", @@ -1148,8 +1200,8 @@ dependencies = [ "paste", "serde_json", "sqlparser 0.49.0", - "strum", - "strum_macros", + "strum 0.26.3", + "strum_macros 0.26.4", ] [[package]] @@ -1235,6 +1287,16 @@ dependencies = [ "rand", ] +[[package]] +name = "datafusion-functions-parquet" +version = "0.1.0" +dependencies = [ + "arrow", + "async-trait", + "datafusion", + "parquet", +] + [[package]] name = "datafusion-optimizer" version = "41.0.0" @@ -1388,7 +1450,7 @@ dependencies = [ "log", "regex", "sqlparser 0.49.0", - "strum", + "strum 0.26.3", ] [[package]] @@ -1417,7 +1479,7 @@ dependencies = [ "rustc_version", "serde", "serde_json", - "strum", + "strum 0.26.3", "thiserror", "tracing", "url", @@ -1525,6 +1587,7 @@ dependencies = [ "datafusion", "datafusion-common", "datafusion-functions-json", + "datafusion-functions-parquet", "deltalake", "directories", "env_logger", @@ -1541,7 +1604,7 @@ dependencies = [ "prost", "ratatui", "serde", - "strum", + "strum 0.26.3", "tempfile", "tokio", "tokio-stream", @@ -2374,6 +2437,30 @@ version = "0.2.158" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d8adc4bb1803a324070e64a98ae98f38934d91957a99cfb3a43dcbc01bc56439" +[[package]] +name = "libflate" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "45d9dfdc14ea4ef0900c1cddbc8dcd553fbaacd8a4a282cf4018ae9dd04fb21e" +dependencies = [ + "adler32", + "core2", + "crc32fast", + "dary_heap", + "libflate_lz77", +] + +[[package]] +name = "libflate_lz77" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6e0d73b369f386f1c44abd9c570d5318f55ccde816ff4b562fa452e5182863d" +dependencies = [ + "core2", + "hashbrown 0.14.5", + "rle-decode-fast", +] + [[package]] name = "libm" version = "0.2.8" @@ -2745,7 +2832,7 @@ dependencies = [ "thrift", "tokio", "twox-hash", - "zstd", + "zstd 0.13.2", "zstd-sys", ] @@ -2936,6 +3023,12 @@ dependencies = [ "prost", ] +[[package]] +name = "quad-rand" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b76f1009795ca44bb5aaae8fd3f18953e209259c33d9b059b1f53d58ab7511db" + [[package]] name = "quick-xml" version = "0.36.1" @@ -3053,8 +3146,8 @@ dependencies = [ "itertools 0.13.0", "lru", "paste", - "strum", - "strum_macros", + "strum 0.26.3", + "strum_macros 0.26.4", "unicode-segmentation", "unicode-truncate", "unicode-width", @@ -3103,6 +3196,12 @@ dependencies = [ "regex-syntax", ] +[[package]] +name = "regex-lite" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53a49587ad06b26609c52e423de037e7f57f20d53535d66e08c695f347df952a" + [[package]] name = "regex-syntax" version = "0.8.4" @@ -3169,6 +3268,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rle-decode-fast" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3582f63211428f83597b51b2ddb88e2a91a9d52d12831f9d08f5e624e8977422" + [[package]] name = "roaring" version = "0.10.6" @@ -3573,13 +3678,32 @@ version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" +[[package]] +name = "strum" +version = "0.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "290d54ea6f91c969195bdbcd7442c8c2a2ba87da8bf60a7ee86a235d4bc1e125" + [[package]] name = "strum" version = "0.26.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8fec0f0aef304996cf250b31b5a10dee7980c85da9d759361292b8bca5a18f06" dependencies = [ - "strum_macros", + "strum_macros 0.26.4", +] + +[[package]] +name = "strum_macros" +version = "0.25.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23dc1fa9ac9c169a78ba62f0b841814b7abae11bdd047b9c58f893439e309ea0" +dependencies = [ + "heck 0.4.1", + "proc-macro2", + "quote", + "rustversion", + "syn 2.0.77", ] [[package]] @@ -4007,6 +4131,26 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "typed-builder" +version = "0.16.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34085c17941e36627a879208083e25d357243812c30e7d7387c3b954f30ade16" +dependencies = [ + "typed-builder-macro", +] + +[[package]] +name = "typed-builder-macro" +version = "0.16.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f03ca4cb38206e2bef0700092660bb74d696f808514dae47fa1467cbfe26e96e" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.77", +] + [[package]] name = "typenum" version = "1.17.0" @@ -4521,13 +4665,32 @@ version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ced3678a2879b30306d323f4542626697a464a97c0a07c9aebf7ebca65cd4dde" +[[package]] +name = "zstd" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a27595e173641171fc74a1232b7b1c7a7cb6e18222c11e9dfb9888fa424c53c" +dependencies = [ + "zstd-safe 6.0.6", +] + [[package]] name = "zstd" version = "0.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fcf2b778a664581e31e389454a7072dab1647606d44f7feea22cd5abb9c9f3f9" dependencies = [ - "zstd-safe", + "zstd-safe 7.2.1", +] + +[[package]] +name = "zstd-safe" +version = "6.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee98ffd0b48ee95e6c5168188e44a54550b1564d9d530ee21d5f0eaed1069581" +dependencies = [ + "libc", + "zstd-sys", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 25ebeb5..07dabc2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ crossterm = { version = "0.28.1", features = ["event-stream"] } datafusion = "41.0.0" datafusion-common = "41.0.0" datafusion-functions-json = { version = "0.41.0", optional = true } +datafusion-functions-parquet = { version = "0.1.0", path = "crates/datafusion-functions-parquet" } deltalake = { version = "0.19.0", features = ["datafusion"], optional = true } directories = "5.0.1" env_logger = "0.11.5" diff --git a/crates/datafusion-functions-parquet/Cargo.toml b/crates/datafusion-functions-parquet/Cargo.toml new file mode 100644 index 0000000..f2c247e --- /dev/null +++ b/crates/datafusion-functions-parquet/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "datafusion-functions-parquet" +version = "0.1.0" +edition = "2021" + +[dependencies] +arrow = { version = "52.2.0" } +async-trait = "0.1.41" +datafusion = { version = "41.0.0", features = [ + "avro", + "crypto_expressions", + "datetime_expressions", + "encoding_expressions", + "parquet", + "regex_expressions", + "unicode_expressions", + "compression", +] } +parquet = { version = "52.2.0", default-features = false } diff --git a/crates/datafusion-functions-parquet/src/lib.rs b/crates/datafusion-functions-parquet/src/lib.rs new file mode 100644 index 0000000..2708861 --- /dev/null +++ b/crates/datafusion-functions-parquet/src/lib.rs @@ -0,0 +1,261 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{Int64Array, StringArray}; +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use arrow::record_batch::RecordBatch; +use async_trait::async_trait; + +use datafusion::catalog::Session; +use datafusion::common::{plan_err, Column}; +use datafusion::datasource::function::TableFunctionImpl; +use datafusion::datasource::TableProvider; +use datafusion::error::Result; +use datafusion::logical_expr::Expr; +use datafusion::physical_plan::memory::MemoryExec; +use datafusion::physical_plan::ExecutionPlan; +use datafusion::scalar::ScalarValue; +use parquet::basic::ConvertedType; +use parquet::file::reader::FileReader; +use parquet::file::serialized_reader::SerializedFileReader; +use parquet::file::statistics::Statistics; +use std::fs::File; +use std::sync::Arc; + +/// PARQUET_META table function +struct ParquetMetadataTable { + schema: SchemaRef, + batch: RecordBatch, +} + +#[async_trait] +impl TableProvider for ParquetMetadataTable { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn schema(&self) -> arrow::datatypes::SchemaRef { + self.schema.clone() + } + + fn table_type(&self) -> datafusion::logical_expr::TableType { + datafusion::logical_expr::TableType::Base + } + + async fn scan( + &self, + _state: &dyn Session, + projection: Option<&Vec>, + _filters: &[Expr], + _limit: Option, + ) -> Result> { + Ok(Arc::new(MemoryExec::try_new( + &[vec![self.batch.clone()]], + TableProvider::schema(self), + projection.cloned(), + )?)) + } +} + +fn convert_parquet_statistics( + value: &Statistics, + converted_type: ConvertedType, +) -> (String, String) { + match (value, converted_type) { + (Statistics::Boolean(val), _) => (val.min().to_string(), val.max().to_string()), + (Statistics::Int32(val), _) => (val.min().to_string(), val.max().to_string()), + (Statistics::Int64(val), _) => (val.min().to_string(), val.max().to_string()), + (Statistics::Int96(val), _) => (val.min().to_string(), val.max().to_string()), + (Statistics::Float(val), _) => (val.min().to_string(), val.max().to_string()), + (Statistics::Double(val), _) => (val.min().to_string(), val.max().to_string()), + (Statistics::ByteArray(val), ConvertedType::UTF8) => { + let min_bytes = val.min(); + let max_bytes = val.max(); + let min = min_bytes + .as_utf8() + .map(|v| v.to_string()) + .unwrap_or_else(|_| min_bytes.to_string()); + + let max = max_bytes + .as_utf8() + .map(|v| v.to_string()) + .unwrap_or_else(|_| max_bytes.to_string()); + (min, max) + } + (Statistics::ByteArray(val), _) => (val.min().to_string(), val.max().to_string()), + (Statistics::FixedLenByteArray(val), ConvertedType::UTF8) => { + let min_bytes = val.min(); + let max_bytes = val.max(); + let min = min_bytes + .as_utf8() + .map(|v| v.to_string()) + .unwrap_or_else(|_| min_bytes.to_string()); + + let max = max_bytes + .as_utf8() + .map(|v| v.to_string()) + .unwrap_or_else(|_| max_bytes.to_string()); + (min, max) + } + (Statistics::FixedLenByteArray(val), _) => (val.min().to_string(), val.max().to_string()), + } +} + +pub struct ParquetMetadataFunc {} + +impl TableFunctionImpl for ParquetMetadataFunc { + fn call(&self, exprs: &[Expr]) -> Result> { + let filename = match exprs.first() { + Some(Expr::Literal(ScalarValue::Utf8(Some(s)))) => s, // single quote: parquet_metadata('x.parquet') + Some(Expr::Column(Column { name, .. })) => name, // double quote: parquet_metadata("x.parquet") + _ => { + return plan_err!("parquet_metadata requires string argument as its input"); + } + }; + + let file = File::open(filename.clone())?; + let reader = SerializedFileReader::new(file)?; + let metadata = reader.metadata(); + + let schema = Arc::new(Schema::new(vec![ + Field::new("filename", DataType::Utf8, true), + Field::new("row_group_id", DataType::Int64, true), + Field::new("row_group_num_rows", DataType::Int64, true), + Field::new("row_group_num_columns", DataType::Int64, true), + Field::new("row_group_bytes", DataType::Int64, true), + Field::new("column_id", DataType::Int64, true), + Field::new("file_offset", DataType::Int64, true), + Field::new("num_values", DataType::Int64, true), + Field::new("path_in_schema", DataType::Utf8, true), + Field::new("type", DataType::Utf8, true), + Field::new("stats_min", DataType::Utf8, true), + Field::new("stats_max", DataType::Utf8, true), + Field::new("stats_null_count", DataType::Int64, true), + Field::new("stats_distinct_count", DataType::Int64, true), + Field::new("stats_min_value", DataType::Utf8, true), + Field::new("stats_max_value", DataType::Utf8, true), + Field::new("compression", DataType::Utf8, true), + Field::new("encodings", DataType::Utf8, true), + Field::new("index_page_offset", DataType::Int64, true), + Field::new("dictionary_page_offset", DataType::Int64, true), + Field::new("data_page_offset", DataType::Int64, true), + Field::new("total_compressed_size", DataType::Int64, true), + Field::new("total_uncompressed_size", DataType::Int64, true), + ])); + + // construct recordbatch from metadata + let mut filename_arr = vec![]; + let mut row_group_id_arr = vec![]; + let mut row_group_num_rows_arr = vec![]; + let mut row_group_num_columns_arr = vec![]; + let mut row_group_bytes_arr = vec![]; + let mut column_id_arr = vec![]; + let mut file_offset_arr = vec![]; + let mut num_values_arr = vec![]; + let mut path_in_schema_arr = vec![]; + let mut type_arr = vec![]; + let mut stats_min_arr = vec![]; + let mut stats_max_arr = vec![]; + let mut stats_null_count_arr = vec![]; + let mut stats_distinct_count_arr = vec![]; + let mut stats_min_value_arr = vec![]; + let mut stats_max_value_arr = vec![]; + let mut compression_arr = vec![]; + let mut encodings_arr = vec![]; + let mut index_page_offset_arr = vec![]; + let mut dictionary_page_offset_arr = vec![]; + let mut data_page_offset_arr = vec![]; + let mut total_compressed_size_arr = vec![]; + let mut total_uncompressed_size_arr = vec![]; + for (rg_idx, row_group) in metadata.row_groups().iter().enumerate() { + for (col_idx, column) in row_group.columns().iter().enumerate() { + filename_arr.push(filename.clone()); + row_group_id_arr.push(rg_idx as i64); + row_group_num_rows_arr.push(row_group.num_rows()); + row_group_num_columns_arr.push(row_group.num_columns() as i64); + row_group_bytes_arr.push(row_group.total_byte_size()); + column_id_arr.push(col_idx as i64); + file_offset_arr.push(column.file_offset()); + num_values_arr.push(column.num_values()); + path_in_schema_arr.push(column.column_path().to_string()); + type_arr.push(column.column_type().to_string()); + let converted_type = column.column_descr().converted_type(); + + if let Some(s) = column.statistics() { + let (min_val, max_val) = if s.has_min_max_set() { + let (min_val, max_val) = convert_parquet_statistics(s, converted_type); + (Some(min_val), Some(max_val)) + } else { + (None, None) + }; + stats_min_arr.push(min_val.clone()); + stats_max_arr.push(max_val.clone()); + stats_null_count_arr.push(Some(s.null_count() as i64)); + stats_distinct_count_arr.push(s.distinct_count().map(|c| c as i64)); + stats_min_value_arr.push(min_val); + stats_max_value_arr.push(max_val); + } else { + stats_min_arr.push(None); + stats_max_arr.push(None); + stats_null_count_arr.push(None); + stats_distinct_count_arr.push(None); + stats_min_value_arr.push(None); + stats_max_value_arr.push(None); + }; + compression_arr.push(format!("{:?}", column.compression())); + encodings_arr.push(format!("{:?}", column.encodings())); + index_page_offset_arr.push(column.index_page_offset()); + dictionary_page_offset_arr.push(column.dictionary_page_offset()); + data_page_offset_arr.push(column.data_page_offset()); + total_compressed_size_arr.push(column.compressed_size()); + total_uncompressed_size_arr.push(column.uncompressed_size()); + } + } + + let rb = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(StringArray::from(filename_arr)), + Arc::new(Int64Array::from(row_group_id_arr)), + Arc::new(Int64Array::from(row_group_num_rows_arr)), + Arc::new(Int64Array::from(row_group_num_columns_arr)), + Arc::new(Int64Array::from(row_group_bytes_arr)), + Arc::new(Int64Array::from(column_id_arr)), + Arc::new(Int64Array::from(file_offset_arr)), + Arc::new(Int64Array::from(num_values_arr)), + Arc::new(StringArray::from(path_in_schema_arr)), + Arc::new(StringArray::from(type_arr)), + Arc::new(StringArray::from(stats_min_arr)), + Arc::new(StringArray::from(stats_max_arr)), + Arc::new(Int64Array::from(stats_null_count_arr)), + Arc::new(Int64Array::from(stats_distinct_count_arr)), + Arc::new(StringArray::from(stats_min_value_arr)), + Arc::new(StringArray::from(stats_max_value_arr)), + Arc::new(StringArray::from(compression_arr)), + Arc::new(StringArray::from(encodings_arr)), + Arc::new(Int64Array::from(index_page_offset_arr)), + Arc::new(Int64Array::from(dictionary_page_offset_arr)), + Arc::new(Int64Array::from(data_page_offset_arr)), + Arc::new(Int64Array::from(total_compressed_size_arr)), + Arc::new(Int64Array::from(total_uncompressed_size_arr)), + ], + )?; + + let parquet_metadata = ParquetMetadataTable { schema, batch: rb }; + Ok(Arc::new(parquet_metadata)) + } +} diff --git a/src/execution/local.rs b/src/execution/local.rs index 7a01a14..deb65c8 100644 --- a/src/execution/local.rs +++ b/src/execution/local.rs @@ -90,6 +90,12 @@ impl ExecutionContext { extension.register_on_ctx(config, &mut session_ctx)?; } + // Register Parquet Metadata Function + session_ctx.register_udtf( + "parquet_metadata", + Arc::new(datafusion_functions_parquet::ParquetMetadataFunc {}), + ); + Ok(Self { config: config.clone(), session_ctx, diff --git a/src/execution/mod.rs b/src/execution/mod.rs index 09eda4d..863367d 100644 --- a/src/execution/mod.rs +++ b/src/execution/mod.rs @@ -32,12 +32,6 @@ use crate::config::AppConfig; use color_eyre::eyre::Result; use datafusion::prelude::*; -// #[cfg(feature = "flightsql")] -// use { -// arrow_flight::sql::client::FlightSqlServiceClient, tokio::sync::Mutex, -// tonic::transport::Channel, -// }; - pub enum AppType { Cli, Tui, @@ -58,7 +52,6 @@ impl AppExecution { context, #[cfg(feature = "flightsql")] flightsql_context: FlightSQLContext::default(), - // flightsql_client: Mutex::new(None), } } @@ -95,27 +88,4 @@ impl AppExecution { pub fn with_flightsql_ctx(&mut self, flightsql_ctx: FlightSQLContext) { self.flightsql_context = flightsql_ctx; } - - // Create FlightSQL client from users FlightSQL config - // #[cfg(feature = "flightsql")] - // pub async fn create_flightsql_client(&self, config: FlightSQLConfig) -> Result<()> { - // use color_eyre::eyre::eyre; - // use log::info; - // - // let url = Box::leak(config.connection_url.into_boxed_str()); - // info!("Connecting to FlightSQL host: {}", url); - // let channel = Channel::from_static(url).connect().await; - // match channel { - // Ok(c) => { - // let client = FlightSqlServiceClient::new(c); - // let mut guard = self.flightsql_context.client().lock().await; - // *guard = Some(client); - // Ok(()) - // } - // Err(e) => Err(eyre!( - // "Error creating channel for FlightSQL client: {:?}", - // e - // )), - // } - // } }