Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(opensearch): refactor global search querybuilder and add case insensitivity opensearch filters #6476

Merged
merged 7 commits into from
Nov 8, 2024
223 changes: 140 additions & 83 deletions crates/analytics/src/opensearch.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::collections::HashSet;

use api_models::{
analytics::search::SearchIndex,
errors::types::{ApiError, ApiErrorResponse},
Expand Down Expand Up @@ -456,7 +458,8 @@ pub struct OpenSearchQueryBuilder {
pub count: Option<i64>,
pub filters: Vec<(String, Vec<String>)>,
pub time_range: Option<OpensearchTimeRange>,
pub search_params: Vec<AuthInfo>,
search_params: Vec<AuthInfo>,
case_sensitive_fields: HashSet<&'static str>,
}

impl OpenSearchQueryBuilder {
Expand All @@ -469,6 +472,12 @@ impl OpenSearchQueryBuilder {
count: Default::default(),
filters: Default::default(),
time_range: Default::default(),
case_sensitive_fields: HashSet::from([
"customer_email.keyword",
"search_tags.keyword",
"card_last_4.keyword",
"payment_id.keyword",
]),
}
}

Expand All @@ -490,48 +499,16 @@ impl OpenSearchQueryBuilder {

pub fn get_status_field(&self, index: &SearchIndex) -> &str {
match index {
SearchIndex::Refunds => "refund_status.keyword",
SearchIndex::Disputes => "dispute_status.keyword",
SearchIndex::Refunds | SearchIndex::SessionizerRefunds => "refund_status.keyword",
SearchIndex::Disputes | SearchIndex::SessionizerDisputes => "dispute_status.keyword",
_ => "status.keyword",
}
}

pub fn replace_status_field(&self, filters: &[Value], index: &SearchIndex) -> Vec<Value> {
filters
.iter()
.map(|filter| {
if let Some(terms) = filter.get("terms").and_then(|v| v.as_object()) {
let mut new_filter = filter.clone();
if let Some(new_terms) =
new_filter.get_mut("terms").and_then(|v| v.as_object_mut())
{
let key = "status.keyword";
if let Some(status_terms) = terms.get(key) {
new_terms.remove(key);
new_terms.insert(
self.get_status_field(index).to_string(),
status_terms.clone(),
);
}
}
new_filter
} else {
filter.clone()
}
})
.collect()
}

/// # Panics
///
/// This function will panic if:
///
/// * The structure of the JSON query is not as expected (e.g., missing keys or incorrect types).
///
/// Ensure that the input data and the structure of the query are valid and correctly handled.
pub fn construct_payload(&self, indexes: &[SearchIndex]) -> QueryResult<Vec<Value>> {
let mut query_obj = Map::new();
let mut bool_obj = Map::new();
pub fn build_filter_array(
&self,
case_sensitive_filters: Vec<&(String, Vec<String>)>,
) -> Vec<Value> {
let mut filter_array = Vec::new();

filter_array.push(json!({
Expand All @@ -542,13 +519,12 @@ impl OpenSearchQueryBuilder {
}
}));

let mut filters = self
.filters
.iter()
let case_sensitive_json_filters = case_sensitive_filters
.into_iter()
.map(|(k, v)| json!({"terms": {k: v}}))
.collect::<Vec<Value>>();

filter_array.append(&mut filters);
filter_array.extend(case_sensitive_json_filters);

if let Some(ref time_range) = self.time_range {
let range = json!(time_range);
Expand All @@ -559,8 +535,72 @@ impl OpenSearchQueryBuilder {
}));
}

let should_array = self
.search_params
filter_array
}

pub fn build_case_insensitive_filters(
&self,
mut payload: Value,
case_insensitive_filters: &[&(String, Vec<String>)],
auth_array: Vec<Value>,
index: &SearchIndex,
) -> Value {
let mut must_array = case_insensitive_filters
.iter()
.map(|(k, v)| {
let key = if *k == "status.keyword" {
self.get_status_field(index).to_string()
} else {
k.clone()
};
json!({
"bool": {
"must": [
{
"bool": {
"should": v.iter().map(|value| {
json!({
"term": {
format!("{}", key): {
"value": value,
"case_insensitive": true
}
}
})
}).collect::<Vec<Value>>(),
"minimum_should_match": 1
}
}
]
}
})
})
.collect::<Vec<Value>>();

must_array.push(json!({ "bool": {
"must": [
{
"bool": {
"should": auth_array,
"minimum_should_match": 1
}
}
]
}}));

if let Some(query) = payload.get_mut("query") {
if let Some(bool_obj) = query.get_mut("bool") {
if let Some(bool_map) = bool_obj.as_object_mut() {
bool_map.insert("must".to_string(), Value::Array(must_array));
}
}
}

payload
}

pub fn build_auth_array(&self) -> Vec<Value> {
self.search_params
.iter()
.map(|user_level| match user_level {
AuthInfo::OrgLevel { org_id } => {
Expand All @@ -579,11 +619,17 @@ impl OpenSearchQueryBuilder {
})
}
AuthInfo::MerchantLevel {
org_id: _,
org_id,
merchant_ids,
} => {
let must_clauses = vec![
// TODO: Add org_id field to the filters
json!({
"term": {
"organization_id.keyword": {
"value": org_id
}
}
}),
json!({
"terms": {
"merchant_id.keyword": merchant_ids
Expand All @@ -598,12 +644,18 @@ impl OpenSearchQueryBuilder {
})
}
AuthInfo::ProfileLevel {
org_id: _,
org_id,
merchant_id,
profile_ids,
} => {
let must_clauses = vec![
// TODO: Add org_id field to the filters
json!({
"term": {
"organization_id.keyword": {
"value": org_id
}
}
}),
json!({
"term": {
"merchant_id.keyword": {
Expand All @@ -625,55 +677,60 @@ impl OpenSearchQueryBuilder {
})
}
})
.collect::<Vec<Value>>();
.collect::<Vec<Value>>()
}

/// # Panics
///
/// This function will panic if:
///
/// * The structure of the JSON query is not as expected (e.g., missing keys or incorrect types).
///
/// Ensure that the input data and the structure of the query are valid and correctly handled.
pub fn construct_payload(&self, indexes: &[SearchIndex]) -> QueryResult<Vec<Value>> {
let mut query_obj = Map::new();
let mut bool_obj = Map::new();

let (case_sensitive_filters, case_insensitive_filters): (Vec<_>, Vec<_>) = self
.filters
.iter()
.partition(|(k, _)| self.case_sensitive_fields.contains(k.as_str()));

let filter_array = self.build_filter_array(case_sensitive_filters);

if !filter_array.is_empty() {
bool_obj.insert("filter".to_string(), Value::Array(filter_array));
}

let should_array = self.build_auth_array();

if !bool_obj.is_empty() {
query_obj.insert("bool".to_string(), Value::Object(bool_obj));
}

let mut query = Map::new();
query.insert("query".to_string(), Value::Object(query_obj));
let mut sort_obj = Map::new();
sort_obj.insert(
"@timestamp".to_string(),
json!({
"order": "desc"
}),
);

Ok(indexes
.iter()
.map(|index| {
let updated_query = query
.get("query")
.and_then(|q| q.get("bool"))
.and_then(|b| b.get("filter"))
.and_then(|f| f.as_array())
.map(|filters| self.replace_status_field(filters, index))
.unwrap_or_default();
let mut final_bool_obj = Map::new();
if !updated_query.is_empty() {
final_bool_obj.insert("filter".to_string(), Value::Array(updated_query));
}
if !should_array.is_empty() {
final_bool_obj.insert("should".to_string(), Value::Array(should_array.clone()));
final_bool_obj
.insert("minimum_should_match".to_string(), Value::Number(1.into()));
}
let mut final_query = Map::new();
if !final_bool_obj.is_empty() {
final_query.insert("bool".to_string(), Value::Object(final_bool_obj));
}

let mut sort_obj = Map::new();
sort_obj.insert(
"@timestamp".to_string(),
json!({
"order": "desc"
}),
);
let payload = json!({
"query": Value::Object(final_query),
let mut payload = json!({
"query": query_obj.clone(),
"sort": [
Value::Object(sort_obj)
Value::Object(sort_obj.clone())
]
});
payload = self.build_case_insensitive_filters(
payload,
&case_insensitive_filters,
should_array.clone(),
index,
);
payload
})
.collect::<Vec<Value>>())
Expand Down
12 changes: 11 additions & 1 deletion crates/analytics/src/search.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,17 @@ pub async fn search_results(
search_params: Vec<AuthInfo>,
) -> CustomResult<GetSearchResponse, OpenSearchError> {
let search_req = req.search_req;

if search_req.query.trim().is_empty()
&& search_req
.filters
.as_ref()
.map_or(true, |filters| filters.is_all_none())
{
return Err(OpenSearchError::BadRequestError(
"Both query and filters are empty".to_string(),
)
.into());
}
let mut query_builder = OpenSearchQueryBuilder::new(
OpenSearchQuery::Search(req.index),
search_req.query,
Expand Down
Loading