Skip to content

Commit

Permalink
Added FDB flush UTs
Browse files Browse the repository at this point in the history
Signed-off-by: Andriy Kokhan <[email protected]>
  • Loading branch information
andriy-kokhan committed Sep 20, 2023
1 parent e6d10a4 commit e56dd83
Showing 1 changed file with 73 additions and 0 deletions.
73 changes: 73 additions & 0 deletions tests/ut/test_fdb_ut.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,76 @@ def test_duplicated_remove(self, npu):
status = npu.remove_fdb(npu.default_vlan_oid, TestFdbEntry.mac, do_assert=False)
assert status == "SAI_STATUS_ITEM_NOT_FOUND"


class TestFlushFdbEntries:
dynamic_entries = ["00:00:00:00:00:11", "00:00:00:00:00:22"]
static_entries = ["00:00:00:00:00:33", "00:00:00:00:00:44"]

def flush_fdb_entries(self, npu, entries_type):
for mac in TestFlushFdbEntries.dynamic_entries:
npu.create_fdb(npu.default_vlan_oid, mac, npu.dot1q_bp_oids[0], f"SAI_FDB_ENTRY_TYPE_DYNAMIC")

for mac in TestFlushFdbEntries.static_entries:
npu.create_fdb(npu.default_vlan_oid, mac, npu.dot1q_bp_oids[0], f"SAI_FDB_ENTRY_TYPE_STATIC")

npu.flush_fdb_entries(npu.switch_oid, [
"SAI_FDB_FLUSH_ATTR_BV_ID", npu.default_vlan_oid,
"SAI_FDB_FLUSH_ATTR_ENTRY_TYPE", f"SAI_FDB_FLUSH_ENTRY_TYPE_{entries_type}"
])

def test_flush_dynamic(self, npu):
self.flush_fdb_entries(npu, "DYNAMIC")

flushed = []
not_flushed = []

for mac in TestFlushFdbEntries.dynamic_entries:
status = npu.remove_fdb(npu.default_vlan_oid, mac, do_assert=False)
if status == "SAI_STATUS_SUCCESS":
not_flushed.append(mac)
for mac in TestFlushFdbEntries.static_entries:
status = npu.remove_fdb(npu.default_vlan_oid, mac, do_assert=False)
if status != "SAI_STATUS_SUCCESS":
flushed.append(mac)

msg = "" if len(not_flushed) == 0 else f"Dynamic FDB entries {not_flushed} have not been flushed. "
msg += "" if len(flushed) == 0 else f"Static FDB entries {flushed} have been flushed."
assert not msg, msg

def test_flush_static(self, npu):
self.flush_fdb_entries(npu, "STATIC")

flushed = []
not_flushed = []

for mac in TestFlushFdbEntries.dynamic_entries:
status = npu.remove_fdb(npu.default_vlan_oid, mac, do_assert=False)
if status != "SAI_STATUS_SUCCESS":
flushed.append(mac)
for mac in TestFlushFdbEntries.static_entries:
status = npu.remove_fdb(npu.default_vlan_oid, mac, do_assert=False)
if status == "SAI_STATUS_SUCCESS":
not_flushed.append(mac)

msg = "" if len(not_flushed) == 0 else f"Static FDB entries {not_flushed} have not been flushed. "
msg += "" if len(flushed) == 0 else f"Dynamic FDB entries {flushed} have been flushed."
assert not msg, msg

def test_flush_all(self, npu):
self.flush_fdb_entries(npu, "ALL")

not_flushed_dynamic = []
not_flushed_static = []

for mac in TestFlushFdbEntries.dynamic_entries:
status = npu.remove_fdb(npu.default_vlan_oid, mac, do_assert=False)
if status == "SAI_STATUS_SUCCESS":
not_flushed_dynamic.append(mac)
for mac in TestFlushFdbEntries.static_entries:
status = npu.remove_fdb(npu.default_vlan_oid, mac, do_assert=False)
if status == "SAI_STATUS_SUCCESS":
not_flushed_static.append(mac)

msg = "" if len(not_flushed_static) == 0 else f"Static FDB entries {not_flushed_static} have not been flushed. "
msg += "" if len(not_flushed_dynamic) == 0 else f"Dynamic FDB entries {not_flushed_dynamic} have not been flushed."
assert not msg, msg

0 comments on commit e56dd83

Please sign in to comment.