Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactored Redis RPC I/O failure #208

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 25 additions & 24 deletions common/sai_client/sai_redis_client/sai_redis_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,10 @@ def operate(self, obj, attrs, op):
if self.asic_channel is None:
self.__assert_syncd_running()

# Clean-up Redis RPC I/O pipe
self.r.delete("GETRESPONSE_KEY_VALUE_OP_QUEUE")

tout = 0.01
attempts = self.attempts
while len(self.r.lrange("GETRESPONSE_KEY_VALUE_OP_QUEUE", 0, -1)) > 0 and attempts > 0:
time.sleep(0.01)
attempts -= 1

if attempts == 0:
return []
status = self.r.lrange("GETRESPONSE_KEY_VALUE_OP_QUEUE", 0, -1)
assert len(status) == 0, "Redis RPC I/O failure!"

# Remove spaces from the key string.
# Required by sai_deserialize_route_entry() in sonic-sairedis.
Expand All @@ -107,22 +101,25 @@ def operate(self, obj, attrs, op):
self.r.lpush("ASIC_STATE_KEY_VALUE_OP_QUEUE", obj, attrs, op)
self.r.publish(self.asic_channel, "G")

status = []
attempts = self.attempts

# Wait upto 3 mins for switch init
if obj.startswith("SAI_OBJECT_TYPE_SWITCH") and op == "Screate":
# Wait upto 3 mins for switch init
tout = 0.5
attempts = 240
else:
tout = 0.01
attempts = self.attempts

# Get response
status = self.r.lrange("GETRESPONSE_KEY_VALUE_OP_QUEUE", 0, -1)
while len(status) < 3 and attempts > 0:
assert self.__check_syncd_running(), "FATAL - SyncD has exited or crashed!"
time.sleep(tout)
attempts -= 1
status = self.r.lrange("GETRESPONSE_KEY_VALUE_OP_QUEUE", 0, -1)

self.r.delete("GETRESPONSE_KEY_VALUE_OP_QUEUE")

assert len(status) == 3, "SAI \"{}\" operation failure!".format(op)
assert len(status) == 3, f"SAI \"{op[1:]}\" operation failure!"
return status

def create(self, obj, attrs, do_assert=True):
Expand Down Expand Up @@ -543,18 +540,22 @@ def vid_to_rid(self, vid):
assert rid.startswith("oid:"), f"Invalid RID format {vid}"
return rid

def __check_syncd_running(self):
if self.asic_db == 1:
numsub = self.r.execute_command('PUBSUB', 'NUMSUB', 'ASIC_STATE_CHANNEL')
if numsub[1] >= 1:
# SONiC 202111 or older detected
return "ASIC_STATE_CHANNEL"
numsub = self.r.execute_command('PUBSUB', 'NUMSUB', f'ASIC_STATE_CHANNEL@{self.asic_db}')
if numsub[1] >= 1:
# SONiC 202205 or newer detected
return f"ASIC_STATE_CHANNEL@{self.asic_db}"
return None

def __assert_syncd_running(self, tout=30):
for i in range(tout + 1):
if self.asic_db == 1:
numsub = self.r.execute_command('PUBSUB', 'NUMSUB', 'ASIC_STATE_CHANNEL')
if numsub[1] >= 1:
# SONiC 202111 or older detected
self.asic_channel = "ASIC_STATE_CHANNEL"
return
numsub = self.r.execute_command('PUBSUB', 'NUMSUB', f'ASIC_STATE_CHANNEL@{self.asic_db}')
if numsub[1] >= 1:
# SONiC 202205 or newer detected
self.asic_channel = f"ASIC_STATE_CHANNEL@{self.asic_db}"
self.asic_channel = self.__check_syncd_running()
if self.asic_channel:
return
if i < tout:
time.sleep(1)
Expand Down