From c73be4888049b85cba39e4929101dd62d690a079 Mon Sep 17 00:00:00 2001 From: Andrew Hauck Date: Fri, 1 Mar 2024 16:08:22 -0800 Subject: [PATCH 01/11] Ensure consistent usage of doc.rs links in rustdoc comments Co-authored-by: zegevlier --- .bleep | 2 +- pingora-boringssl/src/lib.rs | 2 +- pingora-core/src/lib.rs | 2 +- pingora-ketama/src/lib.rs | 2 +- pingora-openssl/src/ext.rs | 2 +- pingora-openssl/src/lib.rs | 2 +- 6 files changed, 6 insertions(+), 6 deletions(-) diff --git a/.bleep b/.bleep index ff99b8e9..b0dfc982 100644 --- a/.bleep +++ b/.bleep @@ -1 +1 @@ -20f290d8adc57ed489bcdd1b950833d0eb6bd8ed \ No newline at end of file +f1ab6c4424b4810991910b4ff5179d0cd0dbb9e0 \ No newline at end of file diff --git a/pingora-boringssl/src/lib.rs b/pingora-boringssl/src/lib.rs index 26dfaaba..869a09d4 100644 --- a/pingora-boringssl/src/lib.rs +++ b/pingora-boringssl/src/lib.rs @@ -15,7 +15,7 @@ //! The BoringSSL API compatibility layer. //! //! This crate aims at making [boring] APIs exchangeable with [openssl-rs](https://docs.rs/openssl/latest/openssl/). -//! In other words, this crate and `pingora-openssl` expose identical rust APIs. +//! In other words, this crate and [`pingora-openssl`](https://docs.rs/pingora-openssl) expose identical rust APIs. #![warn(clippy::all)] diff --git a/pingora-core/src/lib.rs b/pingora-core/src/lib.rs index cdeff85c..3435fe58 100644 --- a/pingora-core/src/lib.rs +++ b/pingora-core/src/lib.rs @@ -34,7 +34,7 @@ //! # Usage //! This crate provides low level service and protocol implementation and abstraction. //! -//! If looking to build a (reverse) proxy, see `pingora-proxy` crate. +//! If looking to build a (reverse) proxy, see [`pingora-proxy`](https://docs.rs/pingora-proxy) crate. //! //! # Optional features //! `boringssl`: Switch the internal TLS library from OpenSSL to BoringSSL. diff --git a/pingora-ketama/src/lib.rs b/pingora-ketama/src/lib.rs index 95200734..07877fe3 100644 --- a/pingora-ketama/src/lib.rs +++ b/pingora-ketama/src/lib.rs @@ -54,7 +54,7 @@ //! We've provided a health-aware example in //! `pingora-ketama/examples/health_aware_selector.rs`. //! -//! For a carefully crafted real-world example, see the pingora-load-balancer +//! For a carefully crafted real-world example, see the [`pingora-load-balancing`](https://docs.rs/pingora-load-balancing) //! crate. use std::cmp::Ordering; diff --git a/pingora-openssl/src/ext.rs b/pingora-openssl/src/ext.rs index f8cebb21..a30e5891 100644 --- a/pingora-openssl/src/ext.rs +++ b/pingora-openssl/src/ext.rs @@ -165,7 +165,7 @@ pub fn clear_error_stack() { /// Create a new [Ssl] from &[SslAcceptor] /// -/// this function is to unify the interface between this crate and `pingora-boringssl` +/// this function is to unify the interface between this crate and [`pingora-boringssl`](https://docs.rs/pingora-boringssl) pub fn ssl_from_acceptor(acceptor: &SslAcceptor) -> Result { Ssl::new(acceptor.context()) } diff --git a/pingora-openssl/src/lib.rs b/pingora-openssl/src/lib.rs index b12cee11..25f7cb71 100644 --- a/pingora-openssl/src/lib.rs +++ b/pingora-openssl/src/lib.rs @@ -15,7 +15,7 @@ //! The OpenSSL API compatibility layer. //! //! This crate aims at making [openssl] APIs interchangeable with [boring](https://docs.rs/boring/latest/boring/). -//! In other words, this crate and `pingora-boringssl` expose identical rust APIs. +//! In other words, this crate and [`pingora-boringssl`](https://docs.rs/pingora-boringssl) expose identical rust APIs. #![warn(clippy::all)] From ab7398c00ef591fffe2330a8c64a029fe9d2ea0c Mon Sep 17 00:00:00 2001 From: Andrew Hauck Date: Sun, 3 Mar 2024 13:54:37 -0800 Subject: [PATCH 02/11] Fix verify_result() in ssl client for boringssl Co-authored-by: afon --- .bleep | 2 +- pingora-core/src/protocols/ssl/client.rs | 34 ++++++++++++++++-------- 2 files changed, 24 insertions(+), 12 deletions(-) diff --git a/.bleep b/.bleep index b0dfc982..e9bdbbdc 100644 --- a/.bleep +++ b/.bleep @@ -1 +1 @@ -f1ab6c4424b4810991910b4ff5179d0cd0dbb9e0 \ No newline at end of file +9f410d52221da26c0651a99d071daf9b7acf87a2 \ No newline at end of file diff --git a/pingora-core/src/protocols/ssl/client.rs b/pingora-core/src/protocols/ssl/client.rs index abb6da6f..6fd6462a 100644 --- a/pingora-core/src/protocols/ssl/client.rs +++ b/pingora-core/src/protocols/ssl/client.rs @@ -17,11 +17,7 @@ use super::SslStream; use crate::protocols::raw_connect::ProxyDigest; use crate::protocols::{GetProxyDigest, GetTimingDigest, TimingDigest, IO}; -use crate::tls::{ - ssl, - ssl::ConnectConfiguration, - ssl_sys::{X509_V_ERR_INVALID_CALL, X509_V_OK}, -}; +use crate::tls::{ssl, ssl::ConnectConfiguration, ssl_sys::X509_V_ERR_INVALID_CALL}; use pingora_error::{Error, ErrorType::*, OrErr, Result}; use std::sync::Arc; @@ -43,13 +39,29 @@ pub async fn handshake( Err(e) => { let context = format!("TLS connect() failed: {e}, SNI: {domain}"); match e.code() { - ssl::ErrorCode::SSL => match stream.ssl().verify_result().as_raw() { - // X509_V_ERR_INVALID_CALL in case verify result was never set - X509_V_OK | X509_V_ERR_INVALID_CALL => { - Error::e_explain(TLSHandshakeFailure, context) + ssl::ErrorCode::SSL => { + // Unify the return type of `verify_result` for openssl + #[cfg(not(feature = "boringssl"))] + fn verify_result(stream: SslStream) -> Result<(), i32> { + match stream.ssl().verify_result().as_raw() { + crate::tls::ssl_sys::X509_V_OK => Ok(()), + e => Err(e), + } } - _ => Error::e_explain(InvalidCert, context), - }, + // Unify the return type of `verify_result` for boringssl + #[cfg(feature = "boringssl")] + fn verify_result(stream: SslStream) -> Result<(), i32> { + stream.ssl().verify_result().map_err(|e| e.as_raw()) + } + match verify_result(stream) { + Ok(()) => Error::e_explain(TLSHandshakeFailure, context), + // X509_V_ERR_INVALID_CALL in case verify result was never set + Err(X509_V_ERR_INVALID_CALL) => { + Error::e_explain(TLSHandshakeFailure, context) + } + _ => Error::e_explain(InvalidCert, context), + } + } /* likely network error, but still mark as TLS error */ _ => Error::e_explain(TLSHandshakeFailure, context), } From 44d41c1c802059a9a81022dd5bc34fc93cf7ece3 Mon Sep 17 00:00:00 2001 From: Andrew Hauck Date: Sun, 3 Mar 2024 14:00:09 -0800 Subject: [PATCH 03/11] Fix SslMethod reference in connectors tests for boringssl --- .bleep | 2 +- pingora-core/src/connectors/mod.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.bleep b/.bleep index e9bdbbdc..193a7972 100644 --- a/.bleep +++ b/.bleep @@ -1 +1 @@ -9f410d52221da26c0651a99d071daf9b7acf87a2 \ No newline at end of file +8e6a08593def12f43d50e83c0d35f9f6f9aca630 \ No newline at end of file diff --git a/pingora-core/src/connectors/mod.rs b/pingora-core/src/connectors/mod.rs index a5dd99d3..e35df37e 100644 --- a/pingora-core/src/connectors/mod.rs +++ b/pingora-core/src/connectors/mod.rs @@ -360,9 +360,9 @@ fn test_reusable_stream(stream: &mut Stream) -> bool { #[cfg(test)] mod tests { use pingora_error::ErrorType; - use pingora_openssl::ssl::SslMethod; use super::*; + use crate::tls::ssl::SslMethod; use crate::upstreams::peer::BasicPeer; // 192.0.2.1 is effectively a black hole From a87d59e24559ecfde786a26ddefad306e9d42530 Mon Sep 17 00:00:00 2001 From: Andrew Hauck Date: Mon, 4 Mar 2024 09:20:50 -0800 Subject: [PATCH 04/11] Revert "Fix verify_result() in ssl client for boringssl" This reverts commit 9f410d52221da26c0651a99d071daf9b7acf87a2. --- .bleep | 2 +- pingora-core/src/protocols/ssl/client.rs | 34 ++++++++---------------- 2 files changed, 12 insertions(+), 24 deletions(-) diff --git a/.bleep b/.bleep index 193a7972..70921017 100644 --- a/.bleep +++ b/.bleep @@ -1 +1 @@ -8e6a08593def12f43d50e83c0d35f9f6f9aca630 \ No newline at end of file +f5828844181647e13067b3578ea7333c70ab671c \ No newline at end of file diff --git a/pingora-core/src/protocols/ssl/client.rs b/pingora-core/src/protocols/ssl/client.rs index 6fd6462a..abb6da6f 100644 --- a/pingora-core/src/protocols/ssl/client.rs +++ b/pingora-core/src/protocols/ssl/client.rs @@ -17,7 +17,11 @@ use super::SslStream; use crate::protocols::raw_connect::ProxyDigest; use crate::protocols::{GetProxyDigest, GetTimingDigest, TimingDigest, IO}; -use crate::tls::{ssl, ssl::ConnectConfiguration, ssl_sys::X509_V_ERR_INVALID_CALL}; +use crate::tls::{ + ssl, + ssl::ConnectConfiguration, + ssl_sys::{X509_V_ERR_INVALID_CALL, X509_V_OK}, +}; use pingora_error::{Error, ErrorType::*, OrErr, Result}; use std::sync::Arc; @@ -39,29 +43,13 @@ pub async fn handshake( Err(e) => { let context = format!("TLS connect() failed: {e}, SNI: {domain}"); match e.code() { - ssl::ErrorCode::SSL => { - // Unify the return type of `verify_result` for openssl - #[cfg(not(feature = "boringssl"))] - fn verify_result(stream: SslStream) -> Result<(), i32> { - match stream.ssl().verify_result().as_raw() { - crate::tls::ssl_sys::X509_V_OK => Ok(()), - e => Err(e), - } + ssl::ErrorCode::SSL => match stream.ssl().verify_result().as_raw() { + // X509_V_ERR_INVALID_CALL in case verify result was never set + X509_V_OK | X509_V_ERR_INVALID_CALL => { + Error::e_explain(TLSHandshakeFailure, context) } - // Unify the return type of `verify_result` for boringssl - #[cfg(feature = "boringssl")] - fn verify_result(stream: SslStream) -> Result<(), i32> { - stream.ssl().verify_result().map_err(|e| e.as_raw()) - } - match verify_result(stream) { - Ok(()) => Error::e_explain(TLSHandshakeFailure, context), - // X509_V_ERR_INVALID_CALL in case verify result was never set - Err(X509_V_ERR_INVALID_CALL) => { - Error::e_explain(TLSHandshakeFailure, context) - } - _ => Error::e_explain(InvalidCert, context), - } - } + _ => Error::e_explain(InvalidCert, context), + }, /* likely network error, but still mark as TLS error */ _ => Error::e_explain(TLSHandshakeFailure, context), } From d11673885f32af569c00f7bdc5998fbcbb0a7128 Mon Sep 17 00:00:00 2001 From: afon Date: Sat, 2 Mar 2024 12:32:51 +0800 Subject: [PATCH 05/11] Unify the type for matching `verify_result` --- .bleep | 2 +- pingora-core/src/protocols/ssl/client.rs | 36 ++++++++++++++++-------- 2 files changed, 26 insertions(+), 12 deletions(-) diff --git a/.bleep b/.bleep index 70921017..eaa447dc 100644 --- a/.bleep +++ b/.bleep @@ -1 +1 @@ -f5828844181647e13067b3578ea7333c70ab671c \ No newline at end of file +7226cbe46016b51a2f76743555e734415f67923b \ No newline at end of file diff --git a/pingora-core/src/protocols/ssl/client.rs b/pingora-core/src/protocols/ssl/client.rs index abb6da6f..7ed683f2 100644 --- a/pingora-core/src/protocols/ssl/client.rs +++ b/pingora-core/src/protocols/ssl/client.rs @@ -17,11 +17,7 @@ use super::SslStream; use crate::protocols::raw_connect::ProxyDigest; use crate::protocols::{GetProxyDigest, GetTimingDigest, TimingDigest, IO}; -use crate::tls::{ - ssl, - ssl::ConnectConfiguration, - ssl_sys::{X509_V_ERR_INVALID_CALL, X509_V_OK}, -}; +use crate::tls::{ssl, ssl::ConnectConfiguration, ssl_sys::X509_V_ERR_INVALID_CALL}; use pingora_error::{Error, ErrorType::*, OrErr, Result}; use std::sync::Arc; @@ -43,13 +39,31 @@ pub async fn handshake( Err(e) => { let context = format!("TLS connect() failed: {e}, SNI: {domain}"); match e.code() { - ssl::ErrorCode::SSL => match stream.ssl().verify_result().as_raw() { - // X509_V_ERR_INVALID_CALL in case verify result was never set - X509_V_OK | X509_V_ERR_INVALID_CALL => { - Error::e_explain(TLSHandshakeFailure, context) + ssl::ErrorCode::SSL => { + // Unify the return type of `verify_result` for openssl + #[cfg(not(feature = "boringssl"))] + fn verify_result(stream: SslStream) -> Result<(), i32> { + match stream.ssl().verify_result().as_raw() { + crate::tls::ssl_sys::X509_V_OK => Ok(()), + e => Err(e), + } } - _ => Error::e_explain(InvalidCert, context), - }, + + // Unify the return type of `verify_result` for boringssl + #[cfg(feature = "boringssl")] + fn verify_result(stream: SslStream) -> Result<(), i32> { + stream.ssl().verify_result().map_err(|e| e.as_raw()) + } + + match verify_result(stream) { + Ok(()) => Error::e_explain(TLSHandshakeFailure, context), + // X509_V_ERR_INVALID_CALL in case verify result was never set + Err(X509_V_ERR_INVALID_CALL) => { + Error::e_explain(TLSHandshakeFailure, context) + } + _ => Error::e_explain(InvalidCert, context), + } + } /* likely network error, but still mark as TLS error */ _ => Error::e_explain(TLSHandshakeFailure, context), } From 5a1910e342f6e4a23cf8acabc802ec81d7c176f5 Mon Sep 17 00:00:00 2001 From: Matthew Gumport Date: Tue, 5 Mar 2024 17:04:13 -0800 Subject: [PATCH 06/11] compile and test cleanly with nightly The vast majority of these are redundant imports. --- .bleep | 2 +- pingora-cache/src/cache_control.rs | 4 +--- pingora-cache/src/eviction/lru.rs | 1 - pingora-cache/src/filters.rs | 5 +++-- pingora-cache/src/memory.rs | 4 ++-- pingora-cache/src/put.rs | 10 ++++++---- pingora-core/src/apps/prometheus_http_app.rs | 2 +- pingora-core/src/connectors/mod.rs | 2 +- pingora-core/src/protocols/http/date.rs | 4 ++-- pingora-core/src/protocols/http/v1/body.rs | 1 - pingora-core/src/protocols/http/v1/client.rs | 2 -- pingora-core/src/protocols/http/v1/server.rs | 3 +-- pingora-core/src/server/mod.rs | 1 - pingora-core/src/server/transfer_fd/mod.rs | 1 - pingora-http/src/lib.rs | 1 - pingora-load-balancing/src/selection/consistent.rs | 1 - pingora-proxy/src/proxy_h1.rs | 1 - pingora-proxy/src/proxy_purge.rs | 3 --- pingora-proxy/src/proxy_trait.rs | 4 +--- pingora-proxy/tests/test_upstream.rs | 2 +- pingora-timeout/src/fast_timeout.rs | 1 - pingora-timeout/src/lib.rs | 1 - pingora-timeout/src/timer.rs | 1 - pingora/examples/server.rs | 1 - tinyufo/src/lib.rs | 4 +--- 25 files changed, 21 insertions(+), 41 deletions(-) diff --git a/.bleep b/.bleep index eaa447dc..b295ae91 100644 --- a/.bleep +++ b/.bleep @@ -1 +1 @@ -7226cbe46016b51a2f76743555e734415f67923b \ No newline at end of file +f414cd9a922f157165dc954757e9ba3aca30fa53 \ No newline at end of file diff --git a/pingora-cache/src/cache_control.rs b/pingora-cache/src/cache_control.rs index 6686c3ed..a8a893fc 100644 --- a/pingora-cache/src/cache_control.rs +++ b/pingora-cache/src/cache_control.rs @@ -20,8 +20,7 @@ use http::header::HeaderName; use http::HeaderValue; use indexmap::IndexMap; use once_cell::sync::Lazy; -use pingora_error::{Error, ErrorType, Result}; -use pingora_http::ResponseHeader; +use pingora_error::{Error, ErrorType}; use regex::bytes::Regex; use std::num::IntErrorKind; use std::slice; @@ -434,7 +433,6 @@ pub trait InterpretCacheControl { mod tests { use super::*; use http::header::CACHE_CONTROL; - use http::HeaderValue; use http::{request, response}; fn build_response(cc_key: HeaderName, cc_value: &str) -> response::Parts { diff --git a/pingora-cache/src/eviction/lru.rs b/pingora-cache/src/eviction/lru.rs index 35ceb770..9c00a94f 100644 --- a/pingora-cache/src/eviction/lru.rs +++ b/pingora-cache/src/eviction/lru.rs @@ -233,7 +233,6 @@ impl EvictionManager for Manager { mod test { use super::*; use crate::CacheKey; - use EvictionManager; // we use shard (N) = 1 for eviction consistency in all tests diff --git a/pingora-cache/src/filters.rs b/pingora-cache/src/filters.rs index 42556736..b84bbf72 100644 --- a/pingora-cache/src/filters.rs +++ b/pingora-cache/src/filters.rs @@ -16,12 +16,12 @@ use super::*; use crate::cache_control::{CacheControl, Cacheable, InterpretCacheControl}; -use crate::{RespCacheable, RespCacheable::*}; +use crate::RespCacheable::*; use http::{header, HeaderValue}; use httpdate::HttpDate; use log::warn; -use pingora_http::{RequestHeader, ResponseHeader}; +use pingora_http::RequestHeader; /// Decide if the request can be cacheable pub fn request_cacheable(req_header: &ReqHeader) -> bool { @@ -206,6 +206,7 @@ pub mod upstream { #[cfg(test)] mod tests { use super::*; + use crate::RespCacheable::Cacheable; use http::header::{HeaderName, CACHE_CONTROL, EXPIRES, SET_COOKIE}; use http::StatusCode; use httpdate::fmt_http_date; diff --git a/pingora-cache/src/memory.rs b/pingora-cache/src/memory.rs index 525bf238..6d0a5197 100644 --- a/pingora-cache/src/memory.rs +++ b/pingora-cache/src/memory.rs @@ -19,8 +19,8 @@ //TODO: Mark this module #[test] only use super::*; -use crate::key::{CacheHashKey, CompactCacheKey}; -use crate::storage::{HandleHit, HandleMiss, Storage}; +use crate::key::CompactCacheKey; +use crate::storage::{HandleHit, HandleMiss}; use crate::trace::SpanHandle; use async_trait::async_trait; diff --git a/pingora-cache/src/put.rs b/pingora-cache/src/put.rs index c50cc2b7..be0d510f 100644 --- a/pingora-cache/src/put.rs +++ b/pingora-cache/src/put.rs @@ -264,14 +264,12 @@ mod test { mod parse_response { use super::*; - use bytes::{Bytes, BytesMut}; + use bytes::BytesMut; use httparse::Status; use pingora_error::{ Error, ErrorType::{self, *}, - Result, }; - use pingora_http::ResponseHeader; pub const INVALID_CHUNK: ErrorType = ErrorType::new("InvalidChunk"); pub const INCOMPLETE_BODY: ErrorType = ErrorType::new("IncompleteHttpBody"); @@ -280,7 +278,7 @@ mod parse_response { const INIT_HEADER_BUF_SIZE: usize = 4096; const CHUNK_DELIMITER_SIZE: usize = 2; // \r\n - #[derive(Debug, Clone, Copy)] + #[derive(Debug, Clone, Copy, PartialEq)] enum ParseState { Init, PartialHeader, @@ -561,6 +559,10 @@ mod parse_response { let output = parser.inject_data(input); // header is not complete assert!(output.is_err()); + match parser.state { + ParseState::Invalid(httparse::Error::Version) => {} + _ => panic!("should have failed to parse"), + } } #[test] diff --git a/pingora-core/src/apps/prometheus_http_app.rs b/pingora-core/src/apps/prometheus_http_app.rs index 3508d201..36513a19 100644 --- a/pingora-core/src/apps/prometheus_http_app.rs +++ b/pingora-core/src/apps/prometheus_http_app.rs @@ -15,7 +15,7 @@ //! An HTTP application that reports Prometheus metrics. use async_trait::async_trait; -use http::{self, Response}; +use http::Response; use prometheus::{Encoder, TextEncoder}; use super::http_app::HttpServer; diff --git a/pingora-core/src/connectors/mod.rs b/pingora-core/src/connectors/mod.rs index e35df37e..9a4a236f 100644 --- a/pingora-core/src/connectors/mod.rs +++ b/pingora-core/src/connectors/mod.rs @@ -427,7 +427,7 @@ mod tests { let stream = connector.new_stream(&peer).await; let error = stream.unwrap_err(); - // XXX: some system will allow the socket to bind and connect without error, only to timeout + // XXX: some systems will allow the socket to bind and connect without error, only to timeout assert!(error.etype() == &ConnectError || error.etype() == &ConnectTimedout) } diff --git a/pingora-core/src/protocols/http/date.rs b/pingora-core/src/protocols/http/date.rs index 4b15c4e8..20e6375b 100644 --- a/pingora-core/src/protocols/http/date.rs +++ b/pingora-core/src/protocols/http/date.rs @@ -12,13 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -use chrono::NaiveDateTime; +use chrono::DateTime; use http::header::HeaderValue; use std::cell::RefCell; use std::time::{Duration, SystemTime}; fn to_date_string(epoch_sec: i64) -> String { - let dt = NaiveDateTime::from_timestamp_opt(epoch_sec, 0).unwrap(); + let dt = DateTime::from_timestamp(epoch_sec, 0).unwrap(); dt.format("%a, %d %b %Y %H:%M:%S GMT").to_string() } diff --git a/pingora-core/src/protocols/http/v1/body.rs b/pingora-core/src/protocols/http/v1/body.rs index 8968f9ef..c317bb72 100644 --- a/pingora-core/src/protocols/http/v1/body.rs +++ b/pingora-core/src/protocols/http/v1/body.rs @@ -644,7 +644,6 @@ impl BodyWriter { #[cfg(test)] mod tests { use super::*; - use crate::utils::BufRef; use tokio_test::io::Builder; fn init_log() { diff --git a/pingora-core/src/protocols/http/v1/client.rs b/pingora-core/src/protocols/http/v1/client.rs index 1d970d7b..e28d2af4 100644 --- a/pingora-core/src/protocols/http/v1/client.rs +++ b/pingora-core/src/protocols/http/v1/client.rs @@ -672,8 +672,6 @@ mod tests_stream { use super::*; use crate::protocols::http::v1::body::ParseState; use crate::ErrorType; - use std::str; - use std::time::Duration; use tokio_test::io::Builder; fn init_log() { diff --git a/pingora-core/src/protocols/http/v1/server.rs b/pingora-core/src/protocols/http/v1/server.rs index 5b3a111e..c1b2d751 100644 --- a/pingora-core/src/protocols/http/v1/server.rs +++ b/pingora-core/src/protocols/http/v1/server.rs @@ -983,9 +983,8 @@ fn http_resp_header_to_buf( mod tests_stream { use super::*; use crate::protocols::http::v1::body::{BodyMode, ParseState}; - use http::{Method, StatusCode}; + use http::StatusCode; use std::str; - use std::time::Duration; use tokio_test::io::Builder; fn init_log() { diff --git a/pingora-core/src/server/mod.rs b/pingora-core/src/server/mod.rs index 42735507..fa2cea21 100644 --- a/pingora-core/src/server/mod.rs +++ b/pingora-core/src/server/mod.rs @@ -22,7 +22,6 @@ use daemon::daemonize; use log::{debug, error, info}; use pingora_runtime::Runtime; use pingora_timeout::fast_timeout; -use std::clone::Clone; use std::sync::Arc; use std::thread; use tokio::signal::unix; diff --git a/pingora-core/src/server/transfer_fd/mod.rs b/pingora-core/src/server/transfer_fd/mod.rs index ae07e33c..d7739352 100644 --- a/pingora-core/src/server/transfer_fd/mod.rs +++ b/pingora-core/src/server/transfer_fd/mod.rs @@ -343,7 +343,6 @@ where mod tests { use super::*; use log::{debug, error}; - use std::thread; fn init_log() { let _ = env_logger::builder().is_test(true).try_init(); diff --git a/pingora-http/src/lib.rs b/pingora-http/src/lib.rs index f3103081..24b648f7 100644 --- a/pingora-http/src/lib.rs +++ b/pingora-http/src/lib.rs @@ -30,7 +30,6 @@ use http::response::Builder as RespBuilder; use http::response::Parts as RespParts; use http::uri::Uri; use pingora_error::{ErrorType::*, OrErr, Result}; -use std::convert::TryInto; use std::ops::Deref; pub use http::method::Method; diff --git a/pingora-load-balancing/src/selection/consistent.rs b/pingora-load-balancing/src/selection/consistent.rs index 60c7b9ff..9c627260 100644 --- a/pingora-load-balancing/src/selection/consistent.rs +++ b/pingora-load-balancing/src/selection/consistent.rs @@ -18,7 +18,6 @@ use super::*; use pingora_core::protocols::l4::socket::SocketAddr; use pingora_ketama::{Bucket, Continuum}; use std::collections::HashMap; -use std::sync::Arc; /// Weighted Ketama consistent hashing pub struct KetamaHashing { diff --git a/pingora-proxy/src/proxy_h1.rs b/pingora-proxy/src/proxy_h1.rs index 77bc50b5..f50d3102 100644 --- a/pingora-proxy/src/proxy_h1.rs +++ b/pingora-proxy/src/proxy_h1.rs @@ -15,7 +15,6 @@ use super::*; use crate::proxy_cache::{range_filter::RangeBodyFilter, ServeFromCache}; use crate::proxy_common::*; -use http::Version; impl HttpProxy { pub(crate) async fn proxy_1to1( diff --git a/pingora-proxy/src/proxy_purge.rs b/pingora-proxy/src/proxy_purge.rs index 16796ba2..73e27a05 100644 --- a/pingora-proxy/src/proxy_purge.rs +++ b/pingora-proxy/src/proxy_purge.rs @@ -14,9 +14,6 @@ use super::*; -use once_cell::sync::Lazy; -use pingora_core::protocols::http::SERVER_NAME; - fn gen_purge_response(code: u16) -> ResponseHeader { let mut resp = ResponseHeader::build(code, Some(3)).unwrap(); resp.insert_header(header::SERVER, &SERVER_NAME[..]) diff --git a/pingora-proxy/src/proxy_trait.rs b/pingora-proxy/src/proxy_trait.rs index 3049c1dc..042dbd44 100644 --- a/pingora-proxy/src/proxy_trait.rs +++ b/pingora-proxy/src/proxy_trait.rs @@ -13,9 +13,7 @@ // limitations under the License. use super::*; -use pingora_cache::{ - key::HashBinary, CacheKey, CacheMeta, NoCacheReason, RespCacheable, RespCacheable::*, -}; +use pingora_cache::{key::HashBinary, CacheKey, CacheMeta, RespCacheable, RespCacheable::*}; /// The interface to control the HTTP proxy /// diff --git a/pingora-proxy/tests/test_upstream.rs b/pingora-proxy/tests/test_upstream.rs index 5149d8fb..b8e5a497 100644 --- a/pingora-proxy/tests/test_upstream.rs +++ b/pingora-proxy/tests/test_upstream.rs @@ -135,7 +135,7 @@ async fn test_ws_server_ends_conn() { mod test_cache { use super::*; - use tokio::time::{sleep, Duration}; + use tokio::time::sleep; #[tokio::test] async fn test_basic_caching() { diff --git a/pingora-timeout/src/fast_timeout.rs b/pingora-timeout/src/fast_timeout.rs index d8f9c252..c3b251ac 100644 --- a/pingora-timeout/src/fast_timeout.rs +++ b/pingora-timeout/src/fast_timeout.rs @@ -94,7 +94,6 @@ pub fn unpause() { #[cfg(test)] mod tests { use super::*; - use std::time::Duration; #[tokio::test] async fn test_timeout() { diff --git a/pingora-timeout/src/lib.rs b/pingora-timeout/src/lib.rs index f3a33dd9..d52dcead 100644 --- a/pingora-timeout/src/lib.rs +++ b/pingora-timeout/src/lib.rs @@ -147,7 +147,6 @@ where #[cfg(test)] mod tests { use super::*; - use std::time::Duration; #[tokio::test] async fn test_timeout() { diff --git a/pingora-timeout/src/timer.rs b/pingora-timeout/src/timer.rs index 6e25c246..6916d7d2 100644 --- a/pingora-timeout/src/timer.rs +++ b/pingora-timeout/src/timer.rs @@ -248,7 +248,6 @@ impl TimerManager { #[cfg(test)] mod tests { use super::*; - use std::sync::Arc; #[test] fn test_round() { diff --git a/pingora/examples/server.rs b/pingora/examples/server.rs index be60b3a1..b28b15eb 100644 --- a/pingora/examples/server.rs +++ b/pingora/examples/server.rs @@ -25,7 +25,6 @@ use structopt::StructOpt; use tokio::time::interval; use std::time::Duration; -use std::vec::Vec; mod app; mod service; diff --git a/tinyufo/src/lib.rs b/tinyufo/src/lib.rs index 9a9e79cb..001f4e3e 100644 --- a/tinyufo/src/lib.rs +++ b/tinyufo/src/lib.rs @@ -329,9 +329,7 @@ impl FiFoQueues { fn evict_one_from_main(&self, buckets: &Buckets) -> Option> { loop { - let Some(to_evict) = self.main.pop() else { - return None; - }; + let to_evict = self.main.pop()?; let buckets = buckets.pin(); let maybe_bucket = buckets.get(&to_evict); if let Some(bucket) = maybe_bucket.as_ref() { From b9930176e7486b0f2f1426360ca893c9c1e2a379 Mon Sep 17 00:00:00 2001 From: Kevin Guthrie Date: Wed, 6 Mar 2024 11:40:53 -0500 Subject: [PATCH 07/11] Adding bleeper config file to internal repo --- .bleep | 2 +- .gitignore | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/.bleep b/.bleep index b295ae91..5c2290a6 100644 --- a/.bleep +++ b/.bleep @@ -1 +1 @@ -f414cd9a922f157165dc954757e9ba3aca30fa53 \ No newline at end of file +a8c217af0e62780f87c43d0ede0bebf31a545c17 \ No newline at end of file diff --git a/.gitignore b/.gitignore index 1e0037e2..abc8cf51 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,4 @@ dhat-heap.json .vscode .idea .cover +bleeper.user.toml \ No newline at end of file From 7eb929b686639d8f230c5542111ce5a5d553640b Mon Sep 17 00:00:00 2001 From: Andrew Hauck Date: Thu, 7 Mar 2024 22:08:34 -0800 Subject: [PATCH 08/11] Treat OS read timeouts as ReadError rather than ReadTimeout when reading http/1.1 response headers --- .bleep | 2 +- pingora-core/src/protocols/http/v1/client.rs | 40 ++++++++------------ 2 files changed, 17 insertions(+), 25 deletions(-) diff --git a/.bleep b/.bleep index 5c2290a6..66c4bb2a 100644 --- a/.bleep +++ b/.bleep @@ -1 +1 @@ -a8c217af0e62780f87c43d0ede0bebf31a545c17 \ No newline at end of file +90d84b32f4528ede68b8351c896a101af788113d \ No newline at end of file diff --git a/pingora-core/src/protocols/http/v1/client.rs b/pingora-core/src/protocols/http/v1/client.rs index e28d2af4..13e0482d 100644 --- a/pingora-core/src/protocols/http/v1/client.rs +++ b/pingora-core/src/protocols/http/v1/client.rs @@ -185,10 +185,9 @@ impl HttpSession { let read_fut = self.underlying_stream.read_buf(&mut buf); let read_result = match self.read_timeout { - Some(t) => match timeout(t, read_fut).await { - Ok(res) => res, - Err(_) => Err(std::io::Error::from(ErrorKind::TimedOut)), - }, + Some(t) => timeout(t, read_fut) + .await + .map_err(|_| Error::explain(ReadTimedout, "while reading response headers"))?, None => read_fut.await, }; let n = match read_result { @@ -208,26 +207,19 @@ impl HttpSession { } }, Err(e) => { - return match e.kind() { - ErrorKind::TimedOut => { - Error::e_explain(ReadTimedout, "while reading response headers") - } - _ => { - let true_io_error = e.raw_os_error().is_some(); - let mut e = Error::because( - ReadError, - format!( - "while reading response headers, bytes already read: {already_read}", - ), - e, - ); - // Likely OSError, typical if a previously reused connection drops it - if true_io_error { - e.retry = RetryType::ReusedOnly; - } // else: not safe to retry TLS error - Err(e) - } - }; + let true_io_error = e.raw_os_error().is_some(); + let mut e = Error::because( + ReadError, + format!( + "while reading response headers, bytes already read: {already_read}", + ), + e, + ); + // Likely OSError, typical if a previously reused connection drops it + if true_io_error { + e.retry = RetryType::ReusedOnly; + } // else: not safe to retry TLS error + return Err(e); } }; already_read += n; From c545ef4e3d507dd428b6d540fbb5b9c1ea95d322 Mon Sep 17 00:00:00 2001 From: ewang Date: Tue, 5 Mar 2024 16:56:37 -0800 Subject: [PATCH 09/11] Add server_addr and client_addr to Session --- .bleep | 2 +- pingora-core/src/apps/mod.rs | 14 ++- pingora-core/src/connectors/http/v1.rs | 5 + pingora-core/src/connectors/http/v2.rs | 1 + pingora-core/src/connectors/l4.rs | 13 ++- pingora-core/src/protocols/digest.rs | 45 +++++++- pingora-core/src/protocols/http/client.rs | 20 +++- pingora-core/src/protocols/http/server.rs | 18 +++- pingora-core/src/protocols/http/v1/client.rs | 19 +++- pingora-core/src/protocols/http/v1/server.rs | 34 +++++- pingora-core/src/protocols/http/v2/client.rs | 20 +++- pingora-core/src/protocols/http/v2/server.rs | 37 ++++++- pingora-core/src/protocols/l4/listener.rs | 31 +++++- pingora-core/src/protocols/l4/socket.rs | 64 +++++++++++ pingora-core/src/protocols/l4/stream.rs | 38 ++++++- pingora-core/src/protocols/mod.rs | 22 +++- pingora-core/src/protocols/ssl/client.rs | 16 ++- pingora-proxy/src/subrequest.rs | 10 +- pingora-proxy/tests/test_basic.rs | 95 +++++++++++++++- pingora-proxy/tests/utils/server_utils.rs | 107 ++++++++++++++++--- 20 files changed, 568 insertions(+), 43 deletions(-) diff --git a/.bleep b/.bleep index 66c4bb2a..1ac1f3c8 100644 --- a/.bleep +++ b/.bleep @@ -1 +1 @@ -90d84b32f4528ede68b8351c896a101af788113d \ No newline at end of file +c16c9e8bfd9334b77a6a7c1123954f41037c06c3 \ No newline at end of file diff --git a/pingora-core/src/apps/mod.rs b/pingora-core/src/apps/mod.rs index db8f11bc..6a436c7d 100644 --- a/pingora-core/src/apps/mod.rs +++ b/pingora-core/src/apps/mod.rs @@ -24,6 +24,7 @@ use std::sync::Arc; use crate::protocols::http::v2::server; use crate::protocols::http::ServerSession; +use crate::protocols::Digest; use crate::protocols::Stream; use crate::protocols::ALPN; @@ -91,6 +92,15 @@ where ) -> Option { match stream.selected_alpn_proto() { Some(ALPN::H2) => { + // create a shared connection digest + let digest = Arc::new(Digest { + ssl_digest: stream.get_ssl_digest(), + // TODO: log h2 handshake time + timing_digest: stream.get_timing_digest(), + proxy_digest: stream.get_proxy_digest(), + socket_digest: stream.get_socket_digest(), + }); + let h2_options = self.h2_options(); let h2_conn = server::handshake(stream, h2_options).await; let mut h2_conn = match h2_conn { @@ -100,10 +110,12 @@ where } Ok(c) => c, }; + loop { // this loop ends when the client decides to close the h2 conn // TODO: add a timeout? - let h2_stream = server::HttpSession::from_h2_conn(&mut h2_conn).await; + let h2_stream = + server::HttpSession::from_h2_conn(&mut h2_conn, digest.clone()).await; let h2_stream = match h2_stream { Err(e) => { // It is common for client to just disconnect TCP without properly diff --git a/pingora-core/src/connectors/http/v1.rs b/pingora-core/src/connectors/http/v1.rs index 513fed14..7958a093 100644 --- a/pingora-core/src/connectors/http/v1.rs +++ b/pingora-core/src/connectors/http/v1.rs @@ -65,6 +65,7 @@ impl Connector { #[cfg(test)] mod tests { use super::*; + use crate::protocols::l4::socket::SocketAddr; use crate::upstreams::peer::HttpPeer; use pingora_http::RequestHeader; @@ -85,6 +86,8 @@ mod tests { let peer = HttpPeer::new(("1.1.1.1", 80), false, "".into()); // make a new connection to 1.1.1.1 let (http, reused) = connector.get_http_session(&peer).await.unwrap(); + let server_addr = http.server_addr().unwrap(); + assert_eq!(*server_addr, "1.1.1.1:80".parse::().unwrap()); assert!(!reused); // this http is not even used, so not be able to reuse @@ -104,6 +107,8 @@ mod tests { let peer = HttpPeer::new(("1.1.1.1", 443), true, "one.one.one.one".into()); // make a new connection to https://1.1.1.1 let (http, reused) = connector.get_http_session(&peer).await.unwrap(); + let server_addr = http.server_addr().unwrap(); + assert_eq!(*server_addr, "1.1.1.1:443".parse::().unwrap()); assert!(!reused); // this http is not even used, so not be able to reuse diff --git a/pingora-core/src/connectors/http/v2.rs b/pingora-core/src/connectors/http/v2.rs index 389bd4e5..6f26b462 100644 --- a/pingora-core/src/connectors/http/v2.rs +++ b/pingora-core/src/connectors/http/v2.rs @@ -369,6 +369,7 @@ async fn handshake( // TODO: log h2 handshake time timing_digest: stream.get_timing_digest(), proxy_digest: stream.get_proxy_digest(), + socket_digest: stream.get_socket_digest(), }; // TODO: make these configurable let (send_req, connection) = Builder::new() diff --git a/pingora-core/src/connectors/l4.rs b/pingora-core/src/connectors/l4.rs index 6f0f5fdb..3db27718 100644 --- a/pingora-core/src/connectors/l4.rs +++ b/pingora-core/src/connectors/l4.rs @@ -16,10 +16,12 @@ use log::debug; use pingora_error::{Context, Error, ErrorType::*, OrErr, Result}; use rand::seq::SliceRandom; use std::net::SocketAddr as InetSocketAddr; +use std::os::unix::io::AsRawFd; use crate::protocols::l4::ext::{connect as tcp_connect, connect_uds, set_tcp_keepalive}; use crate::protocols::l4::socket::SocketAddr; use crate::protocols::l4::stream::Stream; +use crate::protocols::{GetSocketDigest, SocketDigest}; use crate::upstreams::peer::Peer; /// Establish a connection (l4) to the given peer using its settings and an optional bind address. @@ -32,7 +34,8 @@ where .await .err_context(|| format!("Fail to establish CONNECT proxy: {}", peer)); } - let mut stream: Stream = match peer.address() { + let peer_addr = peer.address(); + let mut stream: Stream = match peer_addr { SocketAddr::Inet(addr) => { let connect_future = tcp_connect(addr, bind_to.as_ref()); let conn_res = match peer.connection_timeout() { @@ -97,6 +100,14 @@ where } stream.set_nodelay()?; + + let digest = SocketDigest::from_raw_fd(stream.as_raw_fd()); + digest + .peer_addr + .set(Some(peer_addr.clone())) + .expect("newly created OnceCell must be empty"); + stream.set_socket_digest(digest); + Ok(stream) } diff --git a/pingora-core/src/protocols/digest.rs b/pingora-core/src/protocols/digest.rs index 13ce35c8..594dbba7 100644 --- a/pingora-core/src/protocols/digest.rs +++ b/pingora-core/src/protocols/digest.rs @@ -17,11 +17,14 @@ use std::sync::Arc; use std::time::SystemTime; +use once_cell::sync::OnceCell; + +use super::l4::socket::SocketAddr; use super::raw_connect::ProxyDigest; use super::ssl::digest::SslDigest; /// The information can be extracted from a connection -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Default)] pub struct Digest { /// Information regarding the TLS of this connection if any pub ssl_digest: Option>, @@ -29,6 +32,8 @@ pub struct Digest { pub timing_digest: Vec>, /// information regarding the CONNECT proxy this connection uses. pub proxy_digest: Option>, + /// Information about underlying socket/fd of this connection + pub socket_digest: Option>, } /// The interface to return protocol related information @@ -53,6 +58,38 @@ impl Default for TimingDigest { } } +#[derive(Debug)] +/// The interface to return socket-related information +pub struct SocketDigest { + raw_fd: std::os::unix::io::RawFd, + /// Remote socket address + pub peer_addr: OnceCell>, + /// Local socket address + pub local_addr: OnceCell>, +} + +impl SocketDigest { + pub fn from_raw_fd(raw_fd: std::os::unix::io::RawFd) -> SocketDigest { + SocketDigest { + raw_fd, + peer_addr: OnceCell::new(), + local_addr: OnceCell::new(), + } + } + + pub fn peer_addr(&self) -> Option<&SocketAddr> { + self.peer_addr + .get_or_init(|| SocketAddr::from_raw_fd(self.raw_fd, true)) + .as_ref() + } + + pub fn local_addr(&self) -> Option<&SocketAddr> { + self.local_addr + .get_or_init(|| SocketAddr::from_raw_fd(self.raw_fd, false)) + .as_ref() + } +} + /// The interface to return timing information pub trait GetTimingDigest { /// Return the timing for each layer from the lowest layer to upper @@ -64,3 +101,9 @@ pub trait GetProxyDigest { fn get_proxy_digest(&self) -> Option>; fn set_proxy_digest(&mut self, _digest: ProxyDigest) {} } + +/// The interface to set or return socket information +pub trait GetSocketDigest { + fn get_socket_digest(&self) -> Option>; + fn set_socket_digest(&mut self, _socket_digest: SocketDigest) {} +} diff --git a/pingora-core/src/protocols/http/client.rs b/pingora-core/src/protocols/http/client.rs index 0fe6c901..83a7618f 100644 --- a/pingora-core/src/protocols/http/client.rs +++ b/pingora-core/src/protocols/http/client.rs @@ -19,7 +19,7 @@ use std::time::Duration; use super::v1::client::HttpSession as Http1Session; use super::v2::client::Http2Session; -use crate::protocols::Digest; +use crate::protocols::{Digest, SocketAddr}; /// A type for Http client session. It can be either an Http1 connection or an Http2 stream. pub enum HttpSession { @@ -151,11 +151,27 @@ impl HttpSession { /// Return the [Digest] of the connection /// /// For reused connection, the timing in the digest will reflect its initial handshakes - /// The caller should check if the connection is reused to avoid misuse the timing field + /// The caller should check if the connection is reused to avoid misuse of the timing field pub fn digest(&self) -> Option<&Digest> { match self { Self::H1(s) => Some(s.digest()), Self::H2(s) => s.digest(), } } + + /// Return the server (peer) address of the connection. + pub fn server_addr(&self) -> Option<&SocketAddr> { + match self { + Self::H1(s) => s.server_addr(), + Self::H2(s) => s.server_addr(), + } + } + + /// Return the client (local) address of the connection. + pub fn client_addr(&self) -> Option<&SocketAddr> { + match self { + Self::H1(s) => s.client_addr(), + Self::H2(s) => s.client_addr(), + } + } } diff --git a/pingora-core/src/protocols/http/server.rs b/pingora-core/src/protocols/http/server.rs index c4a0b07f..6edea75a 100644 --- a/pingora-core/src/protocols/http/server.rs +++ b/pingora-core/src/protocols/http/server.rs @@ -18,7 +18,7 @@ use super::error_resp; use super::v1::server::HttpSession as SessionV1; use super::v2::server::HttpSession as SessionV2; use super::HttpTask; -use crate::protocols::Stream; +use crate::protocols::{SocketAddr, Stream}; use bytes::Bytes; use http::header::AsHeaderName; use http::HeaderValue; @@ -330,4 +330,20 @@ impl Session { Self::H2(s) => s.body_bytes_sent(), } } + + /// Return the client (peer) address of the connnection. + pub fn client_addr(&self) -> Option<&SocketAddr> { + match self { + Self::H1(s) => s.client_addr(), + Self::H2(s) => s.client_addr(), + } + } + + /// Return the server (local) address of the connection. + pub fn server_addr(&self) -> Option<&SocketAddr> { + match self { + Self::H1(s) => s.server_addr(), + Self::H2(s) => s.server_addr(), + } + } } diff --git a/pingora-core/src/protocols/http/v1/client.rs b/pingora-core/src/protocols/http/v1/client.rs index 13e0482d..7881e597 100644 --- a/pingora-core/src/protocols/http/v1/client.rs +++ b/pingora-core/src/protocols/http/v1/client.rs @@ -28,7 +28,7 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt}; use super::body::{BodyReader, BodyWriter}; use super::common::*; use crate::protocols::http::HttpTask; -use crate::protocols::{Digest, Stream, UniqueID}; +use crate::protocols::{Digest, SocketAddr, Stream, UniqueID}; use crate::utils::{BufRef, KVRef}; /// The HTTP 1.x client session @@ -65,6 +65,7 @@ impl HttpSession { ssl_digest: stream.get_ssl_digest(), timing_digest: stream.get_timing_digest(), proxy_digest: stream.get_proxy_digest(), + socket_digest: stream.get_socket_digest(), }); HttpSession { underlying_stream: stream, @@ -601,6 +602,22 @@ impl HttpSession { pub fn digest(&self) -> &Digest { &self.digest } + + /// Return the server (peer) address recorded in the connection digest. + pub fn server_addr(&self) -> Option<&SocketAddr> { + self.digest() + .socket_digest + .as_ref() + .map(|d| d.peer_addr())? + } + + /// Return the client (local) address recorded in the connection digest. + pub fn client_addr(&self) -> Option<&SocketAddr> { + self.digest() + .socket_digest + .as_ref() + .map(|d| d.local_addr())? + } } #[inline] diff --git a/pingora-core/src/protocols/http/v1/server.rs b/pingora-core/src/protocols/http/v1/server.rs index c1b2d751..0b199706 100644 --- a/pingora-core/src/protocols/http/v1/server.rs +++ b/pingora-core/src/protocols/http/v1/server.rs @@ -31,7 +31,7 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt}; use super::body::{BodyReader, BodyWriter}; use super::common::*; use crate::protocols::http::{body_buffer::FixedBuffer, date, error_resp, HttpTask}; -use crate::protocols::Stream; +use crate::protocols::{Digest, SocketAddr, Stream}; use crate::utils::{BufRef, KVRef}; /// The HTTP 1.x server session @@ -68,6 +68,8 @@ pub struct HttpSession { /// Whether this session is an upgraded session. This flag is calculated when sending the /// response header to the client. upgraded: bool, + /// Digest to track underlying connection metrics + digest: Box, } impl HttpSession { @@ -75,6 +77,14 @@ impl HttpSession { /// The created session needs to call [`Self::read_request()`] first before performing /// any other operations. pub fn new(underlying_stream: Stream) -> Self { + // TODO: maybe we should put digest in the connection itself + let digest = Box::new(Digest { + ssl_digest: underlying_stream.get_ssl_digest(), + timing_digest: underlying_stream.get_timing_digest(), + proxy_digest: underlying_stream.get_proxy_digest(), + socket_digest: underlying_stream.get_socket_digest(), + }); + HttpSession { underlying_stream, buf: Bytes::new(), // zero size, with be replaced by parsed header later @@ -92,6 +102,7 @@ impl HttpSession { body_bytes_sent: 0, retry_buffer: None, upgraded: false, + digest, } } @@ -751,6 +762,27 @@ impl HttpSession { } } + /// Return the [Digest] of the connection. + pub fn digest(&self) -> &Digest { + &self.digest + } + + /// Return the client (peer) address of the underlying connnection. + pub fn client_addr(&self) -> Option<&SocketAddr> { + self.digest() + .socket_digest + .as_ref() + .map(|d| d.peer_addr())? + } + + /// Return the server (local) address of the underlying connnection. + pub fn server_addr(&self) -> Option<&SocketAddr> { + self.digest() + .socket_digest + .as_ref() + .map(|d| d.local_addr())? + } + /// Consume `self`, if the connection can be reused, the underlying stream will be returned /// to be fed to the next [`Self::new()`]. The next session can just call [`Self::read_request()`]. /// If the connection cannot be reused, the underlying stream will be closed and `None` will be diff --git a/pingora-core/src/protocols/http/v2/client.rs b/pingora-core/src/protocols/http/v2/client.rs index 48551ecf..15f86d08 100644 --- a/pingora-core/src/protocols/http/v2/client.rs +++ b/pingora-core/src/protocols/http/v2/client.rs @@ -30,7 +30,7 @@ use tokio::io::{AsyncRead, AsyncWrite}; use tokio::sync::watch; use crate::connectors::http::v2::ConnectionRef; -use crate::protocols::Digest; +use crate::protocols::{Digest, SocketAddr}; pub const PING_TIMEDOUT: ErrorType = ErrorType::new("PingTimedout"); @@ -310,6 +310,24 @@ impl Http2Session { Some(self.conn.digest()) } + /// Return the server (peer) address recorded in the connection digest. + pub fn server_addr(&self) -> Option<&SocketAddr> { + self.conn + .digest() + .socket_digest + .as_ref() + .map(|d| d.peer_addr())? + } + + /// Return the client (local) address recorded in the connection digest. + pub fn client_addr(&self) -> Option<&SocketAddr> { + self.conn + .digest() + .socket_digest + .as_ref() + .map(|d| d.local_addr())? + } + /// the FD of the underlying connection pub fn fd(&self) -> i32 { self.conn.id() diff --git a/pingora-core/src/protocols/http/v2/server.rs b/pingora-core/src/protocols/http/v2/server.rs index 6ec75e82..a8117699 100644 --- a/pingora-core/src/protocols/http/v2/server.rs +++ b/pingora-core/src/protocols/http/v2/server.rs @@ -23,12 +23,13 @@ use http::header::HeaderName; use http::{header, Response}; use log::{debug, warn}; use pingora_http::{RequestHeader, ResponseHeader}; +use std::sync::Arc; use crate::protocols::http::body_buffer::FixedBuffer; use crate::protocols::http::date::get_cached_date; use crate::protocols::http::v1::client::http_req_header_to_wire; use crate::protocols::http::HttpTask; -use crate::protocols::Stream; +use crate::protocols::{Digest, SocketAddr, Stream}; use crate::{Error, ErrorType, OrErr, Result}; const BODY_BUF_LIMIT: usize = 1024 * 64; @@ -95,6 +96,8 @@ pub struct HttpSession { body_sent: usize, // buffered request body for retry logic retry_buffer: Option, + // digest to record underlying connection info + digest: Arc, } impl HttpSession { @@ -102,11 +105,19 @@ impl HttpSession { /// This function returns a new HTTP/2 session when the provided HTTP/2 connection, `conn`, /// establishes a new HTTP/2 stream to this server. /// + /// A [`Digest`] from the IO stream is also stored in the resulting session, since the + /// session doesn't have access to the underlying stream (and the stream itself isn't + /// accessible from the `h2::server::Connection`). + /// /// Note: in order to handle all **existing** and new HTTP/2 sessions, the server must call /// this function in a loop until the client decides to close the connection. /// /// `None` will be returned when the connection is closing so that the loop can exit. - pub async fn from_h2_conn(conn: &mut H2Connection) -> Result> { + /// + pub async fn from_h2_conn( + conn: &mut H2Connection, + digest: Arc, + ) -> Result> { // NOTE: conn.accept().await is what drives the entire connection. let res = conn.accept().await.transpose().or_err( ErrorType::H2Error, @@ -125,6 +136,7 @@ impl HttpSession { body_read: 0, body_sent: 0, retry_buffer: None, + digest, } })) } @@ -405,6 +417,21 @@ impl HttpSession { pub fn body_bytes_sent(&self) -> usize { self.body_sent } + + /// Return the [Digest] of the connection. + pub fn digest(&self) -> Option<&Digest> { + Some(&self.digest) + } + + /// Return the server (local) address recorded in the connection digest. + pub fn server_addr(&self) -> Option<&SocketAddr> { + self.digest.socket_digest.as_ref().map(|d| d.local_addr())? + } + + /// Return the client (peer) address recorded in the connection digest. + pub fn client_addr(&self) -> Option<&SocketAddr> { + self.digest.socket_digest.as_ref().map(|d| d.peer_addr())? + } } #[cfg(test)] @@ -444,8 +471,12 @@ mod test { }); let mut connection = handshake(Box::new(server), None).await.unwrap(); + let digest = Arc::new(Digest::default()); - while let Some(mut http) = HttpSession::from_h2_conn(&mut connection).await.unwrap() { + while let Some(mut http) = HttpSession::from_h2_conn(&mut connection, digest.clone()) + .await + .unwrap() + { tokio::spawn(async move { let req = http.req_header(); assert_eq!(req.method, Method::GET); diff --git a/pingora-core/src/protocols/l4/listener.rs b/pingora-core/src/protocols/l4/listener.rs index 6473fb4b..29cc9e9e 100644 --- a/pingora-core/src/protocols/l4/listener.rs +++ b/pingora-core/src/protocols/l4/listener.rs @@ -18,6 +18,7 @@ use std::io; use std::os::unix::io::AsRawFd; use tokio::net::{TcpListener, UnixListener}; +use crate::protocols::digest::{GetSocketDigest, SocketDigest}; use crate::protocols::l4::stream::Stream; /// The type for generic listener for both TCP and Unix domain socket @@ -40,7 +41,7 @@ impl From for Listener { } impl AsRawFd for Listener { - fn as_raw_fd(&self) -> std::os::unix::prelude::RawFd { + fn as_raw_fd(&self) -> std::os::unix::io::RawFd { match &self { Self::Tcp(l) => l.as_raw_fd(), Self::Unix(l) => l.as_raw_fd(), @@ -52,8 +53,32 @@ impl Listener { /// Accept a connection from the listening endpoint pub async fn accept(&self) -> io::Result { match &self { - Self::Tcp(l) => l.accept().await.map(|(stream, _)| stream.into()), - Self::Unix(l) => l.accept().await.map(|(stream, _)| stream.into()), + Self::Tcp(l) => l.accept().await.map(|(stream, peer_addr)| { + let mut s: Stream = stream.into(); + let digest = SocketDigest::from_raw_fd(s.as_raw_fd()); + digest + .peer_addr + .set(Some(peer_addr.into())) + .expect("newly created OnceCell must be empty"); + s.set_socket_digest(digest); + // TODO: if listening on a specific bind address, we could save + // an extra syscall looking up the local_addr later if we can pass + // and init it in the socket digest here + s + }), + Self::Unix(l) => l.accept().await.map(|(stream, peer_addr)| { + let mut s: Stream = stream.into(); + let digest = SocketDigest::from_raw_fd(s.as_raw_fd()); + // note: if unnamed/abstract UDS, it will be `None` + // (see TryFrom) + let addr = peer_addr.try_into().ok(); + digest + .peer_addr + .set(addr) + .expect("newly created OnceCell must be empty"); + s.set_socket_digest(digest); + s + }), } } } diff --git a/pingora-core/src/protocols/l4/socket.rs b/pingora-core/src/protocols/l4/socket.rs index 02eab363..186fbecb 100644 --- a/pingora-core/src/protocols/l4/socket.rs +++ b/pingora-core/src/protocols/l4/socket.rs @@ -15,10 +15,12 @@ //! Generic socket type use crate::{Error, OrErr}; +use nix::sys::socket::{getpeername, getsockname, SockaddrStorage}; use std::cmp::Ordering; use std::hash::{Hash, Hasher}; use std::net::SocketAddr as StdSockAddr; use std::os::unix::net::SocketAddr as StdUnixSockAddr; +use tokio::net::unix::SocketAddr as TokioUnixSockAddr; /// [`SocketAddr`] is a storage type that contains either a Internet (IP address) /// socket address or a Unix domain socket address. @@ -53,6 +55,40 @@ impl SocketAddr { addr.set_port(port) } } + + fn from_sockaddr_storage(sock: &SockaddrStorage) -> Option { + if let Some(v4) = sock.as_sockaddr_in() { + return Some(SocketAddr::Inet(StdSockAddr::V4( + std::net::SocketAddrV4::new(v4.ip().into(), v4.port()), + ))); + } else if let Some(v6) = sock.as_sockaddr_in6() { + return Some(SocketAddr::Inet(StdSockAddr::V6( + std::net::SocketAddrV6::new(v6.ip(), v6.port(), v6.flowinfo(), v6.scope_id()), + ))); + } + + // TODO: don't set abstract / unnamed for now, + // for parity with how we treat these types in TryFrom + Some(SocketAddr::Unix( + sock.as_unix_addr() + .map(|addr| addr.path().map(StdUnixSockAddr::from_pathname))?? + .ok()?, + )) + } + + pub fn from_raw_fd(fd: std::os::unix::io::RawFd, peer_addr: bool) -> Option { + let sockaddr_storage = if peer_addr { + getpeername(fd) + } else { + getsockname(fd) + }; + match sockaddr_storage { + Ok(sockaddr) => Self::from_sockaddr_storage(&sockaddr), + // could be errors such as EBADF, i.e. fd is no longer a valid socket + // fail open in this case + Err(_e) => None, + } + } } impl std::fmt::Display for SocketAddr { @@ -167,6 +203,34 @@ impl std::net::ToSocketAddrs for SocketAddr { } } +impl From for SocketAddr { + fn from(sockaddr: StdSockAddr) -> Self { + SocketAddr::Inet(sockaddr) + } +} + +impl From for SocketAddr { + fn from(sockaddr: StdUnixSockAddr) -> Self { + SocketAddr::Unix(sockaddr) + } +} + +// TODO: ideally mio/tokio will start using the std version of the unix `SocketAddr` +// so we can avoid a fallible conversion +// https://github.com/tokio-rs/mio/issues/1527 +impl TryFrom for SocketAddr { + type Error = String; + + fn try_from(value: TokioUnixSockAddr) -> Result { + if let Some(Ok(addr)) = value.as_pathname().map(StdUnixSockAddr::from_pathname) { + Ok(addr.into()) + } else { + // may be unnamed/abstract UDS + Err(format!("could not convert {value:?} to SocketAddr")) + } + } +} + #[cfg(test)] mod test { use super::*; diff --git a/pingora-core/src/protocols/l4/stream.rs b/pingora-core/src/protocols/l4/stream.rs index ebe4f672..e2efdbfd 100644 --- a/pingora-core/src/protocols/l4/stream.rs +++ b/pingora-core/src/protocols/l4/stream.rs @@ -27,7 +27,10 @@ use tokio::io::{self, AsyncRead, AsyncWrite, AsyncWriteExt, BufStream, ReadBuf}; use tokio::net::{TcpStream, UnixStream}; use crate::protocols::raw_connect::ProxyDigest; -use crate::protocols::{GetProxyDigest, GetTimingDigest, Shutdown, Ssl, TimingDigest, UniqueID}; +use crate::protocols::{ + GetProxyDigest, GetSocketDigest, GetTimingDigest, Shutdown, SocketDigest, Ssl, TimingDigest, + UniqueID, +}; use crate::upstreams::peer::Tracer; #[derive(Debug)] @@ -105,6 +108,15 @@ impl AsyncWrite for RawStream { } } +impl AsRawFd for RawStream { + fn as_raw_fd(&self) -> std::os::unix::io::RawFd { + match self { + RawStream::Tcp(s) => s.as_raw_fd(), + RawStream::Unix(s) => s.as_raw_fd(), + } + } +} + // Large read buffering helps reducing syscalls with little trade-off // Ssl layer always does "small" reads in 16k (TLS record size) so L4 read buffer helps a lot. const BUF_READ_SIZE: usize = 64 * 1024; @@ -123,6 +135,7 @@ pub struct Stream { stream: BufStream, buffer_write: bool, proxy_digest: Option>, + socket_digest: Option>, /// When this connection is established pub established_ts: SystemTime, /// The distributed tracing object for this stream @@ -147,6 +160,7 @@ impl From for Stream { buffer_write: true, established_ts: SystemTime::now(), proxy_digest: None, + socket_digest: None, tracer: None, } } @@ -159,17 +173,21 @@ impl From for Stream { buffer_write: true, established_ts: SystemTime::now(), proxy_digest: None, + socket_digest: None, tracer: None, } } } +impl AsRawFd for Stream { + fn as_raw_fd(&self) -> std::os::unix::io::RawFd { + self.stream.get_ref().as_raw_fd() + } +} + impl UniqueID for Stream { fn id(&self) -> i32 { - match &self.stream.get_ref() { - RawStream::Tcp(s) => s.as_raw_fd(), - RawStream::Unix(s) => s.as_raw_fd(), - } + self.as_raw_fd() } } @@ -204,6 +222,16 @@ impl GetProxyDigest for Stream { } } +impl GetSocketDigest for Stream { + fn get_socket_digest(&self) -> Option> { + self.socket_digest.clone() + } + + fn set_socket_digest(&mut self, socket_digest: SocketDigest) { + self.socket_digest = Some(Arc::new(socket_digest)) + } +} + impl Drop for Stream { fn drop(&mut self) { if let Some(t) = self.tracer.as_ref() { diff --git a/pingora-core/src/protocols/mod.rs b/pingora-core/src/protocols/mod.rs index 4df6da8a..6b7a3575 100644 --- a/pingora-core/src/protocols/mod.rs +++ b/pingora-core/src/protocols/mod.rs @@ -20,7 +20,10 @@ pub mod l4; pub mod raw_connect; pub mod ssl; -pub use digest::{Digest, GetProxyDigest, GetTimingDigest, ProtoDigest, TimingDigest}; +pub use digest::{ + Digest, GetProxyDigest, GetSocketDigest, GetTimingDigest, ProtoDigest, SocketDigest, + TimingDigest, +}; pub use ssl::ALPN; use async_trait::async_trait; @@ -71,6 +74,7 @@ pub trait IO: + Ssl + GetTimingDigest + GetProxyDigest + + GetSocketDigest + Unpin + Debug + Send @@ -90,6 +94,7 @@ impl< + Ssl + GetTimingDigest + GetProxyDigest + + GetSocketDigest + Unpin + Debug + Send @@ -134,6 +139,11 @@ mod ext_io_impl { None } } + impl GetSocketDigest for Mock { + fn get_socket_digest(&self) -> Option> { + None + } + } use std::io::Cursor; @@ -157,6 +167,11 @@ mod ext_io_impl { None } } + impl GetSocketDigest for Cursor { + fn get_socket_digest(&self) -> Option> { + None + } + } use tokio::io::DuplexStream; @@ -180,6 +195,11 @@ mod ext_io_impl { None } } + impl GetSocketDigest for DuplexStream { + fn get_socket_digest(&self) -> Option> { + None + } + } } pub(crate) trait ConnFdReusable { diff --git a/pingora-core/src/protocols/ssl/client.rs b/pingora-core/src/protocols/ssl/client.rs index 7ed683f2..07811dda 100644 --- a/pingora-core/src/protocols/ssl/client.rs +++ b/pingora-core/src/protocols/ssl/client.rs @@ -16,7 +16,9 @@ use super::SslStream; use crate::protocols::raw_connect::ProxyDigest; -use crate::protocols::{GetProxyDigest, GetTimingDigest, TimingDigest, IO}; +use crate::protocols::{ + GetProxyDigest, GetSocketDigest, GetTimingDigest, SocketDigest, TimingDigest, IO, +}; use crate::tls::{ssl, ssl::ConnectConfiguration, ssl_sys::X509_V_ERR_INVALID_CALL}; use pingora_error::{Error, ErrorType::*, OrErr, Result}; @@ -90,3 +92,15 @@ where self.get_ref().get_proxy_digest() } } + +impl GetSocketDigest for SslStream +where + S: GetSocketDigest, +{ + fn get_socket_digest(&self) -> Option> { + self.get_ref().get_socket_digest() + } + fn set_socket_digest(&mut self, socket_digest: SocketDigest) { + self.get_mut().set_socket_digest(socket_digest) + } +} diff --git a/pingora-proxy/src/subrequest.rs b/pingora-proxy/src/subrequest.rs index 9490a40b..e75dcc6f 100644 --- a/pingora-proxy/src/subrequest.rs +++ b/pingora-proxy/src/subrequest.rs @@ -17,7 +17,9 @@ use core::pin::Pin; use core::task::{Context, Poll}; use pingora_cache::lock::WritePermit; use pingora_core::protocols::raw_connect::ProxyDigest; -use pingora_core::protocols::{GetProxyDigest, GetTimingDigest, Ssl, TimingDigest, UniqueID}; +use pingora_core::protocols::{ + GetProxyDigest, GetSocketDigest, GetTimingDigest, SocketDigest, Ssl, TimingDigest, UniqueID, +}; use std::io::Cursor; use std::sync::Arc; use tokio::io::{AsyncRead, AsyncWrite, Error, ReadBuf}; @@ -85,6 +87,12 @@ impl GetProxyDigest for DummyIO { } } +impl GetSocketDigest for DummyIO { + fn get_socket_digest(&self) -> Option> { + None + } +} + #[async_trait] impl pingora_core::protocols::Shutdown for DummyIO { async fn shutdown(&mut self) -> () {} diff --git a/pingora-proxy/tests/test_basic.rs b/pingora-proxy/tests/test_basic.rs index a4730bfc..4f11c2b7 100644 --- a/pingora-proxy/tests/test_basic.rs +++ b/pingora-proxy/tests/test_basic.rs @@ -20,6 +20,10 @@ use reqwest::{header, StatusCode}; use utils::server_utils::init; +fn is_specified_port(port: u16) -> bool { + (1..65535).contains(&port) +} + #[tokio::test] async fn test_origin_alive() { init(); @@ -36,8 +40,27 @@ async fn test_simple_proxy() { init(); let res = reqwest::get("http://127.0.0.1:6147").await.unwrap(); assert_eq!(res.status(), StatusCode::OK); + let headers = res.headers(); assert_eq!(headers[header::CONTENT_LENGTH], "13"); + assert_eq!(headers["x-server-addr"], "127.0.0.1:6147"); + let sockaddr = headers["x-client-addr"] + .to_str() + .unwrap() + .parse::() + .unwrap(); + assert_eq!(sockaddr.ip().to_string(), "127.0.0.1"); + assert!(is_specified_port(sockaddr.port())); + + assert_eq!(headers["x-upstream-server-addr"], "127.0.0.1:8000"); + let sockaddr = headers["x-upstream-client-addr"] + .to_str() + .unwrap() + .parse::() + .unwrap(); + assert_eq!(sockaddr.ip().to_string(), "127.0.0.2"); + assert!(is_specified_port(sockaddr.port())); + let body = res.text().await.unwrap(); assert_eq!(body, "Hello World!\n"); } @@ -53,8 +76,28 @@ async fn test_h2_to_h1() { let res = client.get("https://127.0.0.1:6150").send().await.unwrap(); assert_eq!(res.status(), reqwest::StatusCode::OK); assert_eq!(res.version(), reqwest::Version::HTTP_2); + let headers = res.headers(); assert_eq!(headers[header::CONTENT_LENGTH], "13"); + assert_eq!(headers["x-server-addr"], "127.0.0.1:6150"); + + let sockaddr = headers["x-client-addr"] + .to_str() + .unwrap() + .parse::() + .unwrap(); + assert_eq!(sockaddr.ip().to_string(), "127.0.0.1"); + assert!(is_specified_port(sockaddr.port())); + + assert_eq!(headers["x-upstream-server-addr"], "127.0.0.1:8443"); + let sockaddr = headers["x-upstream-client-addr"] + .to_str() + .unwrap() + .parse::() + .unwrap(); + assert_eq!(sockaddr.ip().to_string(), "127.0.0.2"); + assert!(is_specified_port(sockaddr.port())); + let body = res.text().await.unwrap(); assert_eq!(body, "Hello World!\n"); } @@ -75,8 +118,27 @@ async fn test_h2_to_h2() { .unwrap(); assert_eq!(res.status(), reqwest::StatusCode::OK); assert_eq!(res.version(), reqwest::Version::HTTP_2); + let headers = res.headers(); assert_eq!(headers[header::CONTENT_LENGTH], "13"); + assert_eq!(headers["x-server-addr"], "127.0.0.1:6150"); + let sockaddr = headers["x-client-addr"] + .to_str() + .unwrap() + .parse::() + .unwrap(); + assert_eq!(sockaddr.ip().to_string(), "127.0.0.1"); + assert!(is_specified_port(sockaddr.port())); + + assert_eq!(headers["x-upstream-server-addr"], "127.0.0.1:8443"); + let sockaddr = headers["x-upstream-client-addr"] + .to_str() + .unwrap() + .parse::() + .unwrap(); + assert_eq!(sockaddr.ip().to_string(), "127.0.0.2"); + assert!(is_specified_port(sockaddr.port())); + let body = res.text().await.unwrap(); assert_eq!(body, "Hello World!\n"); } @@ -159,7 +221,21 @@ async fn test_simple_proxy_uds() { assert_eq!(res.status(), reqwest::StatusCode::OK); let (resp, body) = res.into_parts(); - assert_eq!(resp.headers[header::CONTENT_LENGTH], "13"); + + let headers = &resp.headers; + assert_eq!(headers[header::CONTENT_LENGTH], "13"); + assert_eq!(headers["x-server-addr"], "/tmp/pingora_proxy.sock"); + assert_eq!(headers["x-client-addr"], "unset"); // unnamed UDS + + assert_eq!(headers["x-upstream-server-addr"], "127.0.0.1:8000"); + let sockaddr = headers["x-upstream-client-addr"] + .to_str() + .unwrap() + .parse::() + .unwrap(); + assert_eq!(sockaddr.ip().to_string(), "127.0.0.2"); + assert!(is_specified_port(sockaddr.port())); + let body = hyper::body::to_bytes(body).await.unwrap(); assert_eq!(body.as_ref(), b"Hello World!\n"); } @@ -168,15 +244,30 @@ async fn test_simple_proxy_uds() { async fn test_simple_proxy_uds_peer() { init(); let client = reqwest::Client::new(); + let res = client .get("http://127.0.0.1:6147") .header("x-uds-peer", "1") // force upstream peer to be UDS .send() .await .unwrap(); + assert_eq!(res.status(), StatusCode::OK); - let headers = res.headers(); + + let headers = &res.headers(); assert_eq!(headers[header::CONTENT_LENGTH], "13"); + assert_eq!(headers["x-server-addr"], "127.0.0.1:6147"); + let sockaddr = headers["x-client-addr"] + .to_str() + .unwrap() + .parse::() + .unwrap(); + assert_eq!(sockaddr.ip().to_string(), "127.0.0.1"); + assert!(is_specified_port(sockaddr.port())); + + assert_eq!(headers["x-upstream-client-addr"], "unset"); // unnamed UDS + assert_eq!(headers["x-upstream-server-addr"], "/tmp/nginx-test.sock"); + let body = res.text().await.unwrap(); assert_eq!(body, "Hello World!\n"); } diff --git a/pingora-proxy/tests/utils/server_utils.rs b/pingora-proxy/tests/utils/server_utils.rs index 32f32756..50fc8040 100644 --- a/pingora-proxy/tests/utils/server_utils.rs +++ b/pingora-proxy/tests/utils/server_utils.rs @@ -23,7 +23,7 @@ use pingora_cache::{ set_compression_dict_path, CacheMeta, CacheMetaDefaults, CachePhase, MemCache, NoCacheReason, RespCacheable, }; -use pingora_core::protocols::Digest; +use pingora_core::protocols::{l4::socket::SocketAddr, Digest}; use pingora_core::server::configuration::Opt; use pingora_core::services::Service; use pingora_core::upstreams::peer::HttpPeer; @@ -38,15 +38,72 @@ use structopt::StructOpt; pub struct ExampleProxyHttps {} #[allow(clippy::upper_case_acronyms)] +#[derive(Default)] pub struct CTX { conn_reused: bool, + upstream_client_addr: Option, + upstream_server_addr: Option, +} + +// Common logic for both ProxyHttp(s) types +fn connected_to_upstream_common( + reused: bool, + digest: Option<&Digest>, + ctx: &mut CTX, +) -> Result<()> { + ctx.conn_reused = reused; + let socket_digest = digest + .expect("upstream connector digest should be set for HTTP sessions") + .socket_digest + .as_ref() + .expect("socket digest should be set for HTTP sessions"); + ctx.upstream_client_addr = socket_digest.local_addr().cloned(); + ctx.upstream_server_addr = socket_digest.peer_addr().cloned(); + + Ok(()) +} + +fn response_filter_common( + session: &mut Session, + response: &mut ResponseHeader, + ctx: &mut CTX, +) -> Result<()> { + if ctx.conn_reused { + response.insert_header("x-conn-reuse", "1")?; + } + + let client_addr = session.client_addr(); + let server_addr = session.server_addr(); + response.insert_header( + "x-client-addr", + client_addr.map_or_else(|| "unset".into(), |a| a.to_string()), + )?; + response.insert_header( + "x-server-addr", + server_addr.map_or_else(|| "unset".into(), |a| a.to_string()), + )?; + + response.insert_header( + "x-upstream-client-addr", + ctx.upstream_client_addr + .as_ref() + .map_or_else(|| "unset".into(), |a| a.to_string()), + )?; + response.insert_header( + "x-upstream-server-addr", + ctx.upstream_server_addr + .as_ref() + .map_or_else(|| "unset".into(), |a| a.to_string()), + )?; + + Ok(()) } #[async_trait] impl ProxyHttp for ExampleProxyHttps { type CTX = CTX; fn new_ctx(&self) -> Self::CTX { - CTX { conn_reused: false } + CTX::default() } async fn upstream_peer( @@ -101,17 +158,14 @@ impl ProxyHttp for ExampleProxyHttps { async fn response_filter( &self, - _session: &mut Session, + session: &mut Session, upstream_response: &mut ResponseHeader, ctx: &mut Self::CTX, ) -> Result<()> where Self::CTX: Send + Sync, { - if ctx.conn_reused { - upstream_response.insert_header("x-conn-reuse", "1")?; - } - Ok(()) + response_filter_common(session, upstream_response, ctx) } async fn upstream_request_filter( @@ -119,10 +173,7 @@ impl ProxyHttp for ExampleProxyHttps { session: &mut Session, req: &mut RequestHeader, _ctx: &mut Self::CTX, - ) -> Result<()> - where - Self::CTX: Send + Sync, - { + ) -> Result<()> { let host = session.get_header_bytes("host-override"); if host != b"" { req.insert_header("host", host)?; @@ -136,11 +187,10 @@ impl ProxyHttp for ExampleProxyHttps { reused: bool, _peer: &HttpPeer, _fd: std::os::unix::io::RawFd, - _digest: Option<&Digest>, + digest: Option<&Digest>, ctx: &mut CTX, ) -> Result<()> { - ctx.conn_reused = reused; - Ok(()) + connected_to_upstream_common(reused, digest, ctx) } } @@ -148,8 +198,10 @@ pub struct ExampleProxyHttp {} #[async_trait] impl ProxyHttp for ExampleProxyHttp { - type CTX = (); - fn new_ctx(&self) -> Self::CTX {} + type CTX = CTX; + fn new_ctx(&self) -> Self::CTX { + CTX::default() + } async fn request_filter(&self, session: &mut Session, _ctx: &mut Self::CTX) -> Result { let req = session.req_header(); @@ -164,6 +216,15 @@ impl ProxyHttp for ExampleProxyHttp { Ok(false) } + async fn response_filter( + &self, + session: &mut Session, + upstream_response: &mut ResponseHeader, + ctx: &mut Self::CTX, + ) -> Result<()> { + response_filter_common(session, upstream_response, ctx) + } + async fn upstream_peer( &self, session: &mut Session, @@ -182,12 +243,24 @@ impl ProxyHttp for ExampleProxyHttp { .get("x-port") .map_or("8000", |v| v.to_str().unwrap()); let peer = Box::new(HttpPeer::new( - format!("127.0.0.1:{}", port), + format!("127.0.0.1:{port}"), false, "".to_string(), )); Ok(peer) } + + async fn connected_to_upstream( + &self, + _http_session: &mut Session, + reused: bool, + _peer: &HttpPeer, + _fd: std::os::unix::io::RawFd, + digest: Option<&Digest>, + ctx: &mut CTX, + ) -> Result<()> { + connected_to_upstream_common(reused, digest, ctx) + } } static CACHE_BACKEND: Lazy = Lazy::new(MemCache::new); From 461ddaebd0a8fdf5e2822ea7c87b3e78a33b4417 Mon Sep 17 00:00:00 2001 From: Shane Utt Date: Wed, 28 Feb 2024 15:39:59 +0000 Subject: [PATCH 10/11] chore: resolve TODOs in the pingora client example Signed-off-by: Shane Utt Replicated-from: https://github.com/cloudflare/pingora/pull/9 Includes-commit: 5833556e5f8d9ccb346ef586cb1b5f51add64357 --- .bleep | 2 +- pingora/Cargo.toml | 1 + pingora/examples/client.rs | 68 ++++++++++++++++++++++++++++---------- 3 files changed, 52 insertions(+), 19 deletions(-) diff --git a/.bleep b/.bleep index 1ac1f3c8..86a87b97 100644 --- a/.bleep +++ b/.bleep @@ -1 +1 @@ -c16c9e8bfd9334b77a6a7c1123954f41037c06c3 \ No newline at end of file +c3ec53f03ed27ce7ce90ec76e9ae67bab2097fd3 \ No newline at end of file diff --git a/pingora/Cargo.toml b/pingora/Cargo.toml index c2691992..9d6aea32 100644 --- a/pingora/Cargo.toml +++ b/pingora/Cargo.toml @@ -40,6 +40,7 @@ log = { workspace = true } prometheus = "0.13" once_cell = { workspace = true } bytes = { workspace = true } +regex = "1" [features] default = ["openssl"] diff --git a/pingora/examples/client.rs b/pingora/examples/client.rs index c235dd19..9462aa2f 100644 --- a/pingora/examples/client.rs +++ b/pingora/examples/client.rs @@ -12,29 +12,61 @@ // See the License for the specific language governing permissions and // limitations under the License. -use pingora::connectors::http::Connector; -use pingora::upstreams::peer::HttpPeer; +use pingora::{connectors::http::Connector, prelude::*}; use pingora_http::RequestHeader; +use regex::Regex; #[tokio::main] -async fn main() { +async fn main() -> Result<()> { let connector = Connector::new(None); - let mut peer = HttpPeer::new("1.1.1.1:443", true, "one.one.one.one".into()); + // create the HTTP session + let peer_addr = "1.1.1.1:443"; + let mut peer = HttpPeer::new(peer_addr, true, "one.one.one.one".into()); peer.options.set_http_version(2, 1); - let (mut http, _reused) = connector.get_http_session(&peer).await.unwrap(); - - let mut new_request = RequestHeader::build("GET", b"/", None).unwrap(); - new_request - .insert_header("Host", "one.one.one.one") - .unwrap(); - http.write_request_header(Box::new(new_request)) - .await - .unwrap(); + let (mut http, _reused) = connector.get_http_session(&peer).await?; + + // perform a GET request + let mut new_request = RequestHeader::build("GET", b"/", None)?; + new_request.insert_header("Host", "one.one.one.one")?; + http.write_request_header(Box::new(new_request)).await?; + // Servers usually don't respond until the full request body is read. - http.finish_request_body().await.unwrap(); - http.read_response_header().await.unwrap(); - println!("{:#?}", http.response_header().unwrap()); - // TODO: continue reading the body - // TODO: return the connection back to the `connector` (or discard it) + http.finish_request_body().await?; + http.read_response_header().await?; + + // display the headers from the response + if let Some(header) = http.response_header() { + println!("{header:#?}"); + } else { + return Error::e_explain(ErrorType::InvalidHTTPHeader, "No response header"); + }; + + // collect the response body + let mut response_body = String::new(); + while let Some(chunk) = http.read_response_body().await? { + response_body.push_str(&String::from_utf8_lossy(&chunk)); + } + + // verify that the response body is valid HTML by displaying the page + let re = Regex::new(r"<title>(.*?)") + .or_err(ErrorType::InternalError, "Failed to compile regex")?; + if let Some(title) = re + .captures(&response_body) + .and_then(|caps| caps.get(1).map(|match_| match_.as_str())) + { + println!("Page Title: {title}"); + } else { + return Error::e_explain( + ErrorType::new("InvalidHTML"), + "No found in response body", + ); + } + + // gracefully release the connection + connector + .release_http_session(http, &peer, Some(std::time::Duration::from_secs(5))) + .await; + + Ok(()) } From 7a65192998ac975b54402acebd8c493d7480a03a Mon Sep 17 00:00:00 2001 From: battmdpkq <cmaker@163.com> Date: Fri, 8 Mar 2024 02:52:27 +0000 Subject: [PATCH 11/11] fix some comments Signed-off-by: battmdpkq <cmaker@163.com> Replicated-from: https://github.com/cloudflare/pingora/pull/123 Includes-commit: 943bedb2811dc9a2b9b31c2f3576aa7cd2cb8f90 --- .bleep | 2 +- pingora-cache/src/memory.rs | 2 +- pingora-load-balancing/src/health_check.rs | 2 +- pingora-proxy/src/proxy_cache.rs | 2 +- pingora-proxy/src/proxy_trait.rs | 4 ++-- 5 files changed, 6 insertions(+), 6 deletions(-) diff --git a/.bleep b/.bleep index 86a87b97..becb0a18 100644 --- a/.bleep +++ b/.bleep @@ -1 +1 @@ -c3ec53f03ed27ce7ce90ec76e9ae67bab2097fd3 \ No newline at end of file +deb3c5409e938ec9c7d0da9b7a2d331eabbb2cd5 \ No newline at end of file diff --git a/pingora-cache/src/memory.rs b/pingora-cache/src/memory.rs index 6d0a5197..6a665775 100644 --- a/pingora-cache/src/memory.rs +++ b/pingora-cache/src/memory.rs @@ -211,7 +211,7 @@ impl HandleHit for MemHitHandler { pub struct MemMissHandler { body: Arc<RwLock<Vec<u8>>>, bytes_written: Arc<watch::Sender<PartialState>>, - // these are used only in finish() to to data from temp to cache + // these are used only in finish() to data from temp to cache key: String, cache: Arc<RwLock<HashMap<String, CacheObject>>>, temp: Arc<RwLock<HashMap<String, TempObject>>>, diff --git a/pingora-load-balancing/src/health_check.rs b/pingora-load-balancing/src/health_check.rs index d40221d6..11d22468 100644 --- a/pingora-load-balancing/src/health_check.rs +++ b/pingora-load-balancing/src/health_check.rs @@ -34,7 +34,7 @@ pub trait HealthCheck { /// This function defines how many *consecutive* checks should flip the health of a backend. /// /// For example: with `success``: `true`: this function should return the - /// number of check need to to flip from unhealthy to healthy. + /// number of check need to flip from unhealthy to healthy. fn health_threshold(&self, success: bool) -> usize; } diff --git a/pingora-proxy/src/proxy_cache.rs b/pingora-proxy/src/proxy_cache.rs index 4546b589..f40e0835 100644 --- a/pingora-proxy/src/proxy_cache.rs +++ b/pingora-proxy/src/proxy_cache.rs @@ -85,7 +85,7 @@ impl<SV> HttpProxy<SV> { if let Some((mut meta, handler)) = res { // vary logic // because this branch can be called multiple times in a loop, and we only - // need to to update the vary once, check if variance is already set to + // need to update the vary once, check if variance is already set to // prevent unnecessary vary lookups let cache_key = session.cache.cache_key(); if let Some(variance) = cache_key.variance_bin() { diff --git a/pingora-proxy/src/proxy_trait.rs b/pingora-proxy/src/proxy_trait.rs index 042dbd44..8293a24e 100644 --- a/pingora-proxy/src/proxy_trait.rs +++ b/pingora-proxy/src/proxy_trait.rs @@ -18,7 +18,7 @@ use pingora_cache::{key::HashBinary, CacheKey, CacheMeta, RespCacheable, RespCac /// The interface to control the HTTP proxy /// /// The methods in [ProxyHttp] are filters/callbacks which will be performed on all requests at their -/// paticular stage (if applicable). +/// particular stage (if applicable). /// /// If any of the filters returns [Result::Err], the request will fail and the error will be logged. #[cfg_attr(not(doc_async_trait), async_trait)] @@ -264,7 +264,7 @@ pub trait ProxyHttp { /// /// If the error can be retried, [Self::upstream_peer()] will be called again so that the user /// can decide whether to send the request to the same upstream or another upstream that is possibly - /// avaliable. + /// available. fn fail_to_connect( &self, _session: &mut Session,