Skip to content

Commit

Permalink
first working rwlock implementation with redis.
Browse files Browse the repository at this point in the history
implements #14
  • Loading branch information
Heiss committed Oct 28, 2023
1 parent a5e5d50 commit 514e28f
Show file tree
Hide file tree
Showing 7 changed files with 163 additions and 105 deletions.
3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@ name = "micro_types"
version = "0.2.1"
edition = "2021"
readme = "README.md"
license = "MIT"
license-file = "LICENSE.md"
description = "Types for distributed systems"
repository = "https://github.com/rust-micro/types"
homepage = "https://github.com/rust-micro/types"
keywords = ["micro", "distributed", "type", "redis"]
categories = ["microservice", "type", "database"]
categories = ["network-programming", "data-structures", "database"]
documentation = "https://docs.rs/micro_types"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
Expand Down
98 changes: 47 additions & 51 deletions src/redis/rwlock/constants.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,27 @@
/// The read lock script.
///
/// Checks if the writer list besides the key is empty.
/// If it is, the uuid is added to the reader list and true is returned.
/// Checks if the writer list besides the key is empty or the lock is set.
/// If there are no writers, the uuid will be set as a reader and returns true.
/// Returns false otherwise.
///
/// Takes 2 arguments:
/// The timeout will be used for the reader lock. You need to retry to get the lock again if you want to keep it.
/// But if a writer comes in scope, the reader lock will be dropped after the timeout and you have to wait.
///
/// Takes 3 arguments:
/// 1. The key to lock
/// 2. The uuid of the lock
/// 3. The timeout in seconds
pub const READER_LOCK: &str = r#"
local writer_len = redis.call("LLEN", ARGV[1] .. ":writer")
if writer_len == 0 then
redis.call("RPUSH", ARGV[1] .. ":reader", ARGV[2])
return true
if redis.call("exists", ARGV[1] .. ":lock") == 1 then
return 0
end
return false
local res = redis.call("scan", 0, "match", ARGV[1] .. ":writer_waiting_list:*")
if next(res[2]) == nil then
redis.call("set", ARGV[1] .. ":reader_locks:" .. ARGV[2], 1, "ex", ARGV[3])
return 1
end
return 0
"#;

/// The read lock drop script.
Expand All @@ -24,31 +32,31 @@ return false
/// 1. The key to lock
/// 2. The uuid of the lock
pub const READER_LOCK_DROP: &str = r#"
local reader_len = redis.call("LLEN", ARGV[1] .. ":reader")
if reader_len > 0 then
redis.call("LREM", ARGV[1] .. ":reader", 1, ARGV[2])
return true
end
return false
redis.call("del", ARGV[1] .. ":reader_locks:" .. ARGV[2])
return 1
"#;

/// The writer lock script.
///
/// Checks if the reader and writer list besides the key are empty.
/// If they are, the uuid is added to the writer list and true is returned.
/// Checks if the reader list besides the key is empty.
/// Also add the uuid to the writer waiting list.
/// If there are no readers, the uuid will be set as the lock and returns true.
/// Returns false otherwise.
///
/// Takes 2 arguments:
/// The timeout will also be used for the waiting ticket, so if you wait too long, your intention will be dropped and reader can be acquired.
/// So be sure to request the lock again fast enough.
///
/// Takes 3 arguments:
/// 1. The key to lock
/// 2. The uuid of the lock
/// 3. The timeout in seconds for waiting
pub const WRITER_LOCK: &str = r#"
local reader_len = redis.call("LLEN", ARGV[1] .. ":reader")
local writer_len = redis.call("LLEN", ARGV[1] .. ":writer")
if reader_len == 0 and writer_len == 0 then
redis.call("RPUSH", ARGV[1] .. ":writer", ARGV[2])
return true
redis.call("setex", ARGV[1] .. ":writer_waiting_list:" .. ARGV[2], ARGV[3], 1)
if redis.call("exists", ARGV[1] .. ":lock") == 1 then
return 0
end
return false
return redis.call("set", ARGV[1] .. ":lock", ARGV[2], "nx", "ex", ARGV[3])
"#;

/// The writer lock drop script.
Expand All @@ -59,12 +67,11 @@ return false
/// 1. The key to lock
/// 2. The uuid of the lock
pub const WRITER_LOCK_DROP: &str = r#"
local writer_len = redis.call("LLEN", ARGV[1] .. ":writer")
if writer_len > 0 then
redis.call("LREM", ARGV[1] .. ":writer", 1, ARGV[2])
return true
redis.call("del", ARGV[1] .. ":writer_waiting_list:" .. ARGV[2])
if redis.call("get", ARGV[1] .. ":lock") == ARGV[2] then
redis.call("del", ARGV[1] .. ":lock")
end
return false
return 1
"#;

/// The uuid script.
Expand All @@ -74,48 +81,37 @@ return false
/// Takes 1 argument:
/// 1. The key to lock
pub const UUID_SCRIPT: &str = r#"
redis.call("INCR", ARGV[1] .. ":uuid")
return redis.call("GET", ARGV[1] .. ":uuid")
return redis.call("INCR", ARGV[1] .. ":lock_counter")
"#;

/// The read script.
///
/// Reads the value from the key, only if the uuid is in reader list or if it is the single entry in the writer list.
/// Reads the value from the key, only if the uuid is in reader list or if the lock is equal to uuid.
///
/// Takes 2 argument:
/// 1. The key to read
/// 2. The uuid of the lock
pub const READ_SCRIPT: &str = r#"
local function contains(table, val)
for i=1,#table do
if table[i] == val then
return true
end
end
return false
pub const LOAD_SCRIPT: &str = r#"
if redis.call("get", ARGV[1] .. ":lock") == ARGV[2] then
return redis.call("get", ARGV[1])
end
local readers = redis.call("LRANGE", ARGV[1] .. ":reader" , 0, -1)
local writers = redis.call("LRANGE", ARGV[1] .. ":writer" , 0, -1)
if contains(readers, ARGV[2]) or (#writers == 1 and writers[1] == ARGV[2]) then
return redis.call("GET", ARGV[1])
if redis.call("exists", ARGV[1] .. ":reader_locks:" .. ARGV[2]) then
return redis.call("get", ARGV[1])
end
"#;

/// The store script.
///
/// Stores the value to the key, only if the uuid is in writer list and the list is only one.
/// Stores the value to the key, only if the uuid is in lock.
///
/// Takes 3 arguments:
/// 1. The key to store
/// 2. The uuid of the lock
/// 3. The value to store
pub const STORE_SCRIPT: &str = r#"
local writers = redis.call("LRANGE", ARGV[1] .. ":writer" , 0, -1)
if #writers == 1 and writers[1] == ARGV[2] then
redis.call("SET", ARGV[1], ARGV[3])
return true
if redis.call("get", ARGV[1] .. ":lock") == ARGV[2] then
redis.call("set", ARGV[1], ARGV[3])
return 1
end
return false
return 0
"#;
2 changes: 2 additions & 0 deletions src/redis/rwlock/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@ pub enum RwLockError {
StillReader,
#[error("The lock could not be dropped.")]
LockNotDroppable,
#[error("The lock is expired. Failed UUID: {0} ")]
LockExpired(usize),
}
70 changes: 29 additions & 41 deletions src/redis/rwlock/lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use super::RwLockReadGuard;
use super::RwLockWriteGuard;
use crate::redis::rwlock::constants::{READER_LOCK, UUID_SCRIPT, WRITER_LOCK};
use crate::redis::{Generic, LockError};
use redis::Connection;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::ops::{Deref, DerefMut};
Expand Down Expand Up @@ -39,8 +40,10 @@ use std::ops::{Deref, DerefMut};
/// // only one writer lock can be held, however
/// {
/// let mut write1 = lock.write().unwrap();
/// write1.store(2);
/// write1.store(2).unwrap();
/// assert_eq!(*write1, 2);
/// // the next line is not allowed, because it deadlocks.
/// //let mut _ = lock.write().unwrap();
/// } // write lock is dropped here
///
/// // look, you can read it again
Expand Down Expand Up @@ -72,18 +75,15 @@ use std::ops::{Deref, DerefMut};
/// ```
pub struct RwLock<T> {
pub(crate) data: Generic<T>,
writer_wanted: bool,
pub(crate) conn: Option<redis::Connection>,

Check warning on line 78 in src/redis/rwlock/lock.rs

View workflow job for this annotation

GitHub Actions / clippy

field `conn` is never read

warning: field `conn` is never read --> src/redis/rwlock/lock.rs:78:16 | 76 | pub struct RwLock<T> { | ------ field in this struct 77 | pub(crate) data: Generic<T>, 78 | pub(crate) conn: Option<redis::Connection>, | ^^^^ | = note: `#[warn(dead_code)]` on by default
}

impl<T> RwLock<T>
where
T: Serialize + DeserializeOwned,
{
pub fn new(data: Generic<T>) -> Self {
Self {
data,
writer_wanted: false,
}
Self { data, conn: None }
}

/// Creates a new RwLock Reader.
Expand All @@ -92,18 +92,9 @@ where
/// If there is a writer lock, this function blocks until the writer lock is dropped.
/// Also if there is a writer locks waiting to be acquired, this function blocks until the writer lock is acquired and dropped.
pub fn read(&self) -> Result<RwLockReadGuard<T>, LockError> {
let uuid = self.generate_uuid();
loop {
// small performance improvement, because if there is a local writer lock wanted, we don't need to check the remote writer lock
if self.writer_wanted {
continue;
}
let res = self.execute_script(&uuid, READER_LOCK);
if res {
break;
}
}
Ok(RwLockReadGuard::new(self, uuid))
let mut conn = self.client.clone().get_connection().unwrap();
let uuid = self.acquire_via_script(READER_LOCK, &mut conn);
Ok(RwLockReadGuard::new(self, uuid, conn))
}

/// Creates a new RwLock Writer.
Expand All @@ -112,32 +103,30 @@ where
/// If there is a reader lock, this function blocks until the reader lock is dropped.
/// The acquiring writer lock has priority over any waiting reader lock.
pub fn write(&mut self) -> Result<RwLockWriteGuard<T>, LockError> {
self.writer_wanted = true;
let uuid = self.generate_uuid();
loop {
let res = self.execute_script(&uuid, WRITER_LOCK);
if res {
break;
}
}
self.writer_wanted = false;
Ok(RwLockWriteGuard::new(self, uuid))
let mut conn = self.client.clone().get_connection().unwrap();
let uuid = self.acquire_via_script(WRITER_LOCK, &mut conn);
Ok(RwLockWriteGuard::new(self, uuid, conn))
}

fn execute_script(&self, uuid: &usize, script: &str) -> bool {
let mut conn = self.data.client.get_connection().unwrap();
redis::Script::new(script)
.arg(&self.data.key)
.arg(uuid)
.invoke(&mut conn)
.unwrap()
fn acquire_via_script(&self, script: &str, conn: &mut Connection) -> usize {
let uuid = self.generate_uuid(conn);
let mut res = false;

while !res {
res = redis::Script::new(script)
.arg(&self.data.key)
.arg(uuid)
.arg(2)
.invoke(conn)
.unwrap();
}
uuid
}

pub(crate) fn generate_uuid(&self) -> usize {
let mut conn = self.data.client.get_connection().unwrap();
pub(crate) fn generate_uuid(&self, conn: &mut Connection) -> usize {
redis::Script::new(UUID_SCRIPT)
.arg(&self.data.key)
.invoke(&mut conn)
.invoke(conn)
.unwrap()
}
}
Expand Down Expand Up @@ -177,7 +166,7 @@ mod tests {
{
// only one writer lock can be held, however
let mut write = lock.write().unwrap();
write.store(2);
write.store(2).unwrap();
assert_eq!(*write, 2);
}
// look, you can read it again
Expand All @@ -193,8 +182,8 @@ mod tests {
{
let _ = ManuallyDrop::new(lock.read().unwrap());
}
eprintln!("1");
// This should not deadlocked forever
// FIXME: This test blocks Pull request, because if a reader lock gets not dropped correctly. The whole systems blocks indefinitely.
{
let _ = lock.write().unwrap();
}
Expand All @@ -203,7 +192,6 @@ mod tests {
let _ = ManuallyDrop::new(lock.write().unwrap());
}
// This should not deadlocked forever
// FIXME: This tests the same as above, but for writer locks.
{
let _ = lock.read().unwrap();
}
Expand Down
1 change: 0 additions & 1 deletion src/redis/rwlock/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,3 @@ pub use error::RwLockError;
pub use lock::RwLock;
pub use reader::RwLockReadGuard;
pub use writer::RwLockWriteGuard;

39 changes: 34 additions & 5 deletions src/redis/rwlock/reader.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::lock::RwLock;
use crate::redis::rwlock::constants::READER_LOCK_DROP;
use crate::redis::rwlock::constants::{LOAD_SCRIPT, READER_LOCK_DROP};
use crate::redis::Generic;
use serde::de::DeserializeOwned;
use serde::Serialize;
Expand All @@ -8,14 +8,44 @@ use std::ops::Deref;
pub struct RwLockReadGuard<'a, T> {
lock: &'a RwLock<T>,
uuid: usize,
conn: redis::Connection,
cache: Option<T>,
}

impl<'a, T> RwLockReadGuard<'a, T>
where
T: Serialize + DeserializeOwned,
{
pub(crate) fn new(lock: &'a RwLock<T>, uuid: usize) -> Self {
Self { lock, uuid }
pub(crate) fn new(lock: &'a RwLock<T>, uuid: usize, conn: redis::Connection) -> Self {
Self {
lock,
uuid,
conn,
cache: None,
}
}

/// Loads the value from Redis.
/// This function blocks until the value is loaded.
/// Shadows the load operation of the guarded value.
pub fn acquire(&mut self) -> &T {
self.cache = self.try_get();
self.cache.as_ref().unwrap()
}

fn try_get(&mut self) -> Option<T> {
let script = redis::Script::new(LOAD_SCRIPT);
let result: Option<String> = script
.arg(&self.lock.data.key)
.arg(self.uuid)
.invoke(&mut self.conn)
.expect("Failed to load value. You should not see this!");
let result = result?;

if result == "nil" {
return None;
}
Some(serde_json::from_str(&result).expect("Failed to deserialize value"))
}
}

Expand All @@ -29,8 +59,7 @@ impl<'a, T> Deref for RwLockReadGuard<'a, T> {

impl<T> Drop for RwLockReadGuard<'_, T> {
fn drop(&mut self) {
let client = self.lock.data.client.clone();
let mut conn = client.get_connection().unwrap();
let mut conn = self.client.get_connection().unwrap();
let _: () = redis::Script::new(READER_LOCK_DROP)
.arg(&self.lock.data.key)
.arg(self.uuid)
Expand Down
Loading

0 comments on commit 514e28f

Please sign in to comment.