From 068f935c538057d33223cf2d04f9221e113b1f39 Mon Sep 17 00:00:00 2001 From: Ceng <441651826@qq.com> Date: Fri, 27 Sep 2024 11:49:59 +0800 Subject: [PATCH] [NativeIO/Fix] Add error info of native writer && fix case of aux_sort_cols (#547) * add error info of native writer && fix case of aux_sort_cols Signed-off-by: zenghua * fix clippy Signed-off-by: zenghua * do cargo fmt Signed-off-by: zenghua --------- Signed-off-by: zenghua Co-authored-by: zenghua --- rust/lakesoul-io/src/async_writer/mod.rs | 1 - .../src/async_writer/multipart_writer.rs | 9 +- .../src/async_writer/partitioning_writer.rs | 37 ++-- .../src/async_writer/sort_writer.rs | 8 +- rust/lakesoul-io/src/filter/parser.rs | 7 +- rust/lakesoul-io/src/helpers.rs | 37 ++-- rust/lakesoul-io/src/lakesoul_io_config.rs | 4 +- rust/lakesoul-io/src/lakesoul_writer.rs | 201 ++++++++++-------- rust/lakesoul-io/src/transform.rs | 42 ++-- 9 files changed, 178 insertions(+), 168 deletions(-) diff --git a/rust/lakesoul-io/src/async_writer/mod.rs b/rust/lakesoul-io/src/async_writer/mod.rs index bbbe8b000..cb9d164f7 100644 --- a/rust/lakesoul-io/src/async_writer/mod.rs +++ b/rust/lakesoul-io/src/async_writer/mod.rs @@ -32,7 +32,6 @@ use datafusion::{ use datafusion_common::{DataFusionError, Result}; use parquet::format::FileMetaData; - // The result of a flush operation with format (partition_desc, file_path, file_meta) pub type WriterFlushResult = Result>; diff --git a/rust/lakesoul-io/src/async_writer/multipart_writer.rs b/rust/lakesoul-io/src/async_writer/multipart_writer.rs index f249809b3..646a47f76 100644 --- a/rust/lakesoul-io/src/async_writer/multipart_writer.rs +++ b/rust/lakesoul-io/src/async_writer/multipart_writer.rs @@ -15,10 +15,13 @@ use tokio::io::{AsyncWrite, AsyncWriteExt}; use url::Url; use crate::{ - constant::TBD_PARTITION_DESC, helpers::get_batch_memory_size, lakesoul_io_config::{create_session_context, LakeSoulIOConfig}, transform::{uniform_record_batch, uniform_schema} + constant::TBD_PARTITION_DESC, + helpers::get_batch_memory_size, + lakesoul_io_config::{create_session_context, LakeSoulIOConfig}, + transform::{uniform_record_batch, uniform_schema}, }; -use super::{AsyncBatchWriter, WriterFlushResult, InMemBuf}; +use super::{AsyncBatchWriter, InMemBuf, WriterFlushResult}; /// An async writer using object_store's multi-part upload feature for cloud storage. /// This writer uses a `VecDeque` as `std::io::Write` for arrow-rs's ArrowWriter. @@ -169,7 +172,6 @@ impl MultiPartAsyncWriter { #[async_trait::async_trait] impl AsyncBatchWriter for MultiPartAsyncWriter { - async fn write_record_batch(&mut self, batch: RecordBatch) -> Result<()> { let batch = uniform_record_batch(batch)?; self.num_rows += batch.num_rows() as u64; @@ -213,5 +215,4 @@ impl AsyncBatchWriter for MultiPartAsyncWriter { fn buffered_size(&self) -> u64 { self.buffered_size } - } diff --git a/rust/lakesoul-io/src/async_writer/partitioning_writer.rs b/rust/lakesoul-io/src/async_writer/partitioning_writer.rs index 288d213ab..461e634aa 100644 --- a/rust/lakesoul-io/src/async_writer/partitioning_writer.rs +++ b/rust/lakesoul-io/src/async_writer/partitioning_writer.rs @@ -13,29 +13,26 @@ use datafusion::{ PhysicalSortExpr, }, physical_plan::{ - projection::ProjectionExec, - sorts::sort::SortExec, - stream::RecordBatchReceiverStream, - ExecutionPlan, Partitioning, PhysicalExpr, + projection::ProjectionExec, sorts::sort::SortExec, stream::RecordBatchReceiverStream, ExecutionPlan, + Partitioning, PhysicalExpr, }, }; use datafusion_common::{DataFusionError, Result}; use rand::distributions::DistString; -use tokio::{ - sync::mpsc::Sender, - task::JoinHandle, -}; +use tokio::{sync::mpsc::Sender, task::JoinHandle}; use tokio_stream::StreamExt; use tracing::debug; use crate::{ - helpers::{columnar_values_to_partition_desc, columnar_values_to_sub_path, get_batch_memory_size, get_columnar_values}, + helpers::{ + columnar_values_to_partition_desc, columnar_values_to_sub_path, get_batch_memory_size, get_columnar_values, + }, lakesoul_io_config::{create_session_context, LakeSoulIOConfig, LakeSoulIOConfigBuilder}, repartition::RepartitionByRangeAndHashExec, }; -use super::{AsyncBatchWriter, WriterFlushResult, MultiPartAsyncWriter, ReceiverStreamExec}; +use super::{AsyncBatchWriter, MultiPartAsyncWriter, ReceiverStreamExec, WriterFlushResult}; // type PartitionedWriterInfo = Arc>>>; @@ -75,7 +72,7 @@ impl PartitioningAsyncWriter { task_context.clone(), config.clone().into(), Arc::new(config.range_partitions.clone()), - write_id.clone() + write_id.clone(), )); // // In a separate task, wait for each input to be done // // (and pass along any errors, including panic!s) @@ -198,7 +195,6 @@ impl PartitioningAsyncWriter { let mut err = None; - let mut partitioned_writer = HashMap::>::new(); let mut flush_join_handle_list = Vec::new(); // let mut partitioned_flush_result_locked = partitioned_flush_result.lock().await; @@ -230,7 +226,6 @@ impl PartitioningAsyncWriter { // row_count += batch_excluding_range.num_rows(); async_writer.write_record_batch(batch_excluding_range).await?; } - } // received abort signal Err(e) => { @@ -256,19 +251,13 @@ impl PartitioningAsyncWriter { } Ok(flush_join_handle_list) } else { - for (partition_desc, writer) in partitioned_writer.into_iter() { - let flush_result = tokio::spawn(async move { - let writer_flush_results =writer.flush_and_close().await?; - Ok( - writer_flush_results.into_iter().map( - |(_, path, file_metadata)| - { - (partition_desc.clone(), path, file_metadata) - } - ).collect::>() - ) + let writer_flush_results = writer.flush_and_close().await?; + Ok(writer_flush_results + .into_iter() + .map(|(_, path, file_metadata)| (partition_desc.clone(), path, file_metadata)) + .collect::>()) }); flush_join_handle_list.push(flush_result); } diff --git a/rust/lakesoul-io/src/async_writer/sort_writer.rs b/rust/lakesoul-io/src/async_writer/sort_writer.rs index c10fa4ce1..de4d513e7 100644 --- a/rust/lakesoul-io/src/async_writer/sort_writer.rs +++ b/rust/lakesoul-io/src/async_writer/sort_writer.rs @@ -12,10 +12,8 @@ use datafusion::{ PhysicalSortExpr, }, physical_plan::{ - projection::ProjectionExec, - sorts::sort::SortExec, - stream::RecordBatchReceiverStream, - ExecutionPlan, PhysicalExpr, + projection::ProjectionExec, sorts::sort::SortExec, stream::RecordBatchReceiverStream, ExecutionPlan, + PhysicalExpr, }, }; use datafusion_common::{DataFusionError, Result}; @@ -24,7 +22,7 @@ use tokio_stream::StreamExt; use crate::{helpers::get_batch_memory_size, lakesoul_io_config::LakeSoulIOConfig}; -use super::{AsyncBatchWriter, WriterFlushResult, MultiPartAsyncWriter, ReceiverStreamExec}; +use super::{AsyncBatchWriter, MultiPartAsyncWriter, ReceiverStreamExec, WriterFlushResult}; /// Wrap the above async writer with a SortExec to /// sort the batches before write to async writer diff --git a/rust/lakesoul-io/src/filter/parser.rs b/rust/lakesoul-io/src/filter/parser.rs index 0d3369e0c..6ab559938 100644 --- a/rust/lakesoul-io/src/filter/parser.rs +++ b/rust/lakesoul-io/src/filter/parser.rs @@ -229,7 +229,6 @@ impl Parser { } pub(crate) fn parse_proto(plan: &Plan, df_schema: &DFSchema) -> Result { - let function_extension = plan .extensions .iter() @@ -733,7 +732,10 @@ fn _from_nullability(nullability: Nullability) -> bool { mod tests { use std::result::Result; - use datafusion::{logical_expr::{LogicalPlan, TableScan}, prelude::{ParquetReadOptions, SessionContext}}; + use datafusion::{ + logical_expr::{LogicalPlan, TableScan}, + prelude::{ParquetReadOptions, SessionContext}, + }; use prost::Message; use super::*; @@ -750,7 +752,6 @@ mod tests { #[tokio::test] async fn tt() { - let ctx = SessionContext::new(); let options = ParquetReadOptions::default(); let table_path = "/var/folders/_b/qyl87wbn1119cvw8kts6fqtw0000gn/T/lakeSource/type/part-00000-97db3149-f99e-404a-aa9a-2af4ab3f7a44_00000.c000.parquet"; diff --git a/rust/lakesoul-io/src/helpers.rs b/rust/lakesoul-io/src/helpers.rs index 8999a5d16..398ef72ce 100644 --- a/rust/lakesoul-io/src/helpers.rs +++ b/rust/lakesoul-io/src/helpers.rs @@ -32,9 +32,9 @@ use url::Url; use crate::{ constant::{ - DATE32_FORMAT, FLINK_TIMESTAMP_FORMAT, LAKESOUL_EMPTY_STRING, LAKESOUL_NULL_STRING, - TIMESTAMP_MICROSECOND_FORMAT, TIMESTAMP_MILLSECOND_FORMAT, TIMESTAMP_NANOSECOND_FORMAT, - TIMESTAMP_SECOND_FORMAT, LAKESOUL_COMMA, LAKESOUL_EQ + DATE32_FORMAT, FLINK_TIMESTAMP_FORMAT, LAKESOUL_COMMA, LAKESOUL_EMPTY_STRING, LAKESOUL_EQ, + LAKESOUL_NULL_STRING, TIMESTAMP_MICROSECOND_FORMAT, TIMESTAMP_MILLSECOND_FORMAT, TIMESTAMP_NANOSECOND_FORMAT, + TIMESTAMP_SECOND_FORMAT, }, filter::parser::Parser, lakesoul_io_config::LakeSoulIOConfig, @@ -169,12 +169,10 @@ pub fn format_scalar_value(v: &ScalarValue) -> String { } ScalarValue::Decimal128(Some(s), _, _) => format!("{}", s), ScalarValue::Decimal256(Some(s), _, _) => format!("{}", s), - ScalarValue::Binary(e) - | ScalarValue::FixedSizeBinary(_, e) - | ScalarValue::LargeBinary(e) => match e { - Some(bytes) => hex::encode(bytes), - None => LAKESOUL_NULL_STRING.to_string(), - } + ScalarValue::Binary(e) | ScalarValue::FixedSizeBinary(_, e) | ScalarValue::LargeBinary(e) => match e { + Some(bytes) => hex::encode(bytes), + None => LAKESOUL_NULL_STRING.to_string(), + }, other => other.to_string(), } } @@ -192,7 +190,7 @@ pub fn into_scalar_value(val: &str, data_type: &DataType) -> Result }, DataType::Decimal128(p, s) => Ok(ScalarValue::Decimal128(None, *p, *s)), DataType::Decimal256(p, s) => Ok(ScalarValue::Decimal256(None, *p, *s)), - DataType::Binary=> Ok(ScalarValue::Binary(None)), + DataType::Binary => Ok(ScalarValue::Binary(None)), DataType::FixedSizeBinary(size) => Ok(ScalarValue::FixedSizeBinary(*size, None)), DataType::LargeBinary => Ok(ScalarValue::LargeBinary(None)), _ => Ok(ScalarValue::Null), @@ -204,7 +202,9 @@ pub fn into_scalar_value(val: &str, data_type: &DataType) -> Result if val.eq(LAKESOUL_EMPTY_STRING) { Ok(ScalarValue::Utf8(Some("".to_string()))) } else { - Ok(ScalarValue::Utf8(Some(val.replace(LAKESOUL_EQ, "=").replace(LAKESOUL_COMMA, ",")))) + Ok(ScalarValue::Utf8(Some( + val.replace(LAKESOUL_EQ, "=").replace(LAKESOUL_COMMA, ","), + ))) } } DataType::Timestamp(unit, timezone) => match unit { @@ -264,7 +264,7 @@ pub fn into_scalar_value(val: &str, data_type: &DataType) -> Result }, DataType::Decimal128(p, s) => Ok(ScalarValue::Decimal128(Some(val.parse::().unwrap()), *p, *s)), DataType::Decimal256(p, s) => Ok(ScalarValue::Decimal256(Some(i256::from_string(val).unwrap()), *p, *s)), - DataType::Binary=> Ok(ScalarValue::Binary(Some(hex::decode(val).unwrap()))), + DataType::Binary => Ok(ScalarValue::Binary(Some(hex::decode(val).unwrap()))), DataType::FixedSizeBinary(size) => Ok(ScalarValue::FixedSizeBinary(*size, Some(hex::decode(val).unwrap()))), DataType::LargeBinary => Ok(ScalarValue::LargeBinary(Some(hex::decode(val).unwrap()))), _ => ScalarValue::try_from_string(val.to_string(), data_type), @@ -526,7 +526,11 @@ pub fn timestamp_str_to_unix_time(value: &str, fmt: &str) -> Result { Ok(datetime.signed_duration_since(epoch_time.naive_utc())) } -pub fn column_with_name_and_name2index<'a>(schema: &'a SchemaRef, name: &str, name_to_index: &Option>) -> Option<(usize, &'a Field)> { +pub fn column_with_name_and_name2index<'a>( + schema: &'a SchemaRef, + name: &str, + name_to_index: &Option>, +) -> Option<(usize, &'a Field)> { if let Some(name_to_index) = name_to_index { name_to_index.get(name).map(|index| (*index, schema.field(*index))) } else { @@ -535,12 +539,11 @@ pub fn column_with_name_and_name2index<'a>(schema: &'a SchemaRef, name: &str, na } pub fn get_batch_memory_size(batch: &RecordBatch) -> Result { - Ok( - batch.columns() + Ok(batch + .columns() .iter() .map(|array| array.to_data().get_slice_memory_size()) .collect::, ArrowError>>()? .into_iter() - .sum() - ) + .sum()) } diff --git a/rust/lakesoul-io/src/lakesoul_io_config.rs b/rust/lakesoul-io/src/lakesoul_io_config.rs index e4b719ed7..c725a0fe4 100644 --- a/rust/lakesoul-io/src/lakesoul_io_config.rs +++ b/rust/lakesoul-io/src/lakesoul_io_config.rs @@ -47,11 +47,9 @@ pub static OPTION_DEFAULT_VALUE_KEEP_ORDERS: &str = "false"; pub static OPTION_KEY_MEM_LIMIT: &str = "mem_limit"; pub static OPTION_KEY_POOL_SIZE: &str = "pool_size"; -pub static OPTION_KEY_HASH_BUCKET_ID : &str = "hash_bucket_id"; +pub static OPTION_KEY_HASH_BUCKET_ID: &str = "hash_bucket_id"; pub static OPTION_KEY_MAX_FILE_SIZE: &str = "max_file_size"; - - #[derive(Debug, Derivative)] #[derivative(Default, Clone)] pub struct LakeSoulIOConfig { diff --git a/rust/lakesoul-io/src/lakesoul_writer.rs b/rust/lakesoul-io/src/lakesoul_writer.rs index d77667d16..38704aee5 100644 --- a/rust/lakesoul-io/src/lakesoul_writer.rs +++ b/rust/lakesoul-io/src/lakesoul_writer.rs @@ -20,7 +20,6 @@ use crate::helpers::get_batch_memory_size; use crate::lakesoul_io_config::{IOSchema, LakeSoulIOConfig}; use crate::transform::uniform_schema; - pub type SendableWriter = Box; // inner is sort writer @@ -38,24 +37,9 @@ impl SyncSendableMutableLakeSoulWriter { pub fn try_new(config: LakeSoulIOConfig, runtime: Runtime) -> Result { let runtime = Arc::new(runtime); runtime.clone().block_on(async move { - // if aux sort cols exist, we need to adjust the schema of final writer - // to exclude all aux sort cols - let writer_schema: SchemaRef = if !config.aux_sort_cols.is_empty() { - let schema = config.target_schema.0.clone(); - // O(nm), n = number of target schema fields, m = number of aux sort cols - let proj_indices = schema - .fields - .iter() - .filter(|f| !config.aux_sort_cols.contains(f.name())) - .map(|f| schema.index_of(f.name().as_str()).map_err(DataFusionError::ArrowError)) - .collect::>>()?; - Arc::new(schema.project(proj_indices.borrow())?) - } else { - config.target_schema.0.clone() - }; let writer_config = config.clone(); let mut config = config.clone(); - let writer = Self::create_writer(writer_schema, writer_config).await?; + let writer = Self::create_writer(writer_config).await?; let schema = writer.schema(); if let Some(mem_limit) = config.mem_limit() { @@ -63,7 +47,7 @@ impl SyncSendableMutableLakeSoulWriter { config.max_file_size = Some((mem_limit as f64 * 0.15) as u64); } else if !config.primary_keys.is_empty() && !config.keep_ordering() { config.max_file_size = Some((mem_limit as f64 * 0.2) as u64); - } + } } Ok(SyncSendableMutableLakeSoulWriter { @@ -76,9 +60,25 @@ impl SyncSendableMutableLakeSoulWriter { }) } - async fn create_writer(writer_schema: SchemaRef, config: LakeSoulIOConfig) -> Result> { + async fn create_writer(config: LakeSoulIOConfig) -> Result> { + // if aux sort cols exist, we need to adjust the schema of final writer + // to exclude all aux sort cols + let writer_schema: SchemaRef = if !config.aux_sort_cols.is_empty() { + let schema = config.target_schema.0.clone(); + // O(nm), n = number of target schema fields, m = number of aux sort cols + let proj_indices = schema + .fields + .iter() + .filter(|f| !config.aux_sort_cols.contains(f.name())) + .map(|f| schema.index_of(f.name().as_str()).map_err(DataFusionError::ArrowError)) + .collect::>>()?; + Arc::new(schema.project(proj_indices.borrow())?) + } else { + config.target_schema.0.clone() + }; + let mut writer_config = config.clone(); - let writer : Box = if config.use_dynamic_partition { + let writer: Box = if config.use_dynamic_partition { Box::new(PartitioningAsyncWriter::try_new(writer_config)?) } else if !writer_config.primary_keys.is_empty() && !writer_config.keep_ordering() { // sort primary key table @@ -113,7 +113,7 @@ impl SyncSendableMutableLakeSoulWriter { pub fn schema(&self) -> SchemaRef { self.schema.clone() } - + pub fn config(&self) -> &LakeSoulIOConfig { &self.config } @@ -123,85 +123,99 @@ impl SyncSendableMutableLakeSoulWriter { // and upload concurrently in background, we only need blocking method here // for ffi callers pub fn write_batch(&mut self, record_batch: RecordBatch) -> Result<()> { - let runtime = self.runtime.clone(); - runtime.block_on(async move { - self.write_batch_async(record_batch, false).await - }) + runtime.block_on(async move { self.write_batch_async(record_batch, false).await }) } #[async_recursion::async_recursion(?Send)] async fn write_batch_async(&mut self, record_batch: RecordBatch, do_spill: bool) -> Result<()> { debug!(record_batch_row=?record_batch.num_rows(), do_spill=?do_spill, "write_batch_async"); - let schema = self.schema(); - let config = self.config().clone(); - if let Some(max_file_size) = self.config().max_file_size { - // if max_file_size is set, we need to split batch into multiple files - let in_progress_writer = match &mut self.in_progress { - Some(writer) => writer, - x => - x.insert( - Arc::new(Mutex::new( - Self::create_writer(schema, config).await? - )) - ) - }; - let mut guard = in_progress_writer.lock().await; - - let batch_memory_size = get_batch_memory_size(&record_batch)? as u64; - let batch_rows = record_batch.num_rows() as u64; - // If would exceed max_file_size, split batch - if !do_spill && guard.buffered_size() + batch_memory_size > max_file_size { - let to_write = (batch_rows * (max_file_size - guard.buffered_size())) / batch_memory_size; - if to_write + 1 < batch_rows { - let to_write = to_write as usize + 1; - let a = record_batch.slice(0, to_write); - let b = record_batch.slice(to_write, record_batch.num_rows() - to_write); - drop(guard); - self.write_batch_async(a, true).await?; - return self.write_batch_async(b, false).await; - } - } - guard.write_record_batch(record_batch).await?; - - if do_spill { - dbg!(format!("spilling writer with size: {}", guard.buffered_size())); + let config = self.config().clone(); + if let Some(max_file_size) = self.config().max_file_size { + // if max_file_size is set, we need to split batch into multiple files + let in_progress_writer = match &mut self.in_progress { + Some(writer) => writer, + x => x.insert(Arc::new(Mutex::new(Self::create_writer(config).await?))), + }; + let mut guard = in_progress_writer.lock().await; + + let batch_memory_size = get_batch_memory_size(&record_batch)? as u64; + let batch_rows = record_batch.num_rows() as u64; + // If would exceed max_file_size, split batch + if !do_spill && guard.buffered_size() + batch_memory_size > max_file_size { + let to_write = (batch_rows * (max_file_size - guard.buffered_size())) / batch_memory_size; + if to_write + 1 < batch_rows { + let to_write = to_write as usize + 1; + let a = record_batch.slice(0, to_write); + let b = record_batch.slice(to_write, record_batch.num_rows() - to_write); drop(guard); - if let Some(writer) = self.in_progress.take() { - let inner_writer = match Arc::try_unwrap(writer) { - Ok(inner) => inner, - Err(_) => { - return Err(DataFusionError::Internal("Cannot get ownership of inner writer".to_string())) - }, - }; - let writer = inner_writer.into_inner(); - let results = writer.flush_and_close().await?; - self.flush_results.extend(results); - } + self.write_batch_async(a, true).await?; + return self.write_batch_async(b, false).await; + } + } + let rb_schema = record_batch.schema(); + guard.write_record_batch(record_batch).await.map_err(|e| { + DataFusionError::Internal(format!( + "err={}, config={:?}, batch_schema={:?}", + e, + self.config.clone(), + rb_schema + )) + })?; + + if do_spill { + dbg!(format!("spilling writer with size: {}", guard.buffered_size())); + drop(guard); + if let Some(writer) = self.in_progress.take() { + let inner_writer = match Arc::try_unwrap(writer) { + Ok(inner) => inner, + Err(_) => { + return Err(DataFusionError::Internal( + "Cannot get ownership of inner writer".to_string(), + )) + } + }; + let writer = inner_writer.into_inner(); + let results = writer.flush_and_close().await.map_err(|e| { + DataFusionError::Internal(format!( + "err={}, config={:?}, batch_schema={:?}", + e, + self.config.clone(), + rb_schema + )) + })?; + self.flush_results.extend(results); } - Ok(()) - } else if let Some(inner_writer) = &self.in_progress { - let inner_writer = inner_writer.clone(); - let mut writer = inner_writer.lock().await; - writer.write_record_batch(record_batch).await - } else { - Err(DataFusionError::Internal("Invalid state of inner writer".to_string())) } - + Ok(()) + } else if let Some(inner_writer) = &self.in_progress { + let inner_writer = inner_writer.clone(); + let mut writer = inner_writer.lock().await; + writer.write_record_batch(record_batch).await + } else { + Err(DataFusionError::Internal("Invalid state of inner writer".to_string())) + } } pub fn flush_and_close(self) -> Result> { if let Some(inner_writer) = self.in_progress { let inner_writer = match Arc::try_unwrap(inner_writer) { Ok(inner) => inner, - Err(_) => return Err(DataFusionError::Internal("Cannot get ownership of inner writer".to_string())), + Err(_) => { + return Err(DataFusionError::Internal( + "Cannot get ownership of inner writer".to_string(), + )) + } }; let runtime = self.runtime; runtime.block_on(async move { let writer = inner_writer.into_inner(); - + let mut grouped_results: HashMap> = HashMap::new(); - let results = writer.flush_and_close().await?; + let results = writer + .flush_and_close() + .await + .map_err(|e| DataFusionError::Internal(format!("err={}, config={:?}", e, self.config.clone())))?; for (partition_desc, file, _) in self.flush_results.into_iter().chain(results) { match grouped_results.get_mut(&partition_desc) { Some(files) => { @@ -220,7 +234,6 @@ impl SyncSendableMutableLakeSoulWriter { summary += files.join("\x02").as_str(); } Ok(summary.into_bytes()) - }) } else { Ok(vec![]) @@ -231,7 +244,11 @@ impl SyncSendableMutableLakeSoulWriter { if let Some(inner_writer) = self.in_progress { let inner_writer = match Arc::try_unwrap(inner_writer) { Ok(inner) => inner, - Err(_) => return Err(DataFusionError::Internal("Cannot get ownership of inner writer".to_string())), + Err(_) => { + return Err(DataFusionError::Internal( + "Cannot get ownership of inner writer".to_string(), + )) + } }; let runtime = self.runtime; runtime.block_on(async move { @@ -264,9 +281,9 @@ mod tests { use datafusion::error::Result; use parquet::arrow::arrow_reader::ParquetRecordBatchReader; use rand::{distributions::DistString, Rng}; - use tracing_subscriber::layer::SubscriberExt; use std::{fs::File, sync::Arc}; use tokio::{runtime::Builder, time::Instant}; + use tracing_subscriber::layer::SubscriberExt; use super::SortAsyncWriter; @@ -546,7 +563,10 @@ mod tests { Arc::new(StringArray::from( (0..num_rows) .into_iter() - .map(|_| rand::distributions::Alphanumeric.sample_string(&mut rng, len_rng.gen_range(str_len..str_len * 3))) + .map(|_| { + rand::distributions::Alphanumeric + .sample_string(&mut rng, len_rng.gen_range(str_len..str_len * 3)) + }) .collect::>(), )) as ArrayRef, true, @@ -621,7 +641,6 @@ mod tests { #[cfg(feature = "dhat-heap")] #[global_allocator] static ALLOC: dhat::Alloc = dhat::Alloc; - #[tracing::instrument] #[test] @@ -629,10 +648,14 @@ mod tests { use tracing_subscriber::fmt; tracing_subscriber::fmt::init(); - - let subscriber = fmt::layer() - .event_format(fmt::format::Format::default().with_level(true).with_source_location(true).with_file(true)); - // .with_max_level(Level::TRACE); + + let subscriber = fmt::layer().event_format( + fmt::format::Format::default() + .with_level(true) + .with_source_location(true) + .with_file(true), + ); + // .with_max_level(Level::TRACE); tracing_subscriber::registry().with(subscriber); #[cfg(feature = "dhat-heap")] diff --git a/rust/lakesoul-io/src/transform.rs b/rust/lakesoul-io/src/transform.rs index 600e1edf8..c1c8e72cb 100644 --- a/rust/lakesoul-io/src/transform.rs +++ b/rust/lakesoul-io/src/transform.rs @@ -19,7 +19,9 @@ use crate::constant::{ ARROW_CAST_OPTIONS, FLINK_TIMESTAMP_FORMAT, LAKESOUL_EMPTY_STRING, LAKESOUL_NULL_STRING, TIMESTAMP_MICROSECOND_FORMAT, TIMESTAMP_MILLSECOND_FORMAT, TIMESTAMP_NANOSECOND_FORMAT, TIMESTAMP_SECOND_FORMAT, }; -use crate::helpers::{column_with_name_and_name2index, date_str_to_epoch_days, timestamp_str_to_unix_time, into_scalar_value}; +use crate::helpers::{ + column_with_name_and_name2index, date_str_to_epoch_days, into_scalar_value, timestamp_str_to_unix_time, +}; /// adjust time zone to UTC pub fn uniform_field(orig_field: &FieldRef) -> FieldRef { @@ -83,18 +85,17 @@ pub fn transform_record_batch( ) -> Result { let num_rows = batch.num_rows(); let orig_schema = batch.schema(); - let name_to_index = - if orig_schema.fields().len() > crate::constant::NUM_COLUMN_OPTIMIZE_THRESHOLD { - Some(HashMap::::from_iter( - orig_schema + let name_to_index = if orig_schema.fields().len() > crate::constant::NUM_COLUMN_OPTIMIZE_THRESHOLD { + Some(HashMap::::from_iter( + orig_schema .fields() .iter() .enumerate() - .map(|(idx, field)| (field.name().clone(), idx)) - )) - } else { - None - }; + .map(|(idx, field)| (field.name().clone(), idx)), + )) + } else { + None + }; let mut transform_arrays = Vec::new(); let mut fields = vec![]; // O(nm) n = orig_schema.fields().len(), m = target_schema.fields().len() @@ -327,18 +328,15 @@ pub fn make_default_array(datatype: &DataType, value: &String, num_rows: usize) .map_err(|e| External(Box::new(e)))?; num_rows ])), - data_type => { - match into_scalar_value(value, data_type) { - Ok(scalar) => scalar.to_array_of_size(num_rows)?, - Err(_) => { - println!( - "make_default_array() datatype not match, datatype={:?}, value={:?}", - datatype, value - ); - new_null_array(datatype, num_rows) - } + data_type => match into_scalar_value(value, data_type) { + Ok(scalar) => scalar.to_array_of_size(num_rows)?, + Err(_) => { + println!( + "make_default_array() datatype not match, datatype={:?}, value={:?}", + datatype, value + ); + new_null_array(datatype, num_rows) } - - } + }, }) }