Skip to content

Commit

Permalink
MergeV, MergeE, & Option Steps (#214)
Browse files Browse the repository at this point in the history
* First pass

* Added JanusGraph in memory mode to docker compose enviornment

* Added mergeV as a start step

* Relaxed property step keys to Into<GValue> from &str
support converting a TraversalBuilder into GValue via Bytecode

* Added JanusGraph custom vertex id tests

* Implemented From<HashMap<GKey, GValue>> for MergeVertexStep to support literal maps being defined for mergeV steps

* Implemented Option step for mergeV

* Added healthcheck for JG & wait with timeout for docker compose up in GH Action

* Use Docker Compose v2 via "docker compose" vs v1's "docker-compose" in order to leverage v2's wait flag

* Combine merge v custom id test cases

* Better handle tests being reran

* Added merge_v_tests feature for tests

* Formatting

* FIxed if condition

* Corrected cargo.toml merge_test feature

* Changed GH Action if statement formatting

* Increased docker compose timeout time for healthy service check

* Fixed imports for merge_test module. Moved merge tests into their own module

* Use drop vertices test utility function

* Drop vertices for test_merge_v_options

* Added mergeV step to anonymous traversals

* Implemented mergeE step

* Added mergeV and mergeE to Bytecode WRITE_OPERATORS

* Implemented travsal test based on reference doc combo mergeV and mergeE

* Support literal options for choose step and added test

* Rewrote side effect and expose via GraphTraversal

* Implemented support for Columns in By Step

* Expose properties() step in an anonymouse traversal

* Also update property_many, property_with_cardinality, and property_many_with_cardinality to take a Key that impls Into<GValue> instead of just &str

* If a request responds with a websocket error mark the conneciton as invalid

* Map additional tungstenite errors to GremlinError::WebSocketAsync

* Arc tungstenite::Error into WebSocketAsync error to maintain Async error enum back to caller

* Expose healthcheck interval setting on async connection pool

* Formatting

* Map mobc pool errors to type that would invalidate connection

* Exploratory logging

* Added uuid to connection instance logging

* Update mobc and make its idle connection behavior the same as the rd2d sync pool

* 0.8 mobc does not treat async-std feature as mutually exclusive from tokio

* Include tokio/sync for mobc compilation in async-std-runtime feature

* Implemented None step

* Implemented iterator() method on returned remote stream to consume stream for only Null terminated traversals

* Trial connection multiplexing for non-credential configured clients

* Formatting

* Removed internal channel bounding

* Revert "Removed internal channel bounding"

This reverts commit 7500820.

* Revert "Formatting"

This reverts commit 8082d3b.

* Revert "Trial connection multiplexing for non-credential configured clients"

This reverts commit 5db2b58.

* Revert "Added uuid to connection instance logging"

This reverts commit a342d8c.

* Revert "Exploratory logging"

This reverts commit 03c24fe.

* Formatting

* Switched command to docker compose in coverage GH Action Workflow

* Corrected non-async tungstenite Error to using #[from]

* Added running cargo test with no async feature enabled
  • Loading branch information
criminosis authored Oct 16, 2024
1 parent d7ce0e4 commit d2505d3
Show file tree
Hide file tree
Showing 38 changed files with 1,408 additions and 92 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/coverage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
- uses: actions/checkout@v2
- name: Starting Gremlin Servers
run: |
docker-compose -f ./docker-compose/docker-compose.yaml up -d
docker compose -f ./docker-compose/docker-compose.yaml up -d
env:
GREMLIN_SERVER: ${{ matrix.gremlin-server }}

Expand Down
29 changes: 28 additions & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
- uses: actions/checkout@v2
- name: Starting Gremlin Servers
run: |
docker-compose -f ./docker-compose/docker-compose.yaml up -d
docker compose -f ./docker-compose/docker-compose.yaml up -d --wait --wait-timeout 90
env:
GREMLIN_SERVER: ${{ matrix.gremlin-server }}

Expand All @@ -41,13 +41,40 @@ jobs:
with:
command: fmt
args: --all -- --check
- name: Run cargo test with blocking client
if: matrix.gremlin-server == '3.5.7'
uses: actions-rs/cargo@v1
with:
command: test
args: --manifest-path gremlin-client/Cargo.toml
- name: Run cargo test with tokio
if: matrix.gremlin-server == '3.5.7'
uses: actions-rs/cargo@v1
with:
command: test
args: --manifest-path gremlin-client/Cargo.toml --features=tokio-runtime
- name: Run cargo test with async-std
if: matrix.gremlin-server == '3.5.7'
uses: actions-rs/cargo@v1
with:
command: test
args: --manifest-path gremlin-client/Cargo.toml --features=async-std-runtime
# MergeV as a step doesn't exist in 3.5.x, so selectively run those tests
- name: Run cargo test with blocking client
if: matrix.gremlin-server != '3.5.7'
uses: actions-rs/cargo@v1
with:
command: test
args: --manifest-path gremlin-client/Cargo.toml --features=merge_tests
- name: Run cargo test with tokio
if: matrix.gremlin-server != '3.5.7'
uses: actions-rs/cargo@v1
with:
command: test
args: --manifest-path gremlin-client/Cargo.toml --features=tokio-runtime,merge_tests
- name: Run cargo test with async-std
if: matrix.gremlin-server != '3.5.7'
uses: actions-rs/cargo@v1
with:
command: test
args: --manifest-path gremlin-client/Cargo.toml --features=async-std-runtime,merge_tests
13 changes: 13 additions & 0 deletions docker-compose/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,16 @@ services:
command : ["conf/gremlin-server-credentials.yaml"]
ports:
- "8183:8182"
janusgraph:
image: janusgraph/janusgraph:latest
environment:
- janusgraph.graph.set-vertex-id=true
- janusgraph.graph.allow-custom-vid-types=true
- JANUS_PROPS_TEMPLATE=inmemory
ports:
- "8184:8182"
healthcheck:
test: ["CMD", "bin/gremlin.sh", "-e", "scripts/remote-connect.groovy"]
interval: 10s
timeout: 30s
retries: 3
7 changes: 4 additions & 3 deletions gremlin-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@ readme = "README.md"
[features]

default = []
merge_tests = []



async_gremlin = ["futures","mobc","async-tungstenite","async-trait","url","pin-project-lite"]

async_std = ["async-std-runtime"]
tokio-runtime = ["async_gremlin","tokio","mobc/tokio","async-tungstenite/tokio-runtime","async-tungstenite/tokio-native-tls","tokio-native-tls","tokio-stream"]
async-std-runtime = ["async_gremlin","async-std","async-tungstenite/async-std-runtime","async-tungstenite/async-tls","mobc/async-std","async-tls","rustls","webpki"]
tokio-runtime = ["async_gremlin","tokio","async-tungstenite/tokio-runtime","async-tungstenite/tokio-native-tls","tokio-native-tls","tokio-stream"]
async-std-runtime = ["async_gremlin","async-std","async-tungstenite/async-std-runtime","async-tungstenite/async-tls","tokio/sync", "mobc/async-std","async-tls","rustls","webpki"]

derive = ["gremlin-derive"]

Expand Down Expand Up @@ -57,7 +58,7 @@ thiserror = "1.0.20"



mobc = {version = "0.7", optional = true, default-features=false, features = ["unstable"] }
mobc = {version = "0.8", optional = true }
url = {version = "2.1.0", optional = true}
futures = { version = "0.3.1", optional = true}
pin-project-lite = { version = "0.2", optional = true}
Expand Down
3 changes: 3 additions & 0 deletions gremlin-client/src/aio/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ impl GremlinClient {
let pool = Pool::builder()
.get_timeout(opts.pool_get_connection_timeout)
.max_open(pool_size as u64)
.health_check_interval(opts.pool_healthcheck_interval)
//Makes max idle connections equal to max open, matching the behavior of the sync pool r2d2
.max_idle(0)
.build(manager);

Ok(GremlinClient {
Expand Down
20 changes: 18 additions & 2 deletions gremlin-client/src/aio/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ mod tokio_use {
pub use tokio_native_tls::TlsStream;
}

use futures::TryFutureExt;
#[cfg(feature = "tokio-runtime")]
use tokio_use::*;

Expand Down Expand Up @@ -144,6 +145,7 @@ impl Conn {
tls::connector(&opts),
websocket_config,
)
.map_err(|e| Arc::new(e))
.await?
};
#[cfg(feature = "tokio-runtime")]
Expand All @@ -153,6 +155,7 @@ impl Conn {
tls::connector(&opts),
websocket_config,
)
.map_err(|e| Arc::new(e))
.await?
};

Expand Down Expand Up @@ -190,6 +193,18 @@ impl Conn {
.await
.expect("It should contain the response")
.map(|r| (r, receiver))
.map_err(|e| {
//If there's been an websocket layer error, mark the connection as invalid
match e {
GremlinError::WebSocket(_)
| GremlinError::WebSocketAsync(_)
| GremlinError::WebSocketPoolAsync(_) => {
self.valid = false;
}
_ => {}
}
e
})
}

pub fn is_valid(&self) -> bool {
Expand Down Expand Up @@ -222,7 +237,7 @@ fn sender_loop(
if let Err(e) = sink.send(Message::Binary(msg.2)).await {
let mut sender = guard.remove(&msg.1).unwrap();
sender
.send(Err(GremlinError::from(e)))
.send(Err(GremlinError::from(Arc::new(e))))
.await
.expect("Failed to send error");
}
Expand Down Expand Up @@ -257,8 +272,9 @@ fn receiver_loop(
match stream.next().await {
Some(Err(error)) => {
let mut guard = requests.lock().await;
let error = Arc::new(error);
for s in guard.values_mut() {
match s.send(Err(GremlinError::from(&error))).await {
match s.send(Err(error.clone().into())).await {
Ok(_r) => {}
Err(_e) => {}
}
Expand Down
15 changes: 0 additions & 15 deletions gremlin-client/src/aio/error.rs

This file was deleted.

1 change: 0 additions & 1 deletion gremlin-client/src/aio/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ pub(crate) mod connection;
pub(crate) mod pool;
mod result;

mod error;
pub(crate) mod process;
pub use client::GremlinClient;
pub use process::traversal::AsyncTerminator;
Expand Down
12 changes: 12 additions & 0 deletions gremlin-client/src/aio/process/traversal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::GremlinResult;
use core::task::Context;
use core::task::Poll;
use futures::Stream;
use futures::StreamExt;
use std::marker::PhantomData;
use std::pin::Pin;

Expand All @@ -29,6 +30,17 @@ impl<T> RemoteTraversalStream<T> {
}
}
}

impl RemoteTraversalStream<crate::structure::Null> {
pub async fn iterate(&mut self) -> GremlinResult<()> {
while let Some(response) = self.next().await {
//consume the entire stream, returning any errors
response?;
}
Ok(())
}
}

impl<T: FromGValue> Stream for RemoteTraversalStream<T> {
type Item = GremlinResult<T>;

Expand Down
14 changes: 13 additions & 1 deletion gremlin-client/src/connection.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{net::TcpStream, time::Duration};
use std::{net::TcpStream, sync::Arc, time::Duration};

Check warning on line 1 in gremlin-client/src/connection.rs

View workflow job for this annotation

GitHub Actions / build_and_test (ubuntu-latest, stable, 3.6.5)

unused import: `sync::Arc`

Check warning on line 1 in gremlin-client/src/connection.rs

View workflow job for this annotation

GitHub Actions / build_and_test (ubuntu-latest, stable, 3.6.5)

unused import: `sync::Arc`

Check warning on line 1 in gremlin-client/src/connection.rs

View workflow job for this annotation

GitHub Actions / build_and_test (ubuntu-latest, stable, 3.6.5)

unused import: `sync::Arc`

Check warning on line 1 in gremlin-client/src/connection.rs

View workflow job for this annotation

GitHub Actions / build_and_test (ubuntu-latest, stable, 3.5.7)

unused import: `sync::Arc`

Check warning on line 1 in gremlin-client/src/connection.rs

View workflow job for this annotation

GitHub Actions / build_and_test (ubuntu-latest, stable, 3.5.7)

unused import: `sync::Arc`

Check warning on line 1 in gremlin-client/src/connection.rs

View workflow job for this annotation

GitHub Actions / build_and_test (ubuntu-latest, stable, 3.5.7)

unused import: `sync::Arc`

use crate::{GraphSON, GremlinError, GremlinResult};
use native_tls::TlsConnector;
Expand Down Expand Up @@ -120,6 +120,16 @@ impl ConnectionOptionsBuilder {
self
}

/// Only applicable to async client. By default a connection is checked on each return to the pool (None)
/// This allows setting an interval of how often it is checked on return.
pub fn pool_healthcheck_interval(
mut self,
pool_healthcheck_interval: Option<Duration>,
) -> Self {
self.0.pool_healthcheck_interval = pool_healthcheck_interval;
self
}

/// Both the sync and async pool providers use a default of 30 seconds,
/// Async pool interprets `None` as no timeout. Sync pool maps `None` to the default value
pub fn pool_connection_timeout(mut self, pool_connection_timeout: Option<Duration>) -> Self {
Expand Down Expand Up @@ -170,6 +180,7 @@ pub struct ConnectionOptions {
pub(crate) host: String,
pub(crate) port: u16,
pub(crate) pool_size: u32,
pub(crate) pool_healthcheck_interval: Option<Duration>,
pub(crate) pool_get_connection_timeout: Option<Duration>,
pub(crate) credentials: Option<Credentials>,
pub(crate) ssl: bool,
Expand Down Expand Up @@ -254,6 +265,7 @@ impl Default for ConnectionOptions {
port: 8182,
pool_size: 10,
pool_get_connection_timeout: Some(Duration::from_secs(30)),
pool_healthcheck_interval: None,
credentials: None,
ssl: false,
tls_options: None,
Expand Down
16 changes: 15 additions & 1 deletion gremlin-client/src/conversion.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
process::traversal::Bytecode,
structure::{TextP, P as Predicate},
structure::{Null, TextP, P as Predicate},
Edge, GKey, GValue, GremlinError, GremlinResult, IntermediateRepr, List, Map, Metric, Path,
Property, Token, TraversalExplanation, TraversalMetrics, Vertex, VertexProperty, GID,
};
Expand Down Expand Up @@ -134,9 +134,23 @@ impl_from_gvalue!(IntermediateRepr, GValue::IntermediateRepr);
impl_from_gvalue!(chrono::DateTime<chrono::Utc>, GValue::Date);
impl_from_gvalue!(Traverser, GValue::Traverser);

impl FromGValue for Null {
fn from_gvalue(v: GValue) -> GremlinResult<Self> {
match v {
GValue::Null => Ok(crate::structure::Null {}),
_ => Err(GremlinError::Cast(format!(
"Cannot convert {:?} to {}",
v,
stringify!($t)
))),
}
}
}

impl FromGValue for GKey {
fn from_gvalue(v: GValue) -> GremlinResult<GKey> {
match v {
GValue::Direction(d) => Ok(GKey::Direction(d)),
GValue::String(s) => Ok(GKey::String(s)),
GValue::Token(s) => Ok(GKey::String(s.value().clone())),
GValue::Vertex(s) => Ok(GKey::Vertex(s)),
Expand Down
28 changes: 8 additions & 20 deletions gremlin-client/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;

Check warning on line 1 in gremlin-client/src/error.rs

View workflow job for this annotation

GitHub Actions / build_and_test (ubuntu-latest, stable, 3.6.5)

unused import: `std::sync::Arc`

Check warning on line 1 in gremlin-client/src/error.rs

View workflow job for this annotation

GitHub Actions / build_and_test (ubuntu-latest, stable, 3.5.7)

unused import: `std::sync::Arc`

use crate::structure::GValue;

use thiserror::Error;
Expand All @@ -12,7 +14,7 @@ pub enum GremlinError {
Generic(String),

#[error(transparent)]
WebSocket(tungstenite::error::Error),
WebSocket(#[from] tungstenite::Error),

#[error(transparent)]
Pool(#[from] r2d2::Error),
Expand All @@ -34,7 +36,10 @@ pub enum GremlinError {

#[cfg(feature = "async_gremlin")]
#[error(transparent)]
WebSocketAsync(#[from] async_tungstenite::tungstenite::Error),
WebSocketAsync(#[from] Arc<async_tungstenite::tungstenite::Error>),
#[cfg(feature = "async_gremlin")]
#[error(transparent)]
WebSocketPoolAsync(#[from] Arc<mobc::Error<GremlinError>>),
#[cfg(feature = "async_gremlin")]
#[error(transparent)]
ChannelSend(#[from] futures::channel::mpsc::SendError),
Expand All @@ -47,24 +52,7 @@ impl From<mobc::Error<GremlinError>> for GremlinError {
fn from(e: mobc::Error<GremlinError>) -> GremlinError {
match e {
mobc::Error::Inner(e) => e,
mobc::Error::BadConn => {
GremlinError::Generic(String::from("Async pool bad connection"))
}
mobc::Error::Timeout => GremlinError::Generic(String::from("Async pool timeout")),
other => GremlinError::WebSocketPoolAsync(Arc::new(other)),
}
}
}

#[cfg(not(feature = "async_gremlin"))]
impl From<tungstenite::error::Error> for GremlinError {
fn from(e: tungstenite::error::Error) -> GremlinError {
let error = match e {
tungstenite::error::Error::AlreadyClosed => tungstenite::error::Error::AlreadyClosed,
tungstenite::error::Error::ConnectionClosed => {
tungstenite::error::Error::ConnectionClosed
}
_ => return GremlinError::Generic(format!("Error from ws {}", e)),
};
GremlinError::WebSocket(error)
}
}
35 changes: 33 additions & 2 deletions gremlin-client/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ mod serializer_v3;

use crate::conversion::ToGValue;
use crate::process::traversal::{Order, Scope};
use crate::structure::{Cardinality, GValue, T};
use crate::structure::{Cardinality, Direction, GValue, Merge, T};
use serde_json::{json, Map, Value};
use std::string::ToString;

Expand Down Expand Up @@ -216,7 +216,38 @@ impl GraphSON {
"@value" : v
}))
}

(_, GValue::Merge(merge)) => {
let merge_option = match merge {
Merge::OnCreate => "onCreate",
Merge::OnMatch => "onMatch",
Merge::OutV => "outV",
Merge::InV => "inV",
};
Ok(json!({
"@type" : "g:Merge",
"@value" : merge_option
}))
}
(_, GValue::Direction(direction)) => {
let direction = match direction {
Direction::Out | Direction::From => "OUT",
Direction::In | Direction::To => "IN",
};
Ok(json!({
"@type" : "g:Direction",
"@value" : direction,
}))
}
(_, GValue::Column(column)) => {
let column = match column {
crate::structure::Column::Keys => "keys",
crate::structure::Column::Values => "values",
};
Ok(json!({
"@type" : "g:Column",
"@value" : column,
}))
}
(_, _) => panic!("Type {:?} not supported.", value),
}
}
Expand Down
Loading

0 comments on commit d2505d3

Please sign in to comment.