Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Synchronize internally merged commits (2024-03-15) #140

Merged
merged 11 commits into from
Mar 15, 2024
2 changes: 1 addition & 1 deletion .bleep
Original file line number Diff line number Diff line change
@@ -1 +1 @@
20f290d8adc57ed489bcdd1b950833d0eb6bd8ed
deb3c5409e938ec9c7d0da9b7a2d331eabbb2cd5
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ dhat-heap.json
.vscode
.idea
.cover
bleeper.user.toml
2 changes: 1 addition & 1 deletion pingora-boringssl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
//! The BoringSSL API compatibility layer.
//!
//! This crate aims at making [boring] APIs exchangeable with [openssl-rs](https://docs.rs/openssl/latest/openssl/).
//! In other words, this crate and `pingora-openssl` expose identical rust APIs.
//! In other words, this crate and [`pingora-openssl`](https://docs.rs/pingora-openssl) expose identical rust APIs.

#![warn(clippy::all)]

Expand Down
4 changes: 1 addition & 3 deletions pingora-cache/src/cache_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ use http::header::HeaderName;
use http::HeaderValue;
use indexmap::IndexMap;
use once_cell::sync::Lazy;
use pingora_error::{Error, ErrorType, Result};
use pingora_http::ResponseHeader;
use pingora_error::{Error, ErrorType};
use regex::bytes::Regex;
use std::num::IntErrorKind;
use std::slice;
Expand Down Expand Up @@ -434,7 +433,6 @@ pub trait InterpretCacheControl {
mod tests {
use super::*;
use http::header::CACHE_CONTROL;
use http::HeaderValue;
use http::{request, response};

fn build_response(cc_key: HeaderName, cc_value: &str) -> response::Parts {
Expand Down
1 change: 0 additions & 1 deletion pingora-cache/src/eviction/lru.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,6 @@ impl<const N: usize> EvictionManager for Manager<N> {
mod test {
use super::*;
use crate::CacheKey;
use EvictionManager;

// we use shard (N) = 1 for eviction consistency in all tests

Expand Down
5 changes: 3 additions & 2 deletions pingora-cache/src/filters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@

use super::*;
use crate::cache_control::{CacheControl, Cacheable, InterpretCacheControl};
use crate::{RespCacheable, RespCacheable::*};
use crate::RespCacheable::*;

use http::{header, HeaderValue};
use httpdate::HttpDate;
use log::warn;
use pingora_http::{RequestHeader, ResponseHeader};
use pingora_http::RequestHeader;

/// Decide if the request can be cacheable
pub fn request_cacheable(req_header: &ReqHeader) -> bool {
Expand Down Expand Up @@ -206,6 +206,7 @@ pub mod upstream {
#[cfg(test)]
mod tests {
use super::*;
use crate::RespCacheable::Cacheable;
use http::header::{HeaderName, CACHE_CONTROL, EXPIRES, SET_COOKIE};
use http::StatusCode;
use httpdate::fmt_http_date;
Expand Down
6 changes: 3 additions & 3 deletions pingora-cache/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
//TODO: Mark this module #[test] only

use super::*;
use crate::key::{CacheHashKey, CompactCacheKey};
use crate::storage::{HandleHit, HandleMiss, Storage};
use crate::key::CompactCacheKey;
use crate::storage::{HandleHit, HandleMiss};
use crate::trace::SpanHandle;

use async_trait::async_trait;
Expand Down Expand Up @@ -211,7 +211,7 @@ impl HandleHit for MemHitHandler {
pub struct MemMissHandler {
body: Arc<RwLock<Vec<u8>>>,
bytes_written: Arc<watch::Sender<PartialState>>,
// these are used only in finish() to to data from temp to cache
// these are used only in finish() to data from temp to cache
key: String,
cache: Arc<RwLock<HashMap<String, CacheObject>>>,
temp: Arc<RwLock<HashMap<String, TempObject>>>,
Expand Down
10 changes: 6 additions & 4 deletions pingora-cache/src/put.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,14 +264,12 @@ mod test {

mod parse_response {
use super::*;
use bytes::{Bytes, BytesMut};
use bytes::BytesMut;
use httparse::Status;
use pingora_error::{
Error,
ErrorType::{self, *},
Result,
};
use pingora_http::ResponseHeader;

pub const INVALID_CHUNK: ErrorType = ErrorType::new("InvalidChunk");
pub const INCOMPLETE_BODY: ErrorType = ErrorType::new("IncompleteHttpBody");
Expand All @@ -280,7 +278,7 @@ mod parse_response {
const INIT_HEADER_BUF_SIZE: usize = 4096;
const CHUNK_DELIMITER_SIZE: usize = 2; // \r\n

#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone, Copy, PartialEq)]
enum ParseState {
Init,
PartialHeader,
Expand Down Expand Up @@ -561,6 +559,10 @@ mod parse_response {
let output = parser.inject_data(input);
// header is not complete
assert!(output.is_err());
match parser.state {
ParseState::Invalid(httparse::Error::Version) => {}
_ => panic!("should have failed to parse"),
}
}

#[test]
Expand Down
14 changes: 13 additions & 1 deletion pingora-core/src/apps/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::sync::Arc;

use crate::protocols::http::v2::server;
use crate::protocols::http::ServerSession;
use crate::protocols::Digest;
use crate::protocols::Stream;
use crate::protocols::ALPN;

Expand Down Expand Up @@ -91,6 +92,15 @@ where
) -> Option<Stream> {
match stream.selected_alpn_proto() {
Some(ALPN::H2) => {
// create a shared connection digest
let digest = Arc::new(Digest {
ssl_digest: stream.get_ssl_digest(),
// TODO: log h2 handshake time
timing_digest: stream.get_timing_digest(),
proxy_digest: stream.get_proxy_digest(),
socket_digest: stream.get_socket_digest(),
});

let h2_options = self.h2_options();
let h2_conn = server::handshake(stream, h2_options).await;
let mut h2_conn = match h2_conn {
Expand All @@ -100,10 +110,12 @@ where
}
Ok(c) => c,
};

loop {
// this loop ends when the client decides to close the h2 conn
// TODO: add a timeout?
let h2_stream = server::HttpSession::from_h2_conn(&mut h2_conn).await;
let h2_stream =
server::HttpSession::from_h2_conn(&mut h2_conn, digest.clone()).await;
let h2_stream = match h2_stream {
Err(e) => {
// It is common for client to just disconnect TCP without properly
Expand Down
2 changes: 1 addition & 1 deletion pingora-core/src/apps/prometheus_http_app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
//! An HTTP application that reports Prometheus metrics.

use async_trait::async_trait;
use http::{self, Response};
use http::Response;
use prometheus::{Encoder, TextEncoder};

use super::http_app::HttpServer;
Expand Down
5 changes: 5 additions & 0 deletions pingora-core/src/connectors/http/v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ impl Connector {
#[cfg(test)]
mod tests {
use super::*;
use crate::protocols::l4::socket::SocketAddr;
use crate::upstreams::peer::HttpPeer;
use pingora_http::RequestHeader;

Expand All @@ -85,6 +86,8 @@ mod tests {
let peer = HttpPeer::new(("1.1.1.1", 80), false, "".into());
// make a new connection to 1.1.1.1
let (http, reused) = connector.get_http_session(&peer).await.unwrap();
let server_addr = http.server_addr().unwrap();
assert_eq!(*server_addr, "1.1.1.1:80".parse::<SocketAddr>().unwrap());
assert!(!reused);

// this http is not even used, so not be able to reuse
Expand All @@ -104,6 +107,8 @@ mod tests {
let peer = HttpPeer::new(("1.1.1.1", 443), true, "one.one.one.one".into());
// make a new connection to https://1.1.1.1
let (http, reused) = connector.get_http_session(&peer).await.unwrap();
let server_addr = http.server_addr().unwrap();
assert_eq!(*server_addr, "1.1.1.1:443".parse::<SocketAddr>().unwrap());
assert!(!reused);

// this http is not even used, so not be able to reuse
Expand Down
1 change: 1 addition & 0 deletions pingora-core/src/connectors/http/v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,7 @@ async fn handshake(
// TODO: log h2 handshake time
timing_digest: stream.get_timing_digest(),
proxy_digest: stream.get_proxy_digest(),
socket_digest: stream.get_socket_digest(),
};
// TODO: make these configurable
let (send_req, connection) = Builder::new()
Expand Down
13 changes: 12 additions & 1 deletion pingora-core/src/connectors/l4.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ use log::debug;
use pingora_error::{Context, Error, ErrorType::*, OrErr, Result};
use rand::seq::SliceRandom;
use std::net::SocketAddr as InetSocketAddr;
use std::os::unix::io::AsRawFd;

use crate::protocols::l4::ext::{connect as tcp_connect, connect_uds, set_tcp_keepalive};
use crate::protocols::l4::socket::SocketAddr;
use crate::protocols::l4::stream::Stream;
use crate::protocols::{GetSocketDigest, SocketDigest};
use crate::upstreams::peer::Peer;

/// Establish a connection (l4) to the given peer using its settings and an optional bind address.
Expand All @@ -32,7 +34,8 @@ where
.await
.err_context(|| format!("Fail to establish CONNECT proxy: {}", peer));
}
let mut stream: Stream = match peer.address() {
let peer_addr = peer.address();
let mut stream: Stream = match peer_addr {
SocketAddr::Inet(addr) => {
let connect_future = tcp_connect(addr, bind_to.as_ref());
let conn_res = match peer.connection_timeout() {
Expand Down Expand Up @@ -97,6 +100,14 @@ where
}

stream.set_nodelay()?;

let digest = SocketDigest::from_raw_fd(stream.as_raw_fd());
digest
.peer_addr
.set(Some(peer_addr.clone()))
.expect("newly created OnceCell must be empty");
stream.set_socket_digest(digest);

Ok(stream)
}

Expand Down
4 changes: 2 additions & 2 deletions pingora-core/src/connectors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,9 +360,9 @@ fn test_reusable_stream(stream: &mut Stream) -> bool {
#[cfg(test)]
mod tests {
use pingora_error::ErrorType;
use pingora_openssl::ssl::SslMethod;

use super::*;
use crate::tls::ssl::SslMethod;
use crate::upstreams::peer::BasicPeer;

// 192.0.2.1 is effectively a black hole
Expand Down Expand Up @@ -427,7 +427,7 @@ mod tests {

let stream = connector.new_stream(&peer).await;
let error = stream.unwrap_err();
// XXX: some system will allow the socket to bind and connect without error, only to timeout
// XXX: some systems will allow the socket to bind and connect without error, only to timeout
assert!(error.etype() == &ConnectError || error.etype() == &ConnectTimedout)
}

Expand Down
2 changes: 1 addition & 1 deletion pingora-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
//! # Usage
//! This crate provides low level service and protocol implementation and abstraction.
//!
//! If looking to build a (reverse) proxy, see `pingora-proxy` crate.
//! If looking to build a (reverse) proxy, see [`pingora-proxy`](https://docs.rs/pingora-proxy) crate.
//!
//! # Optional features
//! `boringssl`: Switch the internal TLS library from OpenSSL to BoringSSL.
Expand Down
45 changes: 44 additions & 1 deletion pingora-core/src/protocols/digest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,23 @@
use std::sync::Arc;
use std::time::SystemTime;

use once_cell::sync::OnceCell;

use super::l4::socket::SocketAddr;
use super::raw_connect::ProxyDigest;
use super::ssl::digest::SslDigest;

/// The information can be extracted from a connection
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Default)]
pub struct Digest {
/// Information regarding the TLS of this connection if any
pub ssl_digest: Option<Arc<SslDigest>>,
/// Timing information
pub timing_digest: Vec<Option<TimingDigest>>,
/// information regarding the CONNECT proxy this connection uses.
pub proxy_digest: Option<Arc<ProxyDigest>>,
/// Information about underlying socket/fd of this connection
pub socket_digest: Option<Arc<SocketDigest>>,
}

/// The interface to return protocol related information
Expand All @@ -53,6 +58,38 @@ impl Default for TimingDigest {
}
}

#[derive(Debug)]
/// The interface to return socket-related information
pub struct SocketDigest {
raw_fd: std::os::unix::io::RawFd,
/// Remote socket address
pub peer_addr: OnceCell<Option<SocketAddr>>,
/// Local socket address
pub local_addr: OnceCell<Option<SocketAddr>>,
}

impl SocketDigest {
pub fn from_raw_fd(raw_fd: std::os::unix::io::RawFd) -> SocketDigest {
SocketDigest {
raw_fd,
peer_addr: OnceCell::new(),
local_addr: OnceCell::new(),
}
}

pub fn peer_addr(&self) -> Option<&SocketAddr> {
self.peer_addr
.get_or_init(|| SocketAddr::from_raw_fd(self.raw_fd, true))
.as_ref()
}

pub fn local_addr(&self) -> Option<&SocketAddr> {
self.local_addr
.get_or_init(|| SocketAddr::from_raw_fd(self.raw_fd, false))
.as_ref()
}
}

/// The interface to return timing information
pub trait GetTimingDigest {
/// Return the timing for each layer from the lowest layer to upper
Expand All @@ -64,3 +101,9 @@ pub trait GetProxyDigest {
fn get_proxy_digest(&self) -> Option<Arc<ProxyDigest>>;
fn set_proxy_digest(&mut self, _digest: ProxyDigest) {}
}

/// The interface to set or return socket information
pub trait GetSocketDigest {
fn get_socket_digest(&self) -> Option<Arc<SocketDigest>>;
fn set_socket_digest(&mut self, _socket_digest: SocketDigest) {}
}
20 changes: 18 additions & 2 deletions pingora-core/src/protocols/http/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::time::Duration;

use super::v1::client::HttpSession as Http1Session;
use super::v2::client::Http2Session;
use crate::protocols::Digest;
use crate::protocols::{Digest, SocketAddr};

/// A type for Http client session. It can be either an Http1 connection or an Http2 stream.
pub enum HttpSession {
Expand Down Expand Up @@ -151,11 +151,27 @@ impl HttpSession {
/// Return the [Digest] of the connection
///
/// For reused connection, the timing in the digest will reflect its initial handshakes
/// The caller should check if the connection is reused to avoid misuse the timing field
/// The caller should check if the connection is reused to avoid misuse of the timing field
pub fn digest(&self) -> Option<&Digest> {
match self {
Self::H1(s) => Some(s.digest()),
Self::H2(s) => s.digest(),
}
}

/// Return the server (peer) address of the connection.
pub fn server_addr(&self) -> Option<&SocketAddr> {
match self {
Self::H1(s) => s.server_addr(),
Self::H2(s) => s.server_addr(),
}
}

/// Return the client (local) address of the connection.
pub fn client_addr(&self) -> Option<&SocketAddr> {
match self {
Self::H1(s) => s.client_addr(),
Self::H2(s) => s.client_addr(),
}
}
}
Loading
Loading