Skip to content

Commit

Permalink
feat: make drainer logs queryable with request_id and global_id (#2771)
Browse files Browse the repository at this point in the history
  • Loading branch information
dracarys18 authored Nov 6, 2023
1 parent d335879 commit ff73aba
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 8 deletions.
22 changes: 15 additions & 7 deletions crates/diesel_models/src/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,21 @@ pub struct TypedSql {
}

impl TypedSql {
pub fn to_field_value_pairs(&self) -> crate::StorageResult<Vec<(&str, String)>> {
Ok(vec![(
"typed_sql",
serde_json::to_string(self)
.into_report()
.change_context(errors::DatabaseError::QueryGenerationFailed)?,
)])
pub fn to_field_value_pairs(
&self,
request_id: String,
global_id: String,
) -> crate::StorageResult<Vec<(&str, String)>> {
Ok(vec![
(
"typed_sql",
serde_json::to_string(self)
.into_report()
.change_context(errors::DatabaseError::QueryGenerationFailed)?,
),
("global_id", global_id),
("request_id", request_id),
])
}
}

Expand Down
16 changes: 16 additions & 0 deletions crates/drainer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::sync::{atomic, Arc};
use common_utils::signals::get_allowed_signals;
use diesel_models::kv;
use error_stack::{IntoReport, ResultExt};
use router_env::{instrument, tracing};
use tokio::sync::{mpsc, oneshot};

use crate::{connection::pg_connection, services::Store};
Expand Down Expand Up @@ -122,6 +123,7 @@ async fn drainer_handler(
active_tasks.fetch_add(1, atomic::Ordering::Release);

let stream_name = utils::get_drainer_stream_name(store.clone(), stream_index);

let drainer_result =
Box::pin(drainer(store.clone(), max_read_count, stream_name.as_str())).await;

Expand All @@ -130,13 +132,15 @@ async fn drainer_handler(
}

let flag_stream_name = utils::get_stream_key_flag(store.clone(), stream_index);

//TODO: USE THE RESULT FOR LOGGING
let output =
utils::make_stream_available(flag_stream_name.as_str(), store.redis_conn.as_ref()).await;
active_tasks.fetch_sub(1, atomic::Ordering::Release);
output
}

#[instrument(skip_all, fields(global_id, request_id, session_id))]
async fn drainer(
store: Arc<Store>,
max_read_count: u64,
Expand Down Expand Up @@ -174,9 +178,21 @@ async fn drainer(
}],
);

let session_id = common_utils::generate_id_with_default_len("drainer_session");

// TODO: Handle errors when deserialization fails and when DB error occurs
for entry in entries {
let typed_sql = entry.1.get("typed_sql").map_or(String::new(), Clone::clone);
let request_id = entry
.1
.get("request_id")
.map_or(String::new(), Clone::clone);
let global_id = entry.1.get("global_id").map_or(String::new(), Clone::clone);

tracing::Span::current().record("request_id", request_id);
tracing::Span::current().record("global_id", global_id);
tracing::Span::current().record("session_id", &session_id);

let result = serde_json::from_str::<kv::DBOperation>(&typed_sql);
let db_op = match result {
Ok(f) => f,
Expand Down
2 changes: 2 additions & 0 deletions crates/drainer/src/services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub struct Store {
pub master_pool: PgPool,
pub redis_conn: Arc<redis_interface::RedisConnectionPool>,
pub config: StoreConfig,
pub request_id: Option<String>,
}

#[derive(Clone)]
Expand All @@ -30,6 +31,7 @@ impl Store {
drainer_stream_name: config.drainer.stream_name.clone(),
drainer_num_partitions: config.drainer.num_partitions,
},
request_id: None,
}
}

Expand Down
20 changes: 20 additions & 0 deletions crates/router/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ pub trait StorageInterface:
+ MasterKeyInterface
+ payment_link::PaymentLinkInterface
+ RedisConnInterface
+ RequestIdStore
+ business_profile::BusinessProfileInterface
+ organization::OrganizationInterface
+ routing_algorithm::RoutingAlgorithmInterface
Expand Down Expand Up @@ -118,6 +119,25 @@ impl StorageInterface for MockDb {
}
}

pub trait RequestIdStore {
fn add_request_id(&mut self, _request_id: String) {}
fn get_request_id(&self) -> Option<String> {
None
}
}

impl RequestIdStore for MockDb {}

impl RequestIdStore for Store {
fn add_request_id(&mut self, request_id: String) {
self.request_id = Some(request_id)
}

fn get_request_id(&self) -> Option<String> {
self.request_id.clone()
}
}

pub async fn get_and_deserialize_key<T>(
db: &dyn StorageInterface,
key: &str,
Expand Down
2 changes: 2 additions & 0 deletions crates/router/src/routes/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ impl AppStateInfo for AppState {
}
fn add_request_id(&mut self, request_id: RequestId) {
self.api_client.add_request_id(request_id);
self.store.add_request_id(request_id.to_string())
}

fn add_merchant_id(&mut self, merchant_id: Option<String>) {
self.api_client.add_merchant_id(merchant_id);
}
Expand Down
4 changes: 4 additions & 0 deletions crates/router_env/src/logger/formatter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ const REQUEST_METHOD: &str = "request_method";
const REQUEST_URL_PATH: &str = "request_url_path";
const REQUEST_ID: &str = "request_id";
const WORKFLOW_ID: &str = "workflow_id";
const GLOBAL_ID: &str = "global_id";
const SESSION_ID: &str = "session_id";

/// Set of predefined implicit keys.
pub static IMPLICIT_KEYS: Lazy<rustc_hash::FxHashSet<&str>> = Lazy::new(|| {
Expand Down Expand Up @@ -85,6 +87,8 @@ pub static EXTRA_IMPLICIT_KEYS: Lazy<rustc_hash::FxHashSet<&str>> = Lazy::new(||
set.insert(REQUEST_METHOD);
set.insert(REQUEST_URL_PATH);
set.insert(REQUEST_ID);
set.insert(GLOBAL_ID);
set.insert(SESSION_ID);
set.insert(WORKFLOW_ID);

set
Expand Down
12 changes: 11 additions & 1 deletion crates/storage_impl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub struct RouterStore<T: DatabaseStore> {
db_store: T,
cache_store: RedisStore,
master_encryption_key: StrongSecret<Vec<u8>>,
pub request_id: Option<String>,
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -103,6 +104,7 @@ impl<T: DatabaseStore> RouterStore<T> {
db_store,
cache_store,
master_encryption_key: encryption_key,
request_id: None,
})
}

Expand All @@ -128,6 +130,7 @@ impl<T: DatabaseStore> RouterStore<T> {
db_store,
cache_store,
master_encryption_key: encryption_key,
request_id: None,
})
}
}
Expand All @@ -138,6 +141,7 @@ pub struct KVRouterStore<T: DatabaseStore> {
drainer_stream_name: String,
drainer_num_partitions: u8,
ttl_for_kv: u32,
pub request_id: Option<String>,
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -179,11 +183,14 @@ impl<T: DatabaseStore> KVRouterStore<T> {
drainer_num_partitions: u8,
ttl_for_kv: u32,
) -> Self {
let request_id = store.request_id.clone();

Self {
router_store: store,
drainer_stream_name,
drainer_num_partitions,
ttl_for_kv,
request_id,
}
}

Expand All @@ -203,6 +210,9 @@ impl<T: DatabaseStore> KVRouterStore<T> {
where
R: crate::redis::kv_store::KvStorePartition,
{
let global_id = format!("{}", partition_key);
let request_id = self.request_id.clone().unwrap_or_default();

let shard_key = R::shard_key(partition_key, self.drainer_num_partitions);
let stream_name = self.get_drainer_stream_name(&shard_key);
self.router_store
Expand All @@ -212,7 +222,7 @@ impl<T: DatabaseStore> KVRouterStore<T> {
&stream_name,
&redis_interface::RedisEntryId::AutoGeneratedID,
redis_entry
.to_field_value_pairs()
.to_field_value_pairs(request_id, global_id)
.change_context(RedisError::JsonSerializationFailed)?,
)
.await
Expand Down

0 comments on commit ff73aba

Please sign in to comment.