From 34c873c5172da4d8c377fd40d35ee2a83d4c0c66 Mon Sep 17 00:00:00 2001 From: kf zheng <100595273+kev1n8@users.noreply.github.com> Date: Sun, 6 Oct 2024 03:07:16 +0800 Subject: [PATCH] feat: json output format for http --- src/servers/src/http.rs | 32 +++++++ src/servers/src/http/handler.rs | 2 + src/servers/src/http/json_result.rs | 135 ++++++++++++++++++++++++++++ 3 files changed, 169 insertions(+) create mode 100644 src/servers/src/http/json_result.rs diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index a2b72b548b1..953ff9e73ae 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -59,6 +59,7 @@ use crate::http::error_result::ErrorResponse; use crate::http::greptime_result_v1::GreptimedbV1Response; use crate::http::influxdb::{influxdb_health, influxdb_ping, influxdb_write_v1, influxdb_write_v2}; use crate::http::influxdb_result_v1::InfluxdbV1Response; +use crate::http::json_result::JsonResponse; use crate::http::prometheus::{ build_info_query, format_query, instant_query, label_values_query, labels_query, range_query, series_query, @@ -97,6 +98,7 @@ pub mod error_result; pub mod greptime_manage_resp; pub mod greptime_result_v1; pub mod influxdb_result_v1; +pub mod json_result; pub mod table_result; #[cfg(any(test, feature = "testing"))] @@ -279,6 +281,7 @@ pub enum ResponseFormat { #[default] GreptimedbV1, InfluxdbV1, + Json, } impl ResponseFormat { @@ -289,6 +292,7 @@ impl ResponseFormat { "table" => Some(ResponseFormat::Table), "greptimedb_v1" => Some(ResponseFormat::GreptimedbV1), "influxdb_v1" => Some(ResponseFormat::InfluxdbV1), + "json" => Some(ResponseFormat::Json), _ => None, } } @@ -300,6 +304,7 @@ impl ResponseFormat { ResponseFormat::Table => "table", ResponseFormat::GreptimedbV1 => "greptimedb_v1", ResponseFormat::InfluxdbV1 => "influxdb_v1", + ResponseFormat::Json => "json", } } } @@ -356,6 +361,7 @@ pub enum HttpResponse { Error(ErrorResponse), GreptimedbV1(GreptimedbV1Response), InfluxdbV1(InfluxdbV1Response), + Json(JsonResponse), } impl HttpResponse { @@ -366,6 +372,7 @@ impl HttpResponse { HttpResponse::Table(resp) => resp.with_execution_time(execution_time).into(), HttpResponse::GreptimedbV1(resp) => resp.with_execution_time(execution_time).into(), HttpResponse::InfluxdbV1(resp) => resp.with_execution_time(execution_time).into(), + HttpResponse::Json(resp) => resp.with_execution_time(execution_time).into(), HttpResponse::Error(resp) => resp.with_execution_time(execution_time).into(), } } @@ -375,6 +382,7 @@ impl HttpResponse { HttpResponse::Csv(resp) => resp.with_limit(limit).into(), HttpResponse::Table(resp) => resp.with_limit(limit).into(), HttpResponse::GreptimedbV1(resp) => resp.with_limit(limit).into(), + HttpResponse::Json(resp) => resp.with_limit(limit).into(), _ => self, } } @@ -407,6 +415,7 @@ impl IntoResponse for HttpResponse { HttpResponse::Table(resp) => resp.into_response(), HttpResponse::GreptimedbV1(resp) => resp.into_response(), HttpResponse::InfluxdbV1(resp) => resp.into_response(), + HttpResponse::Json(resp) => resp.into_response(), HttpResponse::Error(resp) => resp.into_response(), } } @@ -452,6 +461,12 @@ impl From for HttpResponse { } } +impl From for HttpResponse { + fn from(value: JsonResponse) -> Self { + HttpResponse::Json(value) + } +} + async fn serve_api(Extension(api): Extension) -> impl IntoApiResponse { Json(api) } @@ -1131,6 +1146,7 @@ mod test { ResponseFormat::Csv, ResponseFormat::Table, ResponseFormat::Arrow, + ResponseFormat::Json, ] { let recordbatches = RecordBatches::try_new(schema.clone(), vec![recordbatch.clone()]).unwrap(); @@ -1141,6 +1157,7 @@ mod test { ResponseFormat::Table => TableResponse::from_output(outputs).await, ResponseFormat::GreptimedbV1 => GreptimedbV1Response::from_output(outputs).await, ResponseFormat::InfluxdbV1 => InfluxdbV1Response::from_output(outputs, None).await, + ResponseFormat::Json => JsonResponse::from_output(outputs).await, }; match json_resp { @@ -1210,6 +1227,21 @@ mod test { assert_eq!(rb.num_columns(), 2); assert_eq!(rb.num_rows(), 4); } + + HttpResponse::Json(resp) => { + let output = &resp.output()[0]; + if let GreptimeQueryOutput::Records(r) = output { + assert_eq!(r.num_rows(), 4); + assert_eq!(r.num_cols(), 2); + assert_eq!(r.schema.column_schemas[0].name, "numbers"); + assert_eq!(r.schema.column_schemas[0].data_type, "UInt32"); + assert_eq!(r.rows[0][0], serde_json::Value::from(1)); + assert_eq!(r.rows[0][1], serde_json::Value::Null); + } else { + panic!("invalid output type"); + } + } + HttpResponse::Error(err) => unreachable!("{err:?}"), } } diff --git a/src/servers/src/http/handler.rs b/src/servers/src/http/handler.rs index 1befc222401..4925c79639c 100644 --- a/src/servers/src/http/handler.rs +++ b/src/servers/src/http/handler.rs @@ -39,6 +39,7 @@ use crate::http::csv_result::CsvResponse; use crate::http::error_result::ErrorResponse; use crate::http::greptime_result_v1::GreptimedbV1Response; use crate::http::influxdb_result_v1::InfluxdbV1Response; +use crate::http::json_result::JsonResponse; use crate::http::table_result::TableResponse; use crate::http::{ ApiState, Epoch, GreptimeOptionsConfigState, GreptimeQueryOutput, HttpRecordsOutput, @@ -138,6 +139,7 @@ pub async fn sql( ResponseFormat::Table => TableResponse::from_output(outputs).await, ResponseFormat::GreptimedbV1 => GreptimedbV1Response::from_output(outputs).await, ResponseFormat::InfluxdbV1 => InfluxdbV1Response::from_output(outputs, epoch).await, + ResponseFormat::Json => JsonResponse::from_output(outputs).await, }; if let Some(limit) = query_params.limit { diff --git a/src/servers/src/http/json_result.rs b/src/servers/src/http/json_result.rs new file mode 100644 index 00000000000..30c47aff433 --- /dev/null +++ b/src/servers/src/http/json_result.rs @@ -0,0 +1,135 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use axum::http::{header, HeaderValue}; +use axum::response::{IntoResponse, Response}; +use common_error::status_code::StatusCode; +use common_query::Output; +use mime_guess::mime; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; +use serde_json::{json, Map}; + +use super::process_with_limit; +use crate::http::error_result::ErrorResponse; +use crate::http::header::{GREPTIME_DB_HEADER_EXECUTION_TIME, GREPTIME_DB_HEADER_FORMAT}; +use crate::http::{handler, GreptimeQueryOutput, HttpResponse, ResponseFormat}; + +/// The json format here is different from the default json output of greptime_result. +/// `JsonResponse` is intended to make it easier for user to consume data. +#[derive(Serialize, Deserialize, Debug, JsonSchema)] +pub struct JsonResponse { + output: Vec, + execution_time_ms: u64, +} + +impl JsonResponse { + pub async fn from_output(outputs: Vec>) -> HttpResponse { + match handler::from_output(outputs).await { + Err(err) => HttpResponse::Error(err), + Ok((output, _)) => { + if output.len() > 1 { + HttpResponse::Error(ErrorResponse::from_error_message( + StatusCode::InvalidArguments, + "cannot output multi-statements result in json format".to_string(), + )) + } else { + HttpResponse::Json(JsonResponse { + output, + execution_time_ms: 0, + }) + } + } + } + } + + pub fn output(&self) -> &[GreptimeQueryOutput] { + &self.output + } + + pub fn with_execution_time(mut self, execution_time: u64) -> Self { + self.execution_time_ms = execution_time; + self + } + + pub fn execution_time_ms(&self) -> u64 { + self.execution_time_ms + } + + pub fn with_limit(mut self, limit: usize) -> Self { + self.output = process_with_limit(self.output, limit); + self + } +} + +impl IntoResponse for JsonResponse { + fn into_response(mut self) -> Response { + debug_assert!( + self.output.len() <= 1, + "self.output has extra elements: {}", + self.output.len() + ); + + let execution_time = self.execution_time_ms; + let payload = match self.output.pop() { + None => String::default(), + Some(GreptimeQueryOutput::AffectedRows(n)) => json!({ + "data": [ + { + "affectedrows": n + }, + ], + "execution_time_ms": execution_time, + }) + .to_string(), + + Some(GreptimeQueryOutput::Records(records)) => { + let mut data = Vec::new(); + let schema = records.schema(); + + for row in records.rows.iter() { + let mut row_map = Map::new(); + for (i, col) in schema.column_schemas.iter().enumerate() { + row_map.insert(col.name.clone(), row[i].clone()); + } + data.push(row_map); + } + + json!({ + "data": data, + "execution_time_ms": execution_time, + }) + .to_string() + } + }; + + let mut resp = ( + [( + header::CONTENT_TYPE, + HeaderValue::from_static(mime::APPLICATION_JSON.as_ref()), + )], + payload, + ) + .into_response(); + resp.headers_mut().insert( + &GREPTIME_DB_HEADER_FORMAT, + HeaderValue::from_static(ResponseFormat::Json.as_str()), + ); + resp.headers_mut().insert( + &GREPTIME_DB_HEADER_EXECUTION_TIME, + HeaderValue::from(execution_time), + ); + resp + } +}