Skip to content

Commit

Permalink
feat(opensearch): refactor global search querybuilder and add case in…
Browse files Browse the repository at this point in the history
…sensitivity opensearch filters (#6476)
  • Loading branch information
tsdk02 authored Nov 8, 2024
1 parent 6823418 commit 529f1a7
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 84 deletions.
223 changes: 140 additions & 83 deletions crates/analytics/src/opensearch.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::collections::HashSet;

use api_models::{
analytics::search::SearchIndex,
errors::types::{ApiError, ApiErrorResponse},
Expand Down Expand Up @@ -456,7 +458,8 @@ pub struct OpenSearchQueryBuilder {
pub count: Option<i64>,
pub filters: Vec<(String, Vec<String>)>,
pub time_range: Option<OpensearchTimeRange>,
pub search_params: Vec<AuthInfo>,
search_params: Vec<AuthInfo>,
case_sensitive_fields: HashSet<&'static str>,
}

impl OpenSearchQueryBuilder {
Expand All @@ -469,6 +472,12 @@ impl OpenSearchQueryBuilder {
count: Default::default(),
filters: Default::default(),
time_range: Default::default(),
case_sensitive_fields: HashSet::from([
"customer_email.keyword",
"search_tags.keyword",
"card_last_4.keyword",
"payment_id.keyword",
]),
}
}

Expand All @@ -490,48 +499,16 @@ impl OpenSearchQueryBuilder {

pub fn get_status_field(&self, index: &SearchIndex) -> &str {
match index {
SearchIndex::Refunds => "refund_status.keyword",
SearchIndex::Disputes => "dispute_status.keyword",
SearchIndex::Refunds | SearchIndex::SessionizerRefunds => "refund_status.keyword",
SearchIndex::Disputes | SearchIndex::SessionizerDisputes => "dispute_status.keyword",
_ => "status.keyword",
}
}

pub fn replace_status_field(&self, filters: &[Value], index: &SearchIndex) -> Vec<Value> {
filters
.iter()
.map(|filter| {
if let Some(terms) = filter.get("terms").and_then(|v| v.as_object()) {
let mut new_filter = filter.clone();
if let Some(new_terms) =
new_filter.get_mut("terms").and_then(|v| v.as_object_mut())
{
let key = "status.keyword";
if let Some(status_terms) = terms.get(key) {
new_terms.remove(key);
new_terms.insert(
self.get_status_field(index).to_string(),
status_terms.clone(),
);
}
}
new_filter
} else {
filter.clone()
}
})
.collect()
}

/// # Panics
///
/// This function will panic if:
///
/// * The structure of the JSON query is not as expected (e.g., missing keys or incorrect types).
///
/// Ensure that the input data and the structure of the query are valid and correctly handled.
pub fn construct_payload(&self, indexes: &[SearchIndex]) -> QueryResult<Vec<Value>> {
let mut query_obj = Map::new();
let mut bool_obj = Map::new();
pub fn build_filter_array(
&self,
case_sensitive_filters: Vec<&(String, Vec<String>)>,
) -> Vec<Value> {
let mut filter_array = Vec::new();

filter_array.push(json!({
Expand All @@ -542,13 +519,12 @@ impl OpenSearchQueryBuilder {
}
}));

let mut filters = self
.filters
.iter()
let case_sensitive_json_filters = case_sensitive_filters
.into_iter()
.map(|(k, v)| json!({"terms": {k: v}}))
.collect::<Vec<Value>>();

filter_array.append(&mut filters);
filter_array.extend(case_sensitive_json_filters);

if let Some(ref time_range) = self.time_range {
let range = json!(time_range);
Expand All @@ -559,8 +535,72 @@ impl OpenSearchQueryBuilder {
}));
}

let should_array = self
.search_params
filter_array
}

pub fn build_case_insensitive_filters(
&self,
mut payload: Value,
case_insensitive_filters: &[&(String, Vec<String>)],
auth_array: Vec<Value>,
index: &SearchIndex,
) -> Value {
let mut must_array = case_insensitive_filters
.iter()
.map(|(k, v)| {
let key = if *k == "status.keyword" {
self.get_status_field(index).to_string()
} else {
k.clone()
};
json!({
"bool": {
"must": [
{
"bool": {
"should": v.iter().map(|value| {
json!({
"term": {
format!("{}", key): {
"value": value,
"case_insensitive": true
}
}
})
}).collect::<Vec<Value>>(),
"minimum_should_match": 1
}
}
]
}
})
})
.collect::<Vec<Value>>();

must_array.push(json!({ "bool": {
"must": [
{
"bool": {
"should": auth_array,
"minimum_should_match": 1
}
}
]
}}));

if let Some(query) = payload.get_mut("query") {
if let Some(bool_obj) = query.get_mut("bool") {
if let Some(bool_map) = bool_obj.as_object_mut() {
bool_map.insert("must".to_string(), Value::Array(must_array));
}
}
}

payload
}

pub fn build_auth_array(&self) -> Vec<Value> {
self.search_params
.iter()
.map(|user_level| match user_level {
AuthInfo::OrgLevel { org_id } => {
Expand All @@ -579,11 +619,17 @@ impl OpenSearchQueryBuilder {
})
}
AuthInfo::MerchantLevel {
org_id: _,
org_id,
merchant_ids,
} => {
let must_clauses = vec![
// TODO: Add org_id field to the filters
json!({
"term": {
"organization_id.keyword": {
"value": org_id
}
}
}),
json!({
"terms": {
"merchant_id.keyword": merchant_ids
Expand All @@ -598,12 +644,18 @@ impl OpenSearchQueryBuilder {
})
}
AuthInfo::ProfileLevel {
org_id: _,
org_id,
merchant_id,
profile_ids,
} => {
let must_clauses = vec![
// TODO: Add org_id field to the filters
json!({
"term": {
"organization_id.keyword": {
"value": org_id
}
}
}),
json!({
"term": {
"merchant_id.keyword": {
Expand All @@ -625,55 +677,60 @@ impl OpenSearchQueryBuilder {
})
}
})
.collect::<Vec<Value>>();
.collect::<Vec<Value>>()
}

/// # Panics
///
/// This function will panic if:
///
/// * The structure of the JSON query is not as expected (e.g., missing keys or incorrect types).
///
/// Ensure that the input data and the structure of the query are valid and correctly handled.
pub fn construct_payload(&self, indexes: &[SearchIndex]) -> QueryResult<Vec<Value>> {
let mut query_obj = Map::new();
let mut bool_obj = Map::new();

let (case_sensitive_filters, case_insensitive_filters): (Vec<_>, Vec<_>) = self
.filters
.iter()
.partition(|(k, _)| self.case_sensitive_fields.contains(k.as_str()));

let filter_array = self.build_filter_array(case_sensitive_filters);

if !filter_array.is_empty() {
bool_obj.insert("filter".to_string(), Value::Array(filter_array));
}

let should_array = self.build_auth_array();

if !bool_obj.is_empty() {
query_obj.insert("bool".to_string(), Value::Object(bool_obj));
}

let mut query = Map::new();
query.insert("query".to_string(), Value::Object(query_obj));
let mut sort_obj = Map::new();
sort_obj.insert(
"@timestamp".to_string(),
json!({
"order": "desc"
}),
);

Ok(indexes
.iter()
.map(|index| {
let updated_query = query
.get("query")
.and_then(|q| q.get("bool"))
.and_then(|b| b.get("filter"))
.and_then(|f| f.as_array())
.map(|filters| self.replace_status_field(filters, index))
.unwrap_or_default();
let mut final_bool_obj = Map::new();
if !updated_query.is_empty() {
final_bool_obj.insert("filter".to_string(), Value::Array(updated_query));
}
if !should_array.is_empty() {
final_bool_obj.insert("should".to_string(), Value::Array(should_array.clone()));
final_bool_obj
.insert("minimum_should_match".to_string(), Value::Number(1.into()));
}
let mut final_query = Map::new();
if !final_bool_obj.is_empty() {
final_query.insert("bool".to_string(), Value::Object(final_bool_obj));
}

let mut sort_obj = Map::new();
sort_obj.insert(
"@timestamp".to_string(),
json!({
"order": "desc"
}),
);
let payload = json!({
"query": Value::Object(final_query),
let mut payload = json!({
"query": query_obj.clone(),
"sort": [
Value::Object(sort_obj)
Value::Object(sort_obj.clone())
]
});
payload = self.build_case_insensitive_filters(
payload,
&case_insensitive_filters,
should_array.clone(),
index,
);
payload
})
.collect::<Vec<Value>>())
Expand Down
12 changes: 11 additions & 1 deletion crates/analytics/src/search.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,17 @@ pub async fn search_results(
search_params: Vec<AuthInfo>,
) -> CustomResult<GetSearchResponse, OpenSearchError> {
let search_req = req.search_req;

if search_req.query.trim().is_empty()
&& search_req
.filters
.as_ref()
.map_or(true, |filters| filters.is_all_none())
{
return Err(OpenSearchError::BadRequestError(
"Both query and filters are empty".to_string(),
)
.into());
}
let mut query_builder = OpenSearchQueryBuilder::new(
OpenSearchQuery::Search(req.index),
search_req.query,
Expand Down

0 comments on commit 529f1a7

Please sign in to comment.