Skip to content

Commit

Permalink
Handle unparsable SQL error
Browse files Browse the repository at this point in the history
  • Loading branch information
sergiimk committed Jul 3, 2024
1 parent 254ccb1 commit ec8ff0f
Show file tree
Hide file tree
Showing 5 changed files with 197 additions and 5 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## Unreleased
### Fixed
- GQL API regression where unparsable SQL was ending up in internal error
- REST API `/query` endpoint will return `400 Bad Request` in case of unparsable SQL

## [0.189.4] - 2024-07-02
### Fixed
- GQL access token list pagination
Expand Down
153 changes: 150 additions & 3 deletions src/adapter/graphql/tests/tests/test_gql_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,13 @@ async fn create_test_dataset(
.unwrap();
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Tail
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

#[test_group::group(engine, datafusion)]
#[test_log::test(tokio::test)]
async fn test_dataset_schema_local_fs() {
async fn test_dataset_tail_schema() {
let tempdir = tempfile::tempdir().unwrap();
let catalog = create_catalog_with_local_workspace(tempdir.path(), true).await;
create_test_dataset(&catalog, tempdir.path(), None).await;
Expand Down Expand Up @@ -223,7 +225,7 @@ async fn test_dataset_schema_local_fs() {

#[test_group::group(engine, datafusion)]
#[test_log::test(tokio::test)]
async fn test_dataset_tail_local_fs() {
async fn test_dataset_tail_some() {
let tempdir = tempfile::tempdir().unwrap();
let catalog = create_catalog_with_local_workspace(tempdir.path(), true).await;
create_test_dataset(&catalog, tempdir.path(), None).await;
Expand Down Expand Up @@ -264,7 +266,7 @@ async fn test_dataset_tail_local_fs() {

#[test_group::group(engine, datafusion)]
#[test_log::test(tokio::test)]
async fn test_dataset_tail_empty_local_fs() {
async fn test_dataset_tail_empty() {
let tempdir = tempfile::tempdir().unwrap();
let catalog = create_catalog_with_local_workspace(tempdir.path(), true).await;
create_test_dataset(&catalog, tempdir.path(), None).await;
Expand Down Expand Up @@ -302,3 +304,148 @@ async fn test_dataset_tail_empty_local_fs() {
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Query
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

#[test_group::group(engine, datafusion)]
#[test_log::test(tokio::test)]
async fn test_data_query_some() {
let tempdir = tempfile::tempdir().unwrap();
let catalog = create_catalog_with_local_workspace(tempdir.path(), true).await;
create_test_dataset(&catalog, tempdir.path(), None).await;

let schema = kamu_adapter_graphql::schema_quiet();
let res = schema
.execute(
async_graphql::Request::new(indoc::indoc!(
r#"
{
data {
query(
query: "select * from \"kamu/foo\" order by offset",
queryDialect: SQL_DATA_FUSION,
schemaFormat: ARROW_JSON,
dataFormat: JSON,
) {
... on DataQueryResultSuccess {
data { content }
}
}
}
}
"#
))
.data(catalog),
)
.await;
assert!(res.is_ok(), "{res:?}");
let json = serde_json::to_string(&res.data).unwrap();
let json = serde_json::from_str::<serde_json::Value>(&json).unwrap();
let data = &json["data"]["query"]["data"]["content"];
let data = serde_json::from_str::<serde_json::Value>(data.as_str().unwrap()).unwrap();
assert_eq!(
data,
serde_json::json!([
{"offset": 0, "blah": "a"},
{"offset": 1, "blah": "b"},
{"offset": 2, "blah": "c"},
{"offset": 3, "blah": "d"},
])
);
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

#[test_group::group(engine, datafusion)]
#[test_log::test(tokio::test)]
async fn test_data_query_error_sql_unparsable() {
let tempdir = tempfile::tempdir().unwrap();
let catalog = create_catalog_with_local_workspace(tempdir.path(), true).await;

let schema = kamu_adapter_graphql::schema_quiet();
let res = schema
.execute(
async_graphql::Request::new(indoc::indoc!(
r#"
{
data {
query(
query: "select ???",
queryDialect: SQL_DATA_FUSION,
schemaFormat: ARROW_JSON,
dataFormat: JSON,
) {
... on DataQueryResultError {
errorMessage
errorKind
}
}
}
}
"#
))
.data(catalog),
)
.await;
assert!(res.is_ok(), "{res:?}");
let json = serde_json::to_string(&res.data).unwrap();
tracing::debug!(?json, "Response data");
let json = serde_json::from_str::<serde_json::Value>(&json).unwrap();
let data = &json["data"]["query"];
assert_eq!(
*data,
serde_json::json!({
"errorMessage": "sql parser error: Expected end of statement, found: ?",
"errorKind": "INVALID_SQL",
})
);
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

#[test_group::group(engine, datafusion)]
#[test_log::test(tokio::test)]
async fn test_data_query_error_sql_missing_function() {
let tempdir = tempfile::tempdir().unwrap();
let catalog = create_catalog_with_local_workspace(tempdir.path(), true).await;

let schema = kamu_adapter_graphql::schema_quiet();
let res = schema
.execute(
async_graphql::Request::new(indoc::indoc!(
r#"
{
data {
query(
query: "select foobar(1)",
queryDialect: SQL_DATA_FUSION,
schemaFormat: ARROW_JSON,
dataFormat: JSON,
) {
... on DataQueryResultError {
errorMessage
errorKind
}
}
}
}
"#
))
.data(catalog),
)
.await;
assert!(res.is_ok(), "{res:?}");
let json = serde_json::to_string(&res.data).unwrap();
tracing::debug!(?json, "Response data");
let json = serde_json::from_str::<serde_json::Value>(&json).unwrap();
let data = &json["data"]["query"];
assert_eq!(
*data,
serde_json::json!({
"errorMessage": "Invalid function 'foobar'.\nDid you mean 'floor'?",
"errorKind": "INVALID_SQL",
})
);
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
3 changes: 3 additions & 0 deletions src/adapter/http/src/data/query_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ pub async fn dataset_query_handler_post(
{
Ok(res) => res,
Err(QueryError::DatasetNotFound(err)) => Err(ApiError::not_found(err))?,
Err(QueryError::DataFusionError(DataFusionError::SQL(err, _))) => {
Err(ApiError::bad_request(err))?
}
Err(QueryError::DataFusionError(err @ DataFusionError::Plan(_))) => {
Err(ApiError::bad_request(err))?
}
Expand Down
34 changes: 33 additions & 1 deletion src/adapter/http/tests/tests/test_data_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,39 @@ async fn test_data_query_handler_full() {

#[test_group::group(engine, datafusion)]
#[test_log::test(tokio::test)]
async fn test_data_query_handler_invalid_sql() {
async fn test_data_query_handler_error_sql_unparsable() {
let harness = Harness::new().await;

let client = async move {
let cl = reqwest::Client::new();

let query_url = format!("{}query", harness.root_url);
let res = cl
.post(&query_url)
.json(&json!({
"query": "select ???"
}))
.send()
.await
.unwrap();

let status = res.status();
let body = res.text().await.unwrap();
assert_eq!(status, 400, "Unexpected response: {status} {body}");
assert_eq!(
body,
"sql parser error: Expected end of statement, found: ?"
);
};

await_client_server_flow!(harness.server_harness.api_server_run(), client);
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

#[test_group::group(engine, datafusion)]
#[test_log::test(tokio::test)]
async fn test_data_query_handler_error_sql_missing_function() {
let harness = Harness::new().await;

let client = async move {
Expand Down
7 changes: 6 additions & 1 deletion src/infra/core/src/query_service_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::convert::TryFrom;
use std::sync::Arc;

use datafusion::arrow;
use datafusion::error::DataFusionError;
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion::parquet::arrow::async_reader::ParquetObjectReader;
use datafusion::parquet::file::metadata::ParquetMetaData;
Expand Down Expand Up @@ -115,7 +116,11 @@ impl QueryServiceImpl {
// understand which datasets the query is using and populate the state and
// aliases for them
let mut table_refs = Vec::new();
for stmt in datafusion::sql::parser::DFParser::parse_sql(sql).int_err()? {

let statements = datafusion::sql::parser::DFParser::parse_sql(sql)
.map_err(|e| DataFusionError::SQL(e, None))?;

for stmt in statements {
match stmt {
Statement::Statement(stmt) => {
table_refs.append(&mut extract_table_refs(&stmt)?);
Expand Down

0 comments on commit ec8ff0f

Please sign in to comment.