From e22bf4e42f3a26f8c9755c17da3604ab98dceb30 Mon Sep 17 00:00:00 2001 From: Kamil Koczurek Date: Fri, 20 Sep 2024 11:20:28 +0200 Subject: [PATCH] Implement healthcheck service --- Cargo.lock | 27 ++ Cargo.toml | 36 +-- core/healthcheck/Cargo.toml | 25 ++ core/healthcheck/src/lib.rs | 3 + core/healthcheck/src/service.rs | 15 ++ core/healthcheck/src/service/rest.rs | 234 ++++++++++++++++++ core/market/src/matcher.rs | 24 +- core/market/src/protocol/discovery.rs | 9 + core/market/src/protocol/discovery/builder.rs | 2 + core/model/src/market.rs | 11 + core/serv-api/web/src/middleware/auth/mod.rs | 2 +- core/serv/src/main.rs | 3 + core/version/Cargo.toml | 6 +- core/version/src/service/rest.rs | 7 +- 14 files changed, 381 insertions(+), 23 deletions(-) create mode 100644 core/healthcheck/Cargo.toml create mode 100644 core/healthcheck/src/lib.rs create mode 100644 core/healthcheck/src/service.rs create mode 100644 core/healthcheck/src/service/rest.rs diff --git a/Cargo.lock b/Cargo.lock index b68d33147d..c465bca019 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9345,6 +9345,28 @@ dependencies = [ "ya-service-bus", ] +[[package]] +name = "ya-healthcheck" +version = "0.1.0" +dependencies = [ + "actix-web", + "anyhow", + "chrono", + "http 1.1.0", + "log", + "problem_details", + "serde", + "serde_json", + "tokio", + "ya-client", + "ya-core-model", + "ya-net", + "ya-service-api", + "ya-service-api-interfaces", + "ya-service-api-web", + "ya-service-bus", +] + [[package]] name = "ya-identity" version = "0.3.0" @@ -10361,8 +10383,10 @@ dependencies = [ "chrono", "diesel", "diesel_migrations", + "http 1.1.0", "log", "metrics 0.12.1", + "problem_details", "self_update", "serde", "serde_json", @@ -10372,9 +10396,11 @@ dependencies = [ "ya-client", "ya-compile-time-utils", "ya-core-model", + "ya-net", "ya-persistence", "ya-service-api", "ya-service-api-interfaces", + "ya-service-api-web", "ya-service-bus", ] @@ -10460,6 +10486,7 @@ dependencies = [ "ya-fd-metrics", "ya-file-logging", "ya-gsb-api", + "ya-healthcheck", "ya-identity", "ya-market", "ya-metrics", diff --git a/Cargo.toml b/Cargo.toml index 9265a79f4f..56b8d522e8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,9 +17,9 @@ erc20-driver = ['ya-erc20-driver'] static-openssl = ["openssl/vendored", "openssl-probe"] tos = [] framework-test = [ - 'ya-exe-unit/framework-test', - 'ya-payment/framework-test', - 'ya-identity/framework-test', + 'ya-exe-unit/framework-test', + 'ya-payment/framework-test', + 'ya-identity/framework-test', ] # Temporary to make goth integration tests work central-net = ['ya-net/central-net'] @@ -61,13 +61,14 @@ ya-utils-futures.workspace = true ya-utils-process = { workspace = true, features = ["lock"] } ya-utils-networking.workspace = true ya-fd-metrics = { path = "utils/fd-metrics" } +ya-healthcheck = { path = "core/healthcheck" } ya-version = { path = "core/version" } ya-vpn.workspace = true ya-client.workspace = true ya-client-model.workspace = true gftp = { workspace = true, optional = true } # just to enable gftp build for cargo-deb -ya-provider = { path = "agent/provider", optional = true } # just to enable conditionally running some tests +ya-provider = { path = "agent/provider", optional = true } # just to enable conditionally running some tests ya-exe-unit = { version = "0.4", optional = true, path = "exe-unit" } # just to enable conditionally running some tests actix-rt.workspace = true @@ -80,25 +81,25 @@ directories = "2.0.2" dotenv = "0.15.0" futures = "0.3" lazy_static = "1.4" -libsqlite3-sys = {workspace = true} +libsqlite3-sys = { workspace = true } log = "0.4" metrics = "0.12" mime_guess = { version = "2.0", optional = true } num_cpus = "1" -openssl-probe = {version = "0.1", optional = true} +openssl-probe = { version = "0.1", optional = true } openssl.workspace = true rust-embed = { version = "8.5", optional = true } serde = "1.0" serde_json = "1.0" structopt = "0.3" -tokio = {version = "1", features = ["net"]} -tokio-stream = {version = "0.1.8", features = ["io-util"]} -tokio-util = {version = "0.7", features = ["codec"]} +tokio = { version = "1", features = ["net"] } +tokio-stream = { version = "0.1.8", features = ["io-util"] } +tokio-util = { version = "0.7", features = ["codec"] } url = "2.1.1" [dev-dependencies] erc20_processor = { workspace = true } -ya-test-framework.path= "test-utils/test-framework" +ya-test-framework.path = "test-utils/test-framework" ya-exe-unit = { version = "0.4", path = "exe-unit" } @@ -239,6 +240,7 @@ members = [ "test-utils/test-framework/framework-macro", "test-utils/test-framework/framework-basic", "test-utils/test-framework/framework-mocks", + "core/healthcheck", ] [workspace.dependencies] @@ -251,19 +253,19 @@ actix-web = "4" actix = { version = "0.13", default-features = false } derive_more = "0.99.11" -erc20_payment_lib = {git = "https://github.com/golemfactory/erc20_payment_lib", rev = "4200567b931af64f4fb1f6b756dd6d051576b64f"} -erc20_processor = {git = "https://github.com/golemfactory/erc20_payment_lib", rev = "4200567b931af64f4fb1f6b756dd6d051576b64f"} +erc20_payment_lib = { git = "https://github.com/golemfactory/erc20_payment_lib", rev = "4200567b931af64f4fb1f6b756dd6d051576b64f" } +erc20_processor = { git = "https://github.com/golemfactory/erc20_payment_lib", rev = "4200567b931af64f4fb1f6b756dd6d051576b64f" } #erc20_payment_lib = { path = "../../payments/erc20_payment_lib/crates/erc20_payment_lib" } #erc20_processor = { path = "../../payments/erc20_payment_lib" } #erc20_payment_lib = { version = "0.4.7" } #erc20_processor = { version = "0.4.7" } -gftp = {version = "0.4.1", path = "core/gftp"} +gftp = { version = "0.4.1", path = "core/gftp" } hex = "0.4.3" -libsqlite3-sys = {version = "0.26.0", features = ["bundled"]} +libsqlite3-sys = { version = "0.26.0", features = ["bundled"] } openssl = "0.10" rand = "0.8.5" regex = "1.10.4" -strum = {version = "0.24", features = ["derive"]} +strum = { version = "0.24", features = ["derive"] } trust-dns-resolver = "0.22" url = "2.3.1" @@ -295,7 +297,7 @@ ya-utils-path.path = "utils/path" ya-utils-process.path = "utils/process" ya-identity.path = "core/identity" -ya-market.path="core/market" +ya-market.path = "core/market" ya-activity.path = "core/activity" ya-net.path = "core/net" ya-persistence.path = "core/persistence" @@ -306,7 +308,7 @@ ya-vpn.path = "core/vpn" ya-gsb-api.path = "core/gsb-api" ya-payment-driver.path = "core/payment-driver/base" -ya-dummy-driver.path= "core/payment-driver/dummy" +ya-dummy-driver.path = "core/payment-driver/dummy" ya-erc20-driver.path = "core/payment-driver/erc20" ya-service-api.path = "core/serv-api" diff --git a/core/healthcheck/Cargo.toml b/core/healthcheck/Cargo.toml new file mode 100644 index 0000000000..4af7cac70c --- /dev/null +++ b/core/healthcheck/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "ya-healthcheck" +version = "0.1.0" +description = "Node health monitoring" +authors = ["Golem Factory "] +edition = "2021" + +[dependencies] +ya-service-api-web.workspace = true +ya-client.workspace = true +ya-core-model = { workspace = true, features = ["version"] } +ya-net = { workspace = true, features = ["service"] } +ya-service-api.workspace = true +ya-service-api-interfaces.workspace = true +ya-service-bus = { workspace = true } + +actix-web = "4" +anyhow = "1.0" +chrono = { version = "0.4", features = ["serde"] } +log = "0.4" +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +tokio = { version = "1", features = ["time", "sync"] } +problem_details = "0.6.0" +http = "1.1.0" diff --git a/core/healthcheck/src/lib.rs b/core/healthcheck/src/lib.rs new file mode 100644 index 0000000000..309555f15f --- /dev/null +++ b/core/healthcheck/src/lib.rs @@ -0,0 +1,3 @@ +mod service; + +pub use service::HealthcheckService; diff --git a/core/healthcheck/src/service.rs b/core/healthcheck/src/service.rs new file mode 100644 index 0000000000..ce1acefe5f --- /dev/null +++ b/core/healthcheck/src/service.rs @@ -0,0 +1,15 @@ +use ya_service_api_interfaces::Service; + +mod rest; + +pub struct HealthcheckService; + +impl Service for HealthcheckService { + type Cli = (); +} + +impl HealthcheckService { + pub fn rest(_ctx: &C) -> actix_web::Scope { + rest::web_scope() + } +} diff --git a/core/healthcheck/src/service/rest.rs b/core/healthcheck/src/service/rest.rs new file mode 100644 index 0000000000..7499e12497 --- /dev/null +++ b/core/healthcheck/src/service/rest.rs @@ -0,0 +1,234 @@ +use actix_web::{HttpResponse, Responder}; +use chrono::Utc; +use serde_json::json; + +use std::time::Duration; + +use ya_core_model::market::local::BUS_ID as MARKET_BUS_ID; +use ya_core_model::net::local::BUS_ID as NET_BUS_ID; +use ya_core_model::payment::local::BUS_ID as PAYMENT_BUS_ID; + +use ya_core_model::market::GetLastBcastTs; +use ya_core_model::net::local::ListNeighbours; +use ya_core_model::payment::local::{PaymentDriverStatus, PaymentDriverStatusError}; + +use ya_service_bus::{timeout::IntoTimeoutFuture, typed::service, RpcEndpoint}; + +pub const HEALTHCHECK_API_PATH: &str = "/healthcheck"; + +pub fn web_scope() -> actix_web::Scope { + actix_web::web::scope(HEALTHCHECK_API_PATH).service(healthcheck) +} + +async fn payment_healthcheck() -> Result<(), HttpResponse> { + let result = service(PAYMENT_BUS_ID) + .call(PaymentDriverStatus { + driver: None, + network: None, + }) + .timeout(Some(Duration::from_secs(5))) + .await; + + let result = match result { + Ok(ok) => ok, + Err(_elapsed) => return Err(errors::internal("internal-timeout", "payments-check")), + }; + + let result = match result { + Ok(resp) => resp, + Err(gsb_err) => { + log::warn!("Healtcheck failed due to {gsb_err}"); + return Err(errors::internal("gsb-error", "payments-check")); + } + }; + + let status_properties = match result { + Ok(props) => props, + Err( + payment_err @ (PaymentDriverStatusError::NoDriver(_) + | PaymentDriverStatusError::NoNetwork(_) + | PaymentDriverStatusError::Internal(_)), + ) => { + log::warn!("Healtcheck failed due to {payment_err}"); + return Err(errors::internal("payments-service-error", "payments-check")); + } + }; + + if !status_properties.is_empty() { + return Err(errors::payments(status_properties)); + } + + Ok(()) +} + +async fn relay_healtcheck() -> Result<(), HttpResponse> { + let result = service(NET_BUS_ID) + .send(ListNeighbours { size: 8 }) + .timeout(Some(Duration::from_secs(5))) + .await; + + let result = match result { + Ok(ok) => ok, + Err(_elapsed) => return Err(errors::internal("internal-timeout", "relay-check")), + }; + + let result = match result { + Ok(ok) => ok, + Err(gsb_err) => { + log::warn!("Healtcheck failed due to {gsb_err}"); + return Err(errors::internal("gsb-error", "relay-check")); + } + }; + + let _gsb_remote_ping = match result { + Ok(ok) => ok, + Err(net_err) => { + log::warn!("Healtcheck failed due to {net_err}"); + return Err(errors::internal("net-service-error", "relay-check")); + } + }; + + Ok(()) +} + +async fn market_healthcheck() -> Result<(), HttpResponse> { + let result = service(MARKET_BUS_ID) + .call(GetLastBcastTs) + .timeout(Some(Duration::from_secs(5))) + .await; + + let result = match result { + Ok(ok) => ok, + Err(_elapsed) => return Err(errors::internal("internal-timeout", "market-check")), + }; + + let result = match result { + Ok(ok) => ok, + Err(gsb_err) => { + log::warn!("Healtcheck failed due to {gsb_err}"); + return Err(errors::internal("gsb-error", "market-check")); + } + }; + + let bcast_ts = match result { + Ok(ok) => ok, + Err(market_err) => { + log::warn!("Healtcheck failed due to {market_err}"); + return Err(errors::internal("market-service-error", "market-check")); + } + }; + + let last_bcast_age = Utc::now() - bcast_ts; + if last_bcast_age > chrono::Duration::minutes(2) { + return Err(errors::market_bcast_timeout(last_bcast_age)); + } + + Ok(()) +} + +#[actix_web::get("")] +async fn healthcheck() -> impl Responder { + if let Err(response) = payment_healthcheck().await { + return response; + } + if let Err(response) = relay_healtcheck().await { + return response; + } + if let Err(response) = market_healthcheck().await { + return response; + } + + HttpResponse::Ok().json(json!({"status": "ok"})) +} + +mod errors { + use actix_web::HttpResponse; + use http::Uri; + use problem_details::ProblemDetails; + use serde_json::Value; + use std::collections::HashMap; + use std::iter::FromIterator; + use std::str::FromStr; + use ya_client::model::payment::DriverStatusProperty; + + const CONTENT_TYPE_PROBLEM_JSON: (&str, &str) = ("Content-Type", "application/problem+json"); + + pub fn internal(instance: &str, step: &str) -> HttpResponse { + let extensions = HashMap::::from_iter(std::iter::once(( + "step".to_string(), + step.to_string(), + ))); + + let problem = ProblemDetails::new() + .with_type(Uri::from_static("/healthcheck/internal-error")) + .with_instance( + Uri::from_str(&format!("/healthcheck/internal-error/{instance}",)) + .expect("Invalid URI"), + ) + .with_extensions(extensions); + + HttpResponse::InternalServerError() + .insert_header(CONTENT_TYPE_PROBLEM_JSON) + .json(problem) + } + + pub fn payments(props: Vec) -> HttpResponse { + let extensions = HashMap::::from_iter([ + ( + "step".to_string(), + Value::String("payments-check".to_string()), + ), + ( + "problems".to_string(), + Value::Array( + props + .into_iter() + .map(|prop| match prop { + DriverStatusProperty::CantSign { .. } => "Can't sign transaction", + DriverStatusProperty::InsufficientGas { .. } => "Insufficient gas", + DriverStatusProperty::InsufficientToken { .. } => "Insufficient token", + DriverStatusProperty::InvalidChainId { .. } => "Misconfigured chain", + DriverStatusProperty::RpcError { .. } => "Persistent RPC issues", + DriverStatusProperty::TxStuck { .. } => "Stuck transaction", + }) + .map(ToOwned::to_owned) + .map(Value::String) + .collect(), + ), + ), + ]); + + let problem = ProblemDetails::new() + .with_detail("One on more issues blocking the operation of payments have been detected. Run `yagna payment driver status` to diagnose") + .with_type(Uri::from_static("/healthcheck/payment-driver-errors")) + .with_instance(Uri::from_static("/healthcheck/payment-driver-errors")) + .with_extensions(extensions); + + HttpResponse::InternalServerError() + .insert_header(CONTENT_TYPE_PROBLEM_JSON) + .json(problem) + } + + pub fn market_bcast_timeout(last_bcast_age: chrono::Duration) -> HttpResponse { + let extensions = HashMap::::from_iter([ + ( + "step".to_string(), + Value::String("market-check".to_string()), + ), + ( + "lastBcastAgeSecs".to_string(), + Value::Number(last_bcast_age.num_seconds().into()), + ), + ]); + + let problem = ProblemDetails::new() + .with_detail("Last received market broadcast is too old") + .with_type(Uri::from_static("/healthcheck/market-bcast-timeout")) + .with_instance(Uri::from_static("/healthcheck/market-bcast-timeout")) + .with_extensions(extensions); + + HttpResponse::InternalServerError() + .insert_header(CONTENT_TYPE_PROBLEM_JSON) + .json(problem) + } +} diff --git a/core/market/src/matcher.rs b/core/market/src/matcher.rs index db2f6b10ce..c570f3a8b3 100644 --- a/core/market/src/matcher.rs +++ b/core/market/src/matcher.rs @@ -1,9 +1,13 @@ use actix::prelude::*; -use chrono::{TimeZone, Utc}; +use chrono::{DateTime, TimeZone, Utc}; use metrics::counter; use std::str::FromStr; use std::sync::Arc; +use std::time::Duration; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver}; +use ya_core_model::market::{GetLastBcastTs, RpcMessageError}; +use ya_service_bus::timeout::IntoTimeoutFuture; +use ya_service_bus::typed::ServiceBinder; use ya_client::model::market::{NewDemand, NewOffer}; use ya_service_api_web::middleware::Identity; @@ -122,6 +126,24 @@ impl Matcher { .await .map_err(|e| MatcherInitError::ExpirationTrackerError(e.to_string()))?; + let discovery = self.discovery.clone(); + async fn handler( + _: (), + discovery: Discovery, + caller: String, + _msg: GetLastBcastTs, + ) -> Result, RpcMessageError> { + log::debug!("Got GetLastBcastTs from {caller}"); + + discovery + .get_last_bcast_ts() + .timeout(Some(Duration::from_secs(5))) + .await + .map_err(|_| RpcMessageError::Timeout) + } + + ServiceBinder::new(local_prefix, &(), discovery).bind_with_processor(handler); + Ok(()) } diff --git a/core/market/src/protocol/discovery.rs b/core/market/src/protocol/discovery.rs index 7bd139fb12..61f5fa3a96 100644 --- a/core/market/src/protocol/discovery.rs +++ b/core/market/src/protocol/discovery.rs @@ -1,4 +1,5 @@ //! Discovery protocol interface +use chrono::{DateTime, Utc}; use futures::TryFutureExt; use metrics::{counter, timing, value}; use std::collections::HashSet; @@ -64,6 +65,8 @@ pub struct DiscoveryImpl { /// with central NET implementation in future. net_type: net::NetType, ban_cache: BanCache, + + last_bcast_ts: Mutex>, } struct BanCache { @@ -124,6 +127,10 @@ impl Discovery { self.inner.net_type == net::NetType::Hybrid } + pub async fn get_last_bcast_ts(&self) -> DateTime { + *self.inner.last_bcast_ts.lock().await + } + pub async fn bcast_offers(&self, offer_ids: Vec) -> Result<(), DiscoveryError> { if offer_ids.is_empty() { return Ok(()); @@ -454,6 +461,8 @@ impl Discovery { let num_ids_received = msg.offer_ids.len(); log::trace!("Received {num_ids_received} Offers from [{caller}]."); + *self.inner.last_bcast_ts.lock().await = Utc::now(); + if msg.offer_ids.is_empty() { return Ok(()); } diff --git a/core/market/src/protocol/discovery/builder.rs b/core/market/src/protocol/discovery/builder.rs index a02f73ea25..d842bd4669 100644 --- a/core/market/src/protocol/discovery/builder.rs +++ b/core/market/src/protocol/discovery/builder.rs @@ -1,3 +1,4 @@ +use chrono::Utc; use std::any::{Any, TypeId}; use std::collections::HashMap; use std::sync::Arc; @@ -85,6 +86,7 @@ impl DiscoveryBuilder { lazy_binder_prefix: Mutex::new(None), config: self.config.clone().unwrap(), net_type: net::Config::from_env().unwrap().net_type, + last_bcast_ts: Mutex::new(Utc::now()), offers_receiving_queue: sender, ban_cache: BanCache::new(self.config.unwrap().bcast_node_ban_timeout), }), diff --git a/core/model/src/market.rs b/core/model/src/market.rs index 5454ea8df6..a9163a4d01 100644 --- a/core/model/src/market.rs +++ b/core/model/src/market.rs @@ -58,6 +58,17 @@ impl RpcMessage for ListAgreements { type Error = RpcMessageError; } +/// Returns the Agreement. +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct GetLastBcastTs; + +impl RpcMessage for GetLastBcastTs { + const ID: &'static str = "GetLastBcastTs"; + type Item = DateTime; + type Error = RpcMessageError; +} + /// Error message for market service bus API. #[derive(thiserror::Error, Clone, Debug, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] diff --git a/core/serv-api/web/src/middleware/auth/mod.rs b/core/serv-api/web/src/middleware/auth/mod.rs index 3542186f87..39a069cffa 100644 --- a/core/serv-api/web/src/middleware/auth/mod.rs +++ b/core/serv-api/web/src/middleware/auth/mod.rs @@ -84,7 +84,7 @@ where let cache = self.cache.clone(); let service = self.service.clone(); - let allowed_uris = vec!["/metrics-api", "/version", "/dashboard"]; + let allowed_uris = vec!["/metrics-api", "/version/get", "/dashboard"]; for uri in allowed_uris { if req.uri().to_string().starts_with(uri) { diff --git a/core/serv/src/main.rs b/core/serv/src/main.rs index 05d842ee20..2eb5d0a7c5 100644 --- a/core/serv/src/main.rs +++ b/core/serv/src/main.rs @@ -4,6 +4,7 @@ use actix_web::{middleware, web, App, HttpResponse, HttpServer, Responder}; use anyhow::{Context, Result}; use futures::prelude::*; use metrics::{counter, gauge}; +use ya_healthcheck::HealthcheckService; #[cfg(feature = "static-openssl")] extern crate openssl_probe; @@ -241,6 +242,8 @@ impl TryFrom for ServiceContext { enum Services { #[enable(gsb, cli)] Db(PersistenceService), + #[enable(rest)] + Healthcheck(HealthcheckService), // Metrics service must be activated before all other services // to that will use it. Identity service is used by the Metrics, // so must be initialized before. diff --git a/core/version/Cargo.toml b/core/version/Cargo.toml index 9519d06a0a..40c9c35f1e 100644 --- a/core/version/Cargo.toml +++ b/core/version/Cargo.toml @@ -6,13 +6,15 @@ authors = ["Golem Factory "] edition = "2018" [dependencies] +ya-service-api-web.workspace = true ya-client.workspace = true ya-compile-time-utils.workspace = true ya-core-model = { workspace = true, features = ["version"] } +ya-net = { workspace = true, features = ["service"] } ya-persistence.workspace = true ya-service-api.workspace = true ya-service-api-interfaces.workspace = true -ya-service-bus = { workspace = true } +ya-service-bus = { workspace = true } actix-web = "4" anyhow = "1.0" @@ -27,3 +29,5 @@ serde_json = "1.0" structopt = "0.3.21" thiserror = "^1.0" tokio = { version = "1", features = ["time", "sync"] } +problem_details = "0.6.0" +http = "1.1.0" diff --git a/core/version/src/service/rest.rs b/core/version/src/service/rest.rs index aca69be54f..30475eaf6f 100644 --- a/core/version/src/service/rest.rs +++ b/core/version/src/service/rest.rs @@ -1,11 +1,12 @@ -use crate::db::dao::ReleaseDAO; - -use ya_client::model::ErrorMessage; use ya_persistence::executor::DbExecutor; +use crate::db::dao::ReleaseDAO; + use actix_web::web::Data; use actix_web::{web, HttpResponse, Responder}; +use ya_client::model::ErrorMessage; + pub const VERSION_API_PATH: &str = "/version"; pub fn web_scope(db: DbExecutor) -> actix_web::Scope {