diff --git a/tests/ut/test_fdb_ut.py b/tests/ut/test_fdb_ut.py index 46b4ecd3..af9a4305 100644 --- a/tests/ut/test_fdb_ut.py +++ b/tests/ut/test_fdb_ut.py @@ -89,3 +89,76 @@ def test_duplicated_remove(self, npu): status = npu.remove_fdb(npu.default_vlan_oid, TestFdbEntry.mac, do_assert=False) assert status == "SAI_STATUS_ITEM_NOT_FOUND" + +class TestFlushFdbEntries: + dynamic_entries = ["00:00:00:00:00:11", "00:00:00:00:00:22"] + static_entries = ["00:00:00:00:00:33", "00:00:00:00:00:44"] + + def flush_fdb_entries(self, npu, entries_type): + for mac in TestFlushFdbEntries.dynamic_entries: + npu.create_fdb(npu.default_vlan_oid, mac, npu.dot1q_bp_oids[0], f"SAI_FDB_ENTRY_TYPE_DYNAMIC") + + for mac in TestFlushFdbEntries.static_entries: + npu.create_fdb(npu.default_vlan_oid, mac, npu.dot1q_bp_oids[0], f"SAI_FDB_ENTRY_TYPE_STATIC") + + npu.flush_fdb_entries(npu.switch_oid, [ + "SAI_FDB_FLUSH_ATTR_BV_ID", npu.default_vlan_oid, + "SAI_FDB_FLUSH_ATTR_ENTRY_TYPE", f"SAI_FDB_FLUSH_ENTRY_TYPE_{entries_type}" + ]) + + def test_flush_dynamic(self, npu): + self.flush_fdb_entries(npu, "DYNAMIC") + + flushed = [] + not_flushed = [] + + for mac in TestFlushFdbEntries.dynamic_entries: + status = npu.remove_fdb(npu.default_vlan_oid, mac, do_assert=False) + if status == "SAI_STATUS_SUCCESS": + not_flushed.append(mac) + for mac in TestFlushFdbEntries.static_entries: + status = npu.remove_fdb(npu.default_vlan_oid, mac, do_assert=False) + if status != "SAI_STATUS_SUCCESS": + flushed.append(mac) + + msg = "" if len(not_flushed) == 0 else f"Dynamic FDB entries {not_flushed} have not been flushed. " + msg += "" if len(flushed) == 0 else f"Static FDB entries {flushed} have been flushed." + assert not msg, msg + + def test_flush_static(self, npu): + self.flush_fdb_entries(npu, "STATIC") + + flushed = [] + not_flushed = [] + + for mac in TestFlushFdbEntries.dynamic_entries: + status = npu.remove_fdb(npu.default_vlan_oid, mac, do_assert=False) + if status != "SAI_STATUS_SUCCESS": + flushed.append(mac) + for mac in TestFlushFdbEntries.static_entries: + status = npu.remove_fdb(npu.default_vlan_oid, mac, do_assert=False) + if status == "SAI_STATUS_SUCCESS": + not_flushed.append(mac) + + msg = "" if len(not_flushed) == 0 else f"Static FDB entries {not_flushed} have not been flushed. " + msg += "" if len(flushed) == 0 else f"Dynamic FDB entries {flushed} have been flushed." + assert not msg, msg + + def test_flush_all(self, npu): + self.flush_fdb_entries(npu, "ALL") + + not_flushed_dynamic = [] + not_flushed_static = [] + + for mac in TestFlushFdbEntries.dynamic_entries: + status = npu.remove_fdb(npu.default_vlan_oid, mac, do_assert=False) + if status == "SAI_STATUS_SUCCESS": + not_flushed_dynamic.append(mac) + for mac in TestFlushFdbEntries.static_entries: + status = npu.remove_fdb(npu.default_vlan_oid, mac, do_assert=False) + if status == "SAI_STATUS_SUCCESS": + not_flushed_static.append(mac) + + msg = "" if len(not_flushed_static) == 0 else f"Static FDB entries {not_flushed_static} have not been flushed. " + msg += "" if len(not_flushed_dynamic) == 0 else f"Dynamic FDB entries {not_flushed_dynamic} have not been flushed." + assert not msg, msg