diff --git a/Cargo.lock b/Cargo.lock index bbb785a618a8..cb4c25ae998c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17846,6 +17846,7 @@ dependencies = [ "ip_network", "jsonrpsee", "log", + "serde", "serde_json", "substrate-prometheus-endpoint", "tokio", diff --git a/prdoc/pr_4802.prdoc b/prdoc/pr_4802.prdoc new file mode 100644 index 000000000000..5757c4cbae18 --- /dev/null +++ b/prdoc/pr_4802.prdoc @@ -0,0 +1,16 @@ +# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0 +# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json + +title: Add `health/readiness endpoint` to the rpc server + +doc: + - audience: Node Operator + description: | + Add `/health/readiness endpoint` to the rpc server which returns HTTP status code 200 if the chain is synced + and can connect to the rest of the network otherwise status code 500 is returned. + The endpoint can be reached by performing a HTTP GET request to the + endpoint such as `$ curl /health/readiness` + +crates: + - name: sc-rpc-server + bump: patch diff --git a/substrate/client/rpc-servers/Cargo.toml b/substrate/client/rpc-servers/Cargo.toml index 7837c852a1c9..19369e295fc4 100644 --- a/substrate/client/rpc-servers/Cargo.toml +++ b/substrate/client/rpc-servers/Cargo.toml @@ -25,6 +25,7 @@ ip_network = "0.4.1" jsonrpsee = { version = "0.22", features = ["server"] } log = { workspace = true, default-features = true } prometheus-endpoint = { package = "substrate-prometheus-endpoint", path = "../../utils/prometheus" } +serde = { workspace = true } serde_json = { workspace = true, default-features = true } tokio = { version = "1.22.0", features = ["parking_lot"] } tower = { version = "0.4.13", features = ["util"] } diff --git a/substrate/client/rpc-servers/src/lib.rs b/substrate/client/rpc-servers/src/lib.rs index ba1fcf5e3677..647a7ca7e435 100644 --- a/substrate/client/rpc-servers/src/lib.rs +++ b/substrate/client/rpc-servers/src/lib.rs @@ -32,12 +32,10 @@ use hyper::{ service::{make_service_fn, service_fn}, }; use jsonrpsee::{ - server::{ - middleware::http::ProxyGetRequestLayer, stop_channel, ws, PingConfig, StopHandle, - TowerServiceBuilder, - }, + server::{stop_channel, ws, PingConfig, StopHandle, TowerServiceBuilder}, Methods, RpcModule, }; +use middleware::NodeHealthProxyLayer; use tokio::net::TcpListener; use tower::Service; use utils::{build_rpc_api, format_cors, get_proxy_ip, host_filtering, try_into_cors}; @@ -132,8 +130,8 @@ where let http_middleware = tower::ServiceBuilder::new() .option_layer(host_filter) - // Proxy `GET /health` requests to internal `system_health` method. - .layer(ProxyGetRequestLayer::new("/health", "system_health")?) + // Proxy `GET /health, /health/readiness` requests to the internal `system_health` method. + .layer(NodeHealthProxyLayer::default()) .layer(try_into_cors(cors)?); let mut builder = jsonrpsee::server::Server::builder() diff --git a/substrate/client/rpc-servers/src/middleware/mod.rs b/substrate/client/rpc-servers/src/middleware/mod.rs index 88ed8b2f4335..0a14be4dacf5 100644 --- a/substrate/client/rpc-servers/src/middleware/mod.rs +++ b/substrate/client/rpc-servers/src/middleware/mod.rs @@ -32,9 +32,11 @@ use jsonrpsee::{ }; mod metrics; +mod node_health; mod rate_limit; pub use metrics::*; +pub use node_health::*; pub use rate_limit::*; const MAX_JITTER: Duration = Duration::from_millis(50); diff --git a/substrate/client/rpc-servers/src/middleware/node_health.rs b/substrate/client/rpc-servers/src/middleware/node_health.rs new file mode 100644 index 000000000000..d68ec14cb8fe --- /dev/null +++ b/substrate/client/rpc-servers/src/middleware/node_health.rs @@ -0,0 +1,199 @@ +// This file is part of Substrate. + +// Copyright (C) Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! Middleware for handling `/health` and `/health/readiness` endpoints. + +use std::{ + error::Error, + future::Future, + pin::Pin, + task::{Context, Poll}, +}; + +use futures::future::FutureExt; +use http::{HeaderValue, Method, StatusCode, Uri}; +use hyper::Body; +use jsonrpsee::types::{Response as RpcResponse, ResponseSuccess as RpcResponseSuccess}; +use tower::Service; + +const RPC_SYSTEM_HEALTH_CALL: &str = r#"{"jsonrpc":"2.0","method":"system_health","id":0}"#; +const HEADER_VALUE_JSON: HeaderValue = HeaderValue::from_static("application/json; charset=utf-8"); + +/// Layer that applies [`NodeHealthProxy`] which +/// proxies `/health` and `/health/readiness` endpoints. +#[derive(Debug, Clone, Default)] +pub struct NodeHealthProxyLayer; + +impl tower::Layer for NodeHealthProxyLayer { + type Service = NodeHealthProxy; + + fn layer(&self, service: S) -> Self::Service { + NodeHealthProxy::new(service) + } +} + +/// Middleware that proxies `/health` and `/health/readiness` endpoints. +pub struct NodeHealthProxy(S); + +impl NodeHealthProxy { + /// Creates a new [`NodeHealthProxy`]. + pub fn new(service: S) -> Self { + Self(service) + } +} + +impl tower::Service> for NodeHealthProxy +where + S: Service, Response = http::Response>, + S::Response: 'static, + S::Error: Into> + 'static, + S::Future: Send + 'static, +{ + type Response = S::Response; + type Error = Box; + type Future = + Pin> + Send + 'static>>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.0.poll_ready(cx).map_err(Into::into) + } + + fn call(&mut self, mut req: http::Request) -> Self::Future { + let maybe_intercept = InterceptRequest::from_http(&req); + + // Modify the request and proxy it to `system_health` + if let InterceptRequest::Health | InterceptRequest::Readiness = maybe_intercept { + // RPC methods are accessed with `POST`. + *req.method_mut() = Method::POST; + // Precautionary remove the URI. + *req.uri_mut() = Uri::from_static("/"); + + // Requests must have the following headers: + req.headers_mut().insert(http::header::CONTENT_TYPE, HEADER_VALUE_JSON); + req.headers_mut().insert(http::header::ACCEPT, HEADER_VALUE_JSON); + + // Adjust the body to reflect the method call. + req = req.map(|_| Body::from(RPC_SYSTEM_HEALTH_CALL)); + } + + // Call the inner service and get a future that resolves to the response. + let fut = self.0.call(req); + + async move { + let res = fut.await.map_err(|err| err.into())?; + + Ok(match maybe_intercept { + InterceptRequest::Deny => + http_response(StatusCode::METHOD_NOT_ALLOWED, Body::empty()), + InterceptRequest::No => res, + InterceptRequest::Health => { + let health = parse_rpc_response(res.into_body()).await?; + http_ok_response(serde_json::to_string(&health)?) + }, + InterceptRequest::Readiness => { + let health = parse_rpc_response(res.into_body()).await?; + if (!health.is_syncing && health.peers > 0) || !health.should_have_peers { + http_ok_response(Body::empty()) + } else { + http_internal_error() + } + }, + }) + } + .boxed() + } +} + +// NOTE: This is duplicated here to avoid dependency to the `RPC API`. +#[derive(Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize)] +#[serde(rename_all = "camelCase")] +struct Health { + /// Number of connected peers + pub peers: usize, + /// Is the node syncing + pub is_syncing: bool, + /// Should this node have any peers + /// + /// Might be false for local chains or when running without discovery. + pub should_have_peers: bool, +} + +fn http_ok_response>(body: S) -> hyper::Response { + http_response(StatusCode::OK, body) +} + +fn http_response>( + status_code: StatusCode, + body: S, +) -> hyper::Response { + hyper::Response::builder() + .status(status_code) + .header(http::header::CONTENT_TYPE, HEADER_VALUE_JSON) + .body(body.into()) + .expect("Header is valid; qed") +} + +fn http_internal_error() -> hyper::Response { + http_response(hyper::StatusCode::INTERNAL_SERVER_ERROR, Body::empty()) +} + +async fn parse_rpc_response(body: Body) -> Result> { + let bytes = hyper::body::to_bytes(body).await?; + + let raw_rp = serde_json::from_slice::>(&bytes)?; + let rp = RpcResponseSuccess::::try_from(raw_rp)?; + + Ok(rp.result) +} + +/// Whether the request should be treated as ordinary RPC call or be modified. +enum InterceptRequest { + /// Proxy `/health` to `system_health`. + Health, + /// Checks if node has at least one peer and is not doing major syncing. + /// + /// Returns HTTP status code 200 on success otherwise HTTP status code 500 is returned. + Readiness, + /// Treat as a ordinary RPC call and don't modify the request or response. + No, + /// Deny health or readiness calls that is not HTTP GET request. + /// + /// Returns HTTP status code 405. + Deny, +} + +impl InterceptRequest { + fn from_http(req: &http::Request) -> InterceptRequest { + match req.uri().path() { + "/health" => + if req.method() == http::Method::GET { + InterceptRequest::Health + } else { + InterceptRequest::Deny + }, + "/health/readiness" => + if req.method() == http::Method::GET { + InterceptRequest::Readiness + } else { + InterceptRequest::Deny + }, + // Forward all other requests to the RPC server. + _ => InterceptRequest::No, + } + } +}