From 8da4c5e95b49a9df68c8b976096cecbd14693c78 Mon Sep 17 00:00:00 2001 From: Andriy Kokhan Date: Sun, 1 Oct 2023 15:21:00 +0300 Subject: [PATCH] Refactored Redis RPC I/O failure processing (#208) Signed-off-by: Andriy Kokhan Signed-off-by: selldinesh --- .../sai_redis_client/sai_redis_client.py | 49 ++++++++++--------- 1 file changed, 25 insertions(+), 24 deletions(-) diff --git a/common/sai_client/sai_redis_client/sai_redis_client.py b/common/sai_client/sai_redis_client/sai_redis_client.py index ebfbb795..2782302e 100644 --- a/common/sai_client/sai_redis_client/sai_redis_client.py +++ b/common/sai_client/sai_redis_client/sai_redis_client.py @@ -86,16 +86,10 @@ def operate(self, obj, attrs, op): if self.asic_channel is None: self.__assert_syncd_running() + # Clean-up Redis RPC I/O pipe self.r.delete("GETRESPONSE_KEY_VALUE_OP_QUEUE") - - tout = 0.01 - attempts = self.attempts - while len(self.r.lrange("GETRESPONSE_KEY_VALUE_OP_QUEUE", 0, -1)) > 0 and attempts > 0: - time.sleep(0.01) - attempts -= 1 - - if attempts == 0: - return [] + status = self.r.lrange("GETRESPONSE_KEY_VALUE_OP_QUEUE", 0, -1) + assert len(status) == 0, "Redis RPC I/O failure!" # Remove spaces from the key string. # Required by sai_deserialize_route_entry() in sonic-sairedis. @@ -107,22 +101,25 @@ def operate(self, obj, attrs, op): self.r.lpush("ASIC_STATE_KEY_VALUE_OP_QUEUE", obj, attrs, op) self.r.publish(self.asic_channel, "G") - status = [] - attempts = self.attempts - - # Wait upto 3 mins for switch init if obj.startswith("SAI_OBJECT_TYPE_SWITCH") and op == "Screate": + # Wait upto 3 mins for switch init tout = 0.5 attempts = 240 + else: + tout = 0.01 + attempts = self.attempts + # Get response + status = self.r.lrange("GETRESPONSE_KEY_VALUE_OP_QUEUE", 0, -1) while len(status) < 3 and attempts > 0: + assert self.__check_syncd_running(), "FATAL - SyncD has exited or crashed!" time.sleep(tout) attempts -= 1 status = self.r.lrange("GETRESPONSE_KEY_VALUE_OP_QUEUE", 0, -1) self.r.delete("GETRESPONSE_KEY_VALUE_OP_QUEUE") - assert len(status) == 3, "SAI \"{}\" operation failure!".format(op) + assert len(status) == 3, f"SAI \"{op[1:]}\" operation failure!" return status def create(self, obj, attrs, do_assert=True): @@ -543,18 +540,22 @@ def vid_to_rid(self, vid): assert rid.startswith("oid:"), f"Invalid RID format {vid}" return rid + def __check_syncd_running(self): + if self.asic_db == 1: + numsub = self.r.execute_command('PUBSUB', 'NUMSUB', 'ASIC_STATE_CHANNEL') + if numsub[1] >= 1: + # SONiC 202111 or older detected + return "ASIC_STATE_CHANNEL" + numsub = self.r.execute_command('PUBSUB', 'NUMSUB', f'ASIC_STATE_CHANNEL@{self.asic_db}') + if numsub[1] >= 1: + # SONiC 202205 or newer detected + return f"ASIC_STATE_CHANNEL@{self.asic_db}" + return None + def __assert_syncd_running(self, tout=30): for i in range(tout + 1): - if self.asic_db == 1: - numsub = self.r.execute_command('PUBSUB', 'NUMSUB', 'ASIC_STATE_CHANNEL') - if numsub[1] >= 1: - # SONiC 202111 or older detected - self.asic_channel = "ASIC_STATE_CHANNEL" - return - numsub = self.r.execute_command('PUBSUB', 'NUMSUB', f'ASIC_STATE_CHANNEL@{self.asic_db}') - if numsub[1] >= 1: - # SONiC 202205 or newer detected - self.asic_channel = f"ASIC_STATE_CHANNEL@{self.asic_db}" + self.asic_channel = self.__check_syncd_running() + if self.asic_channel: return if i < tout: time.sleep(1)