Skip to content

Commit

Permalink
Propagate schema on empty result
Browse files Browse the repository at this point in the history
  • Loading branch information
sergiimk committed Aug 27, 2024
1 parent 2d2205b commit 34bf726
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 23 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "kamu-engine-datafusion"
version = "0.8.0"
version = "0.8.1"
authors = ["Kamu Data Inc. <[email protected]>"]
license-file = "LICENSE.txt"
edition = "2021"
Expand Down
29 changes: 10 additions & 19 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,20 +166,17 @@ impl Engine {

let new_watermark = Self::compute_new_watermark(&request);

if num_rows == 0 {
Ok(TransformResponseSuccess {
new_offset_interval: None,
new_watermark,
})
} else {
Ok(TransformResponseSuccess {
new_offset_interval: Some(OffsetInterval {
Ok(TransformResponseSuccess {
new_offset_interval: if num_rows != 0 {
Some(OffsetInterval {
start: request.next_offset,
end: request.next_offset + num_rows - 1,
}),
new_watermark,
})
}
})
} else {
None
},
new_watermark,
})
}

#[tracing::instrument(level = "info", skip_all, fields(dataset_id = %input.dataset_id, dataset_alias = %input.dataset_alias, query_alias = %input.query_alias))]
Expand Down Expand Up @@ -618,13 +615,7 @@ impl Engine {
.unwrap()
.value(0);

if num_records > 0 {
tracing::info!(?path, num_records, "Produced parquet file");
} else {
// Clean up empty file
tracing::info!("Produced empty result",);
let _ = std::fs::remove_file(path);
}
tracing::info!(?path, num_records, "Produced parquet file");
Ok(num_records as u64)
}
}
2 changes: 0 additions & 2 deletions tests/tests/test_transform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,8 +299,6 @@ async fn test_query_common(opts: TestQueryCommonOpts) {
if let Some(expected_data) = expected_data {
let actual_data = read_data_pretty(&new_data_path).await;
assert_eq!(expected_data.trim(), actual_data);
} else {
assert!(!new_data_path.exists());
}

if let Some(expected_schema) = opts.expected_schema {
Expand Down

0 comments on commit 34bf726

Please sign in to comment.