From 9c2713a9ca8c08daa4461c07cee2d6f8a5170997 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fr=C3=A9d=C3=A9ric=20Simonis?= Date: Thu, 8 Aug 2024 11:33:04 +0200 Subject: [PATCH] Overhauled gatherstats --- tools/mapping-tester/gatherstats.py | 153 +++++++++++++++++----------- 1 file changed, 94 insertions(+), 59 deletions(-) diff --git a/tools/mapping-tester/gatherstats.py b/tools/mapping-tester/gatherstats.py index b99e4fab..44741245 100755 --- a/tools/mapping-tester/gatherstats.py +++ b/tools/mapping-tester/gatherstats.py @@ -5,6 +5,8 @@ import glob import json import os +import subprocess +from concurrent.futures import ThreadPoolExecutor def parseArguments(args): @@ -25,8 +27,16 @@ def parseArguments(args): return parser.parse_args(args) +def run_checked(args): + r = subprocess.run(args, text=True, capture_output=True) + if r.returncode != 0: + print("Command " + " ".join(map(str, args))) + print(f"Returncode {r.returncode}") + print(r.stderr) + r.check_returncode() + + def statsFromTimings(dir): - stats = {} assert os.path.isdir(dir) assert ( os.system("command -v precice-profiling > /dev/null") == 0 @@ -34,39 +44,46 @@ def statsFromTimings(dir): event_dir = os.path.join(dir, "precice-profiling") json_file = os.path.join(dir, "profiling.json") timings_file = os.path.join(dir, "timings.csv") - os.system("precice-profiling merge --output {} {}".format(json_file, event_dir)) - os.system( - "precice-profiling analyze --output {} B {}".format(timings_file, json_file) - ) - file = timings_file - if os.path.isfile(file): - try: - timings = {} - with open(file, "r") as csvfile: - timings = csv.reader(csvfile) - for row in timings: - if row[0] == "_GLOBAL": - stats["globalTime"] = row[-1] - if row[0] == "initialize": - stats["initializeTime"] = row[-1] - if row[0].startswith("initialize/map") and row[0].endswith( - "computeMapping.FromA-MeshToB-Mesh" - ): - computeMappingName = row[0] - stats["computeMappingTime"] = row[-1] - if row[0].startswith("advance/map") and row[0].endswith( - "mapData.FromA-MeshToB-Mesh" - ): - mapDataName = row[0] - stats["mapDataTime"] = row[-1] - except BaseException: - pass - return stats + + try: + subprocess.run( + ["precice-profiling", "merge", "--output", json_file, event_dir], + check=True, + capture_output=True, + ) + subprocess.run( + ["precice-profiling", "analyze", "--output", timings_file, "B", json_file], + check=True, + capture_output=True, + ) + file = timings_file + stats = {} + timings = {} + with open(file, "r") as csvfile: + timings = csv.reader(csvfile) + for row in timings: + if row[0] == "_GLOBAL": + stats["globalTime"] = row[-1] + if row[0] == "initialize": + stats["initializeTime"] = row[-1] + if row[0].startswith("initialize/map") and row[0].endswith( + "computeMapping.FromA-MeshToB-Mesh" + ): + computeMappingName = row[0] + stats["computeMappingTime"] = row[-1] + if row[0].startswith("advance/map") and row[0].endswith( + "mapData.FromA-MeshToB-Mesh" + ): + mapDataName = row[0] + stats["mapDataTime"] = row[-1] + return stats + except: + return {} def memoryStats(dir): - stats = {} assert os.path.isdir(dir) + stats = {} for P in "A", "B": memfile = os.path.join(dir, f"memory-{P}.log") total = 0 @@ -81,41 +98,59 @@ def memoryStats(dir): return stats +def mappingStats(dir): + globber = os.path.join(dir, "*.stats.json") + statFiles = list(glob.iglob(globber)) + if len(statFiles) == 0: + return {} + + statFile = statFiles[0] + assert os.path.exists(statFile) + with open(os.path.join(dir, statFile), "r") as jsonfile: + return dict(json.load(jsonfile)) + + +def gatherCaseStats(casedir): + assert os.path.exists(casedir) + parts = os.path.normpath(casedir).split(os.sep) + assert len(parts) >= 5 + mapping, constraint, meshes, ranks, run = parts[-5:] + meshA, meshB = meshes.split("-") + ranksA, ranksB = ranks.split("-") + + stats = { + "run": int(run), + "mapping": mapping, + "constraint": constraint, + "mesh A": meshA, + "mesh B": meshB, + "ranks A": ranksA, + "ranks B": ranksB, + } + stats.update(statsFromTimings(casedir)) + stats.update(memoryStats(casedir)) + stats.update(mappingStats(casedir)) + return stats + + def main(argv): args = parseArguments(argv[1:]) - globber = os.path.join(args.outdir, "**", "*.stats.json") - statFiles = [ - os.path.relpath(path, args.outdir) - for path in glob.iglob(globber, recursive=True) - ] + globber = os.path.join(args.outdir, "**", "done") + cases = [os.path.dirname(path) for path in glob.iglob(globber, recursive=True)] allstats = [] - fields = [] - for file in statFiles: - print("Found: " + file) - casedir = os.path.join(args.outdir, os.path.dirname(file)) - parts = os.path.normpath(file).split(os.sep) - assert len(parts) >= 5 - mapping, constraint, meshes, ranks, _ = parts[-5:] - meshA, meshB = meshes.split("-") - ranksA, ranksB = ranks.split("-") - - with open(os.path.join(args.outdir, file), "r") as jsonfile: - stats = json.load(jsonfile) - stats["mapping"] = mapping - stats["constraint"] = constraint - stats["mesh A"] = meshA - stats["mesh B"] = meshB - stats["ranks A"] = ranksA - stats["ranks B"] = ranksB - stats.update(statsFromTimings(casedir)) - stats.update(memoryStats(casedir)) - allstats.append(stats) - if not fields: - fields += stats.keys() + def wrapper(case): + print("Found: " + os.path.relpath(case, args.outdir)) + return gatherCaseStats(case) + + with ThreadPoolExecutor() as pool: + for stat in pool.map(wrapper, cases): + allstats.append(stat) + + fields = {key for s in allstats for key in s.keys()} assert fields - writer = csv.DictWriter(args.file, fieldnames=fields) + writer = csv.DictWriter(args.file, fieldnames=sorted(fields)) writer.writeheader() writer.writerows(allstats) return 0