Skip to content

Commit

Permalink
feat: json output format for http
Browse files Browse the repository at this point in the history
  • Loading branch information
Kev1n8 committed Oct 5, 2024
1 parent a283e13 commit 34c873c
Show file tree
Hide file tree
Showing 3 changed files with 169 additions and 0 deletions.
32 changes: 32 additions & 0 deletions src/servers/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ use crate::http::error_result::ErrorResponse;
use crate::http::greptime_result_v1::GreptimedbV1Response;
use crate::http::influxdb::{influxdb_health, influxdb_ping, influxdb_write_v1, influxdb_write_v2};
use crate::http::influxdb_result_v1::InfluxdbV1Response;
use crate::http::json_result::JsonResponse;
use crate::http::prometheus::{
build_info_query, format_query, instant_query, label_values_query, labels_query, range_query,
series_query,
Expand Down Expand Up @@ -97,6 +98,7 @@ pub mod error_result;
pub mod greptime_manage_resp;
pub mod greptime_result_v1;
pub mod influxdb_result_v1;
pub mod json_result;
pub mod table_result;

#[cfg(any(test, feature = "testing"))]
Expand Down Expand Up @@ -279,6 +281,7 @@ pub enum ResponseFormat {
#[default]
GreptimedbV1,
InfluxdbV1,
Json,
}

impl ResponseFormat {
Expand All @@ -289,6 +292,7 @@ impl ResponseFormat {
"table" => Some(ResponseFormat::Table),
"greptimedb_v1" => Some(ResponseFormat::GreptimedbV1),
"influxdb_v1" => Some(ResponseFormat::InfluxdbV1),
"json" => Some(ResponseFormat::Json),
_ => None,
}
}
Expand All @@ -300,6 +304,7 @@ impl ResponseFormat {
ResponseFormat::Table => "table",
ResponseFormat::GreptimedbV1 => "greptimedb_v1",
ResponseFormat::InfluxdbV1 => "influxdb_v1",
ResponseFormat::Json => "json",
}
}
}
Expand Down Expand Up @@ -356,6 +361,7 @@ pub enum HttpResponse {
Error(ErrorResponse),
GreptimedbV1(GreptimedbV1Response),
InfluxdbV1(InfluxdbV1Response),
Json(JsonResponse),
}

impl HttpResponse {
Expand All @@ -366,6 +372,7 @@ impl HttpResponse {
HttpResponse::Table(resp) => resp.with_execution_time(execution_time).into(),
HttpResponse::GreptimedbV1(resp) => resp.with_execution_time(execution_time).into(),
HttpResponse::InfluxdbV1(resp) => resp.with_execution_time(execution_time).into(),
HttpResponse::Json(resp) => resp.with_execution_time(execution_time).into(),
HttpResponse::Error(resp) => resp.with_execution_time(execution_time).into(),
}
}
Expand All @@ -375,6 +382,7 @@ impl HttpResponse {
HttpResponse::Csv(resp) => resp.with_limit(limit).into(),
HttpResponse::Table(resp) => resp.with_limit(limit).into(),
HttpResponse::GreptimedbV1(resp) => resp.with_limit(limit).into(),
HttpResponse::Json(resp) => resp.with_limit(limit).into(),
_ => self,
}
}
Expand Down Expand Up @@ -407,6 +415,7 @@ impl IntoResponse for HttpResponse {
HttpResponse::Table(resp) => resp.into_response(),
HttpResponse::GreptimedbV1(resp) => resp.into_response(),
HttpResponse::InfluxdbV1(resp) => resp.into_response(),
HttpResponse::Json(resp) => resp.into_response(),
HttpResponse::Error(resp) => resp.into_response(),
}
}
Expand Down Expand Up @@ -452,6 +461,12 @@ impl From<InfluxdbV1Response> for HttpResponse {
}
}

impl From<JsonResponse> for HttpResponse {
fn from(value: JsonResponse) -> Self {
HttpResponse::Json(value)
}
}

async fn serve_api(Extension(api): Extension<OpenApi>) -> impl IntoApiResponse {
Json(api)
}
Expand Down Expand Up @@ -1131,6 +1146,7 @@ mod test {
ResponseFormat::Csv,
ResponseFormat::Table,
ResponseFormat::Arrow,
ResponseFormat::Json,
] {
let recordbatches =
RecordBatches::try_new(schema.clone(), vec![recordbatch.clone()]).unwrap();
Expand All @@ -1141,6 +1157,7 @@ mod test {
ResponseFormat::Table => TableResponse::from_output(outputs).await,
ResponseFormat::GreptimedbV1 => GreptimedbV1Response::from_output(outputs).await,
ResponseFormat::InfluxdbV1 => InfluxdbV1Response::from_output(outputs, None).await,
ResponseFormat::Json => JsonResponse::from_output(outputs).await,
};

match json_resp {
Expand Down Expand Up @@ -1210,6 +1227,21 @@ mod test {
assert_eq!(rb.num_columns(), 2);
assert_eq!(rb.num_rows(), 4);
}

HttpResponse::Json(resp) => {
let output = &resp.output()[0];
if let GreptimeQueryOutput::Records(r) = output {
assert_eq!(r.num_rows(), 4);
assert_eq!(r.num_cols(), 2);
assert_eq!(r.schema.column_schemas[0].name, "numbers");
assert_eq!(r.schema.column_schemas[0].data_type, "UInt32");
assert_eq!(r.rows[0][0], serde_json::Value::from(1));
assert_eq!(r.rows[0][1], serde_json::Value::Null);
} else {
panic!("invalid output type");
}
}

HttpResponse::Error(err) => unreachable!("{err:?}"),
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/servers/src/http/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use crate::http::csv_result::CsvResponse;
use crate::http::error_result::ErrorResponse;
use crate::http::greptime_result_v1::GreptimedbV1Response;
use crate::http::influxdb_result_v1::InfluxdbV1Response;
use crate::http::json_result::JsonResponse;
use crate::http::table_result::TableResponse;
use crate::http::{
ApiState, Epoch, GreptimeOptionsConfigState, GreptimeQueryOutput, HttpRecordsOutput,
Expand Down Expand Up @@ -138,6 +139,7 @@ pub async fn sql(
ResponseFormat::Table => TableResponse::from_output(outputs).await,
ResponseFormat::GreptimedbV1 => GreptimedbV1Response::from_output(outputs).await,
ResponseFormat::InfluxdbV1 => InfluxdbV1Response::from_output(outputs, epoch).await,
ResponseFormat::Json => JsonResponse::from_output(outputs).await,
};

if let Some(limit) = query_params.limit {
Expand Down
135 changes: 135 additions & 0 deletions src/servers/src/http/json_result.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use axum::http::{header, HeaderValue};
use axum::response::{IntoResponse, Response};
use common_error::status_code::StatusCode;
use common_query::Output;
use mime_guess::mime;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_json::{json, Map};

use super::process_with_limit;
use crate::http::error_result::ErrorResponse;
use crate::http::header::{GREPTIME_DB_HEADER_EXECUTION_TIME, GREPTIME_DB_HEADER_FORMAT};
use crate::http::{handler, GreptimeQueryOutput, HttpResponse, ResponseFormat};

/// The json format here is different from the default json output of greptime_result.
/// `JsonResponse` is intended to make it easier for user to consume data.
#[derive(Serialize, Deserialize, Debug, JsonSchema)]
pub struct JsonResponse {
output: Vec<GreptimeQueryOutput>,
execution_time_ms: u64,
}

impl JsonResponse {
pub async fn from_output(outputs: Vec<crate::error::Result<Output>>) -> HttpResponse {
match handler::from_output(outputs).await {
Err(err) => HttpResponse::Error(err),
Ok((output, _)) => {
if output.len() > 1 {
HttpResponse::Error(ErrorResponse::from_error_message(
StatusCode::InvalidArguments,
"cannot output multi-statements result in json format".to_string(),
))
} else {
HttpResponse::Json(JsonResponse {
output,
execution_time_ms: 0,
})
}
}
}
}

pub fn output(&self) -> &[GreptimeQueryOutput] {
&self.output
}

pub fn with_execution_time(mut self, execution_time: u64) -> Self {
self.execution_time_ms = execution_time;
self
}

pub fn execution_time_ms(&self) -> u64 {
self.execution_time_ms
}

pub fn with_limit(mut self, limit: usize) -> Self {
self.output = process_with_limit(self.output, limit);
self
}
}

impl IntoResponse for JsonResponse {
fn into_response(mut self) -> Response {
debug_assert!(
self.output.len() <= 1,
"self.output has extra elements: {}",
self.output.len()
);

let execution_time = self.execution_time_ms;
let payload = match self.output.pop() {
None => String::default(),
Some(GreptimeQueryOutput::AffectedRows(n)) => json!({
"data": [
{
"affectedrows": n
},
],
"execution_time_ms": execution_time,
})
.to_string(),

Some(GreptimeQueryOutput::Records(records)) => {
let mut data = Vec::new();
let schema = records.schema();

for row in records.rows.iter() {
let mut row_map = Map::new();
for (i, col) in schema.column_schemas.iter().enumerate() {
row_map.insert(col.name.clone(), row[i].clone());
}
data.push(row_map);
}

json!({
"data": data,
"execution_time_ms": execution_time,
})
.to_string()
}
};

let mut resp = (
[(
header::CONTENT_TYPE,
HeaderValue::from_static(mime::APPLICATION_JSON.as_ref()),
)],
payload,
)
.into_response();
resp.headers_mut().insert(
&GREPTIME_DB_HEADER_FORMAT,
HeaderValue::from_static(ResponseFormat::Json.as_str()),
);
resp.headers_mut().insert(
&GREPTIME_DB_HEADER_EXECUTION_TIME,
HeaderValue::from(execution_time),
);
resp
}
}

0 comments on commit 34c873c

Please sign in to comment.