-*- mode: Org; fill-column: 80; coding: utf-8; -*-
Asynchronous Data Retrieval (or distributed database)
3 days
CREATE TABLE data_1 (
id INT PRIMARY KEY,
name VARCHAR(255)
);
INSERT INTO data_1 (id, name) VALUES (1, 'Test 1'), (2, 'Test 2');
SELECT * from data_1
Task: RESTful access point to 3 databases with distributed table by ID field. AP should get records from all tables and return sorted result.
- ID distributed: 1) 1-10, 31-40 2) 11-20, 41-50 3) 21-30, 51-60
- error from any of source should be ignored and interpreted as missing data.
- timeout is error (2 seconds)
- test all logic, use mocks for db connection
- select all sources and return the data sorted by ID
Suggestion:
- Reactive paradigm - declarative : a=b+c - a will be updated when b or c changed. Based on streams. employs reactive pipelines, where data flows through a series of transformations and operators.
- implement
- with SQLAlchemy,
- with asyncio,
- cover as much as possible with tests.
We will not write tests before implementation, because we implement this task for the first time.
We will use Redis, just because, and because it is open-source.
We will try to use available mechanics for Redis.
Redis have “Distributed Lock Manager” for locks and built-in sharding by “hash slot” assingned to cluster nodes. It is impossible to check wich key goes to which node.
But it possible to assign rows to same node with HashTag.
- user-profile:{1234} - CRC16(‘1234’) mod 16384 -> slot 0
- user-session:{1234} - CRC16(‘1234’) mod 16384 -> slot 0
We will use https://github.com/joanvila/aioredlock that implement locking, all we need is to add our patch to treat “error from source as missing data”.
- this is not-official implementation of Redlock officially proposed agorithm for Distributed Lock Manager. https://redis.io/docs/latest/develop/use/patterns/distributed-locks/
https://peps.python.org/pep-0249/
- It gets the current time in milliseconds.
- It tries to acquire the lock in all the N instances sequentially, using the same key name and random value in all the instances. During step 2, when setting the lock in each instance, the client uses a timeout which is small compared to the total lock auto-release time in order to acquire it. For example if the auto-release time is 10 seconds, the timeout could be in the ~ 5-50 milliseconds range. This prevents the client from remaining blocked for a long time trying to talk with a Redis node which is down: if an instance is not available, we should try to talk with the next instance ASAP.
- The client computes how much time elapsed in order to acquire the lock, by subtracting from the current time the timestamp obtained in step 1. If and only if the client was able to acquire the lock in the majority of the instances (at least 3), and the total time elapsed to acquire the lock is less than lock validity time, the lock is considered to be acquired.
- If the lock was acquired, its validity time is considered to be the initial validity time minus the time elapsed, as computed in step 3.
- If the client failed to acquire the lock for some reason (either it was not able to lock N/2+1 instances or the validity time is negative), it will try to unlock all the instances (even the instances it believed it was not able to lock).
I have SHARDS dictionary with ranges. Help me to write function in Python that look for key in fastest way. Firstly list available containers and storage types that the best suited for range lookup.
SHARDS={1:((1, 10), (31, 40)),
2:((11, 20), (41, 50)),
3:((21, 30), (51, 60))}
Data Structures:
- Sorted List: For efficient range lookups, a sorted list can be used to store the ranges. This allows for binary search, which reduces the lookup time from O(n)O(n) to O(logn)O(logn).
- Bisect Module: Python's bisect module provides functions to perform binary search on sorted lists, which is ideal for range lookups.
Algorithm:
- Binary Search: Use binary search to find the appropriate range in the sorted list.
...
I have SHARDS dictionary with ranges. Help me to write function in Python that look for key in fastest way. Firstly list available containers and storage types that the best suited for range lookup.
SHARDS={1:((1, 10), (31, 40)),
2:((11, 20), (41, 50)),
3:((21, 30), (51, 60))}
I asked another LLM, it give me wrong answer to use bisect.bisect_right for that, there was error that this function can compare only same type to each and connot compare int and typle.
Here is a Python function that uses a sorted list to look up keys in the SHARDS dictionary:
def find_shard_id(SHARDS, value):
# Flatten the ranges into a list of tuples (start, end, shard_id)
ranges = [(start, end, shard_id) for shard_id, ranges_list in SHARDS.items() for start, end in ranges_list]
# Sort the ranges by their start values
ranges.sort(key=lambda x: x[0])
# Perform a binary search on the sorted list
left, right = 0, len(ranges) - 1
while left <= right:
mid = (left + right) // 2
if ranges[mid][0] <= value <= ranges[mid][1]:
return ranges[mid][2]
elif value < ranges[mid][0]:
right = mid - 1
else:
left = mid + 1
# If no matching range is found, return None
return None
SHARDS = {
1: ((1, 10), (31, 40)),
2: ((11, 20), (41, 50)),
3: ((21, 30), (51, 60))
}
print(find_shard_id(SHARDS, 25)) # Output: 3
print(find_shard_id(SHARDS, 7)) # Output: 1
print(find_shard_id(SHARDS, 61)) # Output: None
3 1
execution path:
- self.execute_command(“KEYS”, pattern, **kwargs) - commands/core.py
- self._execute_command(node, *args, **kwargs) - redis/cluster.py
- get_connection(redis_node, *args, **kwargs)
- redis_node.connection_pool.get_connection(
- connection.connect() - redis/connection.py, line 282, line 1109
- redis/connection.py”, line 282
By default retry_on_timeout and socket_timeout is None in Connection class
RedisCluster(startup_nodes) use ClusterNode in constructor, which use default options for Redis class which should have one Connection object, but have redis_connection argument that can accept custom Connection class. ClusterNodes are kept in NodesManager and fetched with get_node_from_slot.
- file:/usr/lib/python3.12/site-packages/redis/cluster.py::1257
Help for Connector class:
To specify a retry policy for specific errors, first set
`retry_on_error` to a list of the error/s to retry on, then set
`retry` to a valid `Retry` object.
To retry on TimeoutError, `retry_on_timeout` can also be set to `True`.
ConnectionPool
- self._available_connections - already connected
- self.connection_kwargs - Connection parameters.
Why RedisCluster->Redis.ConnectionPool lost arguments?
Let’s explore construction:
- Redis - at construction create ConnectionPool(**kwargs)
- .connection_pool.connection_kwargs - kept
- ClusterNode with self.redis_connection
- .redis_connection.connection_pool.connection_kwargs - kept
- RedisCluster.nodes_manager = NodesManager(startup_nodes=startup_nodes)
- file:/usr/lib/python3.12/site-packages/redis/cluster.py::1304
- NodesManager.self.startup_nodes[startup_nodes.name] = startup_nodes,
- self.connection_kwargs=kwargs used as new connection parameters.
- self.initialize(), - here is self.startup_nodes is recreated
So Nodes keept in NodesManager and main step is initialize
- in every self.startup_nodes checked redis_connection and execute command “CLUSTER SLOTS”.
Solution: The reason was in dynamic_startup_nodes=True default paramter of RedisCluster.
redis.exceptions.ConnectionError: Error 111 connecting to 127.0.0.1:0. Connection refused.
The problem is that self._retries in Retries set to 0 by default, because Connection is not constructed with Redis parameters.
Path: Redis -> ClusterNode -> RedisCluster -> ConnectionPool -> Connection
- RedisCluster.nodes_manager(NodeManager).startup_node(ClusterNode).redis_connection(Redis).self.connection_pool
RedisCluster.execute_command try 3 times to self.nodes_manager.nodes_cache.values()
and loop over all RedisCluster.node_manager.cached_nodes. When attemts self.cluster_error_retry_attempts are running out Exception no longer suppresed.
First RedisCluster connect to first node and retrive other ports.
We can use
redis-cli --cluster call localhost:30001 KEYS "*"
why we have exception in
print(rc.keys(target_nodes=RedisCluster.ALL_NODES))
Two questions, where port changes to 0 and how to mitigate it and recheck.
0 is last node of self.nodes_manager.nodes_cache.values().
CLUSTER SLOTS (deprecated) return host as ‘’ and port as ‘0’ for disconnected node. This node then added to “nodes_cache”
during KEYS attempts we change request node if attempt fails.
To fix this we can add patch to NodeManager.initialize or NodeManager.get_random_node() and NodeManager.get_nodes().
We will fix get_random_node and get_nodes with monkey patch:
import random
def my_get_nodes(self):
# with filter if port = 0, for CLUSTER SLOTS with disconnected node
return list([v for v in self.nodes_manager.nodes_cache.values() if v.port != 0 ])
def my_get_random_node(self):
# with filter if port = 0, for CLUSTER SLOTS with disconnected node
l = list([v for v in self.nodes_manager.nodes_cache.values() if v.port != 0 ])
return random.choice(l)
RedisCluster.get_nodes = my_get_nodes
RedisCluster.get_random_node = my_get_random_node
Immediate: ConnectionRefusedError: [Errno 111] Connection refused
Connection path:
- self.execute_command(“SET”, *pieces, **options) RedisClusterCommands
- self._execute_command(node, *args, **kwargs) RedisCluster
- self.nodes_manager.get_node_from_slot(
- get_connection(redis_node, *args, **kwargs) RedisCluster
- redis_node.connection_pool.get_connection() RedisCluster
- Connection.connect() Connection
- self.retry.call_with_retry() Retry
- self._connect() Connection
Connection path “Redis init”:
- Redis(
- self.connection_pool.get_connection(“_”) - Redis
- connection.connect() - AbstractConnection
“localhost” minor error during “Redis init”:
redis.exceptions.ConnectionError: Error 97 connecting to localhost:30004. Address family not supported by protocol.
class NoBackoff uses 0 as a timeout when one of self._supported_errors was raised. To have timeout we should use ConstantBackoff(2) for 2 seconds.
redis.backoff.EqualJitterBackoff
Connections and Redis objects did not changed.
ConnectionPool.make_connection - show up self.connection_kwargs that is changed. at first reation. but after that ConnectionPool.connection_kwargs is the same.
RedisCluster._execute_command accept ClusterNode, that have wrong connection_kwargs
RedisCluster.execute_command calls RedisCluster._determine_nodes to get this ClusterNode
ClusterNoders created in _get_or_create_cluster_node
NodesManager._get_or_create_cluster_node called from NodesManager.initialize and uses tmp_nodes_cache variable
NodesManager recreate ClusterNode from result of command “CLUSTER SLOTS” on startup_nodes to be able to assing new ClusterNode.server_type.
To fix this we should reuse redis_connection from NodesManager.startup_nodes.
from redis.cluster import get_node_name
def my_get_or_create_cluster_node(self, host, port, role, tmp_nodes_cache):
node_name = get_node_name(host, port)
# check if startup node exist to get redis_connection from it
startup_node = self.startup_nodes.get(get_node_name(host, port))
# check if we already have this node in the tmp_nodes_cache
target_node = tmp_nodes_cache.get(node_name)
if target_node is None:
# before creating a new cluster node, check if the cluster node already
# exists in the current nodes cache and has a valid connection so we can
# reuse it
target_node = self.nodes_cache.get(node_name)
if target_node is None or target_node.redis_connection is None:
# create new cluster node for this cluster
target_node = ClusterNode(host, port, role,
redis_connection=startup_node)
if target_node.server_type != role:
target_node.server_type = role
RedisCluster._get_or_create_cluster_node = my_get_or_create_cluster_node
So we configure Connection timeout in:
- Redis arguments
- RedisCluster kwards is used for ClusterNodes that was known as disconnected. this keys should be added to REDIS_ALLOWED_KEYS to be able to propogate them through NodesManager to Connection.
RedisCluster argumets filtered at constructor in cleanup_kwargs, here fix:
redis.cluster.REDIS_ALLOWED_KEYS += (
"socket_connect_timeout",
"socket_timeout",
"retry_on_timeout",
"retry",
"retry_on_error",
"single_connection_client")
RedisCluster.execute_command query all nodes in cached_nods, we will suppress Exception and return empy result “[]” if ConnectionError occured.
from redis.exceptions import ConnectionError
orig_ec = RedisCluster._execute_command
def my_execute_command(self, target_node, *args, **kwargs):
try:
return orig_ec(self, target_node, *args, **kwargs)
except redis.exceptions.ConnectionError as e:
return []
RedisCluster._execute_command = my_execute_command
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
. ~/create-cluster.sh start
. ~/create-cluster.sh create
ps aux | grep redis
kill -s 9 4735
redis-cli -p 30001 set test1 value
ps aux | grep redis
/usr/bin/redis-cli -p 30001 cluster nodes 2>&1 | head -30
echo
/usr/bin/redis-cli -p 30002 cluster nodes 2>&1 | head -30
echo
/usr/bin/redis-cli -p 30003 cluster nodes 2>&1 | head -30
Add master node:
redis-cli --cluster add-node 127.0.0.1:30001 127.0.0.1:30004
Can not add master without resharding.
We need at least 3 master nodes running.
We have set NODES=5 to always have at least 3 master nodes.
. ~/create-cluster.sh stop
. ~/create-cluster.sh clean
. ~/create-cluster.sh clean-logs
. ~/create-cluster.sh start
. ~/create-cluster.sh create -f
/usr/bin/redis-cli -p 30001 cluster nodes 2>&1 | head -30
Get keys from all clusters:
redis-cli --cluster call localhost:30001 KEYS "*" 2>&1
from redis.cluster import RedisCluster as Redis
from redis.cluster import ClusterNode
def find_shard_id(SHARDS, value):
# Flatten the ranges into a list of tuples (start, end, shard_id)
ranges = [(start, end, shard_id) for shard_id, ranges_list in SHARDS.items() for start, end in ranges_list]
# Sort the ranges by their start values
ranges.sort(key=lambda x: x[0])
# Perform a binary search on the sorted list
left, right = 0, len(ranges) - 1
while left <= right:
mid = (left + right) // 2
if ranges[mid][0] <= value <= ranges[mid][1]:
return ranges[mid][2]
elif value < ranges[mid][0]:
right = mid - 1
else:
left = mid + 1
# If no matching range is found, return None
return None
SHARDS={1:((1, 10), (31, 40)),
2:((11, 20), (41, 50)),
3:((21, 30), (51, 60))}
nodes = [ClusterNode('localhost', pport) for pport in range(30001,30003)]
rc = Redis(startup_nodes=nodes, decode_responses=True,
socket_connect_timeout=2) # 2 seconds timeout
print(rc.ping(target_nodes=Redis.RANDOM))
# -- set:
sh = 1
id = 1
name = 'Test 1'
print(rc.set(f"data_1:{id}:{{{sh}}}", "Name1"))
sh = 2
id = 12
name = 'Test 2'
print(rc.set(f"data_1:{id}:{{{sh}}}", "Name2"))
# -- get one:
id = 1
sh = find_shard_id(SHARDS, id)
print(rc.get(f"data_1:{id}:{{{sh}}}"))
# -- get all:
print(rc.keys(target_nodes=Redis.ALL_NODES))
# -- get range (TODO):
# Sorted set at each shard + "get all"
True True True Name1 ['data_1:12:{2}', 'data_1:1:{1}']
redis-cli -p 30001 CLUSTER KEYSLOT 'data_1:1:{1}'
Master[3] : 30004
By default Redis Cluster nodes stop accepting queries if they detect there is at least an hash slot uncovered. So, just set the cluster-require-full-coverage option to no.
In create-cluster.sh::14 we added line:
ADDITIONAL_OPTIONS="cluster-require-full-coverage no". And recreated cluster.
For timout we set:
ADDITIONAL_OPTIONS="--repl-timeout 60”
Starting cluster, inserting, killing node, fetrching:
. ~/create-cluster.sh stop
. ~/create-cluster.sh clean
. ~/create-cluster.sh clean-logs
. ~/create-cluster.sh start
. ~/create-cluster.sh create -f
sleep 1
redis-cli -p 30004 set 'data_1:1:{1}' 'Name1'
redis-cli -p 30004 get 'data_1:1:{1}'
ps aux | grep 30004 | grep redis | tr -s ' ' | cut -f 2 -d ' ' | xargs kill -s 9
from redis.cluster import RedisCluster
from redis import Redis
from redis.cluster import ClusterNode
from redis.connection import Connection, ConnectionPool
from redis.retry import Retry
from redis.backoff import NoBackoff, ConstantBackoff
import redis.cluster
# ---- fix for ClusterNode->Redis->connection_kwargs
from redis.cluster import get_node_name
def my_get_or_create_cluster_node(self, host, port, role, tmp_nodes_cache):
node_name = get_node_name(host, port)
# check if startup node exist to get redis_connection from it
startup_node = self.startup_nodes.get(get_node_name(host, port))
# check if we already have this node in the tmp_nodes_cache
target_node = tmp_nodes_cache.get(node_name)
if target_node is None:
# before creating a new cluster node, check if the cluster node already
# exists in the current nodes cache and has a valid connection so we can
# reuse it
target_node = self.nodes_cache.get(node_name)
if target_node is None or target_node.redis_connection is None:
# create new cluster node for this cluster
target_node = ClusterNode(host, port, role,
redis_connection=startup_node)
if target_node.server_type != role:
target_node.server_type = role
RedisCluster._get_or_create_cluster_node = my_get_or_create_cluster_node
# ---- fix for case when cluster knows that node is down.
redis.cluster.REDIS_ALLOWED_KEYS += (
"socket_connect_timeout",
"socket_timeout",
"retry_on_timeout",
"retry",
"retry_on_error",
"single_connection_client")
nodes = []
for pport in (30001, 30002):
r = Redis(
# 'localhost', # there is a bug here
'127.0.0.1',
pport,
socket_connect_timeout=1,
socket_timeout=1, retry_on_timeout=False,
retry=Retry(ConstantBackoff(0), 0),
retry_on_error=[ConnectionRefusedError],
single_connection_client=True
)
cn = ClusterNode(
'localhost', pport,
redis_connection=r)
nodes.append(cn) # 2 seconds timeout
rc = RedisCluster(startup_nodes=nodes, decode_responses=True,
# fix for case when cluster knows that node is down:
socket_connect_timeout=2,
dynamic_startup_nodes=False,
socket_timeout=1, retry_on_timeout=False,
retry=Retry(ConstantBackoff(0), 0),
retry_on_error=[ConnectionRefusedError],
single_connection_client=True)
import time
start_time = time.time()
# print(rc.get(f"data_1:{id}:{{{sh}}}"))
print(rc.get('test1{'))
end_time = time.time()
print(f"Command executed in {end_time - start_time:.2f} seconds")
Error: “redis.exceptions.ClusterDownError: The cluster is down”
Several fixes was made.
Lets test that after 2.0 seconds if server is not reachable we assume keys are missing. For that we create 3 nodes cluster, add two keys to two nodes, kill one with key.
After that we we use unroutable IP ‘10.255.255.1’ to emulate “socket timeout”.
. ~/create-cluster.sh stop
. ~/create-cluster.sh clean
. ~/create-cluster.sh clean-logs
. ~/create-cluster.sh start
. ~/create-cluster.sh create -f
sleep 1
redis-cli -p 30001 set 'data_1:1:{11}' 'Name1'
redis-cli -p 30003 set 'data_1:3:{66}' 'Name3'
# kill one node
ps aux | grep 30003 | grep redis | tr -s ' ' | cut -f 2 -d ' ' | xargs kill -s 9
redis-cli --cluster call localhost:30001 KEYS "*"
from redis.cluster import RedisCluster
from redis import Redis
from redis.cluster import ClusterNode
import redis.cluster
from redis.connection import Connection, ConnectionPool
from redis.retry import Retry
from redis.backoff import NoBackoff, ConstantBackoff
# -- fix 127.0.0.1:0
import random
def my_get_nodes(self):
# with filter if port = 0, for CLUSTER SLOTS with disconnected node
return list([v for v in self.nodes_manager.nodes_cache.values() if v.port != 0 ])
def my_get_random_node(self):
# with filter if port = 0, for CLUSTER SLOTS with disconnected node
l = list([v for v in self.nodes_manager.nodes_cache.values() if v.port != 0 ])
return random.choice(l)
RedisCluster.get_nodes = my_get_nodes
RedisCluster.get_random_node = my_get_random_node
# --- fix Problem command fails if one of master node don't reply.
from redis.exceptions import ConnectionError, TimeoutError
orig_ec = RedisCluster._execute_command
def my_execute_command(self, target_node, *args, **kwargs):
try:
return orig_ec(self, target_node, *args, **kwargs)
except redis.exceptions.ConnectionError as e:
return []
RedisCluster._execute_command = my_execute_command
# --- test timeout
import time
orig_getc = ConnectionPool.get_connection
def getc(self, command_name: str, *keys, **options):
"Create a new connection"
start_time = time.time()
try:
c = orig_getc(self, command_name, keys, options)
except Exception as e:
print(e)
raise e
finally:
end_time = time.time()
print(f"Getc executed in {end_time - start_time:.2f} seconds")
return c
ConnectionPool.get_connection = getc
import redis.cluster
redis.cluster.REDIS_ALLOWED_KEYS += (
"socket_connect_timeout",
"socket_timeout",
"retry_on_timeout",
"retry",
"retry_on_error",
"single_connection_client")
nodes = []
for pport in (30001, 30002,30003):
if pport < 30002:
cn = ClusterNode(
# 'localhost',
'10.255.255.1',
pport
)
else:
cn = ClusterNode(
'localhost',
# '10.255.255.1',
pport
)
nodes.append(cn)
rc = RedisCluster(
startup_nodes=nodes, # decode_responses=True,
socket_connect_timeout=2,
dynamic_startup_nodes=False,
socket_timeout=1, retry_on_timeout=False,
retry=Retry(ConstantBackoff(2), 0),
# retry_on_error=[ConnectionRefusedError],
# single_connection_client=True
)
print("startup_nodes:")
[print (x) for x in rc.nodes_manager.startup_nodes]
print()
print("nodes_cache:")
[print (x) for x in rc.nodes_manager.nodes_cache]
print()
print(rc.keys(target_nodes=RedisCluster.ALL_NODES))
Timeout connecting to server Getc executed in 2.00 seconds Getc executed in 0.00 seconds Getc executed in 0.00 seconds Getc executed in 0.00 seconds startup_nodes: 10.255.255.1:30001 127.0.0.1:30002 127.0.0.1:30003 nodes_cache: 127.0.0.1:30001 127.0.0.1:30002 127.0.0.1:30003 Getc executed in 0.00 seconds Getc executed in 0.00 seconds Error 111 connecting to 127.0.0.1:30003. Connection refused. Getc executed in 0.00 seconds Timeout connecting to server Getc executed in 2.00 seconds Getc executed in 0.00 seconds Getc executed in 0.00 seconds [b'data_1:1:{11}']
Timeout is used in Connection.connect. We will use socket_connect_timeout for 2 seconds. We add monkey to patch Connection.connect for testing.
To be able to use automatic cluster-aware key lookup mechanic we need to store ID field in a key, this limit us to following Redis data types: Strings. This way:
python a2-script.py hset data_1:1:{0-10} "Name1"
For sorting there is two commands: ZRANGEBYSCORE and SORT. ZRANGEBYSCORE: can work with a single key hence single node. SORT: BY option of SORT denied in Cluster mode.
That is why we can not use sorting at Redis server and should do it manually.
The only way to get all keys is to send to every node KEYS *
or SCAN 0 MATCH *
.
To select all data we should do two commands: 1) get all keys 2) get value for every keys.
In task there is no requirement to have operation to get a single key, that is why we can keep keys in three SortedSets: “data_1”, “data_2”, “data_3”. We will use two commands:
- get all keys, which is “data_1”, “data_2”, “data_3”
- do 0-3 requests to every data_*.
This allow us to use Redis sorting, we only need to sort data_1, data_2, data_3 by itself.
. ~/create-cluster.sh stop
. ~/create-cluster.sh clean
. ~/create-cluster.sh clean-logs
. ~/create-cluster.sh start
. ~/create-cluster.sh create -f
sleep 1
redis-cli --cluster call localhost:30001 ZADD data_1 1 "Name1"
redis-cli --cluster call localhost:30001 ZADD data_2 12 "Name12"
redis-cli --cluster call localhost:30001 ZADD data_1 2 "Name2"
redis-cli --cluster call localhost:30001 ZADD data_2 11 "Name11"
redis-cli --cluster call 127.0.0.1:30001 KEYS "*"
Get all data sorted with CLI:
redis-cli --cluster call 127.0.0.1:30001 KEYS "*" | cut -d ' ' -f 2 | tail -n +2 | sort | grep -v "^$"| xargs -I '{}' redis-cli --cluster call 127.0.0.1:30001 ZRANGE '{}' 0 inf BYSCORE WITHSCORES | sed 's/127.0.0.1:3000[0-9]: //' | grep -v '127.0.0.1:3000[0-9]' | grep -v 'Calling ZRANGE' | grep -v "^$"
Drawback of this approach that we will not be able to have cluster-aware value retrival by ID it it will be required in future.
. ~/create-cluster.sh stop
. ~/create-cluster.sh clean
. ~/create-cluster.sh clean-logs
. ~/create-cluster.sh start
. ~/create-cluster.sh create -f
sleep 1
redis-cli --cluster call localhost:30001 ZADD data_1 1 "Name1"
redis-cli --cluster call localhost:30001 ZADD data_2 12 "Name12"
redis-cli --cluster call localhost:30001 ZADD data_1 2 "Name2"
redis-cli --cluster call localhost:30001 ZADD data_2 11 "Name11"
redis-cli --cluster call 127.0.0.1:30001 KEYS "*"
#+begin_src python :results output :exports both
from redis.cluster import RedisCluster
from redis import Redis
from redis.cluster import ClusterNode
import redis.cluster
from redis.connection import Connection, ConnectionPool
from redis.retry import Retry
from redis.backoff import NoBackoff, ConstantBackoff
def find_shard_id(SHARDS, value):
# Flatten the ranges into a list of tuples (start, end, shard_id)
ranges = [(start, end, shard_id) for shard_id, ranges_list in SHARDS.items() for start, end in ranges_list]
# Sort the ranges by their start values
ranges.sort(key=lambda x: x[0])
# Perform a binary search on the sorted list
left, right = 0, len(ranges) - 1
while left <= right:
mid = (left + right) // 2
if ranges[mid][0] <= value <= ranges[mid][1]:
return ranges[mid][2]
elif value < ranges[mid][0]:
right = mid - 1
else:
left = mid + 1
# If no matching range is found, return None
return None
SHARDS={1:((1, 10), (31, 40)),
2:((11, 20), (41, 50)),
3:((21, 30), (51, 60))}
# -- fix 127.0.0.1:0
import random
def my_get_nodes(self):
# with filter if port = 0, for CLUSTER SLOTS with disconnected node
return list([v for v in self.nodes_manager.nodes_cache.values() if v.port != 0 ])
def my_get_random_node(self):
# with filter if port = 0, for CLUSTER SLOTS with disconnected node
l = list([v for v in self.nodes_manager.nodes_cache.values() if v.port != 0 ])
return random.choice(l)
RedisCluster.get_nodes = my_get_nodes
RedisCluster.get_random_node = my_get_random_node
# --- fix Problem command fails if one of master node don't reply.
from redis.exceptions import ConnectionError, TimeoutError
orig_ec = RedisCluster._execute_command
def my_execute_command(self, target_node, *args, **kwargs):
try:
return orig_ec(self, target_node, *args, **kwargs)
except redis.exceptions.ConnectionError as e:
return []
RedisCluster._execute_command = my_execute_command
# NodesManager.initialize = myinitialize
import redis.cluster
redis.cluster.REDIS_ALLOWED_KEYS += (
"socket_connect_timeout",
"socket_timeout",
"retry_on_timeout",
"retry",
"retry_on_error",
"single_connection_client")
rs1 = []
nodes = []
for pport in (30001, 30002,30003,30004, 30005):
cn = ClusterNode(
'localhost',
# '10.255.255.1',
pport,
# redis_connection=r
)
# print("r", cn.redis_connection.connection_pool.connection_kwargs)
nodes.append(cn) # 2 seconds timeout
# -------------------- MANIN ------------------
rc = RedisCluster(
startup_nodes=nodes, decode_responses=False,
socket_connect_timeout=2,
dynamic_startup_nodes=False,
socket_timeout=1, retry_on_timeout=False,
retry=Retry(ConstantBackoff(2), 0),
# retry_on_error=[ConnectionRefusedError],
# single_connection_client=True
)
rs2 = []
# # -- Insert items to SortedSets
# for id in [1,2,11,12]:
# sh = find_shard_id(SHARDS, id)
# rc.zadd(f"data_{sh}",{f"Name{id}":id})
# # print(sh, id, )
# KEYS "*"
keys = rc.keys(target_nodes=RedisCluster.ALL_NODES)
res = []
for k in sorted(keys):
res.extend(rc.zrange(k, 0, '-1', withscores=True))
for x in res:
print(x[0].decode("utf-8"))
print(int(x[1]))
- Cluster constructor: ./create-cluster.sh
- Main FlaskAPI application: ./fapi.py
- Tests: ./test_fapi.py
- pytest -s test_fapi.py # to run
- Help script for cluster-aware commands calling: ./redis-script.py
To run cluster:
. ~/create-cluster.sh stop
. ~/create-cluster.sh clean
. ~/create-cluster.sh clean-logs
. ~/create-cluster.sh start
. ~/create-cluster.sh create -f
sleep 1
redis-cli --cluster call localhost:30001 ZADD data_1 1 "Name1"
redis-cli --cluster call localhost:30001 ZADD data_2 12 "Name12"
redis-cli --cluster call localhost:30001 ZADD data_1 2 "Name2"
redis-cli --cluster call localhost:30001 ZADD data_2 11 "Name11"
redis-cli --cluster call 127.0.0.1:30001 KEYS "*"
To run access point:
$ uvicorn fapi:app
Requrements:
- Python 3.11+
pip install redis
pip install redis-py
pip install fastapi
pip install pytest
pip install uvicorn
Software architecture is a representation of a system in which there is a mapping of functionality onto hardware and software components.
- RESTful access point
- FastAPI app runned under “uvicorn” ASGI web server.
- database and sharing implementation
- Redis runned in cluster mode.
- timings 2sec
- socket_timeout and patches for Redis implementation
- asynchronousness
- web - FastAPI implementation, database - py-redis provide alternative implementation in https://github.com/redis/redis-py/tree/master/redis/asyncio (require some time to switch)
Implemented: single access point with FastAPI, 2 seconds timeout, cluster with shared table by ID. Redis have been to implement database access and data sharding.
Fastest debuging technique was found by using built-in Python pdb module and .pdbrc file.
Was learning: Mocking for FastAPI, Redis Cluster architecture building, py-redis programming.
Redis script for calling cluster-aware command was written for testing.
Objectives of Original task that have been solved:
- FastAPI application
- Pytest
Objectives of Original task that wasn’t implemented:
- async dbapi/adapter
- SQLAlchemy 2.0
- Asyncio
- Docker