Skip to content

Commit

Permalink
Support exporting outputs to gsheet
Browse files Browse the repository at this point in the history
add generic utility to append to a gsheet

Add utility for gsheet

improve gsheet utility

export fio output to gsheet

encapsulate cpu/memory calculation in fio

disable repeat operations for quick testing

add dlio output export to gsheet

fix a bug in dlio output parsing

fix a column-name in fio csv output

Revert "disable repeat operations for quick testing"

This reverts commit 04bf834.

add log of successful addition to gsheet

clean-up code changes

added some error-checking

wip

wip

fix calls to download_gcs_objects

support key-file on gcs in gsheet

put back cpu/memory metrics

fix couple of logs

fix couple of help messages

put back accidentally deleted command
  • Loading branch information
gargnitingoogle committed Oct 3, 2024
1 parent 2dde58b commit 773695f
Show file tree
Hide file tree
Showing 4 changed files with 389 additions and 173 deletions.
219 changes: 149 additions & 70 deletions perfmetrics/scripts/testing_on_gke/examples/dlio/parse_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@
import pprint
import subprocess
import sys
from typing import List, Tuple

# local library imports
sys.path.append("../")
import dlio_workload
from utils.utils import get_memory, get_cpu, unix_to_timestamp, standard_timestamp, is_mash_installed, get_memory_from_monitoring_api, get_cpu_from_monitoring_api, timestamp_to_epoch
from utils.parse_logs_common import ensure_directory_exists, download_gcs_objects, parse_arguments, SUPPORTED_SCENARIOS
from utils.parse_logs_common import ensure_directory_exists, download_gcs_objects, parse_arguments, SUPPORTED_SCENARIOS, default_service_account_key_file
from utils.gsheet import append_data_to_gsheet, url

_LOCAL_LOGS_LOCATION = "../../bin/dlio-logs/logs"

Expand All @@ -51,6 +53,31 @@
"gcsfuse_mount_options": "",
}

mash_installed = False

_HEADER = (
"File Size",
"File #",
"Total Size (GB)",
"Batch Size",
"Scenario",
"Epoch",
"Duration (s)",
"GPU Utilization (%)",
"Throughput (sample/s)",
"Throughput (MB/s)",
"Throughput over Local SSD (%)",
"GCSFuse Lowest Memory (MB)",
"GCSFuse Highest Memory (MB)",
"GCSFuse Lowest CPU (core)",
"GCSFuse Highest CPU (core)",
"Pod-name",
"Start",
"End",
"GcsfuseMountOptions",
"InstanceID",
)


def downloadDlioOutputs(dlioWorkloads: set, instanceId: str) -> int:
"""Downloads instanceId-specific dlio outputs for each dlioWorkload locally.
Expand All @@ -70,11 +97,9 @@ def downloadDlioOutputs(dlioWorkloads: set, instanceId: str) -> int:
for dlioWorkload in dlioWorkloads:
srcObjects = f"gs://{dlioWorkload.bucket}/logs/{instanceId}"
print(f"Downloading DLIO logs from the {srcObjects} ...")
returncode, errorStr = download_gcs_objects(
srcObjects, _LOCAL_LOGS_LOCATION
)
returncode = download_gcs_objects(srcObjects, _LOCAL_LOGS_LOCATION)
if returncode < 0:
print(f"Failed to download DLIO logs from {srcObjects}: {errorStr}")
print(f"Failed to download DLIO logs from {srcObjects}: {returncode}")
return returncode
return 0

Expand Down Expand Up @@ -231,68 +256,129 @@ def fetch_cpu_memory_data():
return output


def writeRecordsToCsvOutputFile(output: dict, output_file_path: str):
with open(output_file_path, "a") as output_file:
# Write a new header row.
output_file.write(
"File Size,File #,Total Size (GB),Batch Size,Scenario,Epoch,Duration"
" (s),GPU Utilization (%),Throughput (sample/s),Throughput"
" (MB/s),Throughput over Local SSD (%),GCSFuse Lowest Memory"
" (MB),GCSFuse Highest Memory (MB),GCSFuse Lowest CPU (core),GCSFuse"
" Highest CPU (core),Pod,Start,End,GcsfuseMountOptions,InstanceID\n"
)

for key in output:
record_set = output[key]
total_size = int(
int(record_set["mean_file_size"])
* int(record_set["num_files_train"])
/ (1024**3)
)
def writeOutput(
output: dict,
args: dict,
):
rows = []

for scenario in SUPPORTED_SCENARIOS:
if scenario not in record_set["records"]:
print(f"{scenario} not in output so skipping")
continue
for key in output:
record_set = output[key]
total_size = int(
int(record_set["mean_file_size"])
* int(record_set["num_files_train"])
/ (1024**3)
)

for i in range(len(record_set["records"]["local-ssd"])):
r = record_set["records"][scenario][i]

try:
if "local-ssd" in record_set["records"] and (
len(record_set["records"]["local-ssd"])
== len(record_set["records"][scenario])
):
r["throughput_over_local_ssd"] = round(
r["train_throughput_mb_per_second"]
/ record_set["records"]["local-ssd"][i][
"train_throughput_mb_per_second"
]
* 100,
2,
)
else:
r["throughput_over_local_ssd"] = "NA"
for scenario in SUPPORTED_SCENARIOS:
if scenario not in record_set["records"]:
print(f"{scenario} not in output so skipping")
continue

except ZeroDivisionError:
print("Got ZeroDivisionError. Ignoring it.")
r["throughput_over_local_ssd"] = 0
for i in range(len(record_set["records"][scenario])):
r = record_set["records"][scenario][i]

except Exception as e:
print(
"Error: failed to parse/write record-set for"
f" scenario: {scenario}, i: {i}, record: {r}, exception: {e}"
try:
if "local-ssd" in record_set["records"] and (
len(record_set["records"]["local-ssd"])
== len(record_set["records"][scenario])
):
r["throughput_over_local_ssd"] = round(
r["train_throughput_mb_per_second"]
/ record_set["records"]["local-ssd"][i][
"train_throughput_mb_per_second"
]
* 100,
2,
)
continue
else:
r["throughput_over_local_ssd"] = "NA"

output_file.write(
f"{record_set['mean_file_size']},{record_set['num_files_train']},{total_size},{record_set['batch_size']},{scenario},"
)
output_file.write(
f"{r['epoch']},{r['duration']},{r['train_au_percentage']},{r['train_throughput_samples_per_second']},{r['train_throughput_mb_per_second']},{r['throughput_over_local_ssd']},{r['lowest_memory']},{r['highest_memory']},{r['lowest_cpu']},{r['highest_cpu']},{r['pod_name']},{r['start']},{r['end']},\"{r['gcsfuse_mount_options']}\",{args.instance_id}\n"
except ZeroDivisionError:
print("Got ZeroDivisionError. Ignoring it.")
r["throughput_over_local_ssd"] = 0

except Exception as e:
print(
"Error: failed to parse/write record-set for"
f" scenario: {scenario}, i: {i}, record: {r}, exception: {e}"
)
continue

output_file.close()
new_row = (
record_set["mean_file_size"],
record_set["num_files_train"],
total_size,
record_set["batch_size"],
scenario,
r["epoch"],
r["duration"],
r["train_au_percentage"],
r["train_throughput_samples_per_second"],
r["train_throughput_mb_per_second"],
r["throughput_over_local_ssd"],
r["lowest_memory"],
r["highest_memory"],
r["lowest_cpu"],
r["highest_cpu"],
r["pod_name"],
r["start"],
r["end"],
f'"{r["gcsfuse_mount_options"].strip()}"', # need to wrap in quotes to encapsulate commas in the value.
args.instance_id,
)
rows.append(new_row)

def exportToCsvFile(output_file_path: str, header: str, rows: List):
if output_file_path and output_file_path.strip():
ensure_directory_exists(os.path.dirname(output_file_path))
with open(output_file_path, "a") as output_file_fwr:
# Write a new header.
output_file_fwr.write(f"{','.join(header)}\n")
for row in rows:
output_file_fwr.write(f"{','.join([f'{val}' for val in row])}\n")
output_file_fwr.close()
print(
"\nSuccessfully published outputs of FIO test runs to"
f" {output_file_path} !!!"
)

exportToCsvFile(output_file_path=args.output_file, header=_HEADER, rows=rows)

def exportToGsheet(
header: str,
rows: List,
output_gsheet_id: str,
output_worksheet_name: str,
output_gsheet_keyfile: str,
):
if (
output_gsheet_id
and output_gsheet_id.strip()
and output_worksheet_name
and output_worksheet_name.strip()
):
append_data_to_gsheet(
data={"header": header, "values": rows},
worksheet=output_worksheet_name,
gsheet_id=output_gsheet_id,
serviceAccountKeyFile=output_gsheet_keyfile,
# default_service_account_key_file(
# args.project_id
# ),
)
print(
"\nSuccessfully published outputs of FIO test runs at worksheet"
f" '{args.output_worksheet_name}' in {url(args.output_gsheet_id)}"
)

exportToGsheet(
output_gsheet_id=args.output_gsheet_id,
output_worksheet_name=args.output_worksheet_name,
output_gsheet_keyfile=args.output_gsheet_keyfile,
header=_HEADER,
rows=rows,
)


if __name__ == "__main__":
Expand All @@ -302,20 +388,13 @@ def writeRecordsToCsvOutputFile(output: dict, output_file_path: str):
dlioWorkloads = dlio_workload.ParseTestConfigForDlioWorkloads(
args.workload_config
)
downloadDlioOutputs(dlioWorkloads, args.instance_id)
ret = downloadDlioOutputs(dlioWorkloads, args.instance_id)
if ret != 0:
print(f"failed to download dlio outputs: {ret}")

mash_installed = is_mash_installed()
if not mash_installed:
print("Mash is not installed, will skip parsing CPU and memory usage.")

output = createOutputScenariosFromDownloadedFiles(args)

output_file_path = args.output_file
# Create the parent directory of output_file_path if doesn't
# exist already.
ensure_directory_exists(os.path.dirname(output_file_path))
writeRecordsToCsvOutputFile(output, output_file_path)
print(
"\n\nSuccessfully published outputs of DLIO test runs to"
f" {output_file_path} !!!\n\n"
)
writeOutput(output, args)
Loading

0 comments on commit 773695f

Please sign in to comment.