diff --git a/gremlin-client/src/aio/client.rs b/gremlin-client/src/aio/client.rs index 582c9ee0..50e5040c 100644 --- a/gremlin-client/src/aio/client.rs +++ b/gremlin-client/src/aio/client.rs @@ -28,6 +28,7 @@ impl SessionedClient { let message = match self.options.serializer { IoProtocol::GraphSONV2 => message_with_args_v2(String::from("close"), processor, args), IoProtocol::GraphSONV3 => message_with_args(String::from("close"), processor, args), + IoProtocol::GraphBinaryV1 => todo!("Need to add the handling logic for writing to a processor op") }; let conn = self.pool.get().await?; @@ -142,6 +143,7 @@ impl GremlinClient { let message = match self.options.serializer { IoProtocol::GraphSONV2 => message_with_args_v2(String::from("eval"), processor, args), IoProtocol::GraphSONV3 => message_with_args(String::from("eval"), processor, args), + IoProtocol::GraphBinaryV1 => todo!("Need to add the handling logic for writing to a processor op") }; let conn = self.pool.get().await?; diff --git a/gremlin-client/src/aio/pool.rs b/gremlin-client/src/aio/pool.rs index 6569e3a5..f1b7b0ec 100644 --- a/gremlin-client/src/aio/pool.rs +++ b/gremlin-client/src/aio/pool.rs @@ -45,6 +45,7 @@ impl Manager for GremlinConnectionManager { let message = match self.options.serializer { IoProtocol::GraphSONV2 => message_with_args_v2(String::from("eval"), String::default(), args), IoProtocol::GraphSONV3 => message_with_args(String::from("eval"), String::default(), args), + IoProtocol::GraphBinaryV1 => todo!("Need to add the handling logic for writing to a processor op") }; let id = message.id().clone(); diff --git a/gremlin-client/src/client.rs b/gremlin-client/src/client.rs index 6cf8bbce..c5fa220d 100644 --- a/gremlin-client/src/client.rs +++ b/gremlin-client/src/client.rs @@ -24,13 +24,19 @@ impl SessionedClient { let processor = "session".to_string(); let message = match self.options.serializer { - IoProtocol::GraphSONV2 => message_with_args_v2(String::from("close"), processor, args), + IoProtocol::GraphSONV2 => { + message_with_args_v2(String::from("close"), processor, args) + } IoProtocol::GraphSONV3 => message_with_args(String::from("close"), processor, args), + IoProtocol::GraphBinaryV1 => { + todo!("Need to add the handling logic for writing to a processor op") + } }; let conn = self.pool.get()?; - self.send_message(conn, message) + todo!() + // self.send_message(conn, message) } else { Err(GremlinError::Generic("No session to close".to_string())) } @@ -138,37 +144,23 @@ impl GremlinClient { let message = match self.options.serializer { IoProtocol::GraphSONV2 => message_with_args_v2(String::from("eval"), processor, args), IoProtocol::GraphSONV3 => message_with_args(String::from("eval"), processor, args), + IoProtocol::GraphBinaryV1 => { + todo!("Need to add the handling logic for writing to a processor op") + } }; let conn = self.pool.get()?; - self.send_message(conn, message) + // self.send_message(conn, message) + todo!() } - pub(crate) fn write_message( - &self, - conn: &mut r2d2::PooledConnection, - msg: Message, - ) -> GremlinResult<()> { - let message = self.build_message(msg)?; - - let content_type = self.options.serializer.content_type(); - let payload = String::from("") + content_type + &message; - - let mut binary = payload.into_bytes(); - binary.insert(0, content_type.len() as u8); - - conn.send(binary)?; - - Ok(()) - } - - pub(crate) fn send_message( + pub(crate) fn send_message( &self, mut conn: r2d2::PooledConnection, - msg: Message, + msg: Vec, ) -> GremlinResult { - self.write_message(&mut conn, msg)?; + conn.send(msg)?; let (response, results) = self.read_response(&mut conn)?; @@ -206,7 +198,26 @@ impl GremlinClient { } pub(crate) fn submit_traversal(&self, bytecode: &Bytecode) -> GremlinResult { - let message = self.generate_message(bytecode)?; + let aliases = self + .alias + .clone() + .or_else(|| Some(String::from("g"))) + .map(|s| { + let mut map = HashMap::new(); + map.insert(String::from("g"), GValue::String(s)); + map + }) + .unwrap_or_else(HashMap::new); + + let message = self + .options + .serializer + .build_traversal_message(aliases, bytecode)?; + + if true { + let message: Vec = message.into_iter().map(|byte| byte as i8).collect(); + panic!("{:?}", message); + } let conn = self.pool.get()?; @@ -218,7 +229,7 @@ impl GremlinClient { conn: &mut r2d2::PooledConnection, ) -> GremlinResult<(Response, VecDeque)> { let result = conn.recv()?; - let response: Response = serde_json::from_slice(&result)?; + let response = self.options.deserializer.read_response(&result)?; match response.status.code { 200 | 206 => { @@ -249,9 +260,10 @@ impl GremlinClient { args, ); - self.write_message(conn, message)?; + todo!() + // self.write_message(conn, message)?; - self.read_response(conn) + // self.read_response(conn) } None => Err(GremlinError::Request(( response.status.code, @@ -264,7 +276,9 @@ impl GremlinClient { ))), } } + fn build_message(&self, msg: Message) -> GremlinResult { + //TODO this should be gone by the end serde_json::to_string(&msg).map_err(GremlinError::from) } } diff --git a/gremlin-client/src/io/graph_binary_v1.rs b/gremlin-client/src/io/graph_binary_v1.rs index e69de29b..b46c574d 100644 --- a/gremlin-client/src/io/graph_binary_v1.rs +++ b/gremlin-client/src/io/graph_binary_v1.rs @@ -0,0 +1,467 @@ +use std::{ + collections::HashMap, + convert::TryInto, + fmt::{self, Display}, + io::Read, +}; + +use serde::{de, ser, Serialize}; +use uuid::Uuid; + +use crate::{process::traversal::Instruction, GKey, GValue, GremlinError, GremlinResult}; + +struct RequestV1 { + version: u8, + request_id: Uuid, + op: String, + args: HashMap, +} + +//https://tinkerpop.apache.org/docs/3.7.2/dev/io/#_data_type_codes +//Each type has a "fully qualified" serialized form usually: {type_code}{type_info}{value_flag}{value} +//{type_code} is a single unsigned byte representing the type number. +//{type_info} is an optional sequence of bytes providing additional information of the type represented. This is specially useful for representing complex and custom types. +//{value_flag} is a single byte providing information about the value. Flags have the following meaning: +// 0x01 The value is null. When this flag is set, no bytes for {value} will be provided. +//{value} is a sequence of bytes which content is determined by the type. +//All encodings are big-endian. + +//However there are occassion when just "the value" is written without the fully qualified form, for example the 4 bytes of a integer without the type_code +//this is usually done in scenarios when the type in unambiguous by schema. + +//Generally this is written such that serializing a value wrapped by GValue is taken to mean to write the fully qualified representation +//and serializing just "the value" is done directly upon the underlying value type + +fn write_usize_as_i32_be_bytes(val: usize, buf: &mut Vec) -> GremlinResult<()> { + let val_i32 = TryInto::::try_into(val) + .map_err(|_| GremlinError::Cast(format!("Invalid usize bytes exceed i32")))?; + GraphBinaryV1Serde::to_be_bytes(val_i32, buf) +} + +impl GraphBinaryV1Serde for &GValue { + fn to_be_bytes(self, buf: &mut Vec) -> GremlinResult<()> { + match self { + GValue::Null => { + //Type code of 0xfe: Unspecified null object + buf.push(0xfe); + //Then the null {value_flag} set and no sequence of bytes. + buf.push(0x01); + } + // GValue::Vertex(vertex) => todo!(), + // GValue::Edge(edge) => todo!(), + // GValue::VertexProperty(vertex_property) => todo!(), + // GValue::Property(property) => todo!(), + // GValue::Uuid(uuid) => todo!(), + GValue::Int32(value) => { + //Type code of 0x01 + buf.push(0x01); + //Empty value flag + buf.push(0x00); + //then value bytes + GraphBinaryV1Serde::to_be_bytes(*value, buf)?; + } + // GValue::Int64(_) => todo!(), + // GValue::Float(_) => todo!(), + // GValue::Double(_) => todo!(), + // GValue::Date(date_time) => todo!(), + // GValue::List(list) => todo!(), + // GValue::Set(set) => todo!(), + GValue::Map(map) => { + //Type code of 0x0a: Map + buf.push(0x0a); + // //Empty value flag + buf.push(0x00); + + //{length} is an Int describing the length of the map. + write_usize_as_i32_be_bytes(map.len(), buf)?; + + //{item_0}…​{item_n} are the items of the map. {item_i} is sequence of 2 fully qualified typed values one representing the key + // and the following representing the value, each composed of {type_code}{type_info}{value_flag}{value}. + for (k, v) in map.iter() { + k.to_be_bytes(buf)?; + v.to_be_bytes(buf)?; + } + } + // GValue::Token(token) => todo!(), + GValue::String(value) => { + //Type code of 0x03: String + buf.push(0x03); + //Empty value flag + buf.push(0x00); + //Format: {length}{text_value} + // {length} is an Int describing the byte length of the text. Length is a positive number or zero to represent the empty string. + // {text_value} is a sequence of bytes representing the string value in UTF8 encoding. + GraphBinaryV1Serde::to_be_bytes(value.as_str(), buf)?; + } + // GValue::Path(path) => todo!(), + // GValue::TraversalMetrics(traversal_metrics) => todo!(), + // GValue::Metric(metric) => todo!(), + // GValue::TraversalExplanation(traversal_explanation) => todo!(), + // GValue::IntermediateRepr(intermediate_repr) => todo!(), + // GValue::P(p) => todo!(), + // GValue::T(t) => todo!(), + GValue::Bytecode(code) => { + //Type code of 0x15: Bytecode + buf.push(0x15); + //Empty value flag + buf.push(0x00); + //then value bytes + // {steps_length}{step_0}…​{step_n}{sources_length}{source_0}…​{source_n} + //{steps_length} is an Int value describing the amount of steps. + //{step_i} is composed of {name}{values_length}{value_0}…​{value_n}, where: + // {name} is a String. + // {values_length} is an Int describing the amount values. + // {value_i} is a fully qualified typed value composed of {type_code}{type_info}{value_flag}{value} describing the step argument. + + fn write_instructions(instructions: &Vec, buf: &mut Vec) -> GremlinResult<()>{ + write_usize_as_i32_be_bytes(instructions.len(), buf)?; + for instruction in instructions { + GraphBinaryV1Serde::to_be_bytes(instruction.operator().as_str(), buf)?; + write_usize_as_i32_be_bytes(instruction.args().len(), buf)?; + instruction.args().iter().try_for_each(|arg| arg.to_be_bytes(buf))?; + } + Ok(()) + } + write_instructions(code.steps(), buf)?; + write_instructions(code.sources(), buf)?; + } + // GValue::Traverser(traverser) => todo!(), + GValue::Scope(scope) => { + //Type code of 0x1f: Scope + buf.push(0x1f); + //Empty value flag + buf.push(0x00); + + //Format: a fully qualified single String representing the enum value. + match scope { + crate::process::traversal::Scope::Global => (&GValue::from(String::from("global"))).to_be_bytes(buf)?, + crate::process::traversal::Scope::Local => (&GValue::from(String::from("local"))).to_be_bytes(buf)?, + } + } + // GValue::Order(order) => todo!(), + // GValue::Bool(_) => todo!(), + // GValue::TextP(text_p) => todo!(), + // GValue::Pop(pop) => todo!(), + + // GValue::Cardinality(cardinality) => todo!(), + // GValue::Merge(merge) => todo!(), + // GValue::Direction(direction) => todo!(), + // GValue::Column(column) => todo!(), + other => unimplemented!("TODO {other:?}"), + } + Ok(()) + } + + // fn from_be_bytes<'a, S: Iterator>(bytes: &mut S) -> GremlinResult { + // todo!() + // } +} + +impl GraphBinaryV1Serde for &GKey { + fn to_be_bytes(self, buf: &mut Vec) -> GremlinResult<()> { + match self { + GKey::T(t) => todo!(), + GKey::String(str) => (&GValue::from(str.clone())).to_be_bytes(buf), + GKey::Token(token) => todo!(), + GKey::Vertex(vertex) => todo!(), + GKey::Edge(edge) => todo!(), + GKey::Direction(direction) => todo!(), + } + } + + // fn from_be_bytes<'a, S: Iterator>(bytes: &mut S) -> GremlinResult { + // todo!() + // } +} + +impl, V: Into> GraphBinaryV1Serde for HashMap { + //This represents a complicated meeting point. The request message has a non-qualified emission of a map + //for the arguments, but the contained elements needs to be fully qualified + //Ideally this would just be K: GraphBinaryV1Serde & V: GraphBinaryV1Serde + //but that exposes as a type declaration emission things like HashMap which will then + //invoke the value bytes only impl for String and not doing the qualified and then be rejected by the server + fn to_be_bytes(self, buf: &mut Vec) -> GremlinResult<()> { + write_usize_as_i32_be_bytes(self.len(), buf)?; + for (k, v) in self { + //TODO we could just move this logic into mod.rs since it's a detail about the nature of + //how the request message is implemented, and not the generialized notion of how to write a HashMap + //That'd also duck the issue of passing through GKey here + let k: GKey = k.into(); + let v: GValue = v.into(); + k.to_be_bytes(buf)?; + v.to_be_bytes(buf)?; + } + Ok(()) + } + + // fn from_be_bytes<'a, S: Iterator>(bytes: &mut S) -> GremlinResult { + // todo!() + // } +} + +fn deserialize<'a, T: Iterator>(mut value: T) -> GremlinResult { + let data_code = value + .next() + .ok_or_else(|| GremlinError::Cast(format!("Invalid bytes no data code byte")))?; + match data_code { + // GValue::Null => { + // buf.reserve_exact(2); + // //Type code of 0xfe: Unspecified null object + // buf.push(0xfe); + // //Then the null {value_flag} set and no sequence of bytes. + // buf.push(0x01); + // } + // GValue::Vertex(vertex) => todo!(), + // GValue::Edge(edge) => todo!(), + // GValue::VertexProperty(vertex_property) => todo!(), + // GValue::Property(property) => todo!(), + // GValue::Uuid(uuid) => todo!(), + 0x01 => { + //Type code of 0x01: Integer + //Check null flag + match value.next() { + Some(0x00) => { + //We've got a value to parse + // GraphBinaryV1Serde::from_be_bytes(&mut value).map(GValue::Int32) + todo!() + } + Some(0x01) => Ok(GValue::Null), + _ => Err(GremlinError::Cast(format!("Invalid bytes into i32"))), + } + } + // GValue::Int64(_) => todo!(), + // GValue::Float(_) => todo!(), + // GValue::Double(_) => todo!(), + // GValue::Date(date_time) => todo!(), + // GValue::List(list) => todo!(), + // GValue::Set(set) => todo!(), + // GValue::Map(map) => todo!(), + // GValue::Token(token) => todo!(), + 0x03 => { + //Type code of 0x03: String + match value.next() { + Some(0x00) => { + //We've got a value to parse + // GraphBinaryV1Serde::from_be_bytes(&mut value).map(GValue::String) + todo!() + } + Some(0x01) => Ok(GValue::Null), + _ => Err(GremlinError::Cast(format!("Invalid bytes into String"))), + } + + // GValue::String(value) => { + // + // //Empty value flag + // //Format: {length}{text_value} + // // {length} is an Int describing the byte length of the text. Length is a positive number or zero to represent the empty string. + // // {text_value} is a sequence of bytes representing the string value in UTF8 encoding. + } + // GValue::Path(path) => todo!(), + // GValue::TraversalMetrics(traversal_metrics) => todo!(), + // GValue::Metric(metric) => todo!(), + // GValue::TraversalExplanation(traversal_explanation) => todo!(), + // GValue::IntermediateRepr(intermediate_repr) => todo!(), + // GValue::P(p) => todo!(), + // GValue::T(t) => todo!(), + // GValue::Bytecode(code) => { + // //Type code of 0x15: Bytecode + // buf.push(0x15); + // //Empty value flag + // buf.push(0x00); + // //then value bytes + // // {steps_length}{step_0}…​{step_n}{sources_length}{source_0}…​{source_n} + // //{steps_length} is an Int value describing the amount of steps. + // let steps_length: i32 = code.steps().len().try_into().expect("Number of steps should fit in i32"); + // buf.extend_from_slice(serialize(&GValue::Int32(steps_length))); + + // //{step_i} is composed of {name}{values_length}{value_0}…​{value_n}, where: + // // {name} is a String. + // // {values_length} is an Int describing the amount values. + // // {value_i} is a fully qualified typed value composed of {type_code}{type_info}{value_flag}{value} describing the step argument. + + // let steps: GremlinResult> = code + // .steps() + // .iter() + // .map(|m| { + // let mut instruction = vec![]; + // instruction.push(Value::String(m.operator().clone())); + + // let arguments: GremlinResult> = + // m.args().iter().map(|a| self.write(a)).collect(); + + // instruction.extend(arguments?); + // Ok(Value::Array(instruction)) + // }) + // .collect(); + + // let sources: GremlinResult> = code + // .sources() + // .iter() + // .map(|m| { + // let mut instruction = vec![]; + // instruction.push(Value::String(m.operator().clone())); + + // let arguments: GremlinResult> = + // m.args().iter().map(|a| self.write(a)).collect(); + + // instruction.extend(arguments?); + // Ok(Value::Array(instruction)) + // }) + // .collect(); + // } + // GValue::Traverser(traverser) => todo!(), + // GValue::Scope(scope) => todo!(), + // GValue::Order(order) => todo!(), + // GValue::Bool(_) => todo!(), + // GValue::TextP(text_p) => todo!(), + // GValue::Pop(pop) => todo!(), + // GValue::Cardinality(cardinality) => todo!(), + // GValue::Merge(merge) => todo!(), + // GValue::Direction(direction) => todo!(), + // GValue::Column(column) => todo!(), + _ => unimplemented!("TODO"), + } +} + +pub trait GraphBinaryV1Serde: Sized { + fn to_be_bytes(self, buf: &mut Vec) -> GremlinResult<()>; + // fn to_fully_qualified_be_bytes(&self, buf: &mut Vec) -> GremlinResult<()>; + // fn from_be_bytes<'a, S: Iterator>(bytes: &mut S) -> GremlinResult; + // fn from_fully_qualified_be_bytes<'a, S: Iterator>(bytes: &mut S) -> GremlinResult; + + //TODO implement a to_fully_qualified_be_bytes method & from_fully_qualified_be_bytes instead of serialize/deserialize methods + // maybe this doesn't make sense, since it would require us to peek the first byte anyways and then match to the that impl to seek over the byte again +} + +impl GraphBinaryV1Serde for &str { + fn to_be_bytes(self, buf: &mut Vec) -> GremlinResult<()> { + let length: i32 = self + .len() + .try_into() + .map_err(|_| GremlinError::Cast(format!("String length exceeds i32")))?; + GraphBinaryV1Serde::to_be_bytes(length, buf)?; + buf.extend_from_slice(self.as_bytes()); + Ok(()) + } + + // fn from_be_bytes<'a, S: Iterator>(bytes: &mut S) -> GremlinResult { + // let string_bytes_length: i32 = GraphBinaryV1Serde::from_be_bytes(bytes) + // .map_err((|_| GremlinError::Cast(format!("Invalid bytes for string length"))))?; + // let string_bytes_length = string_bytes_length + // .try_into() + // .map_err((|_| GremlinError::Cast(format!("String length did not fit into usize"))))?; + // let string_value_bytes: Vec = bytes.take(string_bytes_length).cloned().collect(); + // if string_value_bytes.len() < string_bytes_length { + // return Err(GremlinError::Cast(format!( + // "Missing bytes for string value" + // ))); + // } + // String::from_utf8(string_value_bytes) + // .map_err((|_| GremlinError::Cast(format!("Invalid bytes for string value")))) + // } +} + +impl GraphBinaryV1Serde for i32 { + fn to_be_bytes(self, buf: &mut Vec) -> GremlinResult<()> { + buf.extend_from_slice(&self.to_be_bytes()); + Ok(()) + } + + // fn from_be_bytes<'a, S: Iterator>(bytes: &mut S) -> GremlinResult { + // bytes + // .take(4) + // .cloned() + // .collect::>() + // .try_into() + // .map_err(|_| GremlinError::Cast(format!("Invalid bytes into i32"))) + // .map(i32::from_be_bytes) + // } +} + +impl GraphBinaryV1Serde for Uuid { + fn to_be_bytes(self, buf: &mut Vec) -> GremlinResult<()> { + buf.extend_from_slice(self.as_bytes().as_slice()); + Ok(()) + } + + // fn from_be_bytes<'a, S: Iterator>(bytes: &mut S) -> GremlinResult { + // bytes + // .take(16) + // .cloned() + // .collect::>() + // .try_into() + // .map_err(|_| GremlinError::Cast(format!("Invalid bytes into Uuid"))) + // .map(Uuid::from_bytes) + // } +} + +// fn deserialize_i32(bytes: &[u8]) -> GremlinResult { +// bytes +// .try_into() +// .map(i32::from_be_bytes) +// .map_err(|_| GremlinError::Cast(format!("Invalid bytes into i32"))) +// } + +// fn serialize_i32(value: i32) -> [u8; 4] { +// value.to_be_bytes() +// } + +#[cfg(test)] +mod tests { + use rstest::rstest; + + use super::*; + + // All encodings are big-endian. + + // Quick examples, using hexadecimal notation to represent each byte: + + // 01 00 00 00 00 01: a 32-bit integer number, that represents the decimal number 1. It’s composed by the type_code 0x01, and empty flag value 0x00 and four bytes to describe the value. + + // 01 00 00 00 00 ff: a 32-bit integer, representing the number 256. + + // 01 01: a null value for a 32-bit integer. It’s composed by the type_code 0x01, and a null flag value 0x01. + + // 02 00 00 00 00 00 00 00 00 01: a 64-bit integer number 1. It’s composed by the type_code 0x02, empty flags and eight bytes to describe the value. + + //Seems like generalized flow should be, be given a slice: + //Read the first byte + //then match on it + + // {type_code}{type_info}{value_flag}{value} + + // {type_code} is a single unsigned byte representing the type number. + + // {type_info} is an optional sequence of bytes providing additional information of the type represented. This is specially useful for representing complex and custom types. + + // {value_flag} is a single byte providing information about the value. Flags have the following meaning: + + // 0x01 The value is null. When this flag is set, no bytes for {value} will be provided. + + // {value} is a sequence of bytes which content is determined by the type. + + #[rstest] + //Non-Null i32 Integer (01 00) + #[case::int_1(&[0x01, 0x00, 0x00, 0x00, 0x00, 0x01], GValue::Int32(1))] + #[case::int_256(&[0x01, 0x00, 0x00, 0x00, 0x01, 0x00], GValue::Int32(256))] + #[case::int_257(&[0x01, 0x00, 0x00, 0x00, 0x01, 0x01], GValue::Int32(257))] + #[case::int_neg_1(&[0x01, 0x00, 0xFF, 0xFF, 0xFF, 0xFF], GValue::Int32(-1))] + #[case::int_neg_2(&[0x01, 0x00, 0xFF, 0xFF, 0xFF, 0xFE], GValue::Int32(-2))] + //Non-Null Strings (03 00) + #[case::str_abc(&[0x03, 0x00, 0x00, 0x00, 0x00, 0x03, 0x61, 0x62, 0x63], GValue::String("abc".into()))] + #[case::str_abcd(&[0x03, 0x00, 0x00, 0x00, 0x00, 0x04, 0x61, 0x62, 0x63, 0x64], GValue::String("abcd".into()))] + #[case::empty_str(&[0x03, 0x00, 0x00, 0x00, 0x00, 0x00], GValue::String("".into()))] + fn serde(#[case] expected_serialized: &[u8], #[case] expected: GValue) { + let mut serialized = Vec::new(); + (&expected).to_be_bytes(&mut serialized).expect("Shouldn't fail parsing"); + assert_eq!(serialized, expected_serialized); + let deserialized = deserialize(serialized.iter()).expect("Shouldn't fail parsing"); + assert_eq!(deserialized, expected); + } + + #[rstest] + #[case::too_few_bytes( &[0x01, 0x00, 0x00, 0x00, 0x00])] + fn serde_int32_invalid_bytes(#[case] bytes: &[u8]) { + deserialize(bytes.iter()).expect_err("Should have failed due invalid bytes"); + } +} diff --git a/gremlin-client/src/io/mod.rs b/gremlin-client/src/io/mod.rs index 79b3cc27..0a355d75 100644 --- a/gremlin-client/src/io/mod.rs +++ b/gremlin-client/src/io/mod.rs @@ -1,24 +1,31 @@ #[macro_use] mod macros; +mod graph_binary_v1; mod serializer_v2; mod serializer_v3; -mod graph_binary_v1; use crate::conversion::ToGValue; -use crate::process::traversal::{Order, Scope}; +use crate::message::{Response, RequestIdV2}; +use crate::process::traversal::{Bytecode, Order, Scope}; use crate::structure::{Cardinality, Direction, GValue, Merge, T}; use serde_json::{json, Map, Value}; +use uuid::Uuid; +use std::collections::HashMap; +use std::convert::TryInto; +use std::f64::consts::E; use std::string::ToString; -use crate::{GremlinError, GremlinResult}; +use crate::{GKey, GremlinError, GremlinResult, Message, io::graph_binary_v1::GraphBinaryV1Serde}; #[derive(Debug, Clone)] pub enum IoProtocol { GraphSONV2, GraphSONV3, + GraphBinaryV1, } impl IoProtocol { + //TODO maybe we could remove pub from read/write? pub fn read(&self, value: &Value) -> GremlinResult> { if let Value::Null = value { return Ok(None); @@ -26,10 +33,164 @@ impl IoProtocol { match self { IoProtocol::GraphSONV2 => serializer_v2::deserializer_v2(value).map(Some), IoProtocol::GraphSONV3 => serializer_v3::deserializer_v3(value).map(Some), + IoProtocol::GraphBinaryV1 => todo!(), } } pub fn write(&self, value: &GValue) -> GremlinResult { + match self { + IoProtocol::GraphSONV2 | IoProtocol::GraphSONV3 => self.write_graphson(value), + IoProtocol::GraphBinaryV1 => todo!(), + } + } + + pub fn read_response(&self, response: &[u8]) -> GremlinResult{ + match self { + IoProtocol::GraphSONV2 | IoProtocol::GraphSONV3 => serde_json::from_slice(&response).map_err(GremlinError::from), + IoProtocol::GraphBinaryV1 => todo!() + } + } + + pub fn build_eval_message(&self, args: HashMap) -> GremlinResult>{ + let op = String::from("eval"); + let processor = String::default(); + let content_type = self.content_type(); + + match self { + IoProtocol::GraphSONV2 | IoProtocol::GraphSONV3 => { + let args = self.write(&GValue::from(args))?; + let message = match self { + IoProtocol::GraphSONV2 => Message::V2 { + request_id: RequestIdV2 { + id_type: "g:UUID".to_string(), + value: Uuid::new_v4(), + }, + op, + processor, + args, + }, + IoProtocol::GraphSONV3 => { + Message::V3 { request_id: Uuid::new_v4(), op, processor, args} + } + _ => panic!("Invalid branch") + }; + + let msg = serde_json::to_string(&message).map_err(GremlinError::from)?; + let payload = String::from("") + content_type + &msg; + let mut binary = payload.into_bytes(); + binary.insert(0, content_type.len() as u8); + Ok(binary) + } + IoProtocol::GraphBinaryV1 => { + let mut message_bytes: Vec = Vec::new(); + //Need to write header first, its length is a Byte not a Int + let header = String::from(content_type); + let header_length: u8 = header.len().try_into().expect("Header length should fit in u8"); + message_bytes.push(header_length); + message_bytes.extend_from_slice(header.as_bytes()); + + //Version byte + message_bytes.push(0x81); + + //Request Id + Uuid::new_v4().to_be_bytes(&mut message_bytes)?; + + //Op + op.to_be_bytes(&mut message_bytes)?; + + //Processor + processor.to_be_bytes(&mut message_bytes)?; + + //Args + (&GValue::from(args)).to_be_bytes(&mut message_bytes)?; + Ok(message_bytes) + } + } + } + + pub fn build_traversal_message(&self, aliases: HashMap, bytecode: &Bytecode) -> GremlinResult> { + let mut args = HashMap::new(); + args.insert(String::from("gremlin"), GValue::Bytecode(bytecode.clone())); + args.insert(String::from("aliases"), GValue::from(aliases)); + let content_type = self.content_type(); + + match self { + IoProtocol::GraphSONV2 | IoProtocol::GraphSONV3 => { + let args = GValue::from(args); + //TODO this should be calling something more congruent with the graphbinary side + let args = self.write(&args)?; + let message =serde_json::to_string(&Message::V3 { + request_id: Uuid::new_v4(), + op: String::from("bytecode"), + processor: String::from("traversal"), + args, + }).map_err(GremlinError::from)?; + + let payload = String::from("") + content_type + &message; + let mut binary = payload.into_bytes(); + binary.insert(0, content_type.len() as u8); + Ok(binary) + } + IoProtocol::GraphBinaryV1 => { + let mut message_bytes: Vec = Vec::new(); + //Need to write header first, its length is a Byte not a Int + let header = String::from(content_type); + let header_length: u8 = header.len().try_into().expect("Header length should fit in u8"); + message_bytes.push(header_length); + message_bytes.extend_from_slice(header.as_bytes()); + + //Version byte + message_bytes.push(0x81); + + //Request Id + Uuid::new_v4().to_be_bytes(&mut message_bytes)?; + + //Op + String::from("bytecode").to_be_bytes(&mut message_bytes)?; + + //Processor + String::from("traversal").to_be_bytes(&mut message_bytes)?; + + //Args + args.to_be_bytes(&mut message_bytes)?; + Ok(message_bytes) + } + } + } + + //TODO we can probably generalize this + // pub fn generate_traversal_message( + // &self, + // aliases: HashMap, + // bytecode: &Bytecode, + // ) -> GremlinResult> { + // let mut args = HashMap::new(); + + // args.insert(String::from("gremlin"), GValue::Bytecode(bytecode.clone())); + + // // let aliases = self + // // .alias + // // .clone() + // // .or_else(|| Some(String::from("g"))) + // // .map(|s| { + // // let mut map = HashMap::new(); + // // map.insert(String::from("g"), GValue::String(s)); + // // map + // // }) + // // .unwrap_or_else(HashMap::new); + + // args.insert(String::from("aliases"), GValue::from(aliases)); + + // let args = self.write(&GValue::from(args))?; + + // Ok(message_with_args( + // String::from("bytecode"), + // String::from("traversal"), + // args, + // )) + // } + + fn write_graphson(&self, value: &GValue) -> GremlinResult { match (self, value) { (_, GValue::Double(d)) => Ok(json!({ "@type" : "g:Double", @@ -257,6 +418,7 @@ impl IoProtocol { match self { IoProtocol::GraphSONV2 => "application/vnd.gremlin-v2.0+json", IoProtocol::GraphSONV3 => "application/vnd.gremlin-v3.0+json", + IoProtocol::GraphBinaryV1 => "application/vnd.graphbinary-v1.0", } } } diff --git a/gremlin-client/src/message.rs b/gremlin-client/src/message.rs index bd2530b3..8fc8a728 100644 --- a/gremlin-client/src/message.rs +++ b/gremlin-client/src/message.rs @@ -7,10 +7,10 @@ use uuid::Uuid; #[serde(rename_all = "camelCase")] pub struct RequestIdV2 { #[serde(rename = "@type")] - id_type: String, + pub id_type: String, #[serde(rename = "@value")] - value: Uuid, + pub value: Uuid, } #[derive(Serialize)] diff --git a/gremlin-client/src/pool.rs b/gremlin-client/src/pool.rs index 9af619a6..7f22ffe1 100644 --- a/gremlin-client/src/pool.rs +++ b/gremlin-client/src/pool.rs @@ -40,25 +40,12 @@ impl ManageConnection for GremlinConnectionManager { String::from("language"), GValue::String(String::from("gremlin-groovy")), ); - let args = self.options.serializer.write(&GValue::from(args))?; - - let message = match self.options.serializer { - IoProtocol::GraphSONV2 => message_with_args_v2(String::from("eval"), String::default(), args), - IoProtocol::GraphSONV3 => message_with_args(String::from("eval"), String::default(), args), - }; - - let msg = serde_json::to_string(&message).map_err(GremlinError::from)?; - - let content_type = self.options.serializer.content_type(); - let payload = String::from("") + content_type + &msg; - - let mut binary = payload.into_bytes(); - binary.insert(0, content_type.len() as u8); - - conn.send(binary)?; + + let message = self.options.serializer.build_eval_message(args)?; + conn.send(message)?; let result = conn.recv()?; - let response: Response = serde_json::from_slice(&result)?; + let response = self.options.deserializer.read_response(&result)?; match response.status.code { 200 | 206 => Ok(()), @@ -91,20 +78,21 @@ impl ManageConnection for GremlinConnectionManager { conn.send(binary)?; let result = conn.recv()?; - let response: Response = serde_json::from_slice(&result)?; - - match response.status.code { - 200 | 206 => Ok(()), - 204 => Ok(()), - 401 => Ok(()), - // 401 is actually a username/password incorrect error, but if not - // not returned as okay, the pool loops infinitely trying - // to authenticate. - _ => Err(GremlinError::Request(( - response.status.code, - response.status.message, - ))), - } + todo!() + // let response: Response = serde_json::from_slice(&result)?; + + // match response.status.code { + // 200 | 206 => Ok(()), + // 204 => Ok(()), + // 401 => Ok(()), + // // 401 is actually a username/password incorrect error, but if not + // // not returned as okay, the pool loops infinitely trying + // // to authenticate. + // _ => Err(GremlinError::Request(( + // response.status.code, + // response.status.message, + // ))), + // } } None => Err(GremlinError::Request(( response.status.code, diff --git a/gremlin-client/src/process/traversal/mod.rs b/gremlin-client/src/process/traversal/mod.rs index 4001c2d2..efed376b 100644 --- a/gremlin-client/src/process/traversal/mod.rs +++ b/gremlin-client/src/process/traversal/mod.rs @@ -18,7 +18,7 @@ pub use order::Order; pub use remote::{traversal, SyncTerminator, Terminator}; pub use builder::TraversalBuilder; -pub use bytecode::{Bytecode, WRITE_OPERATORS}; +pub use bytecode::{Bytecode, WRITE_OPERATORS, Instruction}; pub use graph_traversal::GraphTraversal; pub use graph_traversal_source::GraphTraversalSource; pub use scope::Scope; diff --git a/gremlin-client/tests/common.rs b/gremlin-client/tests/common.rs index 5f2c9f95..c0dae324 100644 --- a/gremlin-client/tests/common.rs +++ b/gremlin-client/tests/common.rs @@ -25,6 +25,7 @@ pub mod io { let port = match serializer { IoProtocol::GraphSONV2 => 8182, IoProtocol::GraphSONV3 => 8182, + IoProtocol::GraphBinaryV1 => 8182, }; GremlinClient::connect( ConnectionOptions::builder() @@ -130,6 +131,7 @@ pub mod aio { let port = match serializer { IoProtocol::GraphSONV2 => 8182, IoProtocol::GraphSONV3 => 8182, + IoProtocol::GraphBinaryV1 => 8182, }; GremlinClient::connect( ConnectionOptions::builder() diff --git a/gremlin-client/tests/integration_traversal_graph_binary.rs b/gremlin-client/tests/integration_traversal_graph_binary.rs new file mode 100644 index 00000000..08425537 --- /dev/null +++ b/gremlin-client/tests/integration_traversal_graph_binary.rs @@ -0,0 +1,11 @@ +use common::io::graph_serializer; +use gremlin_client::{process::traversal::{traversal, Scope}, GValue, IoProtocol}; + +mod common; + +#[test] +fn demo() { + let g = traversal().with_remote(graph_serializer(IoProtocol::GraphBinaryV1)); + let y = g.inject(1).sum(Scope::Global).next().unwrap(); + panic!("Got {:?}", y); +}