Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[bench] Add code to run multiple command lines and export the result in a csv file #1641

Merged
merged 17 commits into from
Jul 3, 2024
6 changes: 6 additions & 0 deletions onnxscript/tools/benchmark/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,19 @@
from onnxscript.tools.benchmark.benchmark_helpers import (
common_export,
get_parsed_args,
make_configs,
make_dataframe_from_benchmark_data,
multi_run,
run_inference,
run_onnx_inference,
)

__all__ = [
"get_parsed_args",
"common_export",
"make_configs",
"multi_run",
"make_dataframe_from_benchmark_data",
"run_inference",
"run_onnx_inference",
]
26 changes: 26 additions & 0 deletions onnxscript/tools/benchmark/benchmark_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from __future__ import annotations

import argparse
import itertools
import multiprocessing
import os
import platform
Expand Down Expand Up @@ -697,3 +698,28 @@ def run_onnx_inference(
print(f"[run_inference] measure done in {time.perf_counter() - begin}")

return stats


def multi_run(kwargs: dict[str, Any]) -> bool:
"""Checks if multiple values were sent for one argument."""
return any(isinstance(v, str) and "," in v for v in kwargs.values())


def make_configs(kwargs: dict[str, Any]) -> list[dict[str, Any]]:
"""Creates all the configurations based on the command line arguments."""
print(kwargs)
args = []
for k, v in kwargs.items():
if isinstance(v, str):
args.append([(k, s) for s in v.split(",")])
else:
args.append([(k, v)])
configs = list(itertools.product(*args))
return [dict(c) for c in configs]


def make_dataframe_from_benchmark_data(data: list[dict]) -> Any:
"""Creates a dataframe from the received data."""
import pandas

return pandas.DataFrame(data)
53 changes: 53 additions & 0 deletions onnxscript/tools/benchmark/benchmark_helpers_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
import unittest

import onnxscript.tools.benchmark.benchmark_helpers as bh


class BenchmarkHelperTest(unittest.TestCase):
def test_make_configs(self):
value = {
"warmup": 5,
"model": "llama,phi",
"device": "cpu,cuda",
"config": "medium",
"dump_folder": "",
}
self.assertTrue(bh.multi_run(value))
configs = bh.make_configs(value)
expected = [
{
"warmup": 5,
"model": "llama",
"device": "cpu",
"config": "medium",
"dump_folder": "",
},
{
"warmup": 5,
"model": "llama",
"device": "cuda",
"config": "medium",
"dump_folder": "",
},
{
"warmup": 5,
"model": "phi",
"device": "cpu",
"config": "medium",
"dump_folder": "",
},
{
"warmup": 5,
"model": "phi",
"device": "cuda",
"config": "medium",
"dump_folder": "",
},
]
self.assertEqual(expected, configs)


if __name__ == "__main__":
unittest.main(verbosity=2)
140 changes: 140 additions & 0 deletions onnxscript/tools/benchmark/benchmark_run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# pylint: disable=consider-using-with,import-outside-toplevel
from __future__ import annotations

import multiprocessing
import os
import platform
import re
import subprocess
import sys


class BenchmarkError(RuntimeError):
pass


def get_machine() -> dict[str, str | int | float | tuple[int, int]]:
"""Returns the machine specification."""
cpu: dict[str, str | int | float | tuple[int, int]] = dict(
xadupre marked this conversation as resolved.
Show resolved Hide resolved
machine=str(platform.machine()),
processor=str(platform.processor()),
version=str(sys.version),
cpu=int(multiprocessing.cpu_count()),
executable=str(sys.executable),
)
try:
import torch.cuda
Fixed Show fixed Hide fixed
except ImportError:
return cpu

cpu["has_cuda"] = bool(torch.cuda.is_available())
if cpu["has_cuda"]:
cpu["capability"] = torch.cuda.get_device_capability(0)
cpu["device_name"] = str(torch.cuda.get_device_name(0))
return cpu


def _cmd_line(script_name: str, **kwargs: dict[str, str | int | float]) -> list[str]:
args = [sys.executable, "-m", script_name]
for k, v in kwargs.items():
args.append(f"--{k}")
args.append(str(v))
return args


def _extract_metrics(text: str) -> dict[str, str]:
reg = re.compile(":(.*?),(.*.?);")
res = reg.findall(text)
if len(res) == 0:
return {}
return dict(res)


def _make_prefix(script_name: str, index: int) -> str:
name = os.path.splitext(script_name)[0]
return f"{name}_dort_c{index}_"


def run_benchmark(
script_name: str,
configs: list[dict[str, str | int | float]],
verbose: int = 0,
stop_if_exception: bool = True,
dort_dump: bool = False,
) -> list[dict[str, str | int | float | tuple[int, int]]]:
"""
Runs a script multiple times and extract information from the output
following the pattern ``:<metric>,<value>;``.

:param script_name: python script to run
:param configs: list of execution to do
:param stop_if_exception: stop if one experiment failed, otherwise continue
:param verbose: use tqdm to follow the progress
:param dort_dump: dump onnx file if dort is used
:return: values
"""
if verbose:
try:
from tqdm import tqdm

loop = tqdm(configs)
except ImportError:
loop = configs
else:
loop = configs

data: list[dict[str, str | int | float | tuple[int, int]]] = []
for i, config in enumerate(loop):
cmd = _cmd_line(script_name, **config)

if dort_dump:
os.environ["ONNXRT_DUMP_PATH"] = _make_prefix(script_name, i)
else:
os.environ["ONNXRT_DUMP_PATH"] = ""
if verbose > 3:
print(f"[run_benchmark] cmd={cmd if isinstance(cmd, str) else ' '.join(cmd)}")

p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
Fixed Show fixed Hide fixed
try:
res = p.communicate(timeout=30)
out, err = res
serr = err.decode("utf-8", errors="ignore")
except subprocess.TimeoutExpired as e:
p.kill()
res = p.communicate()
out, err = res
serr = f"{e}\n:timeout,1;{err.decode('utf-8', errors='ignore')}"
sout = out.decode("utf-8", errors="ignore")

if "ONNXRuntimeError" in serr or "ONNXRuntimeError" in sout:
if stop_if_exception: # pylint: disable=no-else-raise
raise RuntimeError(
f"Unable to continue with config {config} due to the "
f"following error\n{serr}"
f"\n----OUTPUT--\n{sout}"
)

metrics = _extract_metrics(sout)
if len(metrics) == 0:
if stop_if_exception:
Fixed Show fixed Hide fixed
raise BenchmarkError(
f"Unable (2) to continue with config {config}, no metric was "
f"collected.\n--ERROR--\n{serr}\n--OUTPUT--\n{sout}"
)
else:
metrics = {}
metrics.update(config)
metrics["ERROR"] = serr
metrics["OUTPUT"] = sout
metrics["CMD"] = f"[{' '.join(cmd)}]"
data.append(metrics) # type: ignore[arg-type
Fixed Show fixed Hide fixed
Fixed Show fixed Hide fixed
if verbose > 5:
print("--------------- ERROR")
print(serr)
if verbose >= 10:
print("--------------- OUTPUT")
print(sout)

return data
Loading
Loading