Skip to content

Commit

Permalink
fix conficts
Browse files Browse the repository at this point in the history
Signed-off-by: GoHalo <[email protected]>
  • Loading branch information
gohalo committed Dec 3, 2024
1 parent 2a7768d commit 1249a4e
Show file tree
Hide file tree
Showing 8 changed files with 81 additions and 127 deletions.
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ serde_json = { version = "1" }

# "stdlib"
thiserror = { version = "2.0.3" }
anyhow = { version = "1.0.86" }
bytes = { version = "1" }
paste = { version = "1.0.15" }
once_cell = { version = "1.19.0" }
Expand Down
10 changes: 2 additions & 8 deletions crates/core/src/file_group/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,8 @@ impl BaseFile {
.rsplit_once('.')
.ok_or(Internal(err_msg.clone()))?;
let parts: Vec<&str> = name.split('_').collect();
let file_group_id = parts
.first()
.ok_or(Internal(err_msg.clone()))?
.to_string();
let commit_time = parts
.get(2)
.ok_or(Internal(err_msg.clone()))?
.to_string();
let file_group_id = parts.first().ok_or(Internal(err_msg.clone()))?.to_string();
let commit_time = parts.get(2).ok_or(Internal(err_msg.clone()))?.to_string();
Ok((file_group_id, commit_time))
}

Expand Down
22 changes: 8 additions & 14 deletions crates/core/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,10 @@ impl Storage {
runtime_env.register_object_store(self.base_url.as_ref(), self.object_store.clone());
}

fn get_relative_path(&self, relative_path: &str) -> Result<(Url, ObjPath)> {
let obj_url = join_url_segments(&self.base_url, &[relative_path])?;
let obj_path = ObjPath::from_url_path(obj_url.path())?;
Ok((obj_url, obj_path))
}

#[cfg(test)]
async fn get_file_info(&self, relative_path: &str) -> Result<FileInfo> {
let (obj_url, obj_path) = self.get_relative_path(relative_path)?;
let obj_url = join_url_segments(&self.base_url, &[relative_path])?;
let obj_path = ObjPath::from_url_path(obj_url.path())?;
let meta = self.object_store.head(&obj_path).await?;
let uri = obj_url.to_string();
let name = obj_path
Expand All @@ -127,7 +122,8 @@ impl Storage {
}

pub async fn get_parquet_file_metadata(&self, relative_path: &str) -> Result<ParquetMetaData> {
let (_, obj_path) = self.get_relative_path(relative_path)?;
let obj_url = join_url_segments(&self.base_url, &[relative_path])?;
let obj_path = ObjPath::from_url_path(obj_url.path())?;
let obj_store = self.object_store.clone();
let meta = obj_store.head(&obj_path).await?;
let reader = ParquetObjectReader::new(obj_store, meta);
Expand All @@ -136,7 +132,8 @@ impl Storage {
}

pub async fn get_file_data(&self, relative_path: &str) -> Result<Bytes> {
let (_, obj_path) = self.get_relative_path(relative_path)?;
let obj_url = join_url_segments(&self.base_url, &[relative_path])?;
let obj_path = ObjPath::from_url_path(obj_url.path())?;
let result = self.object_store.get(&obj_path).await?;
let bytes = result.bytes().await?;
Ok(bytes)
Expand All @@ -150,7 +147,8 @@ impl Storage {
}

pub async fn get_parquet_file_data(&self, relative_path: &str) -> Result<RecordBatch> {
let (_, obj_path) = self.get_relative_path(relative_path)?;
let obj_url = join_url_segments(&self.base_url, &[relative_path])?;
let obj_path = ObjPath::from_url_path(obj_url.path())?;
let obj_store = self.object_store.clone();
let meta = obj_store.head(&obj_path).await?;

Expand Down Expand Up @@ -301,10 +299,6 @@ mod tests {
result.is_err(),
"Should return error when no base path is invalid."
);
assert!(result
.unwrap_err()
.to_string()
.contains("Failed to create storage"));
}

#[tokio::test]
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/storage/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::{
Result,
};

/// Splits a filename into a stem and an extension.
pub fn split_filename(filename: &str) -> Result<(String, String)> {
let path = Path::new(filename);

Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/table/fs_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::storage::{get_leaf_dirs, Storage};
use crate::table::partition::PartitionPruner;
use crate::{Error, Result};
use dashmap::DashMap;
use futures::stream::{self, StreamExt};
use futures::stream::{self, StreamExt, TryStreamExt};

/// A view of the Hudi table's data files (files stored outside the `.hoodie/` directory) in the file system. It provides APIs to load and
/// access the file groups and file slices.
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/table/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ mod tests {
assert!(filter
.unwrap_err()
.to_string()
.contains("not found in partition schema"));
.contains("Unable to get field named"));
}

#[test]
Expand All @@ -309,7 +309,7 @@ mod tests {
let filter_tuple = ("count", "=", "not_a_number");
let filter = PartitionFilter::try_from((filter_tuple, &schema));
assert!(filter.is_err());
assert!(filter.unwrap_err().to_string().contains("Unable to cast"));
assert!(filter.unwrap_err().to_string().contains("Cannot cast string"));
}

#[test]
Expand Down
1 change: 0 additions & 1 deletion python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ arrow = { workspace = true }
arrow-schema = { workspace = true }

# "stdlib"
anyhow = { workspace = true }

# runtime / async
futures = { workspace = true }
Expand Down
167 changes: 67 additions & 100 deletions python/src/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,64 +22,66 @@ use std::convert::From;
use std::path::PathBuf;
use std::sync::OnceLock;

use anyhow::Context;
use arrow::pyarrow::ToPyArrow;
//<<<<<<< HEAD
use pyo3::{pyclass, pyfunction, pymethods, PyErr, PyObject, PyResult, Python};
//=======
//use pyo3::{exceptions::PyOSError, pyclass, pymethods, PyErr, PyObject, PyResult, Python};
//>>>>>>> 2f6f596 (refact: define hudi error types for hudi-core crate)
use pyo3::{
exceptions::PyOSError, pyclass, pyfunction, pymethods, PyErr, PyObject, PyResult, Python,
};
use tokio::runtime::Runtime;

use hudi::file_group::reader::FileGroupReader;
use hudi::file_group::FileSlice;
use hudi::table::builder::TableBuilder;
use hudi::table::Table;
//<<<<<<< HEAD
//use hudi::util::convert_vec_to_slice;
//use hudi::util::vec_to_slice;
//
//#[cfg(not(tarpaulin))]
//#[derive(Clone, Debug)]
//#[pyclass]
//pub struct HudiFileGroupReader {
// inner: FileGroupReader,
//}
//
//#[cfg(not(tarpaulin))]
//#[pymethods]
//impl HudiFileGroupReader {
// #[new]
// #[pyo3(signature = (base_uri, options=None))]
// fn new(base_uri: &str, options: Option<HashMap<String, String>>) -> PyResult<Self> {
// let inner = FileGroupReader::new_with_options(base_uri, options.unwrap_or_default())?;
// Ok(HudiFileGroupReader { inner })
// }
//
// fn read_file_slice_by_base_file_path(
// &self,
// relative_path: &str,
// py: Python,
// ) -> PyResult<PyObject> {
// rt().block_on(self.inner.read_file_slice_by_base_file_path(relative_path))?
// .to_pyarrow(py)
//=======
//use hudi::Error::Internal;
//
//struct HoodieError(hudi::Error);
//
//impl From<HoodieError> for PyErr {
// fn from(err: HoodieError) -> PyErr {
// PyOSError::new_err(err.0.to_string())
// }
//}
//
//impl From<hudi::Error> for HoodieError {
// fn from(err: hudi::Error) -> Self {
// Self(err)
//>>>>>>> 2f6f596 (refact: define hudi error types for hudi-core crate)
// }
//}
use hudi::util::convert_vec_to_slice;
use hudi::util::vec_to_slice;

pub struct HoodieError(hudi::Error);

impl From<HoodieError> for PyErr {
fn from(err: HoodieError) -> PyErr {
PyOSError::new_err(err.0.to_string())

Check warning on line 42 in python/src/internal.rs

View check run for this annotation

Codecov / codecov/patch

python/src/internal.rs#L41-L42

Added lines #L41 - L42 were not covered by tests
}
}

impl From<hudi::Error> for HoodieError {
fn from(err: hudi::Error) -> Self {
Self(err)

Check warning on line 48 in python/src/internal.rs

View check run for this annotation

Codecov / codecov/patch

python/src/internal.rs#L47-L48

Added lines #L47 - L48 were not covered by tests
}
}

impl From<pyo3::PyErr> for HoodieError {
fn from(err: pyo3::PyErr) -> Self {
Self(hudi::Error::Internal(err.to_string()))

Check warning on line 54 in python/src/internal.rs

View check run for this annotation

Codecov / codecov/patch

python/src/internal.rs#L53-L54

Added lines #L53 - L54 were not covered by tests
}
}

#[cfg(not(tarpaulin))]
#[derive(Clone, Debug)]
#[pyclass]
pub struct HudiFileGroupReader {
inner: FileGroupReader,
}

#[cfg(not(tarpaulin))]
#[pymethods]
impl HudiFileGroupReader {
#[new]
#[pyo3(signature = (base_uri, options=None))]
fn new(base_uri: &str, options: Option<HashMap<String, String>>) -> Result<Self, HoodieError> {
let inner = FileGroupReader::new_with_options(base_uri, options.unwrap_or_default())?;
Ok(HudiFileGroupReader { inner })
}

fn read_file_slice_by_base_file_path(
&self,
relative_path: &str,
py: Python,
) -> Result<PyObject, HoodieError> {
rt().block_on(self.inner.read_file_slice_by_base_file_path(relative_path))?
.to_pyarrow(py)
.map_err(HoodieError::from)
}
}

#[cfg(not(tarpaulin))]
#[derive(Clone, Debug)]
Expand All @@ -104,16 +106,15 @@ pub struct HudiFileSlice {
#[cfg(not(tarpaulin))]
#[pymethods]
impl HudiFileSlice {
fn base_file_relative_path(&self) -> PyResult<String> {
fn base_file_relative_path(&self) -> Result<String, HoodieError> {
PathBuf::from(&self.partition_path)
.join(&self.base_file_name)
.to_str()
.map(String::from)
.context(format!(
.ok_or(HoodieError(hudi::Error::Internal(format!(
"Failed to get base file relative path for file slice: {:?}",
self
))
.map_err(PyErr::from)
))))
}
}

Expand Down Expand Up @@ -148,20 +149,13 @@ pub struct HudiTable {
#[pymethods]
impl HudiTable {
#[new]
<<<<<<< HEAD
#[pyo3(signature = (base_uri, options=None))]
fn new_with_options(
base_uri: &str,
options: Option<HashMap<String, String>>,
) -> PyResult<Self> {
) -> Result<Self, HoodieError> {
let inner: Table = rt().block_on(Table::new_with_options(
base_uri,
=======
#[pyo3(signature = (table_uri, options = None))]
fn new(table_uri: &str, options: Option<HashMap<String, String>>) -> Result<Self, HoodieError> {
let _table = rt().block_on(Table::new_with_options(
table_uri,
>>>>>>> 2f6f596 (refact: define hudi error types for hudi-core crate)
options.unwrap_or_default(),
))?;
Ok(HudiTable { inner })
Expand All @@ -175,14 +169,16 @@ impl HudiTable {
self.inner.storage_options()
}

<<<<<<< HEAD
fn get_schema(&self, py: Python) -> PyResult<PyObject> {
rt().block_on(self.inner.get_schema())?.to_pyarrow(py)
fn get_schema(&self, py: Python) -> Result<PyObject, HoodieError> {
rt().block_on(self.inner.get_schema())?
.to_pyarrow(py)
.map_err(HoodieError::from)
}

fn get_partition_schema(&self, py: Python) -> PyResult<PyObject> {
fn get_partition_schema(&self, py: Python) -> Result<PyObject, HoodieError> {
rt().block_on(self.inner.get_partition_schema())?
.to_pyarrow(py)
.map_err(HoodieError::from)
}

#[pyo3(signature = (n, filters=None))]
Expand All @@ -191,20 +187,7 @@ impl HudiTable {
n: usize,
filters: Option<Vec<(String, String, String)>>,
py: Python,
) -> PyResult<Vec<Vec<HudiFileSlice>>> {
=======
fn get_schema(&self, py: Python) -> Result<PyObject, HoodieError> {
rt().block_on(self._table.get_schema())?
.to_pyarrow(py)
.map_err(|e| HoodieError(Internal(e.to_string())))
}

fn split_file_slices(
&self,
n: usize,
py: Python,
) -> Result<Vec<Vec<HudiFileSlice>>, HoodieError> {
>>>>>>> 2f6f596 (refact: define hudi error types for hudi-core crate)
py.allow_threads(|| {
let file_slices = rt().block_on(
self.inner
Expand All @@ -217,16 +200,12 @@ impl HudiTable {
})
}

<<<<<<< HEAD
#[pyo3(signature = (filters=None))]
fn get_file_slices(
&self,
filters: Option<Vec<(String, String, String)>>,
py: Python,
) -> PyResult<Vec<HudiFileSlice>> {
=======
fn get_file_slices(&self, py: Python) -> Result<Vec<HudiFileSlice>, HoodieError> {
>>>>>>> 2f6f596 (refact: define hudi error types for hudi-core crate)
) -> Result<Vec<HudiFileSlice>, HoodieError> {
py.allow_threads(|| {
let file_slices = rt().block_on(
self.inner
Expand All @@ -236,7 +215,6 @@ impl HudiTable {
})
}

<<<<<<< HEAD
fn create_file_group_reader(&self) -> PyResult<HudiFileGroupReader> {
let fg_reader = self.inner.create_file_group_reader();
Ok(HudiFileGroupReader { inner: fg_reader })
Expand All @@ -247,24 +225,13 @@ impl HudiTable {
&self,
filters: Option<Vec<(String, String, String)>>,
py: Python,
) -> PyResult<PyObject> {
) -> Result<PyObject, HoodieError> {
rt().block_on(
self.inner
.read_snapshot(vec_to_slice!(filters.unwrap_or_default())),
)?
.to_pyarrow(py)
=======
fn read_file_slice(&self, relative_path: &str, py: Python) -> Result<PyObject, HoodieError> {
rt().block_on(self._table.read_file_slice_by_path(relative_path))?
.to_pyarrow(py)
.map_err(|e| HoodieError(Internal(e.to_string())))
}

fn read_snapshot(&self, py: Python) -> Result<PyObject, HoodieError> {
rt().block_on(self._table.read_snapshot())?
.to_pyarrow(py)
.map_err(|e| HoodieError(Internal(e.to_string())))
>>>>>>> 2f6f596 (refact: define hudi error types for hudi-core crate)
.map_err(HoodieError::from)
}
}

Expand All @@ -276,7 +243,7 @@ pub fn build_hudi_table(
hudi_options: Option<HashMap<String, String>>,
storage_options: Option<HashMap<String, String>>,
options: Option<HashMap<String, String>>,
) -> PyResult<HudiTable> {
) -> Result<HudiTable, HoodieError> {
let inner = rt().block_on(
TableBuilder::from_base_uri(&base_uri)
.with_hudi_options(hudi_options.unwrap_or_default())
Expand Down

0 comments on commit 1249a4e

Please sign in to comment.