diff --git a/crates/diesel_models/src/kv.rs b/crates/diesel_models/src/kv.rs index f1145a4b6e1f..81fa7a88ee3b 100644 --- a/crates/diesel_models/src/kv.rs +++ b/crates/diesel_models/src/kv.rs @@ -27,13 +27,21 @@ pub struct TypedSql { } impl TypedSql { - pub fn to_field_value_pairs(&self) -> crate::StorageResult> { - Ok(vec![( - "typed_sql", - serde_json::to_string(self) - .into_report() - .change_context(errors::DatabaseError::QueryGenerationFailed)?, - )]) + pub fn to_field_value_pairs( + &self, + request_id: String, + global_id: String, + ) -> crate::StorageResult> { + Ok(vec![ + ( + "typed_sql", + serde_json::to_string(self) + .into_report() + .change_context(errors::DatabaseError::QueryGenerationFailed)?, + ), + ("global_id", global_id), + ("request_id", request_id), + ]) } } diff --git a/crates/drainer/src/lib.rs b/crates/drainer/src/lib.rs index 7dcbc2c518cf..19abe9ba3aad 100644 --- a/crates/drainer/src/lib.rs +++ b/crates/drainer/src/lib.rs @@ -10,6 +10,7 @@ use std::sync::{atomic, Arc}; use common_utils::signals::get_allowed_signals; use diesel_models::kv; use error_stack::{IntoReport, ResultExt}; +use router_env::{instrument, tracing}; use tokio::sync::{mpsc, oneshot}; use crate::{connection::pg_connection, services::Store}; @@ -122,6 +123,7 @@ async fn drainer_handler( active_tasks.fetch_add(1, atomic::Ordering::Release); let stream_name = utils::get_drainer_stream_name(store.clone(), stream_index); + let drainer_result = Box::pin(drainer(store.clone(), max_read_count, stream_name.as_str())).await; @@ -130,6 +132,7 @@ async fn drainer_handler( } let flag_stream_name = utils::get_stream_key_flag(store.clone(), stream_index); + //TODO: USE THE RESULT FOR LOGGING let output = utils::make_stream_available(flag_stream_name.as_str(), store.redis_conn.as_ref()).await; @@ -137,6 +140,7 @@ async fn drainer_handler( output } +#[instrument(skip_all, fields(global_id, request_id, session_id))] async fn drainer( store: Arc, max_read_count: u64, @@ -174,9 +178,21 @@ async fn drainer( }], ); + let session_id = common_utils::generate_id_with_default_len("drainer_session"); + // TODO: Handle errors when deserialization fails and when DB error occurs for entry in entries { let typed_sql = entry.1.get("typed_sql").map_or(String::new(), Clone::clone); + let request_id = entry + .1 + .get("request_id") + .map_or(String::new(), Clone::clone); + let global_id = entry.1.get("global_id").map_or(String::new(), Clone::clone); + + tracing::Span::current().record("request_id", request_id); + tracing::Span::current().record("global_id", global_id); + tracing::Span::current().record("session_id", &session_id); + let result = serde_json::from_str::(&typed_sql); let db_op = match result { Ok(f) => f, diff --git a/crates/drainer/src/services.rs b/crates/drainer/src/services.rs index 6edec31f26d7..73f66f27dbf5 100644 --- a/crates/drainer/src/services.rs +++ b/crates/drainer/src/services.rs @@ -7,6 +7,7 @@ pub struct Store { pub master_pool: PgPool, pub redis_conn: Arc, pub config: StoreConfig, + pub request_id: Option, } #[derive(Clone)] @@ -30,6 +31,7 @@ impl Store { drainer_stream_name: config.drainer.stream_name.clone(), drainer_num_partitions: config.drainer.num_partitions, }, + request_id: None, } } diff --git a/crates/router/src/db.rs b/crates/router/src/db.rs index 210f3d21e8cc..b62ffd2c530f 100644 --- a/crates/router/src/db.rs +++ b/crates/router/src/db.rs @@ -76,6 +76,7 @@ pub trait StorageInterface: + MasterKeyInterface + payment_link::PaymentLinkInterface + RedisConnInterface + + RequestIdStore + business_profile::BusinessProfileInterface + organization::OrganizationInterface + routing_algorithm::RoutingAlgorithmInterface @@ -118,6 +119,25 @@ impl StorageInterface for MockDb { } } +pub trait RequestIdStore { + fn add_request_id(&mut self, _request_id: String) {} + fn get_request_id(&self) -> Option { + None + } +} + +impl RequestIdStore for MockDb {} + +impl RequestIdStore for Store { + fn add_request_id(&mut self, request_id: String) { + self.request_id = Some(request_id) + } + + fn get_request_id(&self) -> Option { + self.request_id.clone() + } +} + pub async fn get_and_deserialize_key( db: &dyn StorageInterface, key: &str, diff --git a/crates/router/src/routes/app.rs b/crates/router/src/routes/app.rs index 0369bb612668..268b2ed703bf 100644 --- a/crates/router/src/routes/app.rs +++ b/crates/router/src/routes/app.rs @@ -80,7 +80,9 @@ impl AppStateInfo for AppState { } fn add_request_id(&mut self, request_id: RequestId) { self.api_client.add_request_id(request_id); + self.store.add_request_id(request_id.to_string()) } + fn add_merchant_id(&mut self, merchant_id: Option) { self.api_client.add_merchant_id(merchant_id); } diff --git a/crates/router_env/src/logger/formatter.rs b/crates/router_env/src/logger/formatter.rs index ce2fd74e0e87..4fd94c221637 100644 --- a/crates/router_env/src/logger/formatter.rs +++ b/crates/router_env/src/logger/formatter.rs @@ -51,6 +51,8 @@ const REQUEST_METHOD: &str = "request_method"; const REQUEST_URL_PATH: &str = "request_url_path"; const REQUEST_ID: &str = "request_id"; const WORKFLOW_ID: &str = "workflow_id"; +const GLOBAL_ID: &str = "global_id"; +const SESSION_ID: &str = "session_id"; /// Set of predefined implicit keys. pub static IMPLICIT_KEYS: Lazy> = Lazy::new(|| { @@ -85,6 +87,8 @@ pub static EXTRA_IMPLICIT_KEYS: Lazy> = Lazy::new(|| set.insert(REQUEST_METHOD); set.insert(REQUEST_URL_PATH); set.insert(REQUEST_ID); + set.insert(GLOBAL_ID); + set.insert(SESSION_ID); set.insert(WORKFLOW_ID); set diff --git a/crates/storage_impl/src/lib.rs b/crates/storage_impl/src/lib.rs index 17d432c7932b..cef4a8981a43 100644 --- a/crates/storage_impl/src/lib.rs +++ b/crates/storage_impl/src/lib.rs @@ -31,6 +31,7 @@ pub struct RouterStore { db_store: T, cache_store: RedisStore, master_encryption_key: StrongSecret>, + pub request_id: Option, } #[async_trait::async_trait] @@ -103,6 +104,7 @@ impl RouterStore { db_store, cache_store, master_encryption_key: encryption_key, + request_id: None, }) } @@ -128,6 +130,7 @@ impl RouterStore { db_store, cache_store, master_encryption_key: encryption_key, + request_id: None, }) } } @@ -138,6 +141,7 @@ pub struct KVRouterStore { drainer_stream_name: String, drainer_num_partitions: u8, ttl_for_kv: u32, + pub request_id: Option, } #[async_trait::async_trait] @@ -179,11 +183,14 @@ impl KVRouterStore { drainer_num_partitions: u8, ttl_for_kv: u32, ) -> Self { + let request_id = store.request_id.clone(); + Self { router_store: store, drainer_stream_name, drainer_num_partitions, ttl_for_kv, + request_id, } } @@ -203,6 +210,9 @@ impl KVRouterStore { where R: crate::redis::kv_store::KvStorePartition, { + let global_id = format!("{}", partition_key); + let request_id = self.request_id.clone().unwrap_or_default(); + let shard_key = R::shard_key(partition_key, self.drainer_num_partitions); let stream_name = self.get_drainer_stream_name(&shard_key); self.router_store @@ -212,7 +222,7 @@ impl KVRouterStore { &stream_name, &redis_interface::RedisEntryId::AutoGeneratedID, redis_entry - .to_field_value_pairs() + .to_field_value_pairs(request_id, global_id) .change_context(RedisError::JsonSerializationFailed)?, ) .await