Skip to content

Commit

Permalink
Improve and support single
Browse files Browse the repository at this point in the history
  • Loading branch information
aamalev committed Oct 14, 2023
1 parent 4860604 commit 8a5a821
Show file tree
Hide file tree
Showing 12 changed files with 980 additions and 180 deletions.
15 changes: 10 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "redis-rs"
version = "0.3.0"
version = "0.4.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
Expand All @@ -9,7 +9,12 @@ name = "redis_rs"
crate-type = ["cdylib"]

[dependencies]
deadpool-redis-cluster = { git = "https://github.com/bikeshedder/deadpool", version = "0.1.0" }
pyo3 = { version = "0.17.3", features = ["extension-module"] }
pyo3-asyncio = { version = "0.17.0", features = ["tokio-runtime"] }
redis_cluster_async = "0.7.0"
async-trait = "0.1.73"
redis = { version = "0.23.3", features = ["tokio-comp", "cluster-async"] }
pyo3 = { version = "0.19.2", features = ["extension-module"] }
pyo3-asyncio = { version = "0.19.0", features = ["tokio-runtime"] }
tokio = "1.32.0"
bb8 = "0.8.1"
bb8-redis = "0.13.1"
bb8-redis-cluster = "0.1.1"
deadpool-redis-cluster = "0.1.0"
44 changes: 29 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,36 @@ Python wrapper for redis-rs and redis_cluster_async
import asyncio
import redis_rs

x = redis_rs.create_client([
"redis://redis-node001",
"redis://redis-node002",
], 1)

async def main():
print(await x.execute(b"hset", "fooh", "a", b"asdfg"))
print(await x.fetch_int("hset", "fooh", "b", 11234567890))
print(await x.fetch_int("hget", "fooh", "b"))
print(await x.fetch_str("hget", "fooh", "a"))
print(await x.fetch_dict("hgetall", "fooh"))
print(await x.execute("cluster", "nodes"))
print(await x.fetch_bytes("get", "foo"))
print(await x.fetch_int("get", "foo"))
print(await x.execute("hgetall", "fooh"))
print(await x.execute("zadd", "fooz", 1.5678, "b"))
print(await x.fetch_scores("zrange", "fooz", 0, -1, "WITHSCORES"))
async with redis_rs.create_client(
"redis://redis-node001",
"redis://redis-node002",
max_size=1,
) as x:
print(await x.execute(b"HSET", "fooh", "a", b"asdfg"))
print(await x.fetch_int("HSET", "fooh", "b", 11234567890))
print(await x.fetch_int("HGET", "fooh", "b"))
print(await x.fetch_str("HGET", "fooh", "a"))
print(await x.fetch_dict("HGETALL", "fooh"), encoding="utf-8")
print(await x.execute("CLUSTER", "NODES"))
print(await x.fetch_bytes("GET", "foo"))
print(await x.fetch_int("GET", "foo"))
print(await x.execute("HGETALL", "fooh"))
print(await x.execute("ZADD", "fooz", 1.5678, "b"))
print(await x.fetch_scores("ZRANGE", "fooz", 0, -1, "WITHSCORES"))
print(x.status())

stream = "redis-rs"
print("x.xadd", await x.xadd(stream, "*", {"a": "1234", "d": 4567}))
print("x.xadd", await x.xadd(stream, items={"a": "1234", "d": 4567}))
print("x.xadd", await x.xadd(stream, {"a": "1234", "d": 4567}))
print("x.xadd", await x.xadd(stream, "*", "a", "1234", "d", 4567))
print("x.xadd", await x.xadd(stream, "a", "1234", "d", 4567))
print("xadd", await x.fetch_str("XADD", stream, "*", "a", "1234", "d", 4567))
print("xread", await x.execute("XREAD", "STREAMS", stream, 0))
print("xread", await x.fetch_dict("XREAD", "STREAMS", stream, 0, encoding="int"))
print("x.xread", await x.xread({stream: 0}, encoding="int"))


asyncio.run(main())
Expand Down
60 changes: 60 additions & 0 deletions src/bb8_cluster.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
use std::collections::HashMap;

use crate::{
error,
pool::{Connection, Pool},
types,
};
use async_trait::async_trait;
use bb8_redis_cluster::RedisConnectionManager;
use redis::aio::ConnectionLike;

pub struct BB8Cluster {
pool: bb8::Pool<RedisConnectionManager>,
}

impl BB8Cluster {
pub async fn new(initial_nodes: Vec<String>, max_size: u32) -> Self {
let manager = RedisConnectionManager::new(initial_nodes).unwrap();
let pool = bb8::Pool::builder()
.max_size(max_size)
.build(manager)
.await
.unwrap();
Self { pool }
}
}

#[async_trait]
impl Pool for BB8Cluster {
async fn get_connection(&self) -> Result<Connection, error::RedisError> {
let c = self.pool.get().await?;
Ok(Connection {
c: Box::new(c.to_owned()),
})
}

async fn execute(
&self,
cmd: &str,
args: Vec<types::Arg>,
) -> Result<redis::Value, error::RedisError> {
let mut conn = self.pool.get().await?;
let value = conn.req_packed_command(redis::cmd(cmd).arg(&args)).await?;
Ok(value)
}

fn status(&self) -> HashMap<&str, redis::Value> {
let mut result = HashMap::new();
result.insert("closed", redis::Value::Int(0));
result.insert("impl", redis::Value::Data("bb8_cluster".into()));
result.insert("cluster", redis::Value::Int(1));
let state = self.pool.state();
result.insert("connections", redis::Value::Int(state.connections.into()));
result.insert(
"idle_connections",
redis::Value::Int(state.idle_connections.into()),
);
result
}
}
61 changes: 61 additions & 0 deletions src/bb8_single.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
use std::collections::HashMap;

use crate::{
error,
pool::{Connection, Pool},
types,
};
use async_trait::async_trait;
use bb8_redis::RedisMultiplexedConnectionManager;
use redis::aio::ConnectionLike;

pub struct BB8Pool {
pool: bb8::Pool<RedisMultiplexedConnectionManager>,
}

impl BB8Pool {
pub async fn new(initial_nodes: Vec<String>, max_size: u32) -> Self {
let addr = initial_nodes.get(0).unwrap().to_string();
let manager = RedisMultiplexedConnectionManager::new(addr).unwrap();
let pool = bb8::Pool::builder()
.max_size(max_size)
.build(manager)
.await
.unwrap();
Self { pool }
}
}

#[async_trait]
impl Pool for BB8Pool {
async fn get_connection(&self) -> Result<Connection, error::RedisError> {
let c = self.pool.get().await?;
Ok(Connection {
c: Box::new(c.to_owned()),
})
}

async fn execute(
&self,
cmd: &str,
args: Vec<types::Arg>,
) -> Result<redis::Value, error::RedisError> {
let mut conn = self.pool.get().await?;
let value = conn.req_packed_command(redis::cmd(cmd).arg(&args)).await?;
Ok(value)
}

fn status(&self) -> HashMap<&str, redis::Value> {
let mut result = HashMap::new();
result.insert("impl", redis::Value::Data("bb8_redis".into()));
result.insert("closed", redis::Value::Int(0));
result.insert("cluster", redis::Value::Int(0));
let state = self.pool.state();
result.insert("connections", redis::Value::Int(state.connections.into()));
result.insert(
"idle_connections",
redis::Value::Int(state.idle_connections.into()),
);
result
}
}
Loading

0 comments on commit 8a5a821

Please sign in to comment.