Skip to content

Commit

Permalink
reduce verbosity of output, add nonblocking add
Browse files Browse the repository at this point in the history
  • Loading branch information
gtfierro committed Oct 1, 2020
1 parent 3ff55aa commit 628d197
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 20 deletions.
8 changes: 4 additions & 4 deletions src/manager.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::reasoner::Reasoner;
use crate::error::{ReasonableError, Result};
use log::info;
use log::{info, debug};
use std::fmt;
use std::string::String;
use std::fs;
Expand Down Expand Up @@ -252,7 +252,7 @@ impl Manager {

let q = self.triple_store.prepare_query(&sparql, QueryOptions::default())?;

println!("query: {}", sparql);
debug!("query: {}", sparql);
let res = q.exec()?;
if let QueryResults::Solutions(solutions) = res {
let name_key = name.clone();
Expand Down Expand Up @@ -282,7 +282,7 @@ impl Manager {

let q = self.triple_store.prepare_query(&sparql, QueryOptions::default())?;

println!("query: {}", sparql);
debug!("query: {}", sparql);
let res = q.exec()?;
if let QueryResults::Solutions(solutions) = res {
return Ok(ViewMetadata{
Expand Down Expand Up @@ -325,7 +325,7 @@ pub fn parse_file(filename: &str) -> Result<Vec<(Node, Node, Node)>> {
GraphFormat::RdfXml
};
let data = fs::read_to_string(filename)?;
println!("format: {:?} for {}", gfmt, filename);
debug!("format: {:?} for {}", gfmt, filename);
let parser = GraphParser::from_format(gfmt);
let triples: Vec<std::result::Result<Triple, std::io::Error>> = parser.read_triples(Cursor::new(data))?.collect();//.collect::<Result<Triple>>();
Ok(triples.into_iter().filter_map(|tres| {
Expand Down
40 changes: 24 additions & 16 deletions src/sqlite.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#![feature(proc_macro_hygiene, decl_macro)]
#[macro_use] extern crate rocket;
#[macro_use] extern crate serde_derive;
use log::{info, debug, error};
use std::sync::Mutex;
use std::thread;
use rocket::State;
Expand Down Expand Up @@ -158,7 +159,6 @@ impl SQLiteManager {
fn get_update_hook(&self, sender: mpsc::SyncSender<ChannelMessage>) -> Box<dyn FnMut(rusqlite::Action, &str, &str, i64) + Send> {
Box::new(move |_act, _db_name, table_name, _rowid| {
if table_name == "triples" {
// println!("got {:?} {} {} {}", act, db_name, table_name, rowid);
match sender.try_send(ChannelMessage::Refresh) {
Ok(_) => return,
Err(mpsc::TrySendError::Full(_e)) => return,
Expand All @@ -178,11 +178,10 @@ impl SQLiteManager {
}

fn get_triples(&self) -> Result<Vec<(String, String, String)>>{
println!("triggered");
debug!("triggered");
let mut stmt = self.conn.prepare("SELECT subject, predicate, object FROM triples")?;

let triples: Vec<(String, String, String)> = stmt.query_map(NO_PARAMS, |row| {
//println!("row {:?}", row.columns());
let s: String = row.get(0)?;
let p: String = row.get(1)?;
let o: String = row.get(2)?;
Expand Down Expand Up @@ -221,10 +220,10 @@ impl SQLiteManager {
fn update(&mut self) -> Result<()> {
// try to see if there are any new views
if !self.changed {
println!("triggered but no change");
debug!("triggered but no change");
return Ok(());
} else {
println!("changing row");
debug!("changing row");
}

// TODO: return true if the input changed
Expand All @@ -233,10 +232,10 @@ impl SQLiteManager {
for view in self.views.iter() {
tx.execute(&view.get_delete_tab(), NO_PARAMS)?;
//tx.execute(format!("DELETE FROM view_{};", view.name()).as_str(), NO_PARAMS)?;
println!("insert: {}", view.get_insert_sql());
debug!("insert: {}", view.get_insert_sql());
let mut stmt = tx.prepare(&view.get_insert_sql())?;
let tuples: Vec<Vec<String>> = view.contents_string()?;
println!("got {} for {}", tuples.len(), view.name());
info!("got {} for {}", tuples.len(), view.name());
for tup in tuples {
stmt.execute(&tup)?;
}
Expand All @@ -256,7 +255,12 @@ impl SQLiteManager {
let res = self.recv.recv();
match res {
Ok(ChannelMessage::ViewDef(vdef, tx)) => { self.add_view(vdef.name, &vdef.query)?; tx.send(()).unwrap(); },
Ok(ChannelMessage::TripleAdd(trips, tx)) => { self.add_triples(trips)?; tx.send(()).unwrap(); },
Ok(ChannelMessage::TripleAdd(trips, tx)) => {
self.add_triples(trips)?;
if let Some(c) = tx {
c.send(()).unwrap();
}
},
Ok(ChannelMessage::Refresh) => self.update()?,
Err(e) => return Err(ReasonableError::ChannelRecv(e)),
};
Expand All @@ -273,7 +277,7 @@ struct TableResponse {
#[derive(Debug)]
enum ChannelMessage {
ViewDef(ViewDef, mpsc::Sender<()>),
TripleAdd(Vec<JsonTriple>, mpsc::Sender<()>),
TripleAdd(Vec<JsonTriple>, Option<mpsc::Sender<()>>),
Refresh,
}

Expand All @@ -289,7 +293,6 @@ struct ViewChannel(mpsc::SyncSender<ChannelMessage>);
type DbConn = Mutex<rusqlite::Connection>;
type RdfConn = Mutex<MemoryStore>;


#[get("/view/<name>", format = "json")]
fn getview(name: String, conn: State<DbConn>, _store: State<RdfConn>, _tx: State<ViewChannel>) -> Json<TableResponse> {
let mut rows: Vec<Vec<String>> = Vec::new();
Expand All @@ -304,7 +307,7 @@ fn getview(name: String, conn: State<DbConn>, _store: State<RdfConn>, _tx: State
header = row.column_names().iter().map(|s| s.to_string()).collect();
Ok(())
}).unwrap().count();
println!("rows {}", rows.len());
info!("rows {}", rows.len());
Json(TableResponse{header, rows})
}

Expand All @@ -317,17 +320,23 @@ fn makeview(data: Json<ViewDef>, _conn: State<DbConn>, _store: State<RdfConn>, t
}

#[post("/add", data = "<data>", format = "json")]
fn addtriples(data: Json<Vec<JsonTriple>>, _conn: State<DbConn>, _store: State<RdfConn>, tx: State<ViewChannel>) -> Json<()> {
fn addtriples(data: Json<Vec<JsonTriple>>, tx: State<ViewChannel>) -> Json<()> {
let (send, recv) = mpsc::channel();
tx.0.send(ChannelMessage::TripleAdd(data.0, send)).expect("add triples");
tx.0.send(ChannelMessage::TripleAdd(data.0, Some(send))).expect("add triples");
recv.recv().unwrap();
Json(())
}

#[post("/addnb", data = "<data>", format = "json")]
fn addtriples_nonblock(data: Json<Vec<JsonTriple>>, tx: State<ViewChannel>) -> Json<()> {
tx.0.send(ChannelMessage::TripleAdd(data.0, None)).expect("add triples");
Json(())
}

#[post("/query", data = "<data>", format = "json")]
fn doquery(data: Json<String>, _conn: State<DbConn>, store: State<RdfConn>, _tx: State<ViewChannel>) -> Result<Json<Vec<Vec<String>>>> {
let sparql = format!("{}{}", qfmt, data.0);
println!("do query {}", sparql);
info!("do query {}", sparql);
let q = store.lock().expect("rdf lock").prepare_query(&sparql, QueryOptions::default())?;
let res = q.exec()?;
let mut rows: Vec<Vec<String>> = Vec::new();
Expand All @@ -340,7 +349,6 @@ fn doquery(data: Json<String>, _conn: State<DbConn>, store: State<RdfConn>, _tx:
for col in &vars {
row.push(vals.get(col.as_str()).unwrap().clone().to_string());
}
println!("row {:?}", row);
rows.push(row);
}
},
Expand Down Expand Up @@ -373,7 +381,7 @@ fn rocket(filename: &str) {
.manage(Mutex::new(conn))
.manage(Mutex::new(store))
.manage(ViewChannel(tx))
.mount("/", routes![getview, makeview, addtriples, doquery])
.mount("/", routes![getview, makeview, addtriples, addtriples_nonblock, doquery])
.launch();
});

Expand Down

0 comments on commit 628d197

Please sign in to comment.