Skip to content

Commit

Permalink
[bench] Add code to run multiple command lines and export the result …
Browse files Browse the repository at this point in the history
…in a csv file (#1641)

Signed-off-by: Xavier Dupre <[email protected]>
  • Loading branch information
xadupre authored Jul 3, 2024
1 parent ee29e71 commit c38d6f8
Show file tree
Hide file tree
Showing 7 changed files with 425 additions and 119 deletions.
6 changes: 6 additions & 0 deletions onnxscript/tools/benchmark/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,19 @@
from onnxscript.tools.benchmark.benchmark_helpers import (
common_export,
get_parsed_args,
make_configs,
make_dataframe_from_benchmark_data,
multi_run,
run_inference,
run_onnx_inference,
)

__all__ = [
"get_parsed_args",
"common_export",
"make_configs",
"multi_run",
"make_dataframe_from_benchmark_data",
"run_inference",
"run_onnx_inference",
]
86 changes: 85 additions & 1 deletion onnxscript/tools/benchmark/benchmark_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from __future__ import annotations

import argparse
import itertools
import multiprocessing
import os
import platform
Expand Down Expand Up @@ -195,6 +196,52 @@ def run_benchmark(
return data


def measure_discrepancies(
expected: list[tuple[Any, ...]],
outputs: list[tuple[Any, ...]],
) -> tuple[float, float]:
"""
Computes the discrepancies.
Args:
expected: list of outputs coming from a torch model
outputs: list of outputs coming from an onnx model
Returns:
max absolute errors, max relative errors
"""

def _flatten(outputs):
flat = []
for tensor in outputs:
if isinstance(tensor, tuple):
flat.extend(_flatten(tensor))
else:
flat.append(tensor)
return tuple(flat)

abs_errs = []
rel_errs = []
for torch_outputs_mixed_types, onnx_outputs in zip(expected, outputs):
torch_outputs = _flatten(torch_outputs_mixed_types)
assert len(torch_outputs) == len(
onnx_outputs
), f"Length mismatch {len(torch_outputs)} != {len(onnx_outputs)}"
for torch_tensor, onnx_tensor in zip(torch_outputs, onnx_outputs):
assert (
torch_tensor.dtype == onnx_tensor.dtype
), f"Type mismatch {torch_tensor.dtype} != {onnx_tensor.dtype}"
assert (
torch_tensor.shape == onnx_tensor.shape
), f"Type mismatch {torch_tensor.shape} != {onnx_tensor.shape}"
diff = torch_tensor - onnx_tensor
abs_err = float(diff.abs().max())
rel_err = float((diff.abs() / torch_tensor).max())
abs_errs.append(abs_err)
rel_errs.append(rel_err)
return max(abs_errs), max(rel_errs)


def common_export(
model: Any,
inputs: Sequence[Any],
Expand Down Expand Up @@ -620,6 +667,7 @@ def run_onnx_inference(
repeat: int = 5,
verbose: int = 0,
ort_optimize: bool = True,
torch_model: Any | None = None,
) -> dict[str, Any]:
"""
Runs multiple times the same inference with onnxruntime.
Expand All @@ -631,6 +679,7 @@ def run_onnx_inference(
repeat: number of iterations to repeat
verbose: verbosity
ort_optimize: enable, disable onnxruntime optimizations
torch_model: if not empty, measure the discrepancies
Returns:
statistcs
Expand Down Expand Up @@ -667,16 +716,26 @@ def run_onnx_inference(
print(f"[run_inference] created session in {end}")
print(f"[run_inference] start {warmup} warmup iterations")

if torch_model:
expected = [
torch_model(*example_inputs[i % len(example_inputs)]) for i in range(warmup)
]

got = []
iterations = []
begin = time.perf_counter()
for i in range(warmup):
t0 = time.perf_counter()
wrapped_session.run_dlpack(*example_inputs[i % len(example_inputs)])
got.append(wrapped_session.run_dlpack(*example_inputs[i % len(example_inputs)]))
iterations.append(time.perf_counter() - t0)
end = time.perf_counter() - begin
stats["warmup"] = warmup
stats["warmup_time"] = end / warmup
stats["warmup_iter"] = iterations
if torch_model:
abs_err, rel_err = measure_discrepancies(expected, got)
stats["discrepancies_abs"] = abs_err
stats["discrepancies_rel"] = rel_err

if verbose:
print(f"[run_inference] warmup done in {time.perf_counter() - begin}")
Expand All @@ -697,3 +756,28 @@ def run_onnx_inference(
print(f"[run_inference] measure done in {time.perf_counter() - begin}")

return stats


def multi_run(kwargs: dict[str, Any]) -> bool:
"""Checks if multiple values were sent for one argument."""
return any(isinstance(v, str) and "," in v for v in kwargs.values())


def make_configs(kwargs: dict[str, Any]) -> list[dict[str, Any]]:
"""Creates all the configurations based on the command line arguments."""
print(kwargs)
args = []
for k, v in kwargs.items():
if isinstance(v, str):
args.append([(k, s) for s in v.split(",")])
else:
args.append([(k, v)])
configs = list(itertools.product(*args))
return [dict(c) for c in configs]


def make_dataframe_from_benchmark_data(data: list[dict]) -> Any:
"""Creates a dataframe from the received data."""
import pandas

return pandas.DataFrame(data)
53 changes: 53 additions & 0 deletions onnxscript/tools/benchmark/benchmark_helpers_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
import unittest

import onnxscript.tools.benchmark.benchmark_helpers as bh


class BenchmarkHelperTest(unittest.TestCase):
def test_make_configs(self):
value = {
"warmup": 5,
"model": "llama,phi",
"device": "cpu,cuda",
"config": "medium",
"dump_folder": "",
}
self.assertTrue(bh.multi_run(value))
configs = bh.make_configs(value)
expected = [
{
"warmup": 5,
"model": "llama",
"device": "cpu",
"config": "medium",
"dump_folder": "",
},
{
"warmup": 5,
"model": "llama",
"device": "cuda",
"config": "medium",
"dump_folder": "",
},
{
"warmup": 5,
"model": "phi",
"device": "cpu",
"config": "medium",
"dump_folder": "",
},
{
"warmup": 5,
"model": "phi",
"device": "cuda",
"config": "medium",
"dump_folder": "",
},
]
self.assertEqual(expected, configs)


if __name__ == "__main__":
unittest.main(verbosity=2)
140 changes: 140 additions & 0 deletions onnxscript/tools/benchmark/benchmark_run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# pylint: disable=consider-using-with,import-outside-toplevel
from __future__ import annotations

import multiprocessing
import os
import platform
import re
import subprocess
import sys


class BenchmarkError(RuntimeError):
pass


def get_machine() -> dict[str, str | int | float | tuple[int, int]]:
"""Returns the machine specification."""
config: dict[str, str | int | float | tuple[int, int]] = dict(
machine=str(platform.machine()),
processor=str(platform.processor()),
version=str(sys.version),
config=int(multiprocessing.cpu_count()),
executable=str(sys.executable),
)
try:
import torch.cuda
except ImportError:
return config

config["has_cuda"] = bool(torch.cuda.is_available())
if config["has_cuda"]:
config["capability"] = torch.cuda.get_device_capability(0)
config["device_name"] = str(torch.cuda.get_device_name(0))
return config


def _cmd_line(script_name: str, **kwargs: dict[str, str | int | float]) -> list[str]:
args = [sys.executable, "-m", script_name]
for k, v in kwargs.items():
args.append(f"--{k}")
args.append(str(v))
return args


def _extract_metrics(text: str) -> dict[str, str]:
reg = re.compile(":(.*?),(.*.?);")
res = reg.findall(text)
if len(res) == 0:
return {}
return dict(res)


def _make_prefix(script_name: str, index: int) -> str:
name = os.path.splitext(script_name)[0]
return f"{name}_dort_c{index}_"


def run_benchmark(
script_name: str,
configs: list[dict[str, str | int | float]],
verbose: int = 0,
stop_if_exception: bool = True,
dort_dump: bool = False,
) -> list[dict[str, str | int | float | tuple[int, int]]]:
"""
Runs a script multiple times and extract information from the output
following the pattern ``:<metric>,<value>;``.
:param script_name: python script to run
:param configs: list of execution to do
:param stop_if_exception: stop if one experiment failed, otherwise continue
:param verbose: use tqdm to follow the progress
:param dort_dump: dump onnx file if dort is used
:return: values
"""
if verbose:
try:
from tqdm import tqdm

loop = tqdm(configs)
except ImportError:
loop = configs
else:
loop = configs

data: list[dict[str, str | int | float | tuple[int, int]]] = []
for i, config in enumerate(loop):
cmd = _cmd_line(script_name, **config)

if dort_dump:
os.environ["ONNXRT_DUMP_PATH"] = _make_prefix(script_name, i)
else:
os.environ["ONNXRT_DUMP_PATH"] = ""
if verbose > 3:
print(f"[run_benchmark] cmd={cmd if isinstance(cmd, str) else ' '.join(cmd)}")

p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
try:
res = p.communicate(timeout=30)
out, err = res
serr = err.decode("utf-8", errors="ignore")
except subprocess.TimeoutExpired as e:
p.kill()
res = p.communicate()
out, err = res
serr = f"{e}\n:timeout,1;{err.decode('utf-8', errors='ignore')}"
sout = out.decode("utf-8", errors="ignore")

if "ONNXRuntimeError" in serr or "ONNXRuntimeError" in sout:
if stop_if_exception: # pylint: disable=no-else-raise
raise RuntimeError(
f"Unable to continue with config {config} due to the "
f"following error\n{serr}"
f"\n----OUTPUT--\n{sout}"
)

metrics = _extract_metrics(sout)
if len(metrics) == 0:
if stop_if_exception: # pylint: disable=no-else-raise
raise BenchmarkError(
f"Unable (2) to continue with config {config}, no metric was "
f"collected.\n--ERROR--\n{serr}\n--OUTPUT--\n{sout}"
)
else:
metrics = {}
metrics.update(config)
metrics["ERROR"] = serr
metrics["OUTPUT"] = sout
metrics["CMD"] = f"[{' '.join(cmd)}]"
data.append(metrics) # type: ignore[arg-type]
if verbose > 5:
print("--------------- ERROR")
print(serr)
if verbose >= 10:
print("--------------- OUTPUT")
print(sout)

return data
Loading

0 comments on commit c38d6f8

Please sign in to comment.