Skip to content

Commit

Permalink
actor for processing requests
Browse files Browse the repository at this point in the history
  • Loading branch information
guscost committed Sep 10, 2018
1 parent 80d1e79 commit 1ff1d2f
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 221 deletions.
1 change: 1 addition & 0 deletions example/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ futures = "0.1.24"
serde = "1.0.76"
serde_derive = "1.0.76"
serde_json = "1.0.26"
bytes = "0.4.10"
chrono = "0.4.6"
url = "1.7.1"
uuid = "0.6.5"
2 changes: 1 addition & 1 deletion example/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ deploy: build/deploy build/cleanup
docker run -it --rm \
--network cosworth-example \
-e DATABASE_URL=postgres://root@cosworth-example-database/cosworth_example \
-p 8000:8080 cosworth-example/deploy:latest
-p 8080:8080 cosworth-example/deploy:latest

# Debug targets
debug/network:
Expand Down
83 changes: 0 additions & 83 deletions example/src/db.rs

This file was deleted.

31 changes: 31 additions & 0 deletions example/src/helpers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
use std::collections::HashMap;
use std::time::{SystemTime, UNIX_EPOCH};

use bytes::Bytes;


/// important parts of an HTTP request
pub struct RawRequest {
pub cookies: HashMap<String, String>,
pub headers: HashMap<String, String>,
pub body: Bytes,
}

/// important parts of an HTTP response
pub struct RawResponse {
pub status: u16,
pub cookies: HashMap<String, String>,
pub headers: HashMap<String, String>,
pub body: Bytes,
}


/// timestamp snowflake ID generator
/// TODO: use database instead of app server
pub fn get_millis() -> u64 {
let start = SystemTime::now();
let since_the_epoch = start.duration_since(UNIX_EPOCH)
.expect("Time went backwards");
return since_the_epoch.as_secs() * 1000 +
since_the_epoch.subsec_nanos() as u64 / 1_000_000 << 22;
}
203 changes: 93 additions & 110 deletions example/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
extern crate bytes;
extern crate actix;
extern crate actix_web;
extern crate env_logger;
Expand All @@ -16,149 +17,131 @@ extern crate cosworth;

// std
use std::env;
use std::collections::HashMap;

// diesel
use diesel::prelude::*;
use diesel::r2d2::ConnectionManager;
use diesel::r2d2::Pool;

// actix-web
use futures::{Future, Stream};
use futures::Future;
use actix::prelude::*;
use actix_web::{
http, middleware, pred, server, App, AsyncResponder, Error,
HttpMessage, HttpRequest, HttpResponse
http, middleware, pred, server, App, AsyncResponder, Error,
HttpMessage, HttpRequest, HttpResponse
};

// cosworth
use cosworth::response::json;

// example project module
mod db;
mod schema;
// example project modules
mod helpers;
mod models;
mod processor;
mod schema;

use db::{CreateTodo, DbExecutor};
use models::todo::{TodoJson};
use helpers::RawRequest;


/// state with connection pool(s)
struct AppState {
db_pool: Pool<ConnectionManager<PgConnection>>,
db_addr: Addr<DbExecutor>,
raw_db_pool: Pool<ConnectionManager<PgConnection>>,
processors: Addr<processor::Processor>,
}

/// async POST handler
fn create(req: &HttpRequest<AppState>) -> Box<Future<Item = HttpResponse, Error = Error>> {
let state = req.state();
return Box::new(req.body()
let req = req.clone();
return req.body()
.from_err()
.and_then(move |body| {
return req.state().processors
.send(processor::CreateTodo {request: RawRequest {
cookies: HashMap::new(),
headers: HashMap::new(),
body: body
}})
.from_err()
.and_then(|body| {
match serde_json::from_slice::<TodoJson>(&body) {
Ok(obj) => {
let response = Box::new(state.db_addr
.send(CreateTodo {
id: None,
name: obj.name,
done: None,
})
.from_err()
.and_then(|res| match res {
Ok(obj) => Ok(HttpResponse::Ok().json(TodoJson {
id: Some(obj.id as u64),
name: obj.name,
done: Some(obj.done)
})),
Err(_) => Ok(HttpResponse::InternalServerError().into()),
}));
return response;
}
Err(e) => {
let response = HttpResponse::BadRequest()
.header("Content-Type", "text/javascript")
.body(format!("{{\"error\": \"{}\"}}", e));
return Box::new(futures::future::err(req));
// let result = futures::future::from_err(response);
// return Box::new(result.and_then(|x| x).map_err());
// // e.and_then(|x| {

// // })
}
}
.and_then(|res| match res {
Ok(obj) => Ok(HttpResponse::Ok().body(obj.body)),
Err(_) => Ok(HttpResponse::InternalServerError().into()),
});
})
.responder();
}

// basic index handler
fn index(req: &HttpRequest<AppState>) -> Result<HttpResponse, Error> {
let query_id: String = req.match_info().query("id")?;
let query_name = req.match_info().query("name")?;

// get some data from the real database
use schema::todos::dsl::*;
use models::todo::*;
let connection = req.state().db_pool.get().expect("Error loading connection");
let db_results = todos.filter(done.eq(false))
.limit(5)
.load::<Todo>(&connection)
.expect("Error loading todos");

let mut results: Vec<TodoJson> = db_results.iter().map(|r| {
TodoJson { id: Some(r.id as u64), name: r.name.clone(), done: Some(r.done) }
}).collect();

// return possible responses
match query_id.parse::<u64>() {
Ok(n) => {
let todo = models::todo::TodoJson {
id: Some(n),
name: query_name,
done: Some(false)
};
results.push(todo);
return Ok(json(&req, results, http::StatusCode::OK)?);
},
Err(_e) => {
return Ok(req.build_response(http::StatusCode::BAD_REQUEST)
.content_type("text/plain")
.body(hello!()));
}
let query_id: String = req.match_info().query("id")?;
let query_name = req.match_info().query("name")?;

// get some data from the real database
use schema::todos::dsl::*;
use models::todo::*;
let connection = req.state().raw_db_pool.get().expect("Error loading connection");
let db_results = todos.filter(done.eq(false))
.limit(5)
.load::<Todo>(&connection)
.expect("Error loading todos");

let mut results: Vec<TodoJson> = db_results.iter().map(|r| {
TodoJson { id: Some(r.id as u64), name: r.name.clone(), done: Some(r.done) }
}).collect();

// return possible responses
match query_id.parse::<u64>() {
Ok(n) => {
let todo = models::todo::TodoJson {
id: Some(n),
name: query_name,
done: Some(false)
};
results.push(todo);
return Ok(json(&req, results, http::StatusCode::OK)?);
},
Err(_e) => {
return Ok(req.build_response(http::StatusCode::BAD_REQUEST)
.content_type("text/plain")
.body(hello!()));
}
}
}

// app setup
fn main() {
println!("{}", hello!());

// DB connection pool
let db_url = env::var("DATABASE_URL").expect("DATABASE_URL not found.");
let db_manager = ConnectionManager::<PgConnection>::new(db_url);
let db_pool = Pool::builder().build(db_manager).expect("Failed to create pool.");

// heh heh heh
let db_pool_1 = db_pool.clone();

// actix stuff
::std::env::set_var("RUST_LOG", "actix_web=info");
env_logger::init();
let sys = actix::System::new("cosworth-example");
let addr = SyncArbiter::start(3, move || DbExecutor(db_pool_1.clone()));

server::new(move || {
App::with_state(AppState{db_pool: db_pool.clone(), db_addr: addr.clone()})
.middleware(middleware::Logger::default())
//.resource("/create/{name}", |r| r.method(http::Method::POST).with(create))
.resource("/create", |r| {
r.route()
.filter(pred::Post())
.filter(pred::Header("content-type", "application/json"))
.f(create)
})
.resource("/{id}/{name}", |r| {
r.route()
.filter(pred::Get())
.f(index)
})
})
.bind("0.0.0.0:8080").unwrap()
.run();
println!("{}", hello!());

// DB connection pool
let db_url = env::var("DATABASE_URL").expect("DATABASE_URL not found.");
let db_manager = ConnectionManager::<PgConnection>::new(db_url);
let db_pool = Pool::builder().build(db_manager).expect("Failed to create pool.");

// try using a raw db pool instance (not async)
let raw_db_pool = db_pool.clone();

// actix stuff
::std::env::set_var("RUST_LOG", "actix_web=info");
env_logger::init();
let _sys = actix::System::new("cosworth-example");
let addr = SyncArbiter::start(3, move || processor::Processor(db_pool.clone()));

server::new(move || {
App::with_state(AppState{raw_db_pool: raw_db_pool.clone(), processors: addr.clone()})
.middleware(middleware::Logger::default())
//.resource("/create/{name}", |r| r.method(http::Method::POST).with(create))
.resource("/create", |r| {
r.route()
.filter(pred::Post())
.filter(pred::Header("content-type", "application/json"))
.f(create)
})
.resource("/{id}/{name}", |r| {
r.route()
.filter(pred::Get())
.f(index)
})
})
.bind("0.0.0.0:8080").unwrap()
.run();
}
Loading

0 comments on commit 1ff1d2f

Please sign in to comment.