Skip to content

Commit

Permalink
Modernize run-stf-test script.
Browse files Browse the repository at this point in the history
Signed-off-by: fruffy <[email protected]>
  • Loading branch information
fruffy committed Oct 25, 2024
1 parent eb7a897 commit 07e9ace
Show file tree
Hide file tree
Showing 4 changed files with 301 additions and 303 deletions.
37 changes: 18 additions & 19 deletions backends/bmv2/bmv2stf.py
Original file line number Diff line number Diff line change
Expand Up @@ -488,8 +488,8 @@ def p_meter_rate_one(self, p):
# able to invoke the BMV2 simulator, create a CLI file
# and test packets in pcap files.
class RunBMV2(object):
def __init__(self, folder, options, jsonfile):
self.clifile = folder + "/cli.txt"
def __init__(self, folder: Path, options, jsonfile: Path) -> None:
self.clifile = folder.joinpath("cli.txt")
self.jsonfile = jsonfile
self.stffile = None
self.folder = folder
Expand All @@ -503,8 +503,7 @@ def __init__(self, folder, options, jsonfile):
self.actions = []
self.switchLogFile = "switch.log" # .txt is added by BMv2
self.readJson()
self.cmd_line_args = getattr(options, "switchOptions", ())
self.target_specific_cmd_line_args = getattr(options, "switchTargetSpecificOptions", ())
self.target_specific_cmd_line_args = getattr(options, "switch_target_specific_options", ())

def readJson(self):
with open(self.jsonfile) as jf:
Expand All @@ -516,8 +515,8 @@ def readJson(self):
for t in self.json["pipelines"][1]["tables"]:
self.tables.append(BMV2Table(t))

def filename(self, interface, direction):
return self.folder + "/" + self.pcapPrefix + str(interface) + "_" + direction + ".pcap"
def filename(self, interface: str, direction: str) -> Path:
return self.folder.joinpath(f"{self.pcapPrefix}{interface}_{direction}.pcap")

def interface_of_filename(self, f):
return int(os.path.basename(f).rstrip(".pcap").lstrip(self.pcapPrefix).rsplit("_", 1)[0])
Expand All @@ -529,7 +528,7 @@ def do_cli_command(self, cmd):
self.packetDelay = 1

def execute_stf_command(self, stf_entry):
if self.options.verbose and stf_entry:
if stf_entry:
testutils.log.info("STF Command: %s", stf_entry)
cmd = stf_entry[0]
if cmd == "":
Expand Down Expand Up @@ -686,7 +685,7 @@ def interfaceArgs(self):
result.append("-i " + str(interface) + "@" + self.pcapPrefix + str(interface))
return result

def generate_model_inputs(self, stf_map):
def generate_model_inputs(self, stf_map: testutils.Dict[str, str]) -> int:
for entry in stf_map:
cmd = entry[0]
if cmd in ["packet", "expect"]:
Expand All @@ -711,7 +710,7 @@ def check_switch_server_ready(self, proc, thriftPort):
if result == 0:
return True

def run(self, stf_map):
def run(self, stf_map) -> int:
testutils.log.info("Running model")
wait = 0 # Time to wait before model starts running

Expand Down Expand Up @@ -750,17 +749,15 @@ def run(self, stf_map):
+ self.interfaceArgs()
+ [self.jsonfile]
)
if self.cmd_line_args:
runswitch += self.cmd_line_args
if self.target_specific_cmd_line_args:
runswitch += [
"--",
] + self.target_specific_cmd_line_args
testutils.log.info("Running %s", " ".join(runswitch))
testutils.log.info("Running %s", runswitch)
sw = subprocess.Popen(runswitch, cwd=self.folder)

def openInterface(ifname):
fp = self.interfaces[interface] = scapy_util.RawPcapWriter(ifname, linktype=0)
fp = self.interfaces[interface] = scapy_util.RawPcapWriter(str(ifname), linktype=0)
fp._write_header(None)

# Try to open input interfaces. Each time, we set a 2 second
Expand Down Expand Up @@ -845,16 +842,18 @@ def openInterface(ifname):
testutils.log.info("Execution completed")
return rv

def showLog(self):
with open(self.folder + "/" + self.switchLogFile + ".txt") as a:
def showLog(self) -> None:
"""Show the log file"""
folder = self.folder.joinpath(self.switchLogFile).with_suffix(".txt")
with folder.open() as a:
log = a.read()
testutils.log.info("Log file:\n%s", log)

def checkOutputs(self):
def checkOutputs(self) -> int:
"""Checks if the output of the filter matches expectations"""
testutils.log.info("Comparing outputs")
direction = "out"
for file in glob(self.filename("*", direction)):
for file in glob(str(self.filename("*", direction))):
testutils.log.info("Checking file %s", file)
interface = self.interface_of_filename(file)
if os.stat(file).st_size == 0:
Expand Down Expand Up @@ -907,8 +906,8 @@ def checkOutputs(self):
testutils.log.info("All went well.")
return testutils.SUCCESS

def parse_stf_file(self, testfile):
with open(testfile) as raw_stf:
def parse_stf_file(self, testfile: Path) -> tuple[dict[str, str], int]:
with testfile.open() as raw_stf:
parser = Bmv2StfParser()
stf_str = raw_stf.read()
return parser.parse(stf_str)
Loading

0 comments on commit 07e9ace

Please sign in to comment.