Skip to content

Commit

Permalink
Centralized on new serializer based message building & reading
Browse files Browse the repository at this point in the history
  • Loading branch information
criminosis committed Oct 19, 2024
1 parent dd898ed commit 28abd0a
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 319 deletions.
70 changes: 26 additions & 44 deletions gremlin-client/src/aio/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use futures::future::{BoxFuture, FutureExt};
use mobc::{Connection, Pool};
use serde::Serialize;
use std::collections::{HashMap, VecDeque};
use uuid::Uuid;

pub type SessionedClient = GremlinClient;

Expand All @@ -21,19 +22,14 @@ impl SessionedClient {
if let Some(session_name) = self.session.take() {
let mut args = HashMap::new();
args.insert(String::from("session"), GValue::from(session_name.clone()));
let args = self.options.serializer.write(&GValue::from(args))?;

let processor = "session".to_string();

let message = match self.options.serializer {
IoProtocol::GraphSONV2 => message_with_args_v2(String::from("close"), processor, args),
IoProtocol::GraphSONV3 => message_with_args(String::from("close"), processor, args),
IoProtocol::GraphBinaryV1 => todo!("Need to add the handling logic for writing to a processor op")
};
let (id, message) = self
.options
.serializer
.build_message("close", "session", args, None)?;

let conn = self.pool.get().await?;

self.send_message_new(conn, message).await
self.send_message_new(conn, id, message).await
} else {
Err(GremlinError::Generic("No session to close".to_string()))
}
Expand Down Expand Up @@ -132,39 +128,29 @@ impl GremlinClient {
args.insert(String::from("session"), GValue::from(session_name.clone()));
}

let args = self.options.serializer.write(&GValue::from(args))?;

let processor = if self.session.is_some() {
"session".to_string()
"session"
} else {
String::default()
""
};

let message = match self.options.serializer {
IoProtocol::GraphSONV2 => message_with_args_v2(String::from("eval"), processor, args),
IoProtocol::GraphSONV3 => message_with_args(String::from("eval"), processor, args),
IoProtocol::GraphBinaryV1 => todo!("Need to add the handling logic for writing to a processor op")
};
let (id, message) = self
.options
.serializer
.build_message("eval", processor, args, None)?;

let conn = self.pool.get().await?;

self.send_message_new(conn, message).await
self.send_message_new(conn, id, message).await
}

pub(crate) fn send_message_new<'a, T: Serialize>(
&'a self,
mut conn: Connection<GremlinConnectionManager>,
msg: Message<T>,
id: Uuid,
binary: Vec<u8>,
) -> BoxFuture<'a, GremlinResult<GResultSet>> {
let id = msg.id().clone();
let message = self.build_message(msg).unwrap();

async move {
let content_type = self.options.serializer.content_type();
let payload = String::from("") + content_type + &message;
let mut binary = payload.into_bytes();
binary.insert(0, content_type.len() as u8);

let (response, receiver) = conn.send(id, binary).await?;

let (response, results) = match response.status.code {
Expand All @@ -187,15 +173,14 @@ impl GremlinClient {
GValue::String(encode(&format!("\0{}\0{}", c.username, c.password))),
);

let args = self.options.serializer.write(&GValue::from(args))?;
let message = message_with_args_and_uuid(
String::from("authentication"),
String::from("traversal"),
response.request_id,
let (id, message) = self.options.serializer.build_message(
"authentication",
"traversal",
args,
);
Some(response.request_id),
)?;

return self.send_message_new(conn, message).await;
return self.send_message_new(conn, id, message).await;
}
None => Err(GremlinError::Request((
response.status.code,
Expand Down Expand Up @@ -231,16 +216,13 @@ impl GremlinClient {

args.insert(String::from("aliases"), GValue::from(aliases));

let args = self.options.serializer.write(&GValue::from(args))?;

let message = message_with_args(String::from("bytecode"), String::from("traversal"), args);
let (id, message) = self
.options
.serializer
.build_message("bytecode", "traversal", args, None)?;

let conn = self.pool.get().await?;

self.send_message_new(conn, message).await
}

fn build_message<T: Serialize>(&self, msg: Message<T>) -> GremlinResult<String> {
serde_json::to_string(&msg).map_err(GremlinError::from)
self.send_message_new(conn, id, message).await
}
}
37 changes: 3 additions & 34 deletions gremlin-client/src/aio/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,8 @@ impl Manager for GremlinConnectionManager {
String::from("language"),
GValue::String(String::from("gremlin-groovy")),
);
let args = self.options.serializer.write(&GValue::from(args))?;

let message = match self.options.serializer {
IoProtocol::GraphSONV2 => message_with_args_v2(String::from("eval"), String::default(), args),
IoProtocol::GraphSONV3 => message_with_args(String::from("eval"), String::default(), args),
IoProtocol::GraphBinaryV1 => todo!("Need to add the handling logic for writing to a processor op")
};

let id = message.id().clone();
let msg = serde_json::to_string(&message).map_err(GremlinError::from)?;

let content_type = self.options.serializer.content_type();

let payload = String::from("") + content_type + &msg;
let mut binary = payload.into_bytes();
binary.insert(0, content_type.len() as u8);
let (id, message) = self.options.serializer.build_message("eval", "", args, None)?;

let (response, _receiver) = conn.send(id, binary).await?;

Expand All @@ -71,25 +57,8 @@ impl Manager for GremlinConnectionManager {
GValue::String(encode(&format!("\0{}\0{}", c.username, c.password))),
);

let args = self.options.serializer.write(&GValue::from(args))?;
let message = message_with_args_and_uuid(
String::from("authentication"),
String::from("traversal"),
response.request_id,
args,
);

let id = message.id().clone();
let msg = serde_json::to_string(&message).map_err(GremlinError::from)?;

let content_type = self.options.serializer.content_type();
let payload = String::from("") + content_type + &msg;

let mut binary = payload.into_bytes();
binary.insert(0, content_type.len() as u8);

let (response, _receiver) = conn.send(id, binary).await?;

let (id, message) = self.options.serializer.build_message("authentication", "traversal", args, Some(response.request_id))?;
let (response, _receiver) = conn.send(id, message).await?;
match response.status.code {
200 | 206 => Ok(conn),
204 => Ok(conn),
Expand Down
103 changes: 24 additions & 79 deletions gremlin-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,15 @@ impl SessionedClient {
if let Some(session_name) = self.session.take() {
let mut args = HashMap::new();
args.insert(String::from("session"), GValue::from(session_name.clone()));
let args = self.options.serializer.write(&GValue::from(args))?;

let processor = "session".to_string();

let message = match self.options.serializer {
IoProtocol::GraphSONV2 => {
message_with_args_v2(String::from("close"), processor, args)
}
IoProtocol::GraphSONV3 => message_with_args(String::from("close"), processor, args),
IoProtocol::GraphBinaryV1 => {
todo!("Need to add the handling logic for writing to a processor op")
}
};
let (_, message) = self
.options
.serializer
.build_message("close", "session", args, None)?;

let conn = self.pool.get()?;

todo!()
// self.send_message(conn, message)
self.send_message(conn, message)
} else {
Err(GremlinError::Generic("No session to close".to_string()))
}
Expand Down Expand Up @@ -133,26 +124,20 @@ impl GremlinClient {
args.insert(String::from("session"), GValue::from(session_name.clone()));
}

let args = self.options.serializer.write(&GValue::from(args))?;

let processor = if self.session.is_some() {
"session".to_string()
"session"
} else {
String::default()
""
};

let message = match self.options.serializer {
IoProtocol::GraphSONV2 => message_with_args_v2(String::from("eval"), processor, args),
IoProtocol::GraphSONV3 => message_with_args(String::from("eval"), processor, args),
IoProtocol::GraphBinaryV1 => {
todo!("Need to add the handling logic for writing to a processor op")
}
};
let (_, message) = self
.options
.serializer
.build_message("eval", processor, args, None)?;

let conn = self.pool.get()?;

// self.send_message(conn, message)
todo!()
self.send_message(conn, message)
}

pub(crate) fn send_message(
Expand All @@ -167,14 +152,7 @@ impl GremlinClient {
Ok(GResultSet::new(self.clone(), results, response, conn))
}

pub fn generate_message(
&self,
bytecode: &Bytecode,
) -> GremlinResult<Message<serde_json::Value>> {
let mut args = HashMap::new();

args.insert(String::from("gremlin"), GValue::Bytecode(bytecode.clone()));

pub(crate) fn submit_traversal(&self, bytecode: &Bytecode) -> GremlinResult<GResultSet> {
let aliases = self
.alias
.clone()
Expand All @@ -186,39 +164,14 @@ impl GremlinClient {
})
.unwrap_or_else(HashMap::new);

let mut args = HashMap::new();
args.insert(String::from("gremlin"), GValue::Bytecode(bytecode.clone()));
args.insert(String::from("aliases"), GValue::from(aliases));

let args = self.options.serializer.write(&GValue::from(args))?;

Ok(message_with_args(
String::from("bytecode"),
String::from("traversal"),
args,
))
}

pub(crate) fn submit_traversal(&self, bytecode: &Bytecode) -> GremlinResult<GResultSet> {
let aliases = self
.alias
.clone()
.or_else(|| Some(String::from("g")))
.map(|s| {
let mut map = HashMap::new();
map.insert(String::from("g"), GValue::String(s));
map
})
.unwrap_or_else(HashMap::new);

let message = self
let (_,message) = self
.options
.serializer
.build_traversal_message(aliases, bytecode)?;

if true {
let message: Vec<i8> = message.into_iter().map(|byte| byte as i8).collect();
panic!("{:?}", message);
}

.build_message("bytecode", "traversal", args, None)?;
let conn = self.pool.get()?;

self.send_message(conn, message)
Expand Down Expand Up @@ -252,18 +205,15 @@ impl GremlinClient {
GValue::String(encode(&format!("\0{}\0{}", c.username, c.password))),
);

let args = self.options.serializer.write(&GValue::from(args))?;
let message = message_with_args_and_uuid(
String::from("authentication"),
String::from("traversal"),
response.request_id,
let (_, message) = self.options.serializer.build_message(
"authentication",
"traversal",
args,
);

todo!()
// self.write_message(conn, message)?;
Some(response.request_id),
)?;
conn.send(message)?;

// self.read_response(conn)
self.read_response(conn)
}
None => Err(GremlinError::Request((
response.status.code,
Expand All @@ -276,9 +226,4 @@ impl GremlinClient {
))),
}
}

fn build_message<T: Serialize>(&self, msg: Message<T>) -> GremlinResult<String> {
//TODO this should be gone by the end
serde_json::to_string(&msg).map_err(GremlinError::from)
}
}
Loading

0 comments on commit 28abd0a

Please sign in to comment.