Skip to content

Commit

Permalink
Serialized message binary parity to java driver for test traversal
Browse files Browse the repository at this point in the history
  • Loading branch information
criminosis committed Oct 18, 2024
1 parent e50a63b commit dd898ed
Show file tree
Hide file tree
Showing 10 changed files with 712 additions and 65 deletions.
2 changes: 2 additions & 0 deletions gremlin-client/src/aio/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ impl SessionedClient {
let message = match self.options.serializer {
IoProtocol::GraphSONV2 => message_with_args_v2(String::from("close"), processor, args),
IoProtocol::GraphSONV3 => message_with_args(String::from("close"), processor, args),
IoProtocol::GraphBinaryV1 => todo!("Need to add the handling logic for writing to a processor op")
};

let conn = self.pool.get().await?;
Expand Down Expand Up @@ -142,6 +143,7 @@ impl GremlinClient {
let message = match self.options.serializer {
IoProtocol::GraphSONV2 => message_with_args_v2(String::from("eval"), processor, args),
IoProtocol::GraphSONV3 => message_with_args(String::from("eval"), processor, args),
IoProtocol::GraphBinaryV1 => todo!("Need to add the handling logic for writing to a processor op")
};

let conn = self.pool.get().await?;
Expand Down
1 change: 1 addition & 0 deletions gremlin-client/src/aio/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ impl Manager for GremlinConnectionManager {
let message = match self.options.serializer {
IoProtocol::GraphSONV2 => message_with_args_v2(String::from("eval"), String::default(), args),
IoProtocol::GraphSONV3 => message_with_args(String::from("eval"), String::default(), args),
IoProtocol::GraphBinaryV1 => todo!("Need to add the handling logic for writing to a processor op")
};

let id = message.id().clone();
Expand Down
70 changes: 42 additions & 28 deletions gremlin-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,19 @@ impl SessionedClient {
let processor = "session".to_string();

let message = match self.options.serializer {
IoProtocol::GraphSONV2 => message_with_args_v2(String::from("close"), processor, args),
IoProtocol::GraphSONV2 => {
message_with_args_v2(String::from("close"), processor, args)
}
IoProtocol::GraphSONV3 => message_with_args(String::from("close"), processor, args),
IoProtocol::GraphBinaryV1 => {
todo!("Need to add the handling logic for writing to a processor op")
}
};

let conn = self.pool.get()?;

self.send_message(conn, message)
todo!()
// self.send_message(conn, message)
} else {
Err(GremlinError::Generic("No session to close".to_string()))
}
Expand Down Expand Up @@ -138,37 +144,23 @@ impl GremlinClient {
let message = match self.options.serializer {
IoProtocol::GraphSONV2 => message_with_args_v2(String::from("eval"), processor, args),
IoProtocol::GraphSONV3 => message_with_args(String::from("eval"), processor, args),
IoProtocol::GraphBinaryV1 => {
todo!("Need to add the handling logic for writing to a processor op")
}
};

let conn = self.pool.get()?;

self.send_message(conn, message)
// self.send_message(conn, message)
todo!()
}

pub(crate) fn write_message<T: Serialize>(
&self,
conn: &mut r2d2::PooledConnection<GremlinConnectionManager>,
msg: Message<T>,
) -> GremlinResult<()> {
let message = self.build_message(msg)?;

let content_type = self.options.serializer.content_type();
let payload = String::from("") + content_type + &message;

let mut binary = payload.into_bytes();
binary.insert(0, content_type.len() as u8);

conn.send(binary)?;

Ok(())
}

pub(crate) fn send_message<T: Serialize>(
pub(crate) fn send_message(
&self,
mut conn: r2d2::PooledConnection<GremlinConnectionManager>,
msg: Message<T>,
msg: Vec<u8>,
) -> GremlinResult<GResultSet> {
self.write_message(&mut conn, msg)?;
conn.send(msg)?;

let (response, results) = self.read_response(&mut conn)?;

Expand Down Expand Up @@ -206,7 +198,26 @@ impl GremlinClient {
}

pub(crate) fn submit_traversal(&self, bytecode: &Bytecode) -> GremlinResult<GResultSet> {
let message = self.generate_message(bytecode)?;
let aliases = self
.alias
.clone()
.or_else(|| Some(String::from("g")))
.map(|s| {
let mut map = HashMap::new();
map.insert(String::from("g"), GValue::String(s));
map
})
.unwrap_or_else(HashMap::new);

let message = self
.options
.serializer
.build_traversal_message(aliases, bytecode)?;

if true {
let message: Vec<i8> = message.into_iter().map(|byte| byte as i8).collect();
panic!("{:?}", message);
}

let conn = self.pool.get()?;

Expand All @@ -218,7 +229,7 @@ impl GremlinClient {
conn: &mut r2d2::PooledConnection<GremlinConnectionManager>,
) -> GremlinResult<(Response, VecDeque<GValue>)> {
let result = conn.recv()?;
let response: Response = serde_json::from_slice(&result)?;
let response = self.options.deserializer.read_response(&result)?;

match response.status.code {
200 | 206 => {
Expand Down Expand Up @@ -249,9 +260,10 @@ impl GremlinClient {
args,
);

self.write_message(conn, message)?;
todo!()
// self.write_message(conn, message)?;

self.read_response(conn)
// self.read_response(conn)
}
None => Err(GremlinError::Request((
response.status.code,
Expand All @@ -264,7 +276,9 @@ impl GremlinClient {
))),
}
}

fn build_message<T: Serialize>(&self, msg: Message<T>) -> GremlinResult<String> {
//TODO this should be gone by the end
serde_json::to_string(&msg).map_err(GremlinError::from)
}
}
Loading

0 comments on commit dd898ed

Please sign in to comment.