Skip to content

Commit

Permalink
implemented logic
Browse files Browse the repository at this point in the history
  • Loading branch information
vishalkrishnads committed Aug 22, 2024
1 parent 6c83c37 commit b286a73
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 48 deletions.
7 changes: 6 additions & 1 deletion server/src/handlers/http/modal/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,18 @@ use crate::storage;
use crate::sync;
use crate::users::dashboards::DASHBOARDS;
use crate::users::filters::FILTERS;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;

use actix_web::web::resource;
use actix_web::web::{resource, Data};
use actix_web::Resource;
use actix_web::Scope;
use actix_web::{web, App, HttpServer};
use actix_web_prometheus::PrometheusMetrics;
use actix_web_static_files::ResourceFiles;
use async_trait::async_trait;
use tokio::sync::Mutex;

use crate::{
handlers::http::{
Expand Down Expand Up @@ -84,7 +86,10 @@ impl ParseableServer for Server {
};

let create_app_fn = move || {
let query_map: query::QueryMap = Arc::new(Mutex::new(HashMap::new()));

App::new()
.app_data(Data::new(query_map))
.wrap(prometheus.clone())
.configure(|cfg| Server::configure_routes(cfg, oidc_client.clone()))
.wrap(actix_web::middleware::Logger::default())
Expand Down
148 changes: 106 additions & 42 deletions server/src/handlers/http/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

use actix_web::http::header::ContentType;
use actix_web::web::{self, Json};
use actix_web::{FromRequest, HttpRequest, Responder};
use actix_web::{FromRequest, HttpRequest, HttpResponse};
use chrono::{DateTime, Utc};
use datafusion::common::tree_node::TreeNode;
use datafusion::error::DataFusionError;
Expand All @@ -28,10 +28,13 @@ use http::StatusCode;
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Instant;
use std::time::{Duration, Instant};
use tokio::sync::Mutex;
use tokio::time::sleep;

use crate::event::error::EventError;
use crate::handlers::http::fetch_schema;
use crate::rbac::map::SessionKey;
use arrow_array::RecordBatch;

use crate::event::commit_schema;
Expand All @@ -41,7 +44,7 @@ use crate::option::{Mode, CONFIG};
use crate::query::error::ExecuteError;
use crate::query::Query as LogicalQuery;
use crate::query::{TableScanVisitor, QUERY_SESSION};
use crate::querycache::{CacheMetadata, QueryCacheManager};
use crate::querycache::{generate_hash, CacheMetadata, QueryCacheManager};
use crate::rbac::role::{Action, Permission};
use crate::rbac::Users;
use crate::response::QueryResponse;
Expand All @@ -50,7 +53,7 @@ use crate::storage::ObjectStorageError;
use crate::utils::actix::extract_session_key_from_req;

/// Query Request through http endpoint.
#[derive(Debug, serde::Deserialize, serde::Serialize)]
#[derive(Debug, serde::Deserialize, serde::Serialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Query {
pub query: String,
Expand All @@ -64,81 +67,142 @@ pub struct Query {
pub filter_tags: Option<Vec<String>>,
}

pub type QueryMap = Arc<Mutex<HashMap<u64, QueryStatus>>>;
enum QueryStatus {
Processing,
Result(Query),
Result(QueryResponse),
}

pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Responder, QueryError> {
pub async fn query(
req: HttpRequest,
query_request: Query,
query_map: web::Data<QueryMap>,
) -> Result<HttpResponse, QueryError> {
let session_state = QUERY_SESSION.state();

// get the logical plan and extract the table name
// Generate hash for the query based on start, end, and query string
let hash = generate_hash(
&query_request.start_time,
&query_request.end_time,
&query_request.query,
);

// Check if the query is already being processed or completed
{
let query_map = query_map.lock().await;
if let Some(status) = query_map.get(&hash) {
match status {
QueryStatus::Processing => {
// Wait for 55 seconds and check again
sleep(Duration::from_secs(55)).await;
if let Some(QueryStatus::Result(response)) = query_map.get(&hash) {
return Ok(response.clone().to_http()?);
} else {
return Ok(HttpResponse::Accepted().finish());
}
}
QueryStatus::Result(response) => {
return Ok(response.clone().to_http()?);
}
}
}
}

// Insert the query into the map as Processing
{
let mut query_map = query_map.lock().await;
query_map.insert(hash.clone(), QueryStatus::Processing);
}

// Clone necessary data for the spawned task
let query_request_clone = query_request.clone();
let hash_clone = hash.clone();
let session_state_clone = session_state.clone();
let creds = extract_session_key_from_req(&req).unwrap().to_owned();

// Spawn a separate task to process the query and cache the results
tokio::spawn(async move {
let mut query_map = query_map.lock().await;

let result = process_query(
query_request_clone,
Arc::new(session_state_clone),
creds,
).await;

// Update the query status in the map
match result {
Ok(response) => {
query_map.insert(hash_clone, QueryStatus::Result(response));
}
Err(err) => {
log::error!("Error processing query: {:?}", err);
query_map.remove(&hash_clone);
}
}
});

// Wait for 55 seconds and respond with HTTP 202
sleep(Duration::from_secs(55)).await;
Ok(HttpResponse::Accepted().finish())
}

async fn process_query(
query_request: Query,
session_state: Arc<SessionState>,
creds: SessionKey,
) -> Result<QueryResponse, QueryError> {
let raw_logical_plan = session_state
.create_logical_plan(&query_request.query)
.await?;
.await.unwrap();

// create a visitor to extract the table name
let mut visitor = TableScanVisitor::default();
let _ = raw_logical_plan.visit(&mut visitor);
let _stream = visitor

let visitor_clone = visitor.clone();
let stream = visitor
.top()
.ok_or_else(|| QueryError::MalformedQuery("Table Name not found in SQL"))?;

let query_cache_manager = QueryCacheManager::global(CONFIG.parseable.query_cache_size).await?;

// deal with cached data
if let Ok(results) = get_results_from_cache(
query_cache_manager,
&query_request.start_time,
&query_request.end_time,
&query_request.query,
query_request.send_null,
query_request.fields,
)
.await
{
return results.to_http();
};

let tables = visitor.into_inner();
// Process the query
let tables = visitor_clone.into_inner();
update_schema_when_distributed(tables).await?;

let mut query: LogicalQuery = into_query(&query_request, &session_state).await?;

let creds = extract_session_key_from_req(&req)?;
let permissions = Users.get_permissions(&creds);

let table_name = query
.first_table_name()
let table_name = query.first_table_name()
.ok_or_else(|| QueryError::MalformedQuery("No table name found in query"))?;

authorize_and_set_filter_tags(&mut query, permissions, &table_name)?;

let time = Instant::now();
let (records, fields) = query.execute(table_name.clone()).await?;
// deal with cache saving
if let Err(err) = put_results_in_cache(
query_cache_manager,
&table_name,

let query_cache_manager = QueryCacheManager::global(CONFIG.parseable.query_cache_size)
.await?;

// Cache the results
put_results_in_cache(
&query_cache_manager,
stream,
&records,
query.start.to_rfc3339(),
query.end.to_rfc3339(),
query_request.query,
query_request.query.clone(),
)
.await
{
log::error!("{}", err);
};
.await?;

// Create the response
let response = QueryResponse {
records,
fields,
fill_null: query_request.send_null,
with_fields: query_request.fields,
}
.to_http()?;
};

let time = time.elapsed().as_secs_f64();

QUERY_EXECUTE_TIME
.with_label_values(&[&table_name])
.observe(time);
Expand Down
2 changes: 1 addition & 1 deletion server/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ impl Query {
}
}

#[derive(Debug, Default)]
#[derive(Debug, Default, Clone)]
pub(crate) struct TableScanVisitor {
tables: Vec<String>,
}
Expand Down
2 changes: 1 addition & 1 deletion server/src/querycache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ impl QueryCacheManager {
}
}

fn generate_hash(start: &str, end: &str, query: &str) -> u64 {
pub fn generate_hash(start: &str, end: &str, query: &str) -> u64 {
let mut hasher = DefaultHasher::new();
start.hash(&mut hasher);
end.hash(&mut hasher);
Expand Down
7 changes: 4 additions & 3 deletions server/src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ use crate::{
record_batches_to_json,
},
};
use actix_web::{web, Responder};
use actix_web::HttpResponse;
use datafusion::arrow::record_batch::RecordBatch;
use itertools::Itertools;
use serde_json::{json, Value};
use tonic::{Response, Status};

#[derive(Clone)]
pub struct QueryResponse {
pub records: Vec<RecordBatch>,
pub fields: Vec<String>,
Expand All @@ -37,7 +38,7 @@ pub struct QueryResponse {
}

impl QueryResponse {
pub fn to_http(&self) -> Result<impl Responder, QueryError> {
pub fn to_http(&self) -> Result<HttpResponse, QueryError> {
log::info!("{}", "Returning query results");
let records: Vec<&RecordBatch> = self.records.iter().collect();
let mut json_records = record_batches_to_json(&records)?;
Expand All @@ -62,7 +63,7 @@ impl QueryResponse {
Value::Array(values)
};

Ok(web::Json(response))
Ok(HttpResponse::Ok().json(response))
}

pub fn into_flight(self) -> Result<Response<DoGetStream>, Status> {
Expand Down

0 comments on commit b286a73

Please sign in to comment.