Skip to content

Commit

Permalink
Address @xushiyan's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
abyssnlp committed Aug 18, 2024
1 parent 65f0ae4 commit a28debc
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 58 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
name: PR

on:
pull_request_target:
pull_request:
types: [ opened, edited, reopened, synchronize ]
branches:
- main
Expand Down
7 changes: 7 additions & 0 deletions crates/tests/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,13 @@ impl TestTable {
path_buf.to_str().unwrap().to_string()
}

pub fn s3_path(&self) -> String {
let bucket = std::env::var("INTEGRATION_TEST_S3_BUCKET")
.expect("INTEGRATION_TEST_S3_BUCKET not set");
let data_path = Path::new(format!("s3://{}", bucket).as_str()).join(self.as_ref());
data_path.to_str().unwrap().to_string()
}

pub fn url(&self) -> Url {
let path = self.path();
Url::from_file_path(path).unwrap()
Expand Down
1 change: 1 addition & 0 deletions integration_test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ description = "Integration tests for hudi-rs"

[dependencies]
hudi = { features=["datafusion"], path="../crates/hudi" }
hudi-tests = { path="../crates/tests" }
datafusion = { version = "= 39.0.0" }
datafusion-expr = { version = "= 39.0.0" }
tokio = { version = "1", features = ["rt-multi-thread"] }
Expand Down
69 changes: 12 additions & 57 deletions integration_test/tests/hudi_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

use std::{path::Path, sync::Arc};
use std::sync::Arc;

use datafusion::{
arrow::{
Expand All @@ -28,9 +28,7 @@ use datafusion::{
scalar::ScalarValue,
};
use hudi::HudiDataSource;
use strum::IntoEnumIterator;
use strum_macros::{AsRefStr, EnumIter, EnumString};
use url::Url;
use hudi_tests::TestTable;

#[derive(PartialEq, Debug)]
struct Record {
Expand All @@ -39,43 +37,6 @@ struct Record {
is_active: bool,
}

#[derive(Debug, EnumString, EnumIter, AsRefStr, PartialEq)]
#[strum(serialize_all = "snake_case")]
pub enum IntegrationTestTable {
V6ComplexkeygenHivestyle,
V6Empty,
V6Nonpartitioned,
V6SimplekeygenHivestyleNoMetafields,
V6SimplekeygenNonhivestyle,
V6SimplekeygenNonhivestyleOverwritetable, // 1, 21
V6TimebasedkeygenNonhivestyle,
}

impl IntegrationTestTable {
pub fn path(&self) -> String {
let bucket = std::env::var("INTEGRATION_TEST_S3_BUCKET")
.expect("INTEGRATION_TEST_S3_BUCKET not set");
let data_path = Path::new(format!("s3://{}", bucket).as_str()).join(self.as_ref());
data_path.to_str().unwrap().to_string()
}

pub fn url(&self) -> Url {
let path = self.path();
Url::parse(&path).unwrap()
}
}

#[test]
fn test_table_path() {
for t in IntegrationTestTable::iter() {
let path = t.path();
let url = t.url();

assert_eq!(path, url.as_str());
assert_eq!(url.scheme(), "s3");
}
}

async fn prepare_session_context(
path: &str,
mut options: Vec<(&str, &str)>,
Expand Down Expand Up @@ -122,17 +83,13 @@ where
#[tokio::test]
async fn test_datafusion_read_tables() {
for (t, n_cols, n_rows) in &[
(IntegrationTestTable::V6ComplexkeygenHivestyle, 21, 4),
(IntegrationTestTable::V6Nonpartitioned, 21, 4),
(
IntegrationTestTable::V6SimplekeygenHivestyleNoMetafields,
21,
4,
),
(IntegrationTestTable::V6SimplekeygenNonhivestyle, 21, 4),
(IntegrationTestTable::V6TimebasedkeygenNonhivestyle, 22, 4),
(TestTable::V6ComplexkeygenHivestyle, 21, 4),
(TestTable::V6Nonpartitioned, 21, 4),
(TestTable::V6SimplekeygenHivestyleNoMetafields, 21, 4),
(TestTable::V6SimplekeygenNonhivestyle, 21, 4),
(TestTable::V6TimebasedkeygenNonhivestyle, 22, 4),
] {
let ctx = prepare_session_context(&t.path(), vec![], t.as_ref()).await;
let ctx = prepare_session_context(&t.s3_path(), vec![], t.as_ref()).await;
let df_rows = ctx
.sql(&format!("select count(*) from {}", t.as_ref()))
.await
Expand Down Expand Up @@ -208,16 +165,14 @@ async fn test_datafusion_read_tables() {
actual_data.push(Record {
id: *id,
name: name.unwrap().to_string(),
is_active
is_active,
});
}
});

assert!(
actual_data
.iter()
.all(|record| expected_data.contains(record))
);
assert!(actual_data
.iter()
.all(|record| expected_data.contains(record)));

let df_schema = ctx
.sql(&format!(
Expand Down

0 comments on commit a28debc

Please sign in to comment.