Skip to content

Commit

Permalink
Improve REST query error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
sergiimk committed Jun 17, 2024
1 parent 8cf31a7 commit d404508
Show file tree
Hide file tree
Showing 4 changed files with 297 additions and 41 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## Unreleased
### Changed
- The `/query` REST API endpoint will:
- return `404 Not Found` on not found datasets
- return `400 Bad Request` on invalid SQL
- return `422 Unprocessable Content` on unrecognized request body fields

## [0.188.0] - 2024-06-14
## Added
- New repository `AccessTokenRepository` to work with new access tokens
Expand Down
41 changes: 24 additions & 17 deletions src/adapter/http/src/data/query_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use axum::extract::{self, Extension, Query};
use axum::response::Json;
use datafusion::arrow::array::RecordBatch;
use datafusion::common::DFSchema;
use datafusion::error::DataFusionError;
use kamu::domain::*;
use kamu_data_utils::data::format::*;
use opendatafabric::{DatasetID, Multihash};
Expand All @@ -41,11 +42,17 @@ pub async fn dataset_query_handler_post(
) -> Result<Json<QueryResponseBody>, ApiError> {
let query_svc = catalog.get_one::<dyn QueryService>().unwrap();

let res = query_svc
let res = match query_svc
.sql_statement(&body.query, body.to_options())
.await
.int_err()
.api_err()?;
{
Ok(res) => res,
Err(QueryError::DatasetNotFound(err)) => Err(ApiError::not_found(err))?,
Err(QueryError::DataFusionError(err @ DataFusionError::Plan(_))) => {
Err(ApiError::bad_request(err))?
}
Err(e) => Err(e.int_err().api_err())?,
};

// Apply pagination limits
let df = res
Expand Down Expand Up @@ -80,7 +87,7 @@ pub async fn dataset_query_handler_post(
let json = serialize_data(&record_batches, body.data_format).api_err()?;
let data = serde_json::value::RawValue::from_string(json).unwrap();

let result_hash = if body.include_result_hash {
let data_hash = if body.include_data_hash {
Some(kamu_data_utils::data::hash::get_batches_logical_hash(
&arrow_schema,
&record_batches,
Expand All @@ -93,7 +100,7 @@ pub async fn dataset_query_handler_post(
data,
schema,
state,
result_hash,
data_hash,
}))
}

Expand All @@ -110,7 +117,7 @@ pub async fn dataset_query_handler(

// TODO: Sanity limits
#[derive(Debug, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
#[serde(rename_all = "camelCase", deny_unknown_fields)]
pub struct QueryRequestBody {
/// SQL query
query: String,
Expand Down Expand Up @@ -140,8 +147,8 @@ pub struct QueryRequestBody {

/// Whether to include a logical hash of the resulting data batch.
/// See: https://docs.kamu.dev/odf/spec/#physical-and-logical-hashes
#[serde(default = "QueryRequestBody::default_include_result_hash")]
include_result_hash: bool,
#[serde(default = "QueryRequestBody::default_include_data_hash")]
include_data_hash: bool,

/// Pagination: skips first N records
#[serde(default)]
Expand All @@ -164,7 +171,7 @@ impl QueryRequestBody {
true
}

fn default_include_result_hash() -> bool {
fn default_include_data_hash() -> bool {
true
}

Expand All @@ -183,7 +190,7 @@ impl QueryRequestBody {
/////////////////////////////////////////////////////////////////////////////////

#[derive(Debug, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
#[serde(rename_all = "camelCase", deny_unknown_fields)]
pub struct QueryRequestParams {
query: String,
#[serde(default)]
Expand All @@ -200,8 +207,8 @@ pub struct QueryRequestParams {
include_schema: bool,
#[serde(default = "QueryRequestBody::default_include_state")]
include_state: bool,
#[serde(default = "QueryRequestBody::default_include_result_hash")]
include_result_hash: bool,
#[serde(default = "QueryRequestBody::default_include_data_hash")]
include_data_hash: bool,
}

impl From<QueryRequestParams> for QueryRequestBody {
Expand All @@ -214,7 +221,7 @@ impl From<QueryRequestParams> for QueryRequestBody {
as_of_state: None,
include_schema: v.include_schema,
include_state: v.include_state,
include_result_hash: v.include_result_hash,
include_data_hash: v.include_data_hash,
skip: v.skip,
limit: v.limit,
}
Expand All @@ -232,13 +239,13 @@ pub struct QueryResponseBody {
#[serde(skip_serializing_if = "Option::is_none")]
state: Option<QueryState>,
#[serde(skip_serializing_if = "Option::is_none")]
result_hash: Option<Multihash>,
data_hash: Option<Multihash>,
}

/////////////////////////////////////////////////////////////////////////////////

#[derive(Debug, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
#[serde(rename_all = "camelCase", deny_unknown_fields)]
pub struct QueryDatasetAlias {
pub alias: String,
pub id: DatasetID,
Expand All @@ -247,13 +254,13 @@ pub struct QueryDatasetAlias {
/////////////////////////////////////////////////////////////////////////////////

#[derive(Debug, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
#[serde(rename_all = "camelCase", deny_unknown_fields)]
pub struct QueryState {
pub inputs: Vec<QueryDatasetState>,
}

#[derive(Debug, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
#[serde(rename_all = "camelCase", deny_unknown_fields)]
pub struct QueryDatasetState {
pub id: DatasetID,
pub block_hash: Multihash,
Expand Down
128 changes: 118 additions & 10 deletions src/adapter/http/tests/tests/test_data_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ async fn test_data_query_handler_full() {
"population": 100,
}],
"schema": "{\"fields\":[{\"name\":\"offset\",\"data_type\":\"Int64\",\"nullable\":true,\"dict_id\":0,\"dict_is_ordered\":false,\"metadata\":{}},{\"name\":\"city\",\"data_type\":\"Utf8\",\"nullable\":false,\"dict_id\":0,\"dict_is_ordered\":false,\"metadata\":{}},{\"name\":\"population\",\"data_type\":\"UInt64\",\"nullable\":false,\"dict_id\":0,\"dict_is_ordered\":false,\"metadata\":{}}],\"metadata\":{}}",
"resultHash": "f9680c001200b3483eecc3d5c6b50ee6b8cba11b51c08f89ea1f53d3a334c743199f5fe656e",
"dataHash": "f9680c001200b3483eecc3d5c6b50ee6b8cba11b51c08f89ea1f53d3a334c743199f5fe656e",
"state": {
"inputs": [{
"id": "did:odf:fed01df230b49615d175307d580c33d6fda61fc7b9aec91df0f5c1a5ebe3b8cbfee02",
Expand All @@ -268,6 +268,114 @@ async fn test_data_query_handler_full() {

/////////////////////////////////////////////////////////////////////////////////////////

#[test_group::group(engine, datafusion)]
#[test_log::test(tokio::test)]
async fn test_data_query_handler_invalid_sql() {
let harness = Harness::new().await;

let client = async move {
let cl = reqwest::Client::new();

let query = format!("select foobar(offset) from \"{}\"", harness.dataset_alias);

let query_url = format!("{}query", harness.root_url);
let res = cl
.post(&query_url)
.json(&json!({
"query": query
}))
.send()
.await
.unwrap();

let status = res.status();
let body = res.text().await.unwrap();
assert_eq!(status, 400, "Unexpected response: {status} {body}");
assert_eq!(
body,
"Error during planning: Invalid function 'foobar'.\nDid you mean 'floor'?"
);
};

await_client_server_flow!(harness.server_harness.api_server_run(), client);
}

/////////////////////////////////////////////////////////////////////////////////////////

#[test_group::group(engine, datafusion)]
#[test_log::test(tokio::test)]
async fn test_data_query_handler_dataset_does_not_exist() {
let harness = Harness::new().await;

let client = async move {
let cl = reqwest::Client::new();

let query_url = format!("{}query", harness.root_url);
let res = cl
.post(&query_url)
.json(&json!({
"query": "select offset, city, population from does_not_exist"
}))
.send()
.await
.unwrap();

let status = res.status();
let body = res.text().await.unwrap();
assert_eq!(status, 400, "Unexpected response: {status} {body}");
assert_eq!(
body,
"Error during planning: table 'kamu.kamu.does_not_exist' not found"
);
};

await_client_server_flow!(harness.server_harness.api_server_run(), client);
}

/////////////////////////////////////////////////////////////////////////////////////////

#[test_group::group(engine, datafusion)]
#[test_log::test(tokio::test)]
async fn test_data_query_handler_dataset_does_not_exist_bad_alias() {
let harness = Harness::new().await;

let client = async move {
let cl = reqwest::Client::new();

let query = format!(
"select offset, city, population from \"{}\"",
harness.dataset_alias
);

let query_url = format!("{}query", harness.root_url);
let res = cl
.post(&query_url)
.json(&json!({
"query": query,
"aliases": [{
"alias": harness.dataset_alias,
"id": DatasetID::new_seeded_ed25519(b"does-not-exist"),
}]
}))
.send()
.await
.unwrap();

let status = res.status();
let body = res.text().await.unwrap();
assert_eq!(status, 404, "Unexpected response: {status} {body}");
assert_eq!(
body,
"Dataset not found: \
did:odf:fed011ba79f25e520298ba6945dd6197083a366364bef178d5899b100c434748d88e5"
);
};

await_client_server_flow!(harness.server_harness.api_server_run(), client);
}

/////////////////////////////////////////////////////////////////////////////////////////

#[test_group::group(engine, datafusion)]
#[test_log::test(tokio::test)]
async fn test_data_query_handler_ranges() {
Expand All @@ -290,7 +398,7 @@ async fn test_data_query_handler_ranges() {
("limit", "1"),
("includeSchema", "false"),
("includeState", "false"),
("includeResultHash", "false"),
("includeDataHash", "false"),
])
.send()
.await
Expand All @@ -315,7 +423,7 @@ async fn test_data_query_handler_ranges() {
("skip", "1"),
("includeSchema", "false"),
("includeState", "false"),
("includeResultHash", "false"),
("includeDataHash", "false"),
])
.send()
.await
Expand Down Expand Up @@ -358,7 +466,7 @@ async fn test_data_query_handler_data_formats() {
("dataFormat", "json-aos"),
("includeSchema", "false"),
("includeState", "false"),
("includeResultHash", "false"),
("includeDataHash", "false"),
])
.send()
.await
Expand Down Expand Up @@ -386,7 +494,7 @@ async fn test_data_query_handler_data_formats() {
("dataFormat", "json-soa"),
("includeSchema", "false"),
("includeState", "false"),
("includeResultHash", "false"),
("includeDataHash", "false"),
])
.send()
.await
Expand All @@ -410,7 +518,7 @@ async fn test_data_query_handler_data_formats() {
("dataFormat", "json-aoa"),
("includeSchema", "false"),
("includeState", "false"),
("includeResultHash", "false"),
("includeDataHash", "false"),
])
.send()
.await
Expand Down Expand Up @@ -452,7 +560,7 @@ async fn test_data_query_handler_schema_formats() {
("schemaFormat", "arrow-json"),
("includeSchema", "true"),
("includeState", "false"),
("includeResultHash", "false"),
("includeDataHash", "false"),
])
.send()
.await
Expand All @@ -474,7 +582,7 @@ async fn test_data_query_handler_schema_formats() {
("schemaFormat", "ArrowJson"),
("includeSchema", "true"),
("includeState", "false"),
("includeResultHash", "false"),
("includeDataHash", "false"),
])
.send()
.await
Expand All @@ -496,7 +604,7 @@ async fn test_data_query_handler_schema_formats() {
("schemaFormat", "parquet"),
("includeSchema", "true"),
("includeState", "false"),
("includeResultHash", "false"),
("includeDataHash", "false"),
])
.send()
.await
Expand Down Expand Up @@ -525,7 +633,7 @@ async fn test_data_query_handler_schema_formats() {
("schemaFormat", "parquet-json"),
("includeSchema", "true"),
("includeState", "false"),
("includeResultHash", "false"),
("includeDataHash", "false"),
])
.send()
.await
Expand Down
Loading

0 comments on commit d404508

Please sign in to comment.