Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade hyper to 1.x #3

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
[workspace]
resolver = "2"
members = [
"s3",
"aws-region",
Expand All @@ -10,7 +11,7 @@ license = "MIT"

[workspace.dependencies]
thiserror = "1"
env_logger = "0.10"
env_logger = "0.11"

[profile.release]
lto = "fat"
lto = "fat"
23 changes: 13 additions & 10 deletions deny.toml
Original file line number Diff line number Diff line change
@@ -1,29 +1,32 @@
all-features = false
no-default-features = false
[graph]
all-features = true

[advisories]
version = 2
db-urls = ["https://github.com/rustsec/advisory-db"]
vulnerability = "deny"
unmaintained = "warn"
yanked = "warn"
notice = "warn"
ignore = [
# "RUSTSEC-0000-0000"
]

[licenses]
unlicensed = "deny"
version = 2
allow = [
"MIT",
"ISC",
"OpenSSL",
"Apache-2.0",
"BSD-3-Clause",
"Unicode-DFS-2016", # used by unicode-ident
"CC0-1.0",
"MPL-2.0",
]
copyleft = "deny"
allow-osi-fsf-free = "neither"
default = "deny"

[[licenses.clarify]]
name = "ring"
expression = "MIT AND ISC AND OpenSSL"
license-files = [
{ path = "LICENSE", hash = 0xbd0eed23 }
]

[bans]
skip = [
Expand Down
22 changes: 11 additions & 11 deletions s3/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "zitane-s3-async"
version = "0.37.0"
version = "0.38.0"
authors = ["Drazen Urch", "Aalekh Patel <[email protected]>", "Marco Quinten <[email protected]>"]
description = "Rust library for working with AWS S3 and compatible object storage APIs"
repository = "https://github.com/ZitaneLabs/rust-s3-async"
Expand Down Expand Up @@ -33,25 +33,25 @@ path = "../examples/gcs-tokio.rs"
async-trait = "0.1"
zitane-aws-creds = { version = "0.36", default-features = false }
zitane-aws-region = "0.26"
base64 = "0.21"
base64 = "0.22"
cfg-if = "1"
time = { version = "^0.3.6", features = ["formatting", "macros"] }
futures = "^0.3"
time = { version = "0.3", features = ["formatting", "macros"] }
futures = "0.3"
hex = "0.4"
hmac = "0.12"
http = "0.2"
hyper = { version = "^0.14", default-features = false, features = [
http = "1.1"
hyper = { version = "1", default-features = false, features = [
"client",
"http1",
"http2",
"stream",
] }
hyper-tls = { version = "0.5.0", default-features = false }
tracing = { version="0.1" }
hyper-util = { version = "0.1", features = ["full"] }
http-body-util = "0.1"
tracing = { version = "0.1" }
md5 = "0.7"
percent-encoding = "2"
serde = { version = "1", features = ["derive"]}
quick-xml = { version = "0.29", features = ["serialize"] }
quick-xml = { version = "0.31", features = ["serialize"] }
sha2 = "0.10"
thiserror.workspace = true
tokio = { version = "1", features = [
Expand All @@ -62,7 +62,7 @@ tokio-native-tls = { version = "0.3" }
tokio-stream = { version = "0.1" }
url = "2"
bytes = { version = "1" }
strum_macros = "0.25"
strum_macros = "0.26"

[features]
default = ["zitane-aws-creds/native-tls"]
Expand Down
2 changes: 1 addition & 1 deletion s3/src/bucket/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ impl Bucket {
/// let mut async_output_file = tokio::fs::File::create("async_output_file").await.expect("Unable to create file");
///
/// while let Some(chunk) = response_data_stream.bytes().next().await {
/// async_output_file.write_all(&chunk.unwrap()).await?;
/// // async_output_file.write_all(&chunk.unwrap()).await?;
/// }
///
/// #
Expand Down
2 changes: 2 additions & 0 deletions s3/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ pub enum S3Error {
HmacInvalidLength(#[from] sha2::digest::InvalidLength),
#[error("url parse: {0}")]
UrlParse(#[from] url::ParseError),
#[error("uri parse: {0}")]
UriParse(#[from] http::uri::InvalidUri),
#[error("io: {0}")]
Io(#[from] std::io::Error),
#[error("http: {0}")]
Expand Down
16 changes: 9 additions & 7 deletions s3/src/request/request_trait.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use base64::engine::general_purpose;
use base64::Engine;
use hmac::Mac;
use http::Uri;
use std::collections::HashMap;
use time::format_description::well_known::Rfc2822;
use time::OffsetDateTime;
Expand Down Expand Up @@ -87,7 +88,7 @@ impl fmt::Display for ResponseData {
use std::pin::Pin;

pub type DataStream = Pin<Box<dyn futures::Stream<Item = StreamItem> + Send>>;
pub type StreamItem = Result<bytes::Bytes, crate::error::S3Error>;
pub type StreamItem = Result<hyper::body::Frame<bytes::Bytes>, hyper::Error>;

pub struct ResponseDataStream {
pub bytes: DataStream,
Expand Down Expand Up @@ -232,13 +233,14 @@ pub trait Request {
expiry: u32,
custom_headers: Option<&HeaderMap>,
custom_queries: Option<&HashMap<String, String>>,
) -> Result<Url, S3Error> {
) -> Result<Uri, S3Error> {
let bucket = self.bucket();
let token = if let Some(security_token) = bucket.security_token()? {
Some(security_token)
} else {
bucket.session_token()?
};

let url = Url::parse(&format!(
"{}{}{}",
self.url()?,
Expand All @@ -253,18 +255,18 @@ pub trait Request {
&signing::flatten_queries(custom_queries)?,
))?;

Ok(url)
Ok(url.as_str().parse::<Uri>()?)
}

fn url(&self) -> Result<Url, S3Error> {
fn url(&self) -> Result<Uri, S3Error> {
let mut url_str = self.bucket().url();

if let Command::ListBuckets { .. } = self.command() {
return Ok(Url::parse(&url_str)?);
return Ok(url_str.parse()?);
}

if let Command::CreateBucket { .. } = self.command() {
return Ok(Url::parse(&url_str)?);
return Ok(url_str.parse()?);
}

let path = if self.path().starts_with('/') {
Expand Down Expand Up @@ -373,7 +375,7 @@ pub trait Request {
_ => {}
}

Ok(url)
Ok(url.as_str().parse::<Uri>()?)
}

fn canonical_request(&self, headers: &HeaderMap) -> Result<String, S3Error> {
Expand Down
83 changes: 53 additions & 30 deletions s3/src/request/tokio_backend.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
extern crate base64;
extern crate md5;

use bytes::Buf;
use bytes::Bytes;
use futures::TryStreamExt;
use hyper::{Body, Client};
use hyper_tls::HttpsConnector;
use http_body_util::BodyExt;
use http_body_util::BodyStream;
use http_body_util::Full;
use hyper_util::rt::TokioIo;
use std::collections::HashMap;
use std::io::Read;
use time::OffsetDateTime;
use tokio::{io::AsyncWriteExt as _, net::TcpStream};

use super::request_trait::{Request, ResponseData};
use super::request_trait::ResponseData;
use crate::bucket::Bucket;
use crate::command::Command;
use crate::command::HttpMethod;
use crate::error::S3Error;

use tokio_stream::StreamExt;

pub use crate::request::tokio_backend::HyperRequest as RequestImpl;
pub use tokio::io::AsyncRead;
pub use tokio::io::{AsyncWrite, AsyncWriteExt};
pub use tokio_stream::Stream;

use tracing::{event, span, Level};
Expand All @@ -34,18 +34,31 @@ pub struct HyperRequest<'a> {
}

#[async_trait::async_trait]
impl<'a> Request for HyperRequest<'a> {
type Response = http::Response<Body>;
impl<'a> crate::request::Request for HyperRequest<'a> {
type Response = http::Response<hyper::body::Incoming>;
type HeaderMap = http::header::HeaderMap;

async fn response(&self) -> Result<http::Response<Body>, S3Error> {
// Build headers
async fn response(&self) -> Result<http::Response<hyper::body::Incoming>, S3Error> {
let headers = match self.headers() {
Ok(headers) => headers,
Err(e) => return Err(e),
};
let https_connector = HttpsConnector::new();
let client = Client::builder().build::<_, hyper::Body>(https_connector);

let url = self.url()?;
let host = url.host().unwrap();
let port = url.port_u16().unwrap_or(80);
let address = format!("{}:{}", host, port);
let stream = TcpStream::connect(&address).await?;

let io = TokioIo::new(stream);
let (mut sender, connection) = hyper::client::conn::http1::handshake(io).await?;

// Poll the connection
tokio::task::spawn(async move {
if let Err(err) = connection.await {
println!("Connection failed: {:?}", err);
}
});

let method = match self.command.http_verb() {
HttpMethod::Delete => http::Method::DELETE,
Expand All @@ -56,16 +69,21 @@ impl<'a> Request for HyperRequest<'a> {
};

let request = {
let authority = url.authority().unwrap().clone();

let mut request = http::Request::builder()
.header(hyper::header::HOST, authority.as_str())
.method(method)
.uri(self.url()?.as_str());
.uri(url.path());

for (header, value) in headers.iter() {
request = request.header(header, value);
}

request.body(Body::from(self.request_body()))?
let bytes = Bytes::from(self.request_body());
request.body(Full::new(bytes))?
};

let span = span!(
Level::DEBUG,
"rust-s3-async",
Expand All @@ -80,14 +98,15 @@ impl<'a> Request for HyperRequest<'a> {
year = self.datetime.year()
);
let _enter = span.enter();
let response = client.request(request).await?;
let response = sender.send_request(request).await?;

event!(Level::DEBUG, status_code = response.status().as_u16(),);

if !response.status().is_success() {
let status = response.status().as_u16();
let text =
String::from_utf8(hyper::body::to_bytes(response.into_body()).await?.into())?;
let body = response.collect().await?.aggregate();
let mut text = String::default();
body.reader().read_to_string(&mut text)?;
return Err(S3Error::HttpFailWithBody(status, text));
}

Expand Down Expand Up @@ -117,7 +136,9 @@ impl<'a> Request for HyperRequest<'a> {
Bytes::from("")
}
} else {
hyper::body::to_bytes(response.into_body()).await?
let reader = response.collect().await?.aggregate().reader();
let bytes = reader.bytes().collect::<Result<Vec<_>, _>>().unwrap();
Bytes::from(bytes)
};
Ok(ResponseData::new(body_vec, status_code, response_headers))
}
Expand All @@ -129,10 +150,13 @@ impl<'a> Request for HyperRequest<'a> {
let response = self.response().await?;

let status_code = response.status();
let mut stream = response.into_body().into_stream();
let mut stream = response.into_body();

while let Some(item) = stream.next().await {
writer.write_all(&item?).await?;
while let Some(item) = stream.frame().await {
let item = item?;
if let Some(chunk) = item.data_ref() {
writer.write_all(chunk).await?;
}
}

Ok(status_code.as_u16())
Expand All @@ -141,10 +165,9 @@ impl<'a> Request for HyperRequest<'a> {
async fn response_data_to_stream(&self) -> Result<ResponseDataStream, S3Error> {
let response = self.response().await?;
let status_code = response.status();
let stream = response.into_body().into_stream().map_err(S3Error::Hyper);

let body_stream = BodyStream::new(response);
Ok(ResponseDataStream {
bytes: Box::pin(stream),
bytes: Box::pin(body_stream),
status_code: status_code.as_u16(),
})
}
Expand Down Expand Up @@ -213,7 +236,7 @@ mod tests {
let path = "/my-first/path";
let request = HyperRequest::new(&bucket, path, Command::GetObject).unwrap();

assert_eq!(request.url().unwrap().scheme(), "https");
assert_eq!(request.url().unwrap().scheme_str(), Some("https"));

let headers = request.headers().unwrap();
let host = headers.get(HOST).unwrap();
Expand All @@ -230,7 +253,7 @@ mod tests {
let path = "/my-first/path";
let request = HyperRequest::new(&bucket, path, Command::GetObject).unwrap();

assert_eq!(request.url().unwrap().scheme(), "https");
assert_eq!(request.url().unwrap().scheme_str(), Some("https"));

let headers = request.headers().unwrap();
let host = headers.get(HOST).unwrap();
Expand All @@ -245,7 +268,7 @@ mod tests {
let path = "/my-second/path";
let request = HyperRequest::new(&bucket, path, Command::GetObject).unwrap();

assert_eq!(request.url().unwrap().scheme(), "http");
assert_eq!(request.url().unwrap().scheme_str(), Some("http"));

let headers = request.headers().unwrap();
let host = headers.get(HOST).unwrap();
Expand All @@ -261,7 +284,7 @@ mod tests {
let path = "/my-second/path";
let request = HyperRequest::new(&bucket, path, Command::GetObject).unwrap();

assert_eq!(request.url().unwrap().scheme(), "http");
assert_eq!(request.url().unwrap().scheme_str(), Some("http"));

let headers = request.headers().unwrap();
let host = headers.get(HOST).unwrap();
Expand Down
Loading
Loading