From 2a7768d47a9a2de2a5e3ea281c2a57717311fe5c Mon Sep 17 00:00:00 2001 From: GoHalo Date: Fri, 30 Aug 2024 20:32:29 +0800 Subject: [PATCH 1/3] refact: define hudi error types for hudi-core crate --- Cargo.toml | 1 + crates/core/Cargo.toml | 2 +- crates/core/src/config/internal.rs | 16 +++- crates/core/src/config/mod.rs | 3 +- crates/core/src/config/read.rs | 21 +++-- crates/core/src/config/table.rs | 67 +++++++++++---- crates/core/src/config/utils.rs | 7 +- crates/core/src/file_group/mod.rs | 21 +++-- crates/core/src/file_group/reader.rs | 2 +- crates/core/src/lib.rs | 46 +++++++++++ crates/core/src/storage/mod.rs | 66 ++++++++------- crates/core/src/storage/utils.rs | 37 ++++++--- crates/core/src/table/builder.rs | 23 +++--- crates/core/src/table/fs_view.rs | 7 +- crates/core/src/table/mod.rs | 15 ++-- crates/core/src/table/partition.rs | 32 ++++---- crates/core/src/table/timeline.rs | 30 +++---- python/src/internal.rs | 117 ++++++++++++++++++++------- 18 files changed, 353 insertions(+), 160 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b8dd7224..a590f335 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -61,6 +61,7 @@ serde = { version = "1.0.203", features = ["derive"] } serde_json = { version = "1" } # "stdlib" +thiserror = { version = "2.0.3" } anyhow = { version = "1.0.86" } bytes = { version = "1" } paste = { version = "1.0.15" } diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 90fae7dc..d7c9d06e 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -48,7 +48,7 @@ serde = { workspace = true } serde_json = { workspace = true } # "stdlib" -anyhow = { workspace = true } +thiserror = { workspace = true } bytes = { workspace = true } paste = { workspace = true } strum = { workspace = true } diff --git a/crates/core/src/config/internal.rs b/crates/core/src/config/internal.rs index 89fd3149..8174cb32 100644 --- a/crates/core/src/config/internal.rs +++ b/crates/core/src/config/internal.rs @@ -21,10 +21,13 @@ use std::collections::HashMap; use std::str::FromStr; -use anyhow::{anyhow, Result}; use strum_macros::EnumIter; -use crate::config::{ConfigParser, HudiConfigValue}; +use crate::{ + config::{ConfigParser, HudiConfigValue}, + Error::{ConfNotFound, InvalidConf}, + Result, +}; /// Configurations for internal use. /// @@ -64,11 +67,16 @@ impl ConfigParser for HudiInternalConfig { let get_result = configs .get(self.as_ref()) .map(|v| v.as_str()) - .ok_or(anyhow!("Config '{}' not found", self.as_ref())); + .ok_or(ConfNotFound(self.as_ref().to_string())); match self { Self::SkipConfigValidation => get_result - .and_then(|v| bool::from_str(v).map_err(|e| anyhow!(e))) + .and_then(|v| { + bool::from_str(v).map_err(|e| InvalidConf { + item: Self::SkipConfigValidation.as_ref(), + source: Box::new(e), + }) + }) .map(HudiConfigValue::Boolean), } } diff --git a/crates/core/src/config/mod.rs b/crates/core/src/config/mod.rs index 7b557b4d..f586a896 100644 --- a/crates/core/src/config/mod.rs +++ b/crates/core/src/config/mod.rs @@ -21,8 +21,7 @@ use std::any::type_name; use std::collections::HashMap; use std::sync::Arc; -use crate::storage::utils::parse_uri; -use anyhow::Result; +use crate::{storage::utils::parse_uri, Result}; use serde::{Deserialize, Serialize}; use url::Url; diff --git a/crates/core/src/config/read.rs b/crates/core/src/config/read.rs index e67617dd..c6c7de61 100644 --- a/crates/core/src/config/read.rs +++ b/crates/core/src/config/read.rs @@ -21,10 +21,14 @@ use std::collections::HashMap; use std::str::FromStr; -use crate::config::{ConfigParser, HudiConfigValue}; -use anyhow::{anyhow, Result}; use strum_macros::EnumIter; +use crate::{ + config::{ConfigParser, HudiConfigValue}, + Error::{ConfNotFound, InvalidConf}, + Result, +}; + /// Configurations for reading Hudi tables. /// /// **Example** @@ -37,6 +41,7 @@ use strum_macros::EnumIter; /// HudiTable::new_with_options("/tmp/hudi_data", options) /// ``` /// + #[derive(Clone, Debug, PartialEq, Eq, Hash, EnumIter)] pub enum HudiReadConfig { /// Define input splits @@ -74,11 +79,16 @@ impl ConfigParser for HudiReadConfig { let get_result = configs .get(self.as_ref()) .map(|v| v.as_str()) - .ok_or(anyhow!("Config '{}' not found", self.as_ref())); + .ok_or(ConfNotFound(self.as_ref().to_string())); match self { Self::InputPartitions => get_result - .and_then(|v| usize::from_str(v).map_err(|e| anyhow!(e))) + .and_then(|v| { + usize::from_str(v).map_err(|e| InvalidConf { + item: Self::InputPartitions.as_ref(), + source: Box::new(e), + }) + }) .map(HudiConfigValue::UInteger), Self::AsOfTimestamp => get_result.map(|v| HudiConfigValue::String(v.to_string())), } @@ -90,7 +100,6 @@ mod tests { use crate::config::read::HudiReadConfig::InputPartitions; use crate::config::ConfigParser; use std::collections::HashMap; - use std::num::ParseIntError; #[test] fn parse_valid_config_value() { @@ -103,7 +112,7 @@ mod tests { fn parse_invalid_config_value() { let options = HashMap::from([(InputPartitions.as_ref().to_string(), "foo".to_string())]); let value = InputPartitions.parse_value(&options); - assert!(value.err().unwrap().is::()); + assert!(value.is_err()); assert_eq!( InputPartitions .parse_value_or_default(&options) diff --git a/crates/core/src/config/table.rs b/crates/core/src/config/table.rs index 98cca7d8..196f11f5 100644 --- a/crates/core/src/config/table.rs +++ b/crates/core/src/config/table.rs @@ -21,11 +21,13 @@ use std::collections::HashMap; use std::str::FromStr; -use anyhow::anyhow; -use anyhow::Result; use strum_macros::{AsRefStr, EnumIter}; -use crate::config::{ConfigParser, HudiConfigValue}; +use crate::{ + config::{ConfigParser, HudiConfigValue}, + Error::{self, ConfNotFound, InvalidConf, Unsupported}, + Result, +}; /// Configurations for Hudi tables, most of them are persisted in `hoodie.properties`. /// @@ -146,7 +148,7 @@ impl ConfigParser for HudiTableConfig { let get_result = configs .get(self.as_ref()) .map(|v| v.as_str()) - .ok_or(anyhow!("Config '{}' not found", self.as_ref())); + .ok_or(ConfNotFound(self.as_ref().to_string())); match self { Self::BaseFileFormat => get_result @@ -154,24 +156,49 @@ impl ConfigParser for HudiTableConfig { .map(|v| HudiConfigValue::String(v.as_ref().to_string())), Self::BasePath => get_result.map(|v| HudiConfigValue::String(v.to_string())), Self::Checksum => get_result - .and_then(|v| isize::from_str(v).map_err(|e| anyhow!(e))) + .and_then(|v| { + isize::from_str(v).map_err(|e| InvalidConf { + item: Self::Checksum.as_ref(), + source: Box::new(e), + }) + }) .map(HudiConfigValue::Integer), Self::DatabaseName => get_result.map(|v| HudiConfigValue::String(v.to_string())), Self::DropsPartitionFields => get_result - .and_then(|v| bool::from_str(v).map_err(|e| anyhow!(e))) + .and_then(|v| { + bool::from_str(v).map_err(|e| InvalidConf { + item: Self::DropsPartitionFields.as_ref(), + source: Box::new(e), + }) + }) .map(HudiConfigValue::Boolean), Self::IsHiveStylePartitioning => get_result - .and_then(|v| bool::from_str(v).map_err(|e| anyhow!(e))) + .and_then(|v| { + bool::from_str(v).map_err(|e| InvalidConf { + item: Self::IsHiveStylePartitioning.as_ref(), + source: Box::new(e), + }) + }) .map(HudiConfigValue::Boolean), Self::IsPartitionPathUrlencoded => get_result - .and_then(|v| bool::from_str(v).map_err(|e| anyhow!(e))) + .and_then(|v| { + bool::from_str(v).map_err(|e| InvalidConf { + item: Self::IsPartitionPathUrlencoded.as_ref(), + source: Box::new(e), + }) + }) .map(HudiConfigValue::Boolean), Self::KeyGeneratorClass => get_result.map(|v| HudiConfigValue::String(v.to_string())), Self::PartitionFields => get_result .map(|v| HudiConfigValue::List(v.split(',').map(str::to_string).collect())), Self::PrecombineField => get_result.map(|v| HudiConfigValue::String(v.to_string())), Self::PopulatesMetaFields => get_result - .and_then(|v| bool::from_str(v).map_err(|e| anyhow!(e))) + .and_then(|v| { + bool::from_str(v).map_err(|e| InvalidConf { + item: Self::PopulatesMetaFields.as_ref(), + source: Box::new(e), + }) + }) .map(HudiConfigValue::Boolean), Self::RecordKeyFields => get_result .map(|v| HudiConfigValue::List(v.split(',').map(str::to_string).collect())), @@ -180,10 +207,20 @@ impl ConfigParser for HudiTableConfig { .and_then(TableTypeValue::from_str) .map(|v| HudiConfigValue::String(v.as_ref().to_string())), Self::TableVersion => get_result - .and_then(|v| isize::from_str(v).map_err(|e| anyhow!(e))) + .and_then(|v| { + isize::from_str(v).map_err(|e| InvalidConf { + item: Self::TableVersion.as_ref(), + source: Box::new(e), + }) + }) .map(HudiConfigValue::Integer), Self::TimelineLayoutVersion => get_result - .and_then(|v| isize::from_str(v).map_err(|e| anyhow!(e))) + .and_then(|v| { + isize::from_str(v).map_err(|e| InvalidConf { + item: Self::TimelineLayoutVersion.as_ref(), + source: Box::new(e), + }) + }) .map(HudiConfigValue::Integer), } } @@ -199,13 +236,13 @@ pub enum TableTypeValue { } impl FromStr for TableTypeValue { - type Err = anyhow::Error; + type Err = Error; fn from_str(s: &str) -> Result { match s.to_ascii_lowercase().as_str() { "copy_on_write" | "copy-on-write" | "cow" => Ok(Self::CopyOnWrite), "merge_on_read" | "merge-on-read" | "mor" => Ok(Self::MergeOnRead), - _ => Err(anyhow!("Unsupported table type: {}", s)), + v => Err(Unsupported(format!("unsupported table type {}", v))), } } } @@ -218,12 +255,12 @@ pub enum BaseFileFormatValue { } impl FromStr for BaseFileFormatValue { - type Err = anyhow::Error; + type Err = Error; fn from_str(s: &str) -> Result { match s.to_ascii_lowercase().as_str() { "parquet" => Ok(Self::Parquet), - _ => Err(anyhow!("Unsupported base file format: {}", s)), + v => Err(Unsupported(format!("unsupported base file format {}", v))), } } } diff --git a/crates/core/src/config/utils.rs b/crates/core/src/config/utils.rs index 800a81d0..fe6ed4dc 100644 --- a/crates/core/src/config/utils.rs +++ b/crates/core/src/config/utils.rs @@ -18,11 +18,12 @@ */ //! Config utilities. -use anyhow::{Context, Result}; use bytes::Bytes; use std::collections::HashMap; use std::io::{BufRead, BufReader, Cursor}; +use crate::{Error, Result}; + /// Returns an empty iterator to represent an empty set of options. pub fn empty_options<'a>() -> std::iter::Empty<(&'a str, &'a str)> { std::iter::empty::<(&str, &str)>() @@ -57,7 +58,7 @@ pub fn parse_data_for_options(data: &Bytes, split_chars: &str) -> Result Result Result<(String, String)> { let err_msg = format!("Failed to parse file name '{}' for base file.", file_name); - let (name, _) = file_name.rsplit_once('.').ok_or(anyhow!(err_msg.clone()))?; + let (name, _) = file_name + .rsplit_once('.') + .ok_or(Internal(err_msg.clone()))?; let parts: Vec<&str> = name.split('_').collect(); - let file_group_id = parts.first().ok_or(anyhow!(err_msg.clone()))?.to_string(); - let commit_time = parts.get(2).ok_or(anyhow!(err_msg.clone()))?.to_string(); + let file_group_id = parts + .first() + .ok_or(Internal(err_msg.clone()))? + .to_string(); + let commit_time = parts + .get(2) + .ok_or(Internal(err_msg.clone()))? + .to_string(); Ok((file_group_id, commit_time)) } @@ -189,11 +196,11 @@ impl FileGroup { pub fn add_base_file(&mut self, base_file: BaseFile) -> Result<&Self> { let commit_time = base_file.commit_time.as_str(); if self.file_slices.contains_key(commit_time) { - Err(anyhow!( + Err(Internal(format!( "Commit time {0} is already present in File Group {1}", commit_time.to_owned(), self.id, - )) + ))) } else { self.file_slices.insert( commit_time.to_owned(), diff --git a/crates/core/src/file_group/reader.rs b/crates/core/src/file_group/reader.rs index 1c7d748d..eeb283ae 100644 --- a/crates/core/src/file_group/reader.rs +++ b/crates/core/src/file_group/reader.rs @@ -21,7 +21,7 @@ use crate::config::utils::split_hudi_options_from_others; use crate::config::HudiConfigs; use crate::file_group::FileSlice; use crate::storage::Storage; -use anyhow::Result; +use crate::Result; use arrow_array::RecordBatch; use std::sync::Arc; diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 3190dec6..ece11cd7 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -48,3 +48,49 @@ pub mod file_group; pub mod storage; pub mod table; pub mod util; + +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum Error { + #[error("Config '{0}' not found")] + ConfNotFound(String), + + #[error("Invalid config item '{item}', {source:?}")] + InvalidConf { + item: &'static str, + source: Box, + }, + + #[error("Parse url '{url}' failed, {source}")] + UrlParse { + url: String, + source: url::ParseError, + }, + + #[error("Invalid file path '{name}', {detail}")] + InvalidPath { name: String, detail: String }, + + #[error("{0}")] + Unsupported(String), + + #[error("{0}")] + Internal(String), + + #[error(transparent)] + Utf8Error(#[from] std::str::Utf8Error), + + #[error(transparent)] + Store(#[from] object_store::Error), + + #[error(transparent)] + StorePath(#[from] object_store::path::Error), + + #[error(transparent)] + Parquet(#[from] parquet::errors::ParquetError), + + #[error(transparent)] + Arrow(#[from] arrow::error::ArrowError), +} + +type Result = std::result::Result; diff --git a/crates/core/src/storage/mod.rs b/crates/core/src/storage/mod.rs index 9a4aae14..3d15ad41 100644 --- a/crates/core/src/storage/mod.rs +++ b/crates/core/src/storage/mod.rs @@ -22,11 +22,6 @@ use std::collections::HashMap; use std::path::PathBuf; use std::sync::Arc; -use crate::config::table::HudiTableConfig; -use crate::config::HudiConfigs; -use crate::storage::file_info::FileInfo; -use crate::storage::utils::join_url_segments; -use anyhow::{anyhow, Context, Result}; use arrow::compute::concat_batches; use arrow::record_batch::RecordBatch; use async_recursion::async_recursion; @@ -39,6 +34,12 @@ use parquet::arrow::ParquetRecordBatchStreamBuilder; use parquet::file::metadata::ParquetMetaData; use url::Url; +use crate::config::table::HudiTableConfig; +use crate::config::HudiConfigs; +use crate::storage::file_info::FileInfo; +use crate::storage::utils::join_url_segments; +use crate::{Error, Result}; + pub mod file_info; pub mod file_stats; pub mod utils; @@ -60,10 +61,10 @@ impl Storage { hudi_configs: Arc, ) -> Result> { if !hudi_configs.contains(HudiTableConfig::BasePath) { - return Err(anyhow!( + return Err(Error::Internal(format!( "Failed to create storage: {} is required.", HudiTableConfig::BasePath.as_ref() - )); + ))); } let base_url = hudi_configs.get(HudiTableConfig::BasePath)?.to_url()?; @@ -75,7 +76,7 @@ impl Storage { options, hudi_configs, })), - Err(e) => Err(anyhow!("Failed to create storage: {}", e)), + Err(e) => Err(Error::Store(e)), } } @@ -100,15 +101,23 @@ impl Storage { runtime_env.register_object_store(self.base_url.as_ref(), self.object_store.clone()); } - #[cfg(test)] - async fn get_file_info(&self, relative_path: &str) -> Result { + fn get_relative_path(&self, relative_path: &str) -> Result<(Url, ObjPath)> { let obj_url = join_url_segments(&self.base_url, &[relative_path])?; let obj_path = ObjPath::from_url_path(obj_url.path())?; + Ok((obj_url, obj_path)) + } + + #[cfg(test)] + async fn get_file_info(&self, relative_path: &str) -> Result { + let (obj_url, obj_path) = self.get_relative_path(relative_path)?; let meta = self.object_store.head(&obj_path).await?; let uri = obj_url.to_string(); let name = obj_path .filename() - .ok_or(anyhow!("Failed to get file name for {}", obj_path))? + .ok_or(Error::InvalidPath { + name: obj_path.to_string(), + detail: "failed to get file name".to_string(), + })? .to_string(); Ok(FileInfo { uri, @@ -118,8 +127,7 @@ impl Storage { } pub async fn get_parquet_file_metadata(&self, relative_path: &str) -> Result { - let obj_url = join_url_segments(&self.base_url, &[relative_path])?; - let obj_path = ObjPath::from_url_path(obj_url.path())?; + let (_, obj_path) = self.get_relative_path(relative_path)?; let obj_store = self.object_store.clone(); let meta = obj_store.head(&obj_path).await?; let reader = ParquetObjectReader::new(obj_store, meta); @@ -128,8 +136,7 @@ impl Storage { } pub async fn get_file_data(&self, relative_path: &str) -> Result { - let obj_url = join_url_segments(&self.base_url, &[relative_path])?; - let obj_path = ObjPath::from_url_path(obj_url.path())?; + let (_, obj_path) = self.get_relative_path(relative_path)?; let result = self.object_store.get(&obj_path).await?; let bytes = result.bytes().await?; Ok(bytes) @@ -143,8 +150,7 @@ impl Storage { } pub async fn get_parquet_file_data(&self, relative_path: &str) -> Result { - let obj_url = join_url_segments(&self.base_url, &[relative_path])?; - let obj_path = ObjPath::from_url_path(obj_url.path())?; + let (_, obj_path) = self.get_relative_path(relative_path)?; let obj_store = self.object_store.clone(); let meta = obj_store.head(&obj_path).await?; @@ -156,16 +162,14 @@ impl Storage { let mut batches = Vec::new(); while let Some(r) = stream.next().await { - let batch = r.context("Failed to read record batch.")?; - batches.push(batch) + batches.push(r?) } if batches.is_empty() { return Ok(RecordBatch::new_empty(schema.clone())); } - concat_batches(&schema, &batches) - .map_err(|e| anyhow!("Failed to concat record batches: {}", e)) + Ok(concat_batches(&schema, &batches)?) } pub async fn list_dirs(&self, subdir: Option<&str>) -> Result> { @@ -174,7 +178,10 @@ impl Storage { for dir in dir_paths { dirs.push( dir.filename() - .ok_or(anyhow!("Failed to get file name for {}", dir))? + .ok_or(Error::InvalidPath { + name: dir.to_string(), + detail: "failed to get file name".to_string(), + })? .to_string(), ) } @@ -203,10 +210,10 @@ impl Storage { let name = obj_meta .location .filename() - .ok_or(anyhow!( - "Failed to get file name for {:?}", - obj_meta.location - ))? + .ok_or(Error::InvalidPath { + name: obj_meta.location.to_string(), + detail: "failed to get file name".to_string(), + })? .to_string(); let uri = join_url_segments(&prefix_url, &[&name])?.to_string(); file_info.push(FileInfo { @@ -241,9 +248,10 @@ pub async fn get_leaf_dirs(storage: &Storage, subdir: Option<&str>) -> Result Result<(String, String)> { let path = Path::new(filename); let stem = path .file_stem() .and_then(|s| s.to_str()) - .ok_or_else(|| anyhow!("No file stem found"))? + .ok_or_else(|| InvalidPath { + name: filename.to_string(), + detail: "no file stem found".to_string(), + })? .to_string(); let extension = path @@ -44,13 +49,20 @@ pub fn split_filename(filename: &str) -> Result<(String, String)> { /// Parses a URI string into a URL. pub fn parse_uri(uri: &str) -> Result { - let mut url = Url::parse(uri) - .or(Url::from_file_path(PathBuf::from_str(uri)?)) - .map_err(|_| anyhow!("Failed to parse uri: {}", uri))?; + let mut url = match Url::parse(uri) { + Ok(url) => url, + Err(source) => Url::from_directory_path(uri).map_err(|_| UrlParse { + url: uri.to_string(), + source, + })?, + }; if url.path().ends_with('/') { url.path_segments_mut() - .map_err(|_| anyhow!("Failed to parse uri: {}", uri))? + .map_err(|_| InvalidPath { + name: uri.to_string(), + detail: "parse uri failed".to_string(), + })? .pop(); } @@ -73,7 +85,10 @@ pub fn join_url_segments(base_url: &Url, segments: &[&str]) -> Result { for &seg in segments { let segs: Vec<_> = seg.split('/').filter(|&s| !s.is_empty()).collect(); url.path_segments_mut() - .map_err(|_| ParseError::RelativeUrlWithoutBase)? + .map_err(|_| UrlParse { + url: base_url.to_string(), + source: ParseError::RelativeUrlWithoutBase, + })? .extend(segs); } diff --git a/crates/core/src/table/builder.rs b/crates/core/src/table/builder.rs index c037a0c5..7b900b22 100644 --- a/crates/core/src/table/builder.rs +++ b/crates/core/src/table/builder.rs @@ -17,7 +17,6 @@ * under the License. */ -use anyhow::{anyhow, Context, Result}; use paste::paste; use std::collections::HashMap; use std::env; @@ -38,6 +37,7 @@ use crate::storage::Storage; use crate::table::fs_view::FileSystemView; use crate::table::timeline::Timeline; use crate::table::Table; +use crate::{Error, Result}; /// Builder for creating a [Table] instance. #[derive(Debug, Clone)] @@ -112,13 +112,10 @@ impl TableBuilder { let hudi_configs = Arc::from(hudi_configs); let storage_options = Arc::from(self.storage_options.clone()); - let timeline = Timeline::new(hudi_configs.clone(), storage_options.clone()) - .await - .context("Failed to load timeline")?; + let timeline = Timeline::new(hudi_configs.clone(), storage_options.clone()).await?; - let file_system_view = FileSystemView::new(hudi_configs.clone(), storage_options.clone()) - .await - .context("Failed to load file system view")?; + let file_system_view = + FileSystemView::new(hudi_configs.clone(), storage_options.clone()).await?; Ok(Table { hudi_configs, @@ -253,22 +250,26 @@ impl TableBuilder { // additional validation let table_type = hudi_configs.get(TableType)?.to::(); if TableTypeValue::from_str(&table_type)? != CopyOnWrite { - return Err(anyhow!("Only support copy-on-write table.")); + return Err(Error::Unsupported( + "Only support copy-on-write table.".to_string(), + )); } let table_version = hudi_configs.get(TableVersion)?.to::(); if !(5..=6).contains(&table_version) { - return Err(anyhow!("Only support table version 5 and 6.")); + return Err(Error::Unsupported( + "Only support table version 5 and 6.".to_string(), + )); } let drops_partition_cols = hudi_configs .get_or_default(DropsPartitionFields) .to::(); if drops_partition_cols { - return Err(anyhow!( + return Err(Error::Unsupported(format!( "Only support when `{}` is disabled", DropsPartitionFields.as_ref() - )); + ))); } Ok(()) diff --git a/crates/core/src/table/fs_view.rs b/crates/core/src/table/fs_view.rs index f2309e34..2976e761 100644 --- a/crates/core/src/table/fs_view.rs +++ b/crates/core/src/table/fs_view.rs @@ -24,10 +24,11 @@ use crate::config::HudiConfigs; use crate::file_group::{BaseFile, FileGroup, FileSlice}; use crate::storage::file_info::FileInfo; use crate::storage::{get_leaf_dirs, Storage}; + use crate::table::partition::PartitionPruner; -use anyhow::Result; +use crate::{Error, Result}; use dashmap::DashMap; -use futures::stream::{self, StreamExt, TryStreamExt}; +use futures::stream::{self, StreamExt}; /// A view of the Hudi table's data files (files stored outside the `.hoodie/` directory) in the file system. It provides APIs to load and /// access the file groups and file slices. @@ -134,7 +135,7 @@ impl FileSystemView { .map(|path| async move { let file_groups = Self::load_file_groups_for_partition(&self.storage, &path).await?; - Ok::<_, anyhow::Error>((path, file_groups)) + Ok::<_, Error>((path, file_groups)) }) // TODO parameterize the parallelism for partition loading .buffer_unordered(10) diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs index 4d69bbfe..a2fc7e98 100644 --- a/crates/core/src/table/mod.rs +++ b/crates/core/src/table/mod.rs @@ -87,7 +87,6 @@ use std::collections::{HashMap, HashSet}; use std::sync::Arc; -use anyhow::{anyhow, Context, Result}; use arrow::record_batch::RecordBatch; use arrow_schema::{Field, Schema}; use url::Url; @@ -102,6 +101,7 @@ use crate::table::builder::TableBuilder; use crate::table::fs_view::FileSystemView; use crate::table::partition::PartitionPruner; use crate::table::timeline::Timeline; +use crate::{Error, Result}; pub mod builder; mod fs_view; @@ -161,7 +161,6 @@ impl Table { .register_object_store(runtime_env.clone()); } - /// Get the latest [Schema] of the table. pub async fn get_schema(&self) -> Result { self.timeline.get_latest_schema().await } @@ -259,16 +258,18 @@ impl Table { timestamp: &str, filters: &[(&str, &str, &str)], ) -> Result> { - let file_slices = self - .get_file_slices_as_of(timestamp, filters) - .await - .context(format!("Failed to get file slices as of {}", timestamp))?; + let file_slices = self.get_file_slices_as_of(timestamp, filters).await?; let mut batches = Vec::new(); let fg_reader = self.create_file_group_reader(); for f in file_slices { match fg_reader.read_file_slice(&f).await { Ok(batch) => batches.push(batch), - Err(e) => return Err(anyhow!("Failed to read file slice {:?} - {}", f, e)), + Err(e) => { + return Err(Error::Internal(format!( + "Failed to read file slice {:?} - {}", + f, e + ))) + } } } Ok(batches) diff --git a/crates/core/src/table/partition.rs b/crates/core/src/table/partition.rs index 17927ebe..8edeafbe 100644 --- a/crates/core/src/table/partition.rs +++ b/crates/core/src/table/partition.rs @@ -18,8 +18,7 @@ */ use crate::config::table::HudiTableConfig; use crate::config::HudiConfigs; -use anyhow::Result; -use anyhow::{anyhow, Context}; +use crate::{Error, Result}; use arrow_array::{ArrayRef, Scalar, StringArray}; use arrow_cast::{cast_with_options, CastOptions}; use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq}; @@ -120,11 +119,11 @@ impl PartitionPruner { let parts: Vec<&str> = partition_path.split('/').collect(); if parts.len() != self.schema.fields().len() { - return Err(anyhow!( + return Err(Error::Internal(format!( "Partition path should have {} part(s) but got {}", self.schema.fields().len(), parts.len() - )); + ))); } self.schema @@ -134,14 +133,17 @@ impl PartitionPruner { .map(|(field, part)| { let value = if self.is_hive_style { let (name, value) = part.split_once('=').ok_or_else(|| { - anyhow!("Partition path should be hive-style but got {}", part) + Error::Internal(format!( + "Partition path should be hive-style but got {}", + part + )) })?; if name != field.name() { - return Err(anyhow!( + return Err(Error::Internal(format!( "Partition path should contain {} but got {}", field.name(), name - )); + ))); } value } else { @@ -177,13 +179,13 @@ impl Operator { } impl FromStr for Operator { - type Err = anyhow::Error; + type Err = crate::Error; fn from_str(s: &str) -> Result { Operator::TOKEN_OP_PAIRS .iter() .find_map(|&(token, op)| if token == s { Some(op) } else { None }) - .ok_or_else(|| anyhow!("Unsupported operator: {}", s)) + .ok_or_else(|| Error::Internal(format!("Unsupported operator: {}", s))) } } @@ -196,21 +198,17 @@ pub struct PartitionFilter { } impl TryFrom<((&str, &str, &str), &Schema)> for PartitionFilter { - type Error = anyhow::Error; + type Error = crate::Error; fn try_from((filter, partition_schema): ((&str, &str, &str), &Schema)) -> Result { let (field_name, operator_str, value_str) = filter; - let field: &Field = partition_schema - .field_with_name(field_name) - .with_context(|| format!("Field '{}' not found in partition schema", field_name))?; + let field: &Field = partition_schema.field_with_name(field_name)?; - let operator = Operator::from_str(operator_str) - .with_context(|| format!("Unsupported operator: {}", operator_str))?; + let operator = Operator::from_str(operator_str)?; let value = &[value_str]; - let value = Self::cast_value(value, field.data_type()) - .with_context(|| format!("Unable to cast {:?} as {:?}", value, field.data_type()))?; + let value = Self::cast_value(value, field.data_type())?; let field = field.clone(); Ok(PartitionFilter { diff --git a/crates/core/src/table/timeline.rs b/crates/core/src/table/timeline.rs index 2993c951..c1558685 100644 --- a/crates/core/src/table/timeline.rs +++ b/crates/core/src/table/timeline.rs @@ -23,7 +23,6 @@ use std::fmt::Debug; use std::path::PathBuf; use std::sync::Arc; -use anyhow::{anyhow, Context, Result}; use arrow_schema::Schema; use parquet::arrow::parquet_to_arrow_schema; use serde_json::{Map, Value}; @@ -32,6 +31,7 @@ use crate::config::HudiConfigs; use crate::file_group::FileGroup; use crate::storage::utils::split_filename; use crate::storage::Storage; +use crate::{Error, Result}; /// The [State] of an [Instant] represents the status of the action performed on the table. #[allow(dead_code)] @@ -80,7 +80,10 @@ impl Instant { commit_file_path.push(self.file_name()); commit_file_path .to_str() - .ok_or(anyhow!("Failed to get file path for {:?}", self)) + .ok_or(Error::Internal(format!( + "Failed to get file path for {:?}", + self + ))) .map(|s| s.to_string()) } @@ -140,10 +143,11 @@ impl Timeline { .storage .get_file_data(instant.relative_path()?.as_str()) .await?; - let json: Value = serde_json::from_slice(&bytes)?; + let json: Value = serde_json::from_slice(&bytes) + .map_err(|e| Error::Internal(format!("Invalid instant {:?}", e)))?; let commit_metadata = json .as_object() - .ok_or_else(|| anyhow!("Expected JSON object"))? + .ok_or_else(|| Error::Internal("Expected JSON object".to_string()))? .clone(); Ok(commit_metadata) } @@ -167,17 +171,15 @@ impl Timeline { .and_then(|first_value| first_value["path"].as_str()); if let Some(path) = parquet_path { - let parquet_meta = self - .storage - .get_parquet_file_metadata(path) - .await - .context("Failed to get parquet file metadata")?; - - parquet_to_arrow_schema(parquet_meta.file_metadata().schema_descr(), None) - .context("Failed to resolve the latest schema") + let parquet_meta = self.storage.get_parquet_file_metadata(path).await?; + + Ok(parquet_to_arrow_schema( + parquet_meta.file_metadata().schema_descr(), + None, + )?) } else { - Err(anyhow!( - "Failed to resolve the latest schema: no file path found" + Err(Error::Internal( + "Failed to resolve the latest schema: no file path found".to_string(), )) } } diff --git a/python/src/internal.rs b/python/src/internal.rs index 37201cd4..0acc6520 100644 --- a/python/src/internal.rs +++ b/python/src/internal.rs @@ -18,47 +18,68 @@ */ use std::collections::HashMap; +use std::convert::From; use std::path::PathBuf; use std::sync::OnceLock; use anyhow::Context; use arrow::pyarrow::ToPyArrow; +//<<<<<<< HEAD use pyo3::{pyclass, pyfunction, pymethods, PyErr, PyObject, PyResult, Python}; +//======= +//use pyo3::{exceptions::PyOSError, pyclass, pymethods, PyErr, PyObject, PyResult, Python}; +//>>>>>>> 2f6f596 (refact: define hudi error types for hudi-core crate) use tokio::runtime::Runtime; use hudi::file_group::reader::FileGroupReader; use hudi::file_group::FileSlice; use hudi::table::builder::TableBuilder; use hudi::table::Table; -use hudi::util::convert_vec_to_slice; -use hudi::util::vec_to_slice; - -#[cfg(not(tarpaulin))] -#[derive(Clone, Debug)] -#[pyclass] -pub struct HudiFileGroupReader { - inner: FileGroupReader, -} - -#[cfg(not(tarpaulin))] -#[pymethods] -impl HudiFileGroupReader { - #[new] - #[pyo3(signature = (base_uri, options=None))] - fn new(base_uri: &str, options: Option>) -> PyResult { - let inner = FileGroupReader::new_with_options(base_uri, options.unwrap_or_default())?; - Ok(HudiFileGroupReader { inner }) - } - - fn read_file_slice_by_base_file_path( - &self, - relative_path: &str, - py: Python, - ) -> PyResult { - rt().block_on(self.inner.read_file_slice_by_base_file_path(relative_path))? - .to_pyarrow(py) - } -} +//<<<<<<< HEAD +//use hudi::util::convert_vec_to_slice; +//use hudi::util::vec_to_slice; +// +//#[cfg(not(tarpaulin))] +//#[derive(Clone, Debug)] +//#[pyclass] +//pub struct HudiFileGroupReader { +// inner: FileGroupReader, +//} +// +//#[cfg(not(tarpaulin))] +//#[pymethods] +//impl HudiFileGroupReader { +// #[new] +// #[pyo3(signature = (base_uri, options=None))] +// fn new(base_uri: &str, options: Option>) -> PyResult { +// let inner = FileGroupReader::new_with_options(base_uri, options.unwrap_or_default())?; +// Ok(HudiFileGroupReader { inner }) +// } +// +// fn read_file_slice_by_base_file_path( +// &self, +// relative_path: &str, +// py: Python, +// ) -> PyResult { +// rt().block_on(self.inner.read_file_slice_by_base_file_path(relative_path))? +// .to_pyarrow(py) +//======= +//use hudi::Error::Internal; +// +//struct HoodieError(hudi::Error); +// +//impl From for PyErr { +// fn from(err: HoodieError) -> PyErr { +// PyOSError::new_err(err.0.to_string()) +// } +//} +// +//impl From for HoodieError { +// fn from(err: hudi::Error) -> Self { +// Self(err) +//>>>>>>> 2f6f596 (refact: define hudi error types for hudi-core crate) +// } +//} #[cfg(not(tarpaulin))] #[derive(Clone, Debug)] @@ -127,6 +148,7 @@ pub struct HudiTable { #[pymethods] impl HudiTable { #[new] +<<<<<<< HEAD #[pyo3(signature = (base_uri, options=None))] fn new_with_options( base_uri: &str, @@ -134,6 +156,12 @@ impl HudiTable { ) -> PyResult { let inner: Table = rt().block_on(Table::new_with_options( base_uri, +======= + #[pyo3(signature = (table_uri, options = None))] + fn new(table_uri: &str, options: Option>) -> Result { + let _table = rt().block_on(Table::new_with_options( + table_uri, +>>>>>>> 2f6f596 (refact: define hudi error types for hudi-core crate) options.unwrap_or_default(), ))?; Ok(HudiTable { inner }) @@ -147,6 +175,7 @@ impl HudiTable { self.inner.storage_options() } +<<<<<<< HEAD fn get_schema(&self, py: Python) -> PyResult { rt().block_on(self.inner.get_schema())?.to_pyarrow(py) } @@ -163,6 +192,19 @@ impl HudiTable { filters: Option>, py: Python, ) -> PyResult>> { +======= + fn get_schema(&self, py: Python) -> Result { + rt().block_on(self._table.get_schema())? + .to_pyarrow(py) + .map_err(|e| HoodieError(Internal(e.to_string()))) + } + + fn split_file_slices( + &self, + n: usize, + py: Python, + ) -> Result>, HoodieError> { +>>>>>>> 2f6f596 (refact: define hudi error types for hudi-core crate) py.allow_threads(|| { let file_slices = rt().block_on( self.inner @@ -175,12 +217,16 @@ impl HudiTable { }) } +<<<<<<< HEAD #[pyo3(signature = (filters=None))] fn get_file_slices( &self, filters: Option>, py: Python, ) -> PyResult> { +======= + fn get_file_slices(&self, py: Python) -> Result, HoodieError> { +>>>>>>> 2f6f596 (refact: define hudi error types for hudi-core crate) py.allow_threads(|| { let file_slices = rt().block_on( self.inner @@ -190,6 +236,7 @@ impl HudiTable { }) } +<<<<<<< HEAD fn create_file_group_reader(&self) -> PyResult { let fg_reader = self.inner.create_file_group_reader(); Ok(HudiFileGroupReader { inner: fg_reader }) @@ -206,6 +253,18 @@ impl HudiTable { .read_snapshot(vec_to_slice!(filters.unwrap_or_default())), )? .to_pyarrow(py) +======= + fn read_file_slice(&self, relative_path: &str, py: Python) -> Result { + rt().block_on(self._table.read_file_slice_by_path(relative_path))? + .to_pyarrow(py) + .map_err(|e| HoodieError(Internal(e.to_string()))) + } + + fn read_snapshot(&self, py: Python) -> Result { + rt().block_on(self._table.read_snapshot())? + .to_pyarrow(py) + .map_err(|e| HoodieError(Internal(e.to_string()))) +>>>>>>> 2f6f596 (refact: define hudi error types for hudi-core crate) } } From 1249a4e9cf80f2888b4cc2617c252d040148131d Mon Sep 17 00:00:00 2001 From: GoHalo Date: Fri, 27 Sep 2024 15:29:25 +0800 Subject: [PATCH 2/3] fix conficts Signed-off-by: GoHalo --- Cargo.toml | 1 - crates/core/src/file_group/mod.rs | 10 +- crates/core/src/storage/mod.rs | 22 ++-- crates/core/src/storage/utils.rs | 1 + crates/core/src/table/fs_view.rs | 2 +- crates/core/src/table/partition.rs | 4 +- python/Cargo.toml | 1 - python/src/internal.rs | 167 ++++++++++++----------------- 8 files changed, 81 insertions(+), 127 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a590f335..971e9eb6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -62,7 +62,6 @@ serde_json = { version = "1" } # "stdlib" thiserror = { version = "2.0.3" } -anyhow = { version = "1.0.86" } bytes = { version = "1" } paste = { version = "1.0.15" } once_cell = { version = "1.19.0" } diff --git a/crates/core/src/file_group/mod.rs b/crates/core/src/file_group/mod.rs index 580e2ad6..0cd4614d 100644 --- a/crates/core/src/file_group/mod.rs +++ b/crates/core/src/file_group/mod.rs @@ -54,14 +54,8 @@ impl BaseFile { .rsplit_once('.') .ok_or(Internal(err_msg.clone()))?; let parts: Vec<&str> = name.split('_').collect(); - let file_group_id = parts - .first() - .ok_or(Internal(err_msg.clone()))? - .to_string(); - let commit_time = parts - .get(2) - .ok_or(Internal(err_msg.clone()))? - .to_string(); + let file_group_id = parts.first().ok_or(Internal(err_msg.clone()))?.to_string(); + let commit_time = parts.get(2).ok_or(Internal(err_msg.clone()))?.to_string(); Ok((file_group_id, commit_time)) } diff --git a/crates/core/src/storage/mod.rs b/crates/core/src/storage/mod.rs index 3d15ad41..89fe6f60 100644 --- a/crates/core/src/storage/mod.rs +++ b/crates/core/src/storage/mod.rs @@ -101,15 +101,10 @@ impl Storage { runtime_env.register_object_store(self.base_url.as_ref(), self.object_store.clone()); } - fn get_relative_path(&self, relative_path: &str) -> Result<(Url, ObjPath)> { - let obj_url = join_url_segments(&self.base_url, &[relative_path])?; - let obj_path = ObjPath::from_url_path(obj_url.path())?; - Ok((obj_url, obj_path)) - } - #[cfg(test)] async fn get_file_info(&self, relative_path: &str) -> Result { - let (obj_url, obj_path) = self.get_relative_path(relative_path)?; + let obj_url = join_url_segments(&self.base_url, &[relative_path])?; + let obj_path = ObjPath::from_url_path(obj_url.path())?; let meta = self.object_store.head(&obj_path).await?; let uri = obj_url.to_string(); let name = obj_path @@ -127,7 +122,8 @@ impl Storage { } pub async fn get_parquet_file_metadata(&self, relative_path: &str) -> Result { - let (_, obj_path) = self.get_relative_path(relative_path)?; + let obj_url = join_url_segments(&self.base_url, &[relative_path])?; + let obj_path = ObjPath::from_url_path(obj_url.path())?; let obj_store = self.object_store.clone(); let meta = obj_store.head(&obj_path).await?; let reader = ParquetObjectReader::new(obj_store, meta); @@ -136,7 +132,8 @@ impl Storage { } pub async fn get_file_data(&self, relative_path: &str) -> Result { - let (_, obj_path) = self.get_relative_path(relative_path)?; + let obj_url = join_url_segments(&self.base_url, &[relative_path])?; + let obj_path = ObjPath::from_url_path(obj_url.path())?; let result = self.object_store.get(&obj_path).await?; let bytes = result.bytes().await?; Ok(bytes) @@ -150,7 +147,8 @@ impl Storage { } pub async fn get_parquet_file_data(&self, relative_path: &str) -> Result { - let (_, obj_path) = self.get_relative_path(relative_path)?; + let obj_url = join_url_segments(&self.base_url, &[relative_path])?; + let obj_path = ObjPath::from_url_path(obj_url.path())?; let obj_store = self.object_store.clone(); let meta = obj_store.head(&obj_path).await?; @@ -301,10 +299,6 @@ mod tests { result.is_err(), "Should return error when no base path is invalid." ); - assert!(result - .unwrap_err() - .to_string() - .contains("Failed to create storage")); } #[tokio::test] diff --git a/crates/core/src/storage/utils.rs b/crates/core/src/storage/utils.rs index a619b142..8c1624d5 100644 --- a/crates/core/src/storage/utils.rs +++ b/crates/core/src/storage/utils.rs @@ -26,6 +26,7 @@ use crate::{ Result, }; +/// Splits a filename into a stem and an extension. pub fn split_filename(filename: &str) -> Result<(String, String)> { let path = Path::new(filename); diff --git a/crates/core/src/table/fs_view.rs b/crates/core/src/table/fs_view.rs index 2976e761..9262c782 100644 --- a/crates/core/src/table/fs_view.rs +++ b/crates/core/src/table/fs_view.rs @@ -28,7 +28,7 @@ use crate::storage::{get_leaf_dirs, Storage}; use crate::table::partition::PartitionPruner; use crate::{Error, Result}; use dashmap::DashMap; -use futures::stream::{self, StreamExt}; +use futures::stream::{self, StreamExt, TryStreamExt}; /// A view of the Hudi table's data files (files stored outside the `.hoodie/` directory) in the file system. It provides APIs to load and /// access the file groups and file slices. diff --git a/crates/core/src/table/partition.rs b/crates/core/src/table/partition.rs index 8edeafbe..189c772a 100644 --- a/crates/core/src/table/partition.rs +++ b/crates/core/src/table/partition.rs @@ -288,7 +288,7 @@ mod tests { assert!(filter .unwrap_err() .to_string() - .contains("not found in partition schema")); + .contains("Unable to get field named")); } #[test] @@ -309,7 +309,7 @@ mod tests { let filter_tuple = ("count", "=", "not_a_number"); let filter = PartitionFilter::try_from((filter_tuple, &schema)); assert!(filter.is_err()); - assert!(filter.unwrap_err().to_string().contains("Unable to cast")); + assert!(filter.unwrap_err().to_string().contains("Cannot cast string")); } #[test] diff --git a/python/Cargo.toml b/python/Cargo.toml index 90336672..945b3072 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -39,7 +39,6 @@ arrow = { workspace = true } arrow-schema = { workspace = true } # "stdlib" -anyhow = { workspace = true } # runtime / async futures = { workspace = true } diff --git a/python/src/internal.rs b/python/src/internal.rs index 0acc6520..0b3b829b 100644 --- a/python/src/internal.rs +++ b/python/src/internal.rs @@ -22,64 +22,66 @@ use std::convert::From; use std::path::PathBuf; use std::sync::OnceLock; -use anyhow::Context; use arrow::pyarrow::ToPyArrow; -//<<<<<<< HEAD -use pyo3::{pyclass, pyfunction, pymethods, PyErr, PyObject, PyResult, Python}; -//======= -//use pyo3::{exceptions::PyOSError, pyclass, pymethods, PyErr, PyObject, PyResult, Python}; -//>>>>>>> 2f6f596 (refact: define hudi error types for hudi-core crate) +use pyo3::{ + exceptions::PyOSError, pyclass, pyfunction, pymethods, PyErr, PyObject, PyResult, Python, +}; use tokio::runtime::Runtime; use hudi::file_group::reader::FileGroupReader; use hudi::file_group::FileSlice; use hudi::table::builder::TableBuilder; use hudi::table::Table; -//<<<<<<< HEAD -//use hudi::util::convert_vec_to_slice; -//use hudi::util::vec_to_slice; -// -//#[cfg(not(tarpaulin))] -//#[derive(Clone, Debug)] -//#[pyclass] -//pub struct HudiFileGroupReader { -// inner: FileGroupReader, -//} -// -//#[cfg(not(tarpaulin))] -//#[pymethods] -//impl HudiFileGroupReader { -// #[new] -// #[pyo3(signature = (base_uri, options=None))] -// fn new(base_uri: &str, options: Option>) -> PyResult { -// let inner = FileGroupReader::new_with_options(base_uri, options.unwrap_or_default())?; -// Ok(HudiFileGroupReader { inner }) -// } -// -// fn read_file_slice_by_base_file_path( -// &self, -// relative_path: &str, -// py: Python, -// ) -> PyResult { -// rt().block_on(self.inner.read_file_slice_by_base_file_path(relative_path))? -// .to_pyarrow(py) -//======= -//use hudi::Error::Internal; -// -//struct HoodieError(hudi::Error); -// -//impl From for PyErr { -// fn from(err: HoodieError) -> PyErr { -// PyOSError::new_err(err.0.to_string()) -// } -//} -// -//impl From for HoodieError { -// fn from(err: hudi::Error) -> Self { -// Self(err) -//>>>>>>> 2f6f596 (refact: define hudi error types for hudi-core crate) -// } -//} +use hudi::util::convert_vec_to_slice; +use hudi::util::vec_to_slice; + +pub struct HoodieError(hudi::Error); + +impl From for PyErr { + fn from(err: HoodieError) -> PyErr { + PyOSError::new_err(err.0.to_string()) + } +} + +impl From for HoodieError { + fn from(err: hudi::Error) -> Self { + Self(err) + } +} + +impl From for HoodieError { + fn from(err: pyo3::PyErr) -> Self { + Self(hudi::Error::Internal(err.to_string())) + } +} + +#[cfg(not(tarpaulin))] +#[derive(Clone, Debug)] +#[pyclass] +pub struct HudiFileGroupReader { + inner: FileGroupReader, +} + +#[cfg(not(tarpaulin))] +#[pymethods] +impl HudiFileGroupReader { + #[new] + #[pyo3(signature = (base_uri, options=None))] + fn new(base_uri: &str, options: Option>) -> Result { + let inner = FileGroupReader::new_with_options(base_uri, options.unwrap_or_default())?; + Ok(HudiFileGroupReader { inner }) + } + + fn read_file_slice_by_base_file_path( + &self, + relative_path: &str, + py: Python, + ) -> Result { + rt().block_on(self.inner.read_file_slice_by_base_file_path(relative_path))? + .to_pyarrow(py) + .map_err(HoodieError::from) + } +} #[cfg(not(tarpaulin))] #[derive(Clone, Debug)] @@ -104,16 +106,15 @@ pub struct HudiFileSlice { #[cfg(not(tarpaulin))] #[pymethods] impl HudiFileSlice { - fn base_file_relative_path(&self) -> PyResult { + fn base_file_relative_path(&self) -> Result { PathBuf::from(&self.partition_path) .join(&self.base_file_name) .to_str() .map(String::from) - .context(format!( + .ok_or(HoodieError(hudi::Error::Internal(format!( "Failed to get base file relative path for file slice: {:?}", self - )) - .map_err(PyErr::from) + )))) } } @@ -148,20 +149,13 @@ pub struct HudiTable { #[pymethods] impl HudiTable { #[new] -<<<<<<< HEAD #[pyo3(signature = (base_uri, options=None))] fn new_with_options( base_uri: &str, options: Option>, - ) -> PyResult { + ) -> Result { let inner: Table = rt().block_on(Table::new_with_options( base_uri, -======= - #[pyo3(signature = (table_uri, options = None))] - fn new(table_uri: &str, options: Option>) -> Result { - let _table = rt().block_on(Table::new_with_options( - table_uri, ->>>>>>> 2f6f596 (refact: define hudi error types for hudi-core crate) options.unwrap_or_default(), ))?; Ok(HudiTable { inner }) @@ -175,14 +169,16 @@ impl HudiTable { self.inner.storage_options() } -<<<<<<< HEAD - fn get_schema(&self, py: Python) -> PyResult { - rt().block_on(self.inner.get_schema())?.to_pyarrow(py) + fn get_schema(&self, py: Python) -> Result { + rt().block_on(self.inner.get_schema())? + .to_pyarrow(py) + .map_err(HoodieError::from) } - fn get_partition_schema(&self, py: Python) -> PyResult { + fn get_partition_schema(&self, py: Python) -> Result { rt().block_on(self.inner.get_partition_schema())? .to_pyarrow(py) + .map_err(HoodieError::from) } #[pyo3(signature = (n, filters=None))] @@ -191,20 +187,7 @@ impl HudiTable { n: usize, filters: Option>, py: Python, - ) -> PyResult>> { -======= - fn get_schema(&self, py: Python) -> Result { - rt().block_on(self._table.get_schema())? - .to_pyarrow(py) - .map_err(|e| HoodieError(Internal(e.to_string()))) - } - - fn split_file_slices( - &self, - n: usize, - py: Python, ) -> Result>, HoodieError> { ->>>>>>> 2f6f596 (refact: define hudi error types for hudi-core crate) py.allow_threads(|| { let file_slices = rt().block_on( self.inner @@ -217,16 +200,12 @@ impl HudiTable { }) } -<<<<<<< HEAD #[pyo3(signature = (filters=None))] fn get_file_slices( &self, filters: Option>, py: Python, - ) -> PyResult> { -======= - fn get_file_slices(&self, py: Python) -> Result, HoodieError> { ->>>>>>> 2f6f596 (refact: define hudi error types for hudi-core crate) + ) -> Result, HoodieError> { py.allow_threads(|| { let file_slices = rt().block_on( self.inner @@ -236,7 +215,6 @@ impl HudiTable { }) } -<<<<<<< HEAD fn create_file_group_reader(&self) -> PyResult { let fg_reader = self.inner.create_file_group_reader(); Ok(HudiFileGroupReader { inner: fg_reader }) @@ -247,24 +225,13 @@ impl HudiTable { &self, filters: Option>, py: Python, - ) -> PyResult { + ) -> Result { rt().block_on( self.inner .read_snapshot(vec_to_slice!(filters.unwrap_or_default())), )? .to_pyarrow(py) -======= - fn read_file_slice(&self, relative_path: &str, py: Python) -> Result { - rt().block_on(self._table.read_file_slice_by_path(relative_path))? - .to_pyarrow(py) - .map_err(|e| HoodieError(Internal(e.to_string()))) - } - - fn read_snapshot(&self, py: Python) -> Result { - rt().block_on(self._table.read_snapshot())? - .to_pyarrow(py) - .map_err(|e| HoodieError(Internal(e.to_string()))) ->>>>>>> 2f6f596 (refact: define hudi error types for hudi-core crate) + .map_err(HoodieError::from) } } @@ -276,7 +243,7 @@ pub fn build_hudi_table( hudi_options: Option>, storage_options: Option>, options: Option>, -) -> PyResult { +) -> Result { let inner = rt().block_on( TableBuilder::from_base_uri(&base_uri) .with_hudi_options(hudi_options.unwrap_or_default()) From 02c659b3399cef272d09edb2acc860424f03a69a Mon Sep 17 00:00:00 2001 From: Shiyan Xu <2701446+xushiyan@users.noreply.github.com> Date: Wed, 4 Dec 2024 19:11:04 -0600 Subject: [PATCH 3/3] update names and define python error --- crates/core/src/config/internal.rs | 6 +- crates/core/src/config/read.rs | 12 ++- crates/core/src/config/table.rs | 26 +++--- crates/core/src/config/utils.rs | 8 +- crates/core/src/file_group/mod.rs | 2 +- crates/core/src/lib.rs | 12 +-- crates/core/src/storage/mod.rs | 15 ++-- crates/core/src/storage/utils.rs | 2 +- crates/core/src/table/builder.rs | 8 +- crates/core/src/table/fs_view.rs | 4 +- crates/core/src/table/mod.rs | 4 +- crates/core/src/table/partition.rs | 19 +++-- crates/core/src/table/timeline.rs | 10 +-- python/Cargo.toml | 3 +- python/src/internal.rs | 130 ++++++++++++++++------------- 15 files changed, 143 insertions(+), 118 deletions(-) diff --git a/crates/core/src/config/internal.rs b/crates/core/src/config/internal.rs index 8174cb32..4fdc2765 100644 --- a/crates/core/src/config/internal.rs +++ b/crates/core/src/config/internal.rs @@ -25,7 +25,7 @@ use strum_macros::EnumIter; use crate::{ config::{ConfigParser, HudiConfigValue}, - Error::{ConfNotFound, InvalidConf}, + CoreError::{ConfigNotFound, InvalidConfig}, Result, }; @@ -67,12 +67,12 @@ impl ConfigParser for HudiInternalConfig { let get_result = configs .get(self.as_ref()) .map(|v| v.as_str()) - .ok_or(ConfNotFound(self.as_ref().to_string())); + .ok_or(ConfigNotFound(self.as_ref().to_string())); match self { Self::SkipConfigValidation => get_result .and_then(|v| { - bool::from_str(v).map_err(|e| InvalidConf { + bool::from_str(v).map_err(|e| InvalidConfig { item: Self::SkipConfigValidation.as_ref(), source: Box::new(e), }) diff --git a/crates/core/src/config/read.rs b/crates/core/src/config/read.rs index c6c7de61..a9389538 100644 --- a/crates/core/src/config/read.rs +++ b/crates/core/src/config/read.rs @@ -25,7 +25,7 @@ use strum_macros::EnumIter; use crate::{ config::{ConfigParser, HudiConfigValue}, - Error::{ConfNotFound, InvalidConf}, + CoreError::{ConfigNotFound, InvalidConfig}, Result, }; @@ -79,12 +79,12 @@ impl ConfigParser for HudiReadConfig { let get_result = configs .get(self.as_ref()) .map(|v| v.as_str()) - .ok_or(ConfNotFound(self.as_ref().to_string())); + .ok_or(ConfigNotFound(self.as_ref().to_string())); match self { Self::InputPartitions => get_result .and_then(|v| { - usize::from_str(v).map_err(|e| InvalidConf { + usize::from_str(v).map_err(|e| InvalidConfig { item: Self::InputPartitions.as_ref(), source: Box::new(e), }) @@ -99,6 +99,7 @@ impl ConfigParser for HudiReadConfig { mod tests { use crate::config::read::HudiReadConfig::InputPartitions; use crate::config::ConfigParser; + use crate::CoreError::InvalidConfig; use std::collections::HashMap; #[test] @@ -112,7 +113,10 @@ mod tests { fn parse_invalid_config_value() { let options = HashMap::from([(InputPartitions.as_ref().to_string(), "foo".to_string())]); let value = InputPartitions.parse_value(&options); - assert!(value.is_err()); + assert!(matches!( + value.unwrap_err(), + InvalidConfig { item: _, source: _ } + )); assert_eq!( InputPartitions .parse_value_or_default(&options) diff --git a/crates/core/src/config/table.rs b/crates/core/src/config/table.rs index 196f11f5..bb19066c 100644 --- a/crates/core/src/config/table.rs +++ b/crates/core/src/config/table.rs @@ -25,7 +25,7 @@ use strum_macros::{AsRefStr, EnumIter}; use crate::{ config::{ConfigParser, HudiConfigValue}, - Error::{self, ConfNotFound, InvalidConf, Unsupported}, + CoreError::{self, ConfigNotFound, InvalidConfig, Unsupported}, Result, }; @@ -148,7 +148,7 @@ impl ConfigParser for HudiTableConfig { let get_result = configs .get(self.as_ref()) .map(|v| v.as_str()) - .ok_or(ConfNotFound(self.as_ref().to_string())); + .ok_or(ConfigNotFound(self.as_ref().to_string())); match self { Self::BaseFileFormat => get_result @@ -157,7 +157,7 @@ impl ConfigParser for HudiTableConfig { Self::BasePath => get_result.map(|v| HudiConfigValue::String(v.to_string())), Self::Checksum => get_result .and_then(|v| { - isize::from_str(v).map_err(|e| InvalidConf { + isize::from_str(v).map_err(|e| InvalidConfig { item: Self::Checksum.as_ref(), source: Box::new(e), }) @@ -166,7 +166,7 @@ impl ConfigParser for HudiTableConfig { Self::DatabaseName => get_result.map(|v| HudiConfigValue::String(v.to_string())), Self::DropsPartitionFields => get_result .and_then(|v| { - bool::from_str(v).map_err(|e| InvalidConf { + bool::from_str(v).map_err(|e| InvalidConfig { item: Self::DropsPartitionFields.as_ref(), source: Box::new(e), }) @@ -174,7 +174,7 @@ impl ConfigParser for HudiTableConfig { .map(HudiConfigValue::Boolean), Self::IsHiveStylePartitioning => get_result .and_then(|v| { - bool::from_str(v).map_err(|e| InvalidConf { + bool::from_str(v).map_err(|e| InvalidConfig { item: Self::IsHiveStylePartitioning.as_ref(), source: Box::new(e), }) @@ -182,7 +182,7 @@ impl ConfigParser for HudiTableConfig { .map(HudiConfigValue::Boolean), Self::IsPartitionPathUrlencoded => get_result .and_then(|v| { - bool::from_str(v).map_err(|e| InvalidConf { + bool::from_str(v).map_err(|e| InvalidConfig { item: Self::IsPartitionPathUrlencoded.as_ref(), source: Box::new(e), }) @@ -194,7 +194,7 @@ impl ConfigParser for HudiTableConfig { Self::PrecombineField => get_result.map(|v| HudiConfigValue::String(v.to_string())), Self::PopulatesMetaFields => get_result .and_then(|v| { - bool::from_str(v).map_err(|e| InvalidConf { + bool::from_str(v).map_err(|e| InvalidConfig { item: Self::PopulatesMetaFields.as_ref(), source: Box::new(e), }) @@ -208,7 +208,7 @@ impl ConfigParser for HudiTableConfig { .map(|v| HudiConfigValue::String(v.as_ref().to_string())), Self::TableVersion => get_result .and_then(|v| { - isize::from_str(v).map_err(|e| InvalidConf { + isize::from_str(v).map_err(|e| InvalidConfig { item: Self::TableVersion.as_ref(), source: Box::new(e), }) @@ -216,7 +216,7 @@ impl ConfigParser for HudiTableConfig { .map(HudiConfigValue::Integer), Self::TimelineLayoutVersion => get_result .and_then(|v| { - isize::from_str(v).map_err(|e| InvalidConf { + isize::from_str(v).map_err(|e| InvalidConfig { item: Self::TimelineLayoutVersion.as_ref(), source: Box::new(e), }) @@ -236,13 +236,13 @@ pub enum TableTypeValue { } impl FromStr for TableTypeValue { - type Err = Error; + type Err = CoreError; fn from_str(s: &str) -> Result { match s.to_ascii_lowercase().as_str() { "copy_on_write" | "copy-on-write" | "cow" => Ok(Self::CopyOnWrite), "merge_on_read" | "merge-on-read" | "mor" => Ok(Self::MergeOnRead), - v => Err(Unsupported(format!("unsupported table type {}", v))), + v => Err(Unsupported(format!("Unsupported table type {}", v))), } } } @@ -255,12 +255,12 @@ pub enum BaseFileFormatValue { } impl FromStr for BaseFileFormatValue { - type Err = Error; + type Err = CoreError; fn from_str(s: &str) -> Result { match s.to_ascii_lowercase().as_str() { "parquet" => Ok(Self::Parquet), - v => Err(Unsupported(format!("unsupported base file format {}", v))), + v => Err(Unsupported(format!("Unsupported base file format {}", v))), } } } diff --git a/crates/core/src/config/utils.rs b/crates/core/src/config/utils.rs index fe6ed4dc..85784ec4 100644 --- a/crates/core/src/config/utils.rs +++ b/crates/core/src/config/utils.rs @@ -22,7 +22,7 @@ use bytes::Bytes; use std::collections::HashMap; use std::io::{BufRead, BufReader, Cursor}; -use crate::{Error, Result}; +use crate::{CoreError, Result}; /// Returns an empty iterator to represent an empty set of options. pub fn empty_options<'a>() -> std::iter::Empty<(&'a str, &'a str)> { @@ -58,7 +58,7 @@ pub fn parse_data_for_options(data: &Bytes, split_chars: &str) -> Result Result, }, @@ -81,10 +81,10 @@ pub enum Error { Utf8Error(#[from] std::str::Utf8Error), #[error(transparent)] - Store(#[from] object_store::Error), + ObjectStore(#[from] object_store::Error), #[error(transparent)] - StorePath(#[from] object_store::path::Error), + ObjectStorePath(#[from] object_store::path::Error), #[error(transparent)] Parquet(#[from] parquet::errors::ParquetError), @@ -93,4 +93,4 @@ pub enum Error { Arrow(#[from] arrow::error::ArrowError), } -type Result = std::result::Result; +type Result = std::result::Result; diff --git a/crates/core/src/storage/mod.rs b/crates/core/src/storage/mod.rs index 89fe6f60..0a9f0e15 100644 --- a/crates/core/src/storage/mod.rs +++ b/crates/core/src/storage/mod.rs @@ -38,7 +38,7 @@ use crate::config::table::HudiTableConfig; use crate::config::HudiConfigs; use crate::storage::file_info::FileInfo; use crate::storage::utils::join_url_segments; -use crate::{Error, Result}; +use crate::{CoreError, Result}; pub mod file_info; pub mod file_stats; @@ -61,7 +61,7 @@ impl Storage { hudi_configs: Arc, ) -> Result> { if !hudi_configs.contains(HudiTableConfig::BasePath) { - return Err(Error::Internal(format!( + return Err(CoreError::Internal(format!( "Failed to create storage: {} is required.", HudiTableConfig::BasePath.as_ref() ))); @@ -76,7 +76,7 @@ impl Storage { options, hudi_configs, })), - Err(e) => Err(Error::Store(e)), + Err(e) => Err(CoreError::ObjectStore(e)), } } @@ -109,7 +109,7 @@ impl Storage { let uri = obj_url.to_string(); let name = obj_path .filename() - .ok_or(Error::InvalidPath { + .ok_or(CoreError::InvalidPath { name: obj_path.to_string(), detail: "failed to get file name".to_string(), })? @@ -176,7 +176,7 @@ impl Storage { for dir in dir_paths { dirs.push( dir.filename() - .ok_or(Error::InvalidPath { + .ok_or(CoreError::InvalidPath { name: dir.to_string(), detail: "failed to get file name".to_string(), })? @@ -208,7 +208,7 @@ impl Storage { let name = obj_meta .location .filename() - .ok_or(Error::InvalidPath { + .ok_or(CoreError::InvalidPath { name: obj_meta.location.to_string(), detail: "failed to get file name".to_string(), })? @@ -246,7 +246,7 @@ pub async fn get_leaf_dirs(storage: &Storage, subdir: Option<&str>) -> Result(); if TableTypeValue::from_str(&table_type)? != CopyOnWrite { - return Err(Error::Unsupported( + return Err(CoreError::Unsupported( "Only support copy-on-write table.".to_string(), )); } let table_version = hudi_configs.get(TableVersion)?.to::(); if !(5..=6).contains(&table_version) { - return Err(Error::Unsupported( + return Err(CoreError::Unsupported( "Only support table version 5 and 6.".to_string(), )); } @@ -266,7 +266,7 @@ impl TableBuilder { .get_or_default(DropsPartitionFields) .to::(); if drops_partition_cols { - return Err(Error::Unsupported(format!( + return Err(CoreError::Unsupported(format!( "Only support when `{}` is disabled", DropsPartitionFields.as_ref() ))); diff --git a/crates/core/src/table/fs_view.rs b/crates/core/src/table/fs_view.rs index 9262c782..ee68eecd 100644 --- a/crates/core/src/table/fs_view.rs +++ b/crates/core/src/table/fs_view.rs @@ -26,7 +26,7 @@ use crate::storage::file_info::FileInfo; use crate::storage::{get_leaf_dirs, Storage}; use crate::table::partition::PartitionPruner; -use crate::{Error, Result}; +use crate::{CoreError, Result}; use dashmap::DashMap; use futures::stream::{self, StreamExt, TryStreamExt}; @@ -135,7 +135,7 @@ impl FileSystemView { .map(|path| async move { let file_groups = Self::load_file_groups_for_partition(&self.storage, &path).await?; - Ok::<_, Error>((path, file_groups)) + Ok::<_, CoreError>((path, file_groups)) }) // TODO parameterize the parallelism for partition loading .buffer_unordered(10) diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs index a2fc7e98..4836d1c9 100644 --- a/crates/core/src/table/mod.rs +++ b/crates/core/src/table/mod.rs @@ -101,7 +101,7 @@ use crate::table::builder::TableBuilder; use crate::table::fs_view::FileSystemView; use crate::table::partition::PartitionPruner; use crate::table::timeline::Timeline; -use crate::{Error, Result}; +use crate::{CoreError, Result}; pub mod builder; mod fs_view; @@ -265,7 +265,7 @@ impl Table { match fg_reader.read_file_slice(&f).await { Ok(batch) => batches.push(batch), Err(e) => { - return Err(Error::Internal(format!( + return Err(CoreError::Internal(format!( "Failed to read file slice {:?} - {}", f, e ))) diff --git a/crates/core/src/table/partition.rs b/crates/core/src/table/partition.rs index 189c772a..8f035213 100644 --- a/crates/core/src/table/partition.rs +++ b/crates/core/src/table/partition.rs @@ -18,7 +18,7 @@ */ use crate::config::table::HudiTableConfig; use crate::config::HudiConfigs; -use crate::{Error, Result}; +use crate::{CoreError, Result}; use arrow_array::{ArrayRef, Scalar, StringArray}; use arrow_cast::{cast_with_options, CastOptions}; use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq}; @@ -119,7 +119,7 @@ impl PartitionPruner { let parts: Vec<&str> = partition_path.split('/').collect(); if parts.len() != self.schema.fields().len() { - return Err(Error::Internal(format!( + return Err(CoreError::Internal(format!( "Partition path should have {} part(s) but got {}", self.schema.fields().len(), parts.len() @@ -133,13 +133,13 @@ impl PartitionPruner { .map(|(field, part)| { let value = if self.is_hive_style { let (name, value) = part.split_once('=').ok_or_else(|| { - Error::Internal(format!( + CoreError::Internal(format!( "Partition path should be hive-style but got {}", part )) })?; if name != field.name() { - return Err(Error::Internal(format!( + return Err(CoreError::Internal(format!( "Partition path should contain {} but got {}", field.name(), name @@ -179,13 +179,13 @@ impl Operator { } impl FromStr for Operator { - type Err = crate::Error; + type Err = crate::CoreError; fn from_str(s: &str) -> Result { Operator::TOKEN_OP_PAIRS .iter() .find_map(|&(token, op)| if token == s { Some(op) } else { None }) - .ok_or_else(|| Error::Internal(format!("Unsupported operator: {}", s))) + .ok_or_else(|| CoreError::Internal(format!("Unsupported operator: {}", s))) } } @@ -198,7 +198,7 @@ pub struct PartitionFilter { } impl TryFrom<((&str, &str, &str), &Schema)> for PartitionFilter { - type Error = crate::Error; + type Error = crate::CoreError; fn try_from((filter, partition_schema): ((&str, &str, &str), &Schema)) -> Result { let (field_name, operator_str, value_str) = filter; @@ -309,7 +309,10 @@ mod tests { let filter_tuple = ("count", "=", "not_a_number"); let filter = PartitionFilter::try_from((filter_tuple, &schema)); assert!(filter.is_err()); - assert!(filter.unwrap_err().to_string().contains("Cannot cast string")); + assert!(filter + .unwrap_err() + .to_string() + .contains("Cannot cast string")); } #[test] diff --git a/crates/core/src/table/timeline.rs b/crates/core/src/table/timeline.rs index c1558685..d147abf4 100644 --- a/crates/core/src/table/timeline.rs +++ b/crates/core/src/table/timeline.rs @@ -31,7 +31,7 @@ use crate::config::HudiConfigs; use crate::file_group::FileGroup; use crate::storage::utils::split_filename; use crate::storage::Storage; -use crate::{Error, Result}; +use crate::{CoreError, Result}; /// The [State] of an [Instant] represents the status of the action performed on the table. #[allow(dead_code)] @@ -80,7 +80,7 @@ impl Instant { commit_file_path.push(self.file_name()); commit_file_path .to_str() - .ok_or(Error::Internal(format!( + .ok_or(CoreError::Internal(format!( "Failed to get file path for {:?}", self ))) @@ -144,10 +144,10 @@ impl Timeline { .get_file_data(instant.relative_path()?.as_str()) .await?; let json: Value = serde_json::from_slice(&bytes) - .map_err(|e| Error::Internal(format!("Invalid instant {:?}", e)))?; + .map_err(|e| CoreError::Internal(format!("Invalid instant {:?}", e)))?; let commit_metadata = json .as_object() - .ok_or_else(|| Error::Internal("Expected JSON object".to_string()))? + .ok_or_else(|| CoreError::Internal("Expected JSON object".to_string()))? .clone(); Ok(commit_metadata) } @@ -178,7 +178,7 @@ impl Timeline { None, )?) } else { - Err(Error::Internal( + Err(CoreError::Internal( "Failed to resolve the latest schema: no file path found".to_string(), )) } diff --git a/python/Cargo.toml b/python/Cargo.toml index 945b3072..1861c654 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -39,6 +39,7 @@ arrow = { workspace = true } arrow-schema = { workspace = true } # "stdlib" +thiserror = { workspace = true } # runtime / async futures = { workspace = true } @@ -46,4 +47,4 @@ tokio = { workspace = true } [dependencies.pyo3] version = "0.22.2" -features = ["extension-module", "abi3", "abi3-py39", "anyhow"] +features = ["extension-module", "abi3", "abi3-py39"] diff --git a/python/src/internal.rs b/python/src/internal.rs index 0b3b829b..549a503b 100644 --- a/python/src/internal.rs +++ b/python/src/internal.rs @@ -23,9 +23,7 @@ use std::path::PathBuf; use std::sync::OnceLock; use arrow::pyarrow::ToPyArrow; -use pyo3::{ - exceptions::PyOSError, pyclass, pyfunction, pymethods, PyErr, PyObject, PyResult, Python, -}; +use pyo3::{pyclass, pyfunction, pymethods, PyErr, PyObject, PyResult, Python}; use tokio::runtime::Runtime; use hudi::file_group::reader::FileGroupReader; @@ -34,24 +32,28 @@ use hudi::table::builder::TableBuilder; use hudi::table::Table; use hudi::util::convert_vec_to_slice; use hudi::util::vec_to_slice; +use hudi::CoreError; +use pyo3::create_exception; +use pyo3::exceptions::PyException; -pub struct HoodieError(hudi::Error); +create_exception!(_internal, HudiCoreError, PyException); -impl From for PyErr { - fn from(err: HoodieError) -> PyErr { - PyOSError::new_err(err.0.to_string()) - } +fn convert_to_py_err(err: CoreError) -> PyErr { + // TODO(xushiyan): match and map all sub types + HudiCoreError::new_err(err.to_string()) } -impl From for HoodieError { - fn from(err: hudi::Error) -> Self { - Self(err) - } +#[derive(thiserror::Error, Debug)] +pub enum PythonError { + #[error("Error in Hudi core")] + HudiCore(#[from] CoreError), } -impl From for HoodieError { - fn from(err: pyo3::PyErr) -> Self { - Self(hudi::Error::Internal(err.to_string())) +impl From for PyErr { + fn from(err: PythonError) -> PyErr { + match err { + PythonError::HudiCore(err) => convert_to_py_err(err), + } } } @@ -67,8 +69,9 @@ pub struct HudiFileGroupReader { impl HudiFileGroupReader { #[new] #[pyo3(signature = (base_uri, options=None))] - fn new(base_uri: &str, options: Option>) -> Result { - let inner = FileGroupReader::new_with_options(base_uri, options.unwrap_or_default())?; + fn new(base_uri: &str, options: Option>) -> PyResult { + let inner = FileGroupReader::new_with_options(base_uri, options.unwrap_or_default()) + .map_err(PythonError::from)?; Ok(HudiFileGroupReader { inner }) } @@ -76,10 +79,10 @@ impl HudiFileGroupReader { &self, relative_path: &str, py: Python, - ) -> Result { - rt().block_on(self.inner.read_file_slice_by_base_file_path(relative_path))? + ) -> PyResult { + rt().block_on(self.inner.read_file_slice_by_base_file_path(relative_path)) + .map_err(PythonError::from)? .to_pyarrow(py) - .map_err(HoodieError::from) } } @@ -106,15 +109,18 @@ pub struct HudiFileSlice { #[cfg(not(tarpaulin))] #[pymethods] impl HudiFileSlice { - fn base_file_relative_path(&self) -> Result { - PathBuf::from(&self.partition_path) + fn base_file_relative_path(&self) -> PyResult { + Ok(PathBuf::from(&self.partition_path) .join(&self.base_file_name) .to_str() .map(String::from) - .ok_or(HoodieError(hudi::Error::Internal(format!( - "Failed to get base file relative path for file slice: {:?}", - self - )))) + .ok_or_else(|| { + CoreError::Internal(format!( + "Failed to get base file relative path for file slice: {:?}", + self + )) + }) + .map_err(PythonError::from)?) } } @@ -153,11 +159,13 @@ impl HudiTable { fn new_with_options( base_uri: &str, options: Option>, - ) -> Result { - let inner: Table = rt().block_on(Table::new_with_options( - base_uri, - options.unwrap_or_default(), - ))?; + ) -> PyResult { + let inner: Table = rt() + .block_on(Table::new_with_options( + base_uri, + options.unwrap_or_default(), + )) + .map_err(PythonError::from)?; Ok(HudiTable { inner }) } @@ -169,16 +177,16 @@ impl HudiTable { self.inner.storage_options() } - fn get_schema(&self, py: Python) -> Result { - rt().block_on(self.inner.get_schema())? + fn get_schema(&self, py: Python) -> PyResult { + rt().block_on(self.inner.get_schema()) + .map_err(PythonError::from)? .to_pyarrow(py) - .map_err(HoodieError::from) } - fn get_partition_schema(&self, py: Python) -> Result { - rt().block_on(self.inner.get_partition_schema())? + fn get_partition_schema(&self, py: Python) -> PyResult { + rt().block_on(self.inner.get_partition_schema()) + .map_err(PythonError::from)? .to_pyarrow(py) - .map_err(HoodieError::from) } #[pyo3(signature = (n, filters=None))] @@ -187,12 +195,14 @@ impl HudiTable { n: usize, filters: Option>, py: Python, - ) -> Result>, HoodieError> { + ) -> PyResult>> { py.allow_threads(|| { - let file_slices = rt().block_on( - self.inner - .get_file_slices_splits(n, vec_to_slice!(filters.unwrap_or_default())), - )?; + let file_slices = rt() + .block_on( + self.inner + .get_file_slices_splits(n, vec_to_slice!(filters.unwrap_or_default())), + ) + .map_err(PythonError::from)?; Ok(file_slices .iter() .map(|inner_vec| inner_vec.iter().map(convert_file_slice).collect()) @@ -205,12 +215,14 @@ impl HudiTable { &self, filters: Option>, py: Python, - ) -> Result, HoodieError> { + ) -> PyResult> { py.allow_threads(|| { - let file_slices = rt().block_on( - self.inner - .get_file_slices(vec_to_slice!(filters.unwrap_or_default())), - )?; + let file_slices = rt() + .block_on( + self.inner + .get_file_slices(vec_to_slice!(filters.unwrap_or_default())), + ) + .map_err(PythonError::from)?; Ok(file_slices.iter().map(convert_file_slice).collect()) }) } @@ -225,13 +237,13 @@ impl HudiTable { &self, filters: Option>, py: Python, - ) -> Result { + ) -> PyResult { rt().block_on( self.inner .read_snapshot(vec_to_slice!(filters.unwrap_or_default())), - )? + ) + .map_err(PythonError::from)? .to_pyarrow(py) - .map_err(HoodieError::from) } } @@ -243,14 +255,16 @@ pub fn build_hudi_table( hudi_options: Option>, storage_options: Option>, options: Option>, -) -> Result { - let inner = rt().block_on( - TableBuilder::from_base_uri(&base_uri) - .with_hudi_options(hudi_options.unwrap_or_default()) - .with_storage_options(storage_options.unwrap_or_default()) - .with_options(options.unwrap_or_default()) - .build(), - )?; +) -> PyResult { + let inner = rt() + .block_on( + TableBuilder::from_base_uri(&base_uri) + .with_hudi_options(hudi_options.unwrap_or_default()) + .with_storage_options(storage_options.unwrap_or_default()) + .with_options(options.unwrap_or_default()) + .build(), + ) + .map_err(PythonError::from)?; Ok(HudiTable { inner }) }