Skip to content

Commit

Permalink
Upgrade to axum 0.7.2 and hyper 1.0.1 (#16)
Browse files Browse the repository at this point in the history
* Upgrade to axum 0.7.2 and hyper 1.0.1

* Update main to use new axum::serve(listener, router)

* Upgrade tests with use_dependencies flag on

* Update rust.yml

* Switch from hyper::Body to axum::body::Body

* A few fixes to mockito and let tokio version upgrade

* Upgrade mockito to use hyper 1.0

* Make tokio version explicit

* Upgrade tokio in tulsa lib
  • Loading branch information
tyleragreen authored Dec 13, 2023
1 parent f42b03d commit 3336dcd
Show file tree
Hide file tree
Showing 9 changed files with 79 additions and 104 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,7 @@ jobs:
run: cd tulsa && cargo test --verbose
- name: Build example-app
run: cd example-app && cargo build --verbose
- name: Test example-app
- name: Test example-app with custom libs
run: cd example-app && cargo test --verbose
- name: Test example-app with third-party libs
run: cd example-app && cargo test --features use_dependencies --verbose
8 changes: 5 additions & 3 deletions example-app/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ edition = "2021"

[dependencies]
tulsa = { path = "../tulsa" }
axum = "0.6.18"
hyper = { version = "0.14.27", features = ["client"] }
axum = "0.7.2"
hyper = { version = "1.0.1", features = ["client", "server"] }
prost = "0.12"
rand = "0.8.5"
reqwest = { version = "0.11.18", features = ["json", "blocking"] }
serde = { version = "1.0.171", features = ["derive"] }
serde_json = "1.0.103"
tokio = { version = "1.29.1", features = ["macros", "rt-multi-thread"] }
tokio = { version = "1.32.0", features = ["macros", "rt-multi-thread"] }
tower = "0.4.13"
ureq = "2.7.1"
petgraph = "0.6.3" # for prost_build
Expand All @@ -24,6 +24,8 @@ heck = "0.4.1" # for prost_build
regex = "1.9.3" # for prost_build
mime = { version = "0.3.17", optional = true }
mockito = { version = "1.1.0", optional = true }
hyper-util = { version = "0.1.1", features = ["full"] }
http-body-util = "0.1.0"

[build-dependencies]
prost-build = { version = "0.12", optional = true }
Expand Down
10 changes: 2 additions & 8 deletions example-app/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,11 @@ use gtfs_realtime_rust::scheduler::{build, Mode};
# using coroutine scheduling
let interface = build(Mode::Async);
axum::Server::bind(&address)
.serve(api::app(interface).into_make_service())
.await
.unwrap();
axum::serve(listener, router).await.unwrap();
# using thread scheduling
let interface = build(Mode::Sync);
axum::Server::bind(&address)
.serve(api::app(interface).into_make_service())
.await
.unwrap();
axum::serve(listener, router).await.unwrap();
```

## Running Locally
Expand Down
114 changes: 42 additions & 72 deletions example-app/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,13 +141,15 @@ mod api_tests {

use crate::fetcher::Feed;
use crate::scheduler_interface::{SchedulerInterface, TaskSender};
use tokio::net::TcpListener;
use tulsa::AsyncTask;

use super::*;
use axum::{
body::Body,
http::{self, Request, StatusCode},
};
use std::net::SocketAddr;
use std::sync::mpsc::SendError;
use std::sync::Mutex;
use tower::ServiceExt; // for `oneshot`
Expand Down Expand Up @@ -193,7 +195,7 @@ mod api_tests {
assert_eq!(response.status(), StatusCode::OK);
assert_eq!(sender.lock().unwrap().count(), 0);

let body = hyper::body::to_bytes(response.into_body()).await.unwrap();
let body = axum::body::to_bytes(response.into_body(), usize::MAX).await.unwrap();
assert_eq!(&body[..], b"{\"status\":\"OK\"}");
}

Expand All @@ -213,7 +215,7 @@ mod api_tests {

assert_eq!(response.status(), StatusCode::BAD_REQUEST);

let body = hyper::body::to_bytes(response.into_body()).await.unwrap();
let body = axum::body::to_bytes(response.into_body(), usize::MAX).await.unwrap();
assert_eq!(body.len(), 0);

let sender = Arc::new(Mutex::new(MockSender::new()));
Expand All @@ -230,7 +232,7 @@ mod api_tests {

assert_eq!(response.status(), StatusCode::BAD_REQUEST);

let body = hyper::body::to_bytes(response.into_body()).await.unwrap();
let body = axum::body::to_bytes(response.into_body(), usize::MAX).await.unwrap();
assert_eq!(body.len(), 0);
}

Expand Down Expand Up @@ -303,7 +305,7 @@ mod api_tests {
assert_eq!(response.status(), StatusCode::CREATED);
assert_eq!(sender.lock().unwrap().count(), 1);

let body = hyper::body::to_bytes(response.into_body()).await.unwrap();
let body = axum::body::to_bytes(response.into_body(), usize::MAX).await.unwrap();
let f: Feed = serde_json::from_slice(&body).unwrap();

assert_eq!(f.id, 1);
Expand All @@ -315,7 +317,6 @@ mod api_tests {

#[tokio::test]
async fn full_api_flow() {
let addr: &str = "0.0.0.0:3000";
let input = CreateFeed {
name: "Name".to_string(),
url: "http".to_string(),
Expand All @@ -331,127 +332,96 @@ mod api_tests {

let sender = Arc::new(Mutex::new(MockSender::new()));
let interface = Arc::new(SchedulerInterface::new(sender));
let address = SocketAddr::from(([0, 0, 0, 0], 3000));
tokio::spawn(async move {
axum::Server::bind(&addr.parse().unwrap())
.serve(app(interface).into_make_service())
.await
.unwrap();
let listener = TcpListener::bind(address).await.unwrap();
let router = app(interface).into_make_service();
axum::serve(listener, router).await.unwrap();
});

let client = hyper::Client::new();

let client = reqwest::Client::new();
let response = client
.request(
Request::builder()
.uri(format!("http://localhost:3000/feed"))
.body(hyper::Body::empty())
.unwrap(),
)
.get(format!("http://localhost:3000/feed"))
.send()
.await
.unwrap();

assert_eq!(response.status(), StatusCode::OK);
assert_eq!(response.status().as_u16(), StatusCode::OK.as_u16());

let body = hyper::body::to_bytes(response.into_body()).await.unwrap();
let body = response.bytes().await.unwrap();
let f: Vec<Feed> = serde_json::from_slice(&body).unwrap();
assert_eq!(f.len(), 0);
assert_eq!(&body[..], b"[]");

let response = client
.request(
Request::builder()
.method(http::Method::POST)
.uri("http://localhost:3000/feed")
.header(http::header::CONTENT_TYPE, mime::APPLICATION_JSON.as_ref())
.body(Body::from(input))
.unwrap(),
)
.post(format!("http://localhost:3000/feed"))
.header(http::header::CONTENT_TYPE.as_str(), mime::APPLICATION_JSON.as_ref())
.json(&serde_json::json!(input))
.send()
.await
.unwrap();

assert_eq!(response.status(), StatusCode::CREATED);
assert_eq!(response.status().as_u16(), StatusCode::CREATED.as_u16());

let response = client
.request(
Request::builder()
.uri(format!("http://localhost:3000/feed"))
.body(hyper::Body::empty())
.unwrap(),
)
.get(format!("http://localhost:3000/feed"))
.send()
.await
.unwrap();

assert_eq!(response.status(), StatusCode::OK);
assert_eq!(response.status().as_u16(), StatusCode::OK.as_u16());

let body = hyper::body::to_bytes(response.into_body()).await.unwrap();
let body = response.bytes().await.unwrap();
let f: Vec<Feed> = serde_json::from_slice(&body).unwrap();
assert_eq!(f.len(), 1);

let response = client
.request(
Request::builder()
.method(http::Method::PUT)
.uri("http://localhost:3000/feed/1")
.header(http::header::CONTENT_TYPE, mime::APPLICATION_JSON.as_ref())
.body(Body::from(input_new))
.unwrap(),
)
.put(format!("http://localhost:3000/feed/1"))
.header(http::header::CONTENT_TYPE.as_str(), mime::APPLICATION_JSON.as_ref())
.json(&serde_json::json!(input_new))
.send()
.await
.unwrap();

assert_eq!(response.status(), StatusCode::OK);
assert_eq!(response.status().as_u16(), StatusCode::OK.as_u16());

let response = client
.request(
Request::builder()
.method(http::Method::GET)
.uri("http://localhost:3000/feed/1")
.body(Body::empty())
.unwrap(),
)
.get(format!("http://localhost:3000/feed/1"))
.send()
.await
.unwrap();

assert_eq!(response.status(), StatusCode::OK);
assert_eq!(response.status().as_u16(), StatusCode::OK.as_u16());

let body = hyper::body::to_bytes(response.into_body()).await.unwrap();
let body = response.bytes().await.unwrap();
let f: Feed = serde_json::from_slice(&body).unwrap();

assert_eq!(f.id, 1);
assert_eq!(f.name, "Name");
assert_eq!(f.url, "http");
assert_eq!(f.frequency, 20);


let response = client
.request(
Request::builder()
.method(http::Method::DELETE)
.uri("http://localhost:3000/feed/1")
.header(http::header::CONTENT_TYPE, mime::APPLICATION_JSON.as_ref())
.body(Body::empty())
.unwrap(),
)
.delete(format!("http://localhost:3000/feed/1"))
.send()
.await
.unwrap();

assert_eq!(response.status(), StatusCode::NO_CONTENT);
assert_eq!(response.status().as_u16(), StatusCode::NO_CONTENT.as_u16());

let body = hyper::body::to_bytes(response.into_body()).await.unwrap();
let body = response.bytes().await.unwrap();
assert_eq!(body.len(), 0);

let response = client
.request(
Request::builder()
.uri(format!("http://localhost:3000/feed"))
.body(hyper::Body::empty())
.unwrap(),
)
.get(format!("http://localhost:3000/feed"))
.send()
.await
.unwrap();

assert_eq!(response.status(), StatusCode::OK);
assert_eq!(response.status().as_u16(), StatusCode::OK.as_u16());

let body = hyper::body::to_bytes(response.into_body()).await.unwrap();
let body = response.bytes().await.unwrap();
let f: Vec<Feed> = serde_json::from_slice(&body).unwrap();
assert_eq!(f.len(), 0);
assert_eq!(&body[..], b"[]");
Expand Down
5 changes: 3 additions & 2 deletions example-app/src/deps/mockito/mock.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use hyper::StatusCode;
use hyper::{Body, Request};
use hyper::Request;
use hyper::body::Incoming;
use rand;
use std::sync::{Arc, RwLock};

Expand Down Expand Up @@ -82,7 +83,7 @@ impl Mock {
}
}

pub fn matches(&self, request: &Request<Body>) -> bool {
pub fn matches(&self, request: &Request<Incoming>) -> bool {
let method = request.method().to_string();
let path = request.uri().path().to_string();

Expand Down
21 changes: 13 additions & 8 deletions example-app/src/deps/mockito/server.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use hyper::server::conn::Http;
use http_body_util::Full;
use hyper::body::Bytes;
use hyper::body::Incoming;
use hyper::server::conn::http1::Builder;
use hyper::service::service_fn;
use hyper::{Body, Request, Response as HyperResponse};
use hyper::{Request, Response as HyperResponse};
use hyper_util::rt::TokioIo;
use std::net::SocketAddr;
use std::sync::{Arc, RwLock};
use std::thread;
Expand Down Expand Up @@ -35,10 +39,11 @@ impl Server {
while let Ok((stream, _)) = listener.accept().await {
let state_c = state_b.clone();
spawn(async move {
let _ = Http::new()
let io = TokioIo::new(stream);
let _ = Builder::new()
.serve_connection(
stream,
service_fn(move |request: Request<Body>| {
io,
service_fn(move |request: Request<Incoming>| {
handle_request(request, state_c.clone())
}),
)
Expand All @@ -61,9 +66,9 @@ impl Server {
}

async fn handle_request(
request: Request<Body>,
request: Request<Incoming>,
state: Arc<RwLock<State>>,
) -> Result<HyperResponse<Body>, MockError> {
) -> Result<HyperResponse<Full<Bytes>>, MockError> {
let state_b = state.clone();
let mut state = state_b.write().unwrap();
let mut matching: Vec<&mut Mock> = vec![];
Expand All @@ -77,7 +82,7 @@ async fn handle_request(

if let Some(mock) = mock {
mock.inner.num_called += 1;
let response = HyperResponse::new(Body::from(mock.inner.response.body.clone()));
let response = HyperResponse::new(Full::new(Bytes::from(mock.inner.response.body.clone())));
Ok(response)
} else {
panic!("No matching mock found");
Expand Down
8 changes: 4 additions & 4 deletions example-app/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::net::SocketAddr;
use tokio::net::TcpListener;
use tokio::runtime::Builder;

use gtfs_realtime_rust::api;
Expand All @@ -22,9 +23,8 @@ fn main() {
.unwrap();

runtime.block_on(async {
axum::Server::bind(&address)
.serve(api::app(interface).into_make_service())
.await
.unwrap();
let listener = TcpListener::bind(address).await.unwrap();
let router = api::app(interface).into_make_service();
axum::serve(listener, router).await.unwrap();
});
}
11 changes: 6 additions & 5 deletions example-app/tests/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
mod tests {
use reqwest::blocking::Client;
use serde_json::json;
use tokio::net::TcpListener;
use std::net::SocketAddr;
use std::thread;
use std::time::Duration;
use tokio::runtime::Builder;
Expand All @@ -15,12 +17,11 @@ mod tests {
thread::spawn(move || {
let runtime = Builder::new_multi_thread().enable_io().build().unwrap();

let address: &str = "0.0.0.0:3000";
let address = SocketAddr::from(([0, 0, 0, 0], 3000));
runtime.block_on(async {
axum::Server::bind(&address.parse().unwrap())
.serve(api::app(interface).into_make_service())
.await
.unwrap();
let listener = TcpListener::bind(address).await.unwrap();
let router = api::app(interface).into_make_service();
axum::serve(listener, router).await.unwrap();
});
});

Expand Down
2 changes: 1 addition & 1 deletion tulsa/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ version = "0.1.0"
edition = "2021"

[dependencies]
tokio = { version = "1.29.1", features = ["macros", "time", "rt-multi-thread"] }
tokio = { version = "1.32.0", features = ["macros", "time", "rt-multi-thread"] }

0 comments on commit 3336dcd

Please sign in to comment.