From 2f6f596720abcd19b9eea71f5ee85b4d5b249e0c Mon Sep 17 00:00:00 2001 From: GoHalo Date: Fri, 30 Aug 2024 20:32:29 +0800 Subject: [PATCH] refact: define hudi error types for hudi-core crate --- crates/core/Cargo.toml | 2 +- crates/core/src/config/internal.rs | 16 +++++-- crates/core/src/config/mod.rs | 2 +- crates/core/src/config/read.rs | 20 ++++++--- crates/core/src/config/table.rs | 67 +++++++++++++++++++++++------- crates/core/src/file_group/mod.rs | 21 ++++++---- crates/core/src/lib.rs | 43 +++++++++++++++++++ crates/core/src/storage/mod.rs | 53 +++++++++++++---------- crates/core/src/storage/utils.rs | 34 +++++++++++---- crates/core/src/table/fs_view.rs | 4 +- crates/core/src/table/mod.rs | 45 ++++++++++---------- crates/core/src/table/timeline.rs | 30 ++++++------- python/src/internal.rs | 43 +++++++++++++++---- 13 files changed, 267 insertions(+), 113 deletions(-) diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index dcaf547..71cf877 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -48,7 +48,6 @@ serde = { workspace = true } serde_json = { workspace = true } # "stdlib" -anyhow = { workspace = true } bytes = { workspace = true } strum = { workspace = true } strum_macros = { workspace = true } @@ -65,6 +64,7 @@ datafusion = { workspace = true, optional = true } datafusion-expr = { workspace = true, optional = true } datafusion-common = { workspace = true, optional = true } datafusion-physical-expr = { workspace = true, optional = true } +thiserror = "1.0.63" [dev-dependencies] hudi-tests = { path = "../tests" } diff --git a/crates/core/src/config/internal.rs b/crates/core/src/config/internal.rs index d6ad814..7d7aff8 100644 --- a/crates/core/src/config/internal.rs +++ b/crates/core/src/config/internal.rs @@ -20,10 +20,13 @@ use std::collections::HashMap; use std::str::FromStr; -use anyhow::{anyhow, Result}; use strum_macros::EnumIter; -use crate::config::{ConfigParser, HudiConfigValue}; +use crate::{ + config::{ConfigParser, HudiConfigValue}, + Error::{ConfNotFound, InvalidConf}, + Result, +}; #[derive(Clone, Debug, PartialEq, Eq, Hash, EnumIter)] pub enum HudiInternalConfig { @@ -51,11 +54,16 @@ impl ConfigParser for HudiInternalConfig { let get_result = configs .get(self.as_ref()) .map(|v| v.as_str()) - .ok_or(anyhow!("Config '{}' not found", self.as_ref())); + .ok_or(ConfNotFound(self.as_ref().to_string())); match self { Self::SkipConfigValidation => get_result - .and_then(|v| bool::from_str(v).map_err(|e| anyhow!(e))) + .and_then(|v| { + bool::from_str(v).map_err(|e| InvalidConf { + item: Self::SkipConfigValidation.as_ref(), + source: Box::new(e), + }) + }) .map(HudiConfigValue::Boolean), } } diff --git a/crates/core/src/config/mod.rs b/crates/core/src/config/mod.rs index 2b37dc7..7ba4f36 100644 --- a/crates/core/src/config/mod.rs +++ b/crates/core/src/config/mod.rs @@ -20,7 +20,7 @@ use std::any::type_name; use std::collections::HashMap; use std::sync::Arc; -use anyhow::Result; +use crate::Result; pub mod internal; pub mod read; diff --git a/crates/core/src/config/read.rs b/crates/core/src/config/read.rs index 195aa70..59bae1f 100644 --- a/crates/core/src/config/read.rs +++ b/crates/core/src/config/read.rs @@ -20,10 +20,14 @@ use std::collections::HashMap; use std::str::FromStr; -use crate::config::{ConfigParser, HudiConfigValue}; -use anyhow::{anyhow, Result}; use strum_macros::EnumIter; +use crate::{ + config::{ConfigParser, HudiConfigValue}, + Error::{ConfNotFound, InvalidConf}, + Result, +}; + #[derive(Clone, Debug, PartialEq, Eq, Hash, EnumIter)] pub enum HudiReadConfig { InputPartitions, @@ -53,11 +57,16 @@ impl ConfigParser for HudiReadConfig { let get_result = configs .get(self.as_ref()) .map(|v| v.as_str()) - .ok_or(anyhow!("Config '{}' not found", self.as_ref())); + .ok_or(ConfNotFound(self.as_ref().to_string())); match self { Self::InputPartitions => get_result - .and_then(|v| usize::from_str(v).map_err(|e| anyhow!(e))) + .and_then(|v| { + usize::from_str(v).map_err(|e| InvalidConf { + item: Self::InputPartitions.as_ref(), + source: Box::new(e), + }) + }) .map(HudiConfigValue::UInteger), Self::AsOfTimestamp => get_result.map(|v| HudiConfigValue::String(v.to_string())), } @@ -69,7 +78,6 @@ mod tests { use crate::config::read::HudiReadConfig::InputPartitions; use crate::config::ConfigParser; use std::collections::HashMap; - use std::num::ParseIntError; #[test] fn parse_valid_config_value() { @@ -82,7 +90,7 @@ mod tests { fn parse_invalid_config_value() { let options = HashMap::from([(InputPartitions.as_ref().to_string(), "foo".to_string())]); let value = InputPartitions.parse_value(&options); - assert!(value.err().unwrap().is::()); + assert!(value.is_err()); assert_eq!( InputPartitions .parse_value_or_default(&options) diff --git a/crates/core/src/config/table.rs b/crates/core/src/config/table.rs index a23ecbd..a797e3a 100644 --- a/crates/core/src/config/table.rs +++ b/crates/core/src/config/table.rs @@ -20,11 +20,13 @@ use std::collections::HashMap; use std::str::FromStr; -use anyhow::anyhow; -use anyhow::Result; use strum_macros::{AsRefStr, EnumIter}; -use crate::config::{ConfigParser, HudiConfigValue}; +use crate::{ + config::{ConfigParser, HudiConfigValue}, + Error::{self, ConfNotFound, InvalidConf, Unsupported}, + Result, +}; #[derive(Clone, Debug, PartialEq, Eq, Hash, EnumIter)] pub enum HudiTableConfig { @@ -87,31 +89,56 @@ impl ConfigParser for HudiTableConfig { let get_result = configs .get(self.as_ref()) .map(|v| v.as_str()) - .ok_or(anyhow!("Config '{}' not found", self.as_ref())); + .ok_or(ConfNotFound(self.as_ref().to_string())); match self { Self::BaseFileFormat => get_result .and_then(BaseFileFormatValue::from_str) .map(|v| HudiConfigValue::String(v.as_ref().to_string())), Self::Checksum => get_result - .and_then(|v| isize::from_str(v).map_err(|e| anyhow!(e))) + .and_then(|v| { + isize::from_str(v).map_err(|e| InvalidConf { + item: Self::Checksum.as_ref(), + source: Box::new(e), + }) + }) .map(HudiConfigValue::Integer), Self::DatabaseName => get_result.map(|v| HudiConfigValue::String(v.to_string())), Self::DropsPartitionFields => get_result - .and_then(|v| bool::from_str(v).map_err(|e| anyhow!(e))) + .and_then(|v| { + bool::from_str(v).map_err(|e| InvalidConf { + item: Self::DropsPartitionFields.as_ref(), + source: Box::new(e), + }) + }) .map(HudiConfigValue::Boolean), Self::IsHiveStylePartitioning => get_result - .and_then(|v| bool::from_str(v).map_err(|e| anyhow!(e))) + .and_then(|v| { + bool::from_str(v).map_err(|e| InvalidConf { + item: Self::IsHiveStylePartitioning.as_ref(), + source: Box::new(e), + }) + }) .map(HudiConfigValue::Boolean), Self::IsPartitionPathUrlencoded => get_result - .and_then(|v| bool::from_str(v).map_err(|e| anyhow!(e))) + .and_then(|v| { + bool::from_str(v).map_err(|e| InvalidConf { + item: Self::IsPartitionPathUrlencoded.as_ref(), + source: Box::new(e), + }) + }) .map(HudiConfigValue::Boolean), Self::KeyGeneratorClass => get_result.map(|v| HudiConfigValue::String(v.to_string())), Self::PartitionFields => get_result .map(|v| HudiConfigValue::List(v.split(',').map(str::to_string).collect())), Self::PrecombineField => get_result.map(|v| HudiConfigValue::String(v.to_string())), Self::PopulatesMetaFields => get_result - .and_then(|v| bool::from_str(v).map_err(|e| anyhow!(e))) + .and_then(|v| { + bool::from_str(v).map_err(|e| InvalidConf { + item: Self::PopulatesMetaFields.as_ref(), + source: Box::new(e), + }) + }) .map(HudiConfigValue::Boolean), Self::RecordKeyFields => get_result .map(|v| HudiConfigValue::List(v.split(',').map(str::to_string).collect())), @@ -120,10 +147,20 @@ impl ConfigParser for HudiTableConfig { .and_then(TableTypeValue::from_str) .map(|v| HudiConfigValue::String(v.as_ref().to_string())), Self::TableVersion => get_result - .and_then(|v| isize::from_str(v).map_err(|e| anyhow!(e))) + .and_then(|v| { + isize::from_str(v).map_err(|e| InvalidConf { + item: Self::TableVersion.as_ref(), + source: Box::new(e), + }) + }) .map(HudiConfigValue::Integer), Self::TimelineLayoutVersion => get_result - .and_then(|v| isize::from_str(v).map_err(|e| anyhow!(e))) + .and_then(|v| { + isize::from_str(v).map_err(|e| InvalidConf { + item: Self::TimelineLayoutVersion.as_ref(), + source: Box::new(e), + }) + }) .map(HudiConfigValue::Integer), } } @@ -138,13 +175,13 @@ pub enum TableTypeValue { } impl FromStr for TableTypeValue { - type Err = anyhow::Error; + type Err = Error; fn from_str(s: &str) -> Result { match s.to_ascii_lowercase().as_str() { "copy_on_write" | "copy-on-write" | "cow" => Ok(Self::CopyOnWrite), "merge_on_read" | "merge-on-read" | "mor" => Ok(Self::MergeOnRead), - _ => Err(anyhow!("Unsupported table type: {}", s)), + v => Err(Unsupported(format!("unsupported table type {}", v))), } } } @@ -156,12 +193,12 @@ pub enum BaseFileFormatValue { } impl FromStr for BaseFileFormatValue { - type Err = anyhow::Error; + type Err = Error; fn from_str(s: &str) -> Result { match s.to_ascii_lowercase().as_str() { "parquet" => Ok(Self::Parquet), - _ => Err(anyhow!("Unsupported base file format: {}", s)), + v => Err(Unsupported(format!("unsupported base file format {}", v))), } } } diff --git a/crates/core/src/file_group/mod.rs b/crates/core/src/file_group/mod.rs index 3dd1af3..9c071fb 100644 --- a/crates/core/src/file_group/mod.rs +++ b/crates/core/src/file_group/mod.rs @@ -23,11 +23,10 @@ use std::fmt::Formatter; use std::hash::{Hash, Hasher}; use std::path::PathBuf; -use anyhow::{anyhow, Result}; - use crate::storage::file_info::FileInfo; use crate::storage::file_stats::FileStats; use crate::storage::Storage; +use crate::{Error::Internal, Result}; #[derive(Clone, Debug)] pub struct BaseFile { @@ -40,10 +39,18 @@ pub struct BaseFile { impl BaseFile { fn parse_file_name(file_name: &str) -> Result<(String, String)> { let err_msg = format!("Failed to parse file name '{}' for base file.", file_name); - let (name, _) = file_name.rsplit_once('.').ok_or(anyhow!(err_msg.clone()))?; + let (name, _) = file_name + .rsplit_once('.') + .ok_or(Internal(err_msg.clone()))?; let parts: Vec<&str> = name.split('_').collect(); - let file_group_id = parts.first().ok_or(anyhow!(err_msg.clone()))?.to_string(); - let commit_time = parts.get(2).ok_or(anyhow!(err_msg.clone()))?.to_string(); + let file_group_id = parts + .first() + .ok_or(Internal(err_msg.clone()))? + .to_string(); + let commit_time = parts + .get(2) + .ok_or(Internal(err_msg.clone()))? + .to_string(); Ok((file_group_id, commit_time)) } @@ -162,11 +169,11 @@ impl FileGroup { pub fn add_base_file(&mut self, base_file: BaseFile) -> Result<&Self> { let commit_time = base_file.commit_time.as_str(); if self.file_slices.contains_key(commit_time) { - Err(anyhow!( + Err(Internal(format!( "Commit time {0} is already present in File Group {1}", commit_time.to_owned(), self.id, - )) + ))) } else { self.file_slices.insert( commit_time.to_owned(), diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 9b492e9..c4122e2 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -21,3 +21,46 @@ pub mod config; pub mod file_group; pub mod storage; pub mod table; + +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum Error { + #[error("Config '{0}' not found")] + ConfNotFound(String), + + #[error("Invalid config item '{item}', {source:?}")] + InvalidConf { + item: &'static str, + source: Box, + }, + + #[error("Parse url '{url}' failed, {source}")] + UrlParse { + url: String, + source: url::ParseError, + }, + + #[error("Invalid file path '{name}', {detail}")] + InvalidPath { name: String, detail: String }, + + #[error("{0}")] + Unsupported(String), + + #[error("{0}")] + Internal(String), + + #[error(transparent)] + Store(#[from] object_store::Error), + + #[error(transparent)] + StorePath(#[from] object_store::path::Error), + + #[error(transparent)] + Parquet(#[from] parquet::errors::ParquetError), + + #[error(transparent)] + Arrow(#[from] arrow::error::ArrowError), +} + +type Result = std::result::Result; diff --git a/crates/core/src/storage/mod.rs b/crates/core/src/storage/mod.rs index c7eb1ec..c2ab284 100644 --- a/crates/core/src/storage/mod.rs +++ b/crates/core/src/storage/mod.rs @@ -21,7 +21,6 @@ use std::collections::HashMap; use std::path::PathBuf; use std::sync::Arc; -use anyhow::{anyhow, Context, Result}; use arrow::compute::concat_batches; use arrow::record_batch::RecordBatch; use async_recursion::async_recursion; @@ -36,6 +35,7 @@ use url::Url; use crate::storage::file_info::FileInfo; use crate::storage::utils::join_url_segments; +use crate::{Error, Result}; pub mod file_info; pub mod file_stats; @@ -54,7 +54,7 @@ impl Storage { base_url, object_store: Arc::new(object_store), })), - Err(e) => Err(anyhow!("Failed to create storage: {}", e)), + Err(e) => Err(Error::Store(e)), } } @@ -66,15 +66,23 @@ impl Storage { runtime_env.register_object_store(self.base_url.as_ref(), self.object_store.clone()); } - #[cfg(test)] - async fn get_file_info(&self, relative_path: &str) -> Result { + fn get_relative_path(&self, relative_path: &str) -> Result<(Url, ObjPath)> { let obj_url = join_url_segments(&self.base_url, &[relative_path])?; let obj_path = ObjPath::from_url_path(obj_url.path())?; + Ok((obj_url, obj_path)) + } + + #[cfg(test)] + async fn get_file_info(&self, relative_path: &str) -> Result { + let (obj_url, obj_path) = self.get_relative_path(relative_path)?; let meta = self.object_store.head(&obj_path).await?; let uri = obj_url.to_string(); let name = obj_path .filename() - .ok_or(anyhow!("Failed to get file name for {}", obj_path))? + .ok_or(Error::InvalidPath { + name: obj_path.to_string(), + detail: "failed to get file name".to_string(), + })? .to_string(); Ok(FileInfo { uri, @@ -84,8 +92,7 @@ impl Storage { } pub async fn get_parquet_file_metadata(&self, relative_path: &str) -> Result { - let obj_url = join_url_segments(&self.base_url, &[relative_path])?; - let obj_path = ObjPath::from_url_path(obj_url.path())?; + let (_, obj_path) = self.get_relative_path(relative_path)?; let obj_store = self.object_store.clone(); let meta = obj_store.head(&obj_path).await?; let reader = ParquetObjectReader::new(obj_store, meta); @@ -94,16 +101,14 @@ impl Storage { } pub async fn get_file_data(&self, relative_path: &str) -> Result { - let obj_url = join_url_segments(&self.base_url, &[relative_path])?; - let obj_path = ObjPath::from_url_path(obj_url.path())?; + let (_, obj_path) = self.get_relative_path(relative_path)?; let result = self.object_store.get(&obj_path).await?; let bytes = result.bytes().await?; Ok(bytes) } pub async fn get_parquet_file_data(&self, relative_path: &str) -> Result { - let obj_url = join_url_segments(&self.base_url, &[relative_path])?; - let obj_path = ObjPath::from_url_path(obj_url.path())?; + let (_, obj_path) = self.get_relative_path(relative_path)?; let obj_store = self.object_store.clone(); let meta = obj_store.head(&obj_path).await?; @@ -115,16 +120,14 @@ impl Storage { let mut batches = Vec::new(); while let Some(r) = stream.next().await { - let batch = r.context("Failed to read record batch.")?; - batches.push(batch) + batches.push(r?) } if batches.is_empty() { return Ok(RecordBatch::new_empty(schema.clone())); } - concat_batches(&schema, &batches) - .map_err(|e| anyhow!("Failed to concat record batches: {}", e)) + Ok(concat_batches(&schema, &batches)?) } pub async fn list_dirs(&self, subdir: Option<&str>) -> Result> { @@ -133,7 +136,10 @@ impl Storage { for dir in dir_paths { dirs.push( dir.filename() - .ok_or(anyhow!("Failed to get file name for {}", dir))? + .ok_or(Error::InvalidPath { + name: dir.to_string(), + detail: "failed to get file name".to_string(), + })? .to_string(), ) } @@ -162,10 +168,10 @@ impl Storage { let name = obj_meta .location .filename() - .ok_or(anyhow!( - "Failed to get file name for {:?}", - obj_meta.location - ))? + .ok_or(Error::InvalidPath { + name: obj_meta.location.to_string(), + detail: "failed to get file name".to_string(), + })? .to_string(); let uri = join_url_segments(&prefix_url, &[&name])?.to_string(); file_info.push(FileInfo { @@ -191,9 +197,10 @@ pub async fn get_leaf_dirs(storage: &Storage, subdir: Option<&str>) -> Result Result<(String, String)> { let path = Path::new(filename); let stem = path .file_stem() .and_then(|s| s.to_str()) - .ok_or_else(|| anyhow!("No file stem found"))? + .ok_or_else(|| InvalidPath { + name: filename.to_string(), + detail: "no file stem found".to_string(), + })? .to_string(); let extension = path @@ -42,13 +48,20 @@ pub fn split_filename(filename: &str) -> Result<(String, String)> { } pub fn parse_uri(uri: &str) -> Result { - let mut url = Url::parse(uri) - .or(Url::from_file_path(PathBuf::from_str(uri)?)) - .map_err(|_| anyhow!("Failed to parse uri: {}", uri))?; + let mut url = match Url::parse(uri) { + Ok(url) => url, + Err(source) => Url::from_directory_path(uri).map_err(|_| UrlParse { + url: uri.to_string(), + source, + })?, + }; if url.path().ends_with('/') { url.path_segments_mut() - .map_err(|_| anyhow!("Failed to parse uri: {}", uri))? + .map_err(|_| InvalidPath { + name: uri.to_string(), + detail: "parse uri failed".to_string(), + })? .pop(); } @@ -69,7 +82,10 @@ pub fn join_url_segments(base_url: &Url, segments: &[&str]) -> Result { for &seg in segments { let segs: Vec<_> = seg.split('/').filter(|&s| !s.is_empty()).collect(); url.path_segments_mut() - .map_err(|_| ParseError::RelativeUrlWithoutBase)? + .map_err(|_| UrlParse { + url: base_url.to_string(), + source: ParseError::RelativeUrlWithoutBase, + })? .extend(segs); } diff --git a/crates/core/src/table/fs_view.rs b/crates/core/src/table/fs_view.rs index 65cc2a9..78b66d3 100644 --- a/crates/core/src/table/fs_view.rs +++ b/crates/core/src/table/fs_view.rs @@ -20,7 +20,6 @@ use std::collections::{HashMap, HashSet}; use std::sync::Arc; -use anyhow::{anyhow, Result}; use arrow::record_batch::RecordBatch; use dashmap::DashMap; use url::Url; @@ -29,6 +28,7 @@ use crate::config::HudiConfigs; use crate::file_group::{BaseFile, FileGroup, FileSlice}; use crate::storage::file_info::FileInfo; use crate::storage::{get_leaf_dirs, Storage}; +use crate::{Error::Internal, Result}; #[derive(Clone, Debug)] #[allow(dead_code)] @@ -83,7 +83,7 @@ impl FileSystemView { Ok(file_groups) => { partition_to_file_groups.insert(p, file_groups); } - Err(e) => return Err(anyhow!("Failed to load partitions: {}", e)), + Err(e) => return Err(Internal(format!("Failed to load partitions: {}", e))), } } Ok(partition_to_file_groups) diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs index e4e1a93..f9f1823 100644 --- a/crates/core/src/table/mod.rs +++ b/crates/core/src/table/mod.rs @@ -23,7 +23,6 @@ use std::io::{BufRead, BufReader}; use std::str::FromStr; use std::sync::Arc; -use anyhow::{anyhow, Context, Result}; use arrow::record_batch::RecordBatch; use arrow_schema::Schema; use strum::IntoEnumIterator; @@ -43,6 +42,7 @@ use crate::storage::utils::{empty_options, parse_uri}; use crate::storage::Storage; use crate::table::fs_view::FileSystemView; use crate::table::timeline::Timeline; +use crate::{Error, Result}; mod fs_view; mod timeline; @@ -69,20 +69,15 @@ impl Table { { let base_url = Arc::new(parse_uri(base_uri)?); - let (configs, extra_options) = Self::load_configs(base_url.clone(), all_options) - .await - .context("Failed to load table properties")?; + let (configs, extra_options) = Self::load_configs(base_url.clone(), all_options).await?; let configs = Arc::new(configs); let extra_options = Arc::new(extra_options); - let timeline = Timeline::new(base_url.clone(), extra_options.clone(), configs.clone()) - .await - .context("Failed to load timeline")?; + let timeline = + Timeline::new(base_url.clone(), extra_options.clone(), configs.clone()).await?; let file_system_view = - FileSystemView::new(base_url.clone(), extra_options.clone(), configs.clone()) - .await - .context("Failed to load file system view")?; + FileSystemView::new(base_url.clone(), extra_options.clone(), configs.clone()).await?; Ok(Table { base_url, @@ -133,7 +128,8 @@ impl Table { let cursor = std::io::Cursor::new(data); let lines = BufReader::new(cursor).lines(); for line in lines { - let line = line?; + let line = + line.map_err(|e| Error::Internal(format!("Invalid hoodie.properties {:?}", e)))?; let trimmed_line = line.trim(); if trimmed_line.is_empty() || trimmed_line.starts_with('#') { continue; @@ -177,20 +173,24 @@ impl Table { // additional validation let table_type = hudi_configs.get(TableType)?.to::(); if TableTypeValue::from_str(&table_type)? != CopyOnWrite { - return Err(anyhow!("Only support copy-on-write table.")); + return Err(Error::Unsupported( + "Only support copy-on-write table".to_string(), + )); } let table_version = hudi_configs.get(TableVersion)?.to::(); if !(5..=6).contains(&table_version) { - return Err(anyhow!("Only support table version 5 and 6.")); + return Err(Error::Unsupported( + "Only support table version 5 and 6".to_string(), + )); } let drops_partition_cols = hudi_configs.get(DropsPartitionFields)?.to::(); if drops_partition_cols { - return Err(anyhow!( + return Err(Error::Unsupported(format!( "Only support when `{}` is disabled", DropsPartitionFields.as_ref() - )); + ))); } Ok(()) @@ -226,8 +226,7 @@ impl Table { let excludes = self.timeline.get_replaced_file_groups().await?; self.file_system_view .load_file_slices_stats_as_of(timestamp, &excludes) - .await - .context("Fail to load file slice stats.")?; + .await?; self.file_system_view .get_file_slices_as_of(timestamp, &excludes) } @@ -244,15 +243,17 @@ impl Table { } async fn read_snapshot_as_of(&self, timestamp: &str) -> Result> { - let file_slices = self - .get_file_slices_as_of(timestamp) - .await - .context(format!("Failed to get file slices as of {}", timestamp))?; + let file_slices = self.get_file_slices_as_of(timestamp).await?; let mut batches = Vec::new(); for f in file_slices { match self.file_system_view.read_file_slice_unchecked(&f).await { Ok(batch) => batches.push(batch), - Err(e) => return Err(anyhow!("Failed to read file slice {:?} - {}", f, e)), + Err(e) => { + return Err(Error::Internal(format!( + "Failed to read file slice {:?} - {}", + f, e + ))) + } } } Ok(batches) diff --git a/crates/core/src/table/timeline.rs b/crates/core/src/table/timeline.rs index e502cd6..7dbbe6b 100644 --- a/crates/core/src/table/timeline.rs +++ b/crates/core/src/table/timeline.rs @@ -23,7 +23,6 @@ use std::fmt::Debug; use std::path::PathBuf; use std::sync::Arc; -use anyhow::{anyhow, Context, Result}; use arrow_schema::Schema; use parquet::arrow::parquet_to_arrow_schema; use serde_json::{Map, Value}; @@ -33,6 +32,7 @@ use crate::config::HudiConfigs; use crate::file_group::FileGroup; use crate::storage::utils::split_filename; use crate::storage::Storage; +use crate::{Error, Result}; #[allow(dead_code)] #[derive(Clone, Debug, Eq, PartialEq)] @@ -79,7 +79,10 @@ impl Instant { commit_file_path.push(self.file_name()); commit_file_path .to_str() - .ok_or(anyhow!("Failed to get file path for {:?}", self)) + .ok_or(Error::Internal(format!( + "Failed to get file path for {:?}", + self + ))) .map(|s| s.to_string()) } @@ -139,10 +142,11 @@ impl Timeline { .storage .get_file_data(instant.relative_path()?.as_str()) .await?; - let json: Value = serde_json::from_slice(&bytes)?; + let json: Value = serde_json::from_slice(&bytes) + .map_err(|e| Error::Internal(format!("Invalid instant {:?}", e)))?; let commit_metadata = json .as_object() - .ok_or_else(|| anyhow!("Expected JSON object"))? + .ok_or_else(|| Error::Internal("Expected JSON object".to_string()))? .clone(); Ok(commit_metadata) } @@ -166,17 +170,15 @@ impl Timeline { .and_then(|first_value| first_value["path"].as_str()); if let Some(path) = parquet_path { - let parquet_meta = self - .storage - .get_parquet_file_metadata(path) - .await - .context("Failed to get parquet file metadata")?; - - parquet_to_arrow_schema(parquet_meta.file_metadata().schema_descr(), None) - .context("Failed to resolve the latest schema") + let parquet_meta = self.storage.get_parquet_file_metadata(path).await?; + + Ok(parquet_to_arrow_schema( + parquet_meta.file_metadata().schema_descr(), + None, + )?) } else { - Err(anyhow!( - "Failed to resolve the latest schema: no file path found" + Err(Error::Internal( + "Failed to resolve the latest schema: no file path found".to_string(), )) } } diff --git a/python/src/internal.rs b/python/src/internal.rs index 141a74e..2670861 100644 --- a/python/src/internal.rs +++ b/python/src/internal.rs @@ -17,16 +17,32 @@ * under the License. */ use std::collections::HashMap; +use std::convert::From; use std::path::PathBuf; use std::sync::OnceLock; use anyhow::Context; use arrow::pyarrow::ToPyArrow; -use pyo3::{pyclass, pymethods, PyErr, PyObject, PyResult, Python}; +use pyo3::{exceptions::PyOSError, pyclass, pymethods, PyErr, PyObject, PyResult, Python}; use tokio::runtime::Runtime; use hudi::file_group::FileSlice; use hudi::table::Table; +use hudi::Error::Internal; + +struct HoodieError(hudi::Error); + +impl From for PyErr { + fn from(err: HoodieError) -> PyErr { + PyOSError::new_err(err.0.to_string()) + } +} + +impl From for HoodieError { + fn from(err: hudi::Error) -> Self { + Self(err) + } +} #[cfg(not(tarpaulin))] #[derive(Clone, Debug)] @@ -91,7 +107,7 @@ pub struct HudiTable { impl HudiTable { #[new] #[pyo3(signature = (table_uri, options = None))] - fn new(table_uri: &str, options: Option>) -> PyResult { + fn new(table_uri: &str, options: Option>) -> Result { let _table = rt().block_on(Table::new_with_options( table_uri, options.unwrap_or_default(), @@ -99,11 +115,17 @@ impl HudiTable { Ok(HudiTable { _table }) } - fn get_schema(&self, py: Python) -> PyResult { - rt().block_on(self._table.get_schema())?.to_pyarrow(py) + fn get_schema(&self, py: Python) -> Result { + rt().block_on(self._table.get_schema())? + .to_pyarrow(py) + .map_err(|e| HoodieError(Internal(e.to_string()))) } - fn split_file_slices(&self, n: usize, py: Python) -> PyResult>> { + fn split_file_slices( + &self, + n: usize, + py: Python, + ) -> Result>, HoodieError> { py.allow_threads(|| { let file_slices = rt().block_on(self._table.split_file_slices(n))?; Ok(file_slices @@ -113,20 +135,23 @@ impl HudiTable { }) } - fn get_file_slices(&self, py: Python) -> PyResult> { + fn get_file_slices(&self, py: Python) -> Result, HoodieError> { py.allow_threads(|| { let file_slices = rt().block_on(self._table.get_file_slices())?; Ok(file_slices.iter().map(convert_file_slice).collect()) }) } - fn read_file_slice(&self, relative_path: &str, py: Python) -> PyResult { + fn read_file_slice(&self, relative_path: &str, py: Python) -> Result { rt().block_on(self._table.read_file_slice_by_path(relative_path))? .to_pyarrow(py) + .map_err(|e| HoodieError(Internal(e.to_string()))) } - fn read_snapshot(&self, py: Python) -> PyResult { - rt().block_on(self._table.read_snapshot())?.to_pyarrow(py) + fn read_snapshot(&self, py: Python) -> Result { + rt().block_on(self._table.read_snapshot())? + .to_pyarrow(py) + .map_err(|e| HoodieError(Internal(e.to_string()))) } }