Skip to content

Commit

Permalink
Refactored Redis RPC I/O failure processing (opencomputeproject#208)
Browse files Browse the repository at this point in the history
Signed-off-by: Andriy Kokhan <[email protected]>
Signed-off-by: selldinesh <[email protected]>
  • Loading branch information
andriy-kokhan authored and selldinesh committed Oct 16, 2023
1 parent a9bef24 commit 8da4c5e
Showing 1 changed file with 25 additions and 24 deletions.
49 changes: 25 additions & 24 deletions common/sai_client/sai_redis_client/sai_redis_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,10 @@ def operate(self, obj, attrs, op):
if self.asic_channel is None:
self.__assert_syncd_running()

# Clean-up Redis RPC I/O pipe
self.r.delete("GETRESPONSE_KEY_VALUE_OP_QUEUE")

tout = 0.01
attempts = self.attempts
while len(self.r.lrange("GETRESPONSE_KEY_VALUE_OP_QUEUE", 0, -1)) > 0 and attempts > 0:
time.sleep(0.01)
attempts -= 1

if attempts == 0:
return []
status = self.r.lrange("GETRESPONSE_KEY_VALUE_OP_QUEUE", 0, -1)
assert len(status) == 0, "Redis RPC I/O failure!"

# Remove spaces from the key string.
# Required by sai_deserialize_route_entry() in sonic-sairedis.
Expand All @@ -107,22 +101,25 @@ def operate(self, obj, attrs, op):
self.r.lpush("ASIC_STATE_KEY_VALUE_OP_QUEUE", obj, attrs, op)
self.r.publish(self.asic_channel, "G")

status = []
attempts = self.attempts

# Wait upto 3 mins for switch init
if obj.startswith("SAI_OBJECT_TYPE_SWITCH") and op == "Screate":
# Wait upto 3 mins for switch init
tout = 0.5
attempts = 240
else:
tout = 0.01
attempts = self.attempts

# Get response
status = self.r.lrange("GETRESPONSE_KEY_VALUE_OP_QUEUE", 0, -1)
while len(status) < 3 and attempts > 0:
assert self.__check_syncd_running(), "FATAL - SyncD has exited or crashed!"
time.sleep(tout)
attempts -= 1
status = self.r.lrange("GETRESPONSE_KEY_VALUE_OP_QUEUE", 0, -1)

self.r.delete("GETRESPONSE_KEY_VALUE_OP_QUEUE")

assert len(status) == 3, "SAI \"{}\" operation failure!".format(op)
assert len(status) == 3, f"SAI \"{op[1:]}\" operation failure!"
return status

def create(self, obj, attrs, do_assert=True):
Expand Down Expand Up @@ -543,18 +540,22 @@ def vid_to_rid(self, vid):
assert rid.startswith("oid:"), f"Invalid RID format {vid}"
return rid

def __check_syncd_running(self):
if self.asic_db == 1:
numsub = self.r.execute_command('PUBSUB', 'NUMSUB', 'ASIC_STATE_CHANNEL')
if numsub[1] >= 1:
# SONiC 202111 or older detected
return "ASIC_STATE_CHANNEL"
numsub = self.r.execute_command('PUBSUB', 'NUMSUB', f'ASIC_STATE_CHANNEL@{self.asic_db}')
if numsub[1] >= 1:
# SONiC 202205 or newer detected
return f"ASIC_STATE_CHANNEL@{self.asic_db}"
return None

def __assert_syncd_running(self, tout=30):
for i in range(tout + 1):
if self.asic_db == 1:
numsub = self.r.execute_command('PUBSUB', 'NUMSUB', 'ASIC_STATE_CHANNEL')
if numsub[1] >= 1:
# SONiC 202111 or older detected
self.asic_channel = "ASIC_STATE_CHANNEL"
return
numsub = self.r.execute_command('PUBSUB', 'NUMSUB', f'ASIC_STATE_CHANNEL@{self.asic_db}')
if numsub[1] >= 1:
# SONiC 202205 or newer detected
self.asic_channel = f"ASIC_STATE_CHANNEL@{self.asic_db}"
self.asic_channel = self.__check_syncd_running()
if self.asic_channel:
return
if i < tout:
time.sleep(1)
Expand Down

0 comments on commit 8da4c5e

Please sign in to comment.