From 34bf726de3b3a5c86d5b87ad765f606bc5d949fe Mon Sep 17 00:00:00 2001 From: Sergii Mikhtoniuk Date: Mon, 26 Aug 2024 19:13:23 -0700 Subject: [PATCH] Propagate schema on empty result --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/engine.rs | 29 ++++++++++------------------- tests/tests/test_transform.rs | 2 -- 4 files changed, 12 insertions(+), 23 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3fcef40..c636fa6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1845,7 +1845,7 @@ dependencies = [ [[package]] name = "kamu-engine-datafusion" -version = "0.8.0" +version = "0.8.1" dependencies = [ "async-trait", "chrono", diff --git a/Cargo.toml b/Cargo.toml index 3c8a4db..ecb73c9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "kamu-engine-datafusion" -version = "0.8.0" +version = "0.8.1" authors = ["Kamu Data Inc. "] license-file = "LICENSE.txt" edition = "2021" diff --git a/src/engine.rs b/src/engine.rs index 1af8a3f..584d7dd 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -166,20 +166,17 @@ impl Engine { let new_watermark = Self::compute_new_watermark(&request); - if num_rows == 0 { - Ok(TransformResponseSuccess { - new_offset_interval: None, - new_watermark, - }) - } else { - Ok(TransformResponseSuccess { - new_offset_interval: Some(OffsetInterval { + Ok(TransformResponseSuccess { + new_offset_interval: if num_rows != 0 { + Some(OffsetInterval { start: request.next_offset, end: request.next_offset + num_rows - 1, - }), - new_watermark, - }) - } + }) + } else { + None + }, + new_watermark, + }) } #[tracing::instrument(level = "info", skip_all, fields(dataset_id = %input.dataset_id, dataset_alias = %input.dataset_alias, query_alias = %input.query_alias))] @@ -618,13 +615,7 @@ impl Engine { .unwrap() .value(0); - if num_records > 0 { - tracing::info!(?path, num_records, "Produced parquet file"); - } else { - // Clean up empty file - tracing::info!("Produced empty result",); - let _ = std::fs::remove_file(path); - } + tracing::info!(?path, num_records, "Produced parquet file"); Ok(num_records as u64) } } diff --git a/tests/tests/test_transform.rs b/tests/tests/test_transform.rs index 54da13a..503de20 100644 --- a/tests/tests/test_transform.rs +++ b/tests/tests/test_transform.rs @@ -299,8 +299,6 @@ async fn test_query_common(opts: TestQueryCommonOpts) { if let Some(expected_data) = expected_data { let actual_data = read_data_pretty(&new_data_path).await; assert_eq!(expected_data.trim(), actual_data); - } else { - assert!(!new_data_path.exists()); } if let Some(expected_schema) = opts.expected_schema {