diff --git a/perfmetrics/scripts/testing_on_gke/examples/dlio/parse_logs.py b/perfmetrics/scripts/testing_on_gke/examples/dlio/parse_logs.py index c163a882d8..6a815f846e 100644 --- a/perfmetrics/scripts/testing_on_gke/examples/dlio/parse_logs.py +++ b/perfmetrics/scripts/testing_on_gke/examples/dlio/parse_logs.py @@ -22,12 +22,14 @@ import pprint import subprocess import sys +from typing import List, Tuple # local library imports sys.path.append("../") import dlio_workload from utils.utils import get_memory, get_cpu, unix_to_timestamp, standard_timestamp, is_mash_installed, get_memory_from_monitoring_api, get_cpu_from_monitoring_api, timestamp_to_epoch -from utils.parse_logs_common import ensureDir, download_gcs_objects, parseLogParserArguments, SUPPORTED_SCENARIOS +from utils.parse_logs_common import ensureDir, download_gcs_objects, parseLogParserArguments, SUPPORTED_SCENARIOS, default_service_account_key_file +from utils.gsheet import append_data_to_gsheet, url _LOCAL_LOGS_LOCATION = "../../bin/dlio-logs/logs" @@ -51,6 +53,31 @@ "gcsfuse_mount_options": "", } +mash_installed = False + +_HEADER = ( + "File Size", + "File #", + "Total Size (GB)", + "Batch Size", + "Scenario", + "Epoch", + "Duration (s)", + "GPU Utilization (%)", + "Throughput (sample/s)", + "Throughput (MB/s)", + "Throughput over Local SSD (%)", + "GCSFuse Lowest Memory (MB)", + "GCSFuse Highest Memory (MB)", + "GCSFuse Lowest CPU (core)", + "GCSFuse Highest CPU (core)", + "Pod-name", + "Start", + "End", + "GcsfuseMountOptions", + "InstanceID", +) + def downloadDlioOutputs(dlioWorkloads: set, instanceId: str) -> int: """Downloads instanceId-specific dlio outputs for each dlioWorkload locally. @@ -70,11 +97,9 @@ def downloadDlioOutputs(dlioWorkloads: set, instanceId: str) -> int: for dlioWorkload in dlioWorkloads: srcObjects = f"gs://{dlioWorkload.bucket}/logs/{instanceId}" print(f"Downloading DLIO logs from the {srcObjects} ...") - returncode, errorStr = download_gcs_objects( - srcObjects, _LOCAL_LOGS_LOCATION - ) + returncode = download_gcs_objects(srcObjects, _LOCAL_LOGS_LOCATION) if returncode < 0: - print(f"Failed to download DLIO logs from {srcObjects}: {errorStr}") + print(f"Failed to download DLIO logs from {srcObjects}: {returncode}") return returncode return 0 @@ -232,68 +257,129 @@ def fetch_cpu_memory_data(): return output -def writeRecordsToCsvOutputFile(output: dict, output_file_path: str): - with open(output_file_path, "a") as output_file: - # Write a new header row. - output_file.write( - "File Size,File #,Total Size (GB),Batch Size,Scenario,Epoch,Duration" - " (s),GPU Utilization (%),Throughput (sample/s),Throughput" - " (MB/s),Throughput over Local SSD (%),GCSFuse Lowest Memory" - " (MB),GCSFuse Highest Memory (MB),GCSFuse Lowest CPU (core),GCSFuse" - " Highest CPU (core),Pod,Start,End,GcsfuseMountOptions,InstanceID\n" - ) - - for key in output: - record_set = output[key] - total_size = int( - int(record_set["mean_file_size"]) - * int(record_set["num_files_train"]) - / (1024**3) - ) +def writeOutput( + output: dict, + args: dict, +): + rows = [] - for scenario in SUPPORTED_SCENARIOS: - if scenario not in record_set["records"]: - print(f"{scenario} not in output so skipping") - continue + for key in output: + record_set = output[key] + total_size = int( + int(record_set["mean_file_size"]) + * int(record_set["num_files_train"]) + / (1024**3) + ) - for i in range(len(record_set["records"]["local-ssd"])): - r = record_set["records"][scenario][i] - - try: - if "local-ssd" in record_set["records"] and ( - len(record_set["records"]["local-ssd"]) - == len(record_set["records"][scenario]) - ): - r["throughput_over_local_ssd"] = round( - r["train_throughput_mb_per_second"] - / record_set["records"]["local-ssd"][i][ - "train_throughput_mb_per_second" - ] - * 100, - 2, - ) - else: - r["throughput_over_local_ssd"] = "NA" + for scenario in SUPPORTED_SCENARIOS: + if scenario not in record_set["records"]: + print(f"{scenario} not in output so skipping") + continue - except ZeroDivisionError: - print("Got ZeroDivisionError. Ignoring it.") - r["throughput_over_local_ssd"] = 0 + for i in range(len(record_set["records"][scenario])): + r = record_set["records"][scenario][i] - except Exception as e: - print( - "Error: failed to parse/write record-set for" - f" scenario: {scenario}, i: {i}, record: {r}, exception: {e}" + try: + if "local-ssd" in record_set["records"] and ( + len(record_set["records"]["local-ssd"]) + == len(record_set["records"][scenario]) + ): + r["throughput_over_local_ssd"] = round( + r["train_throughput_mb_per_second"] + / record_set["records"]["local-ssd"][i][ + "train_throughput_mb_per_second" + ] + * 100, + 2, ) - continue + else: + r["throughput_over_local_ssd"] = "NA" - output_file.write( - f"{record_set['mean_file_size']},{record_set['num_files_train']},{total_size},{record_set['batch_size']},{scenario}," - ) - output_file.write( - f"{r['epoch']},{r['duration']},{r['train_au_percentage']},{r['train_throughput_samples_per_second']},{r['train_throughput_mb_per_second']},{r['throughput_over_local_ssd']},{r['lowest_memory']},{r['highest_memory']},{r['lowest_cpu']},{r['highest_cpu']},{r['pod_name']},{r['start']},{r['end']},\"{r['gcsfuse_mount_options']}\",{args.instance_id}\n" + except ZeroDivisionError: + print("Got ZeroDivisionError. Ignoring it.") + r["throughput_over_local_ssd"] = 0 + + except Exception as e: + print( + "Error: failed to parse/write record-set for" + f" scenario: {scenario}, i: {i}, record: {r}, exception: {e}" ) + continue - output_file.close() + new_row = ( + record_set["mean_file_size"], + record_set["num_files_train"], + total_size, + record_set["batch_size"], + scenario, + r["epoch"], + r["duration"], + r["train_au_percentage"], + r["train_throughput_samples_per_second"], + r["train_throughput_mb_per_second"], + r["throughput_over_local_ssd"], + r["lowest_memory"], + r["highest_memory"], + r["lowest_cpu"], + r["highest_cpu"], + r["pod_name"], + r["start"], + r["end"], + f'"{r["gcsfuse_mount_options"].strip()}"', # need to wrap in quotes to encapsulate commas in the value. + args.instance_id, + ) + rows.append(new_row) + + def exportToCsvFile(output_file_path: str, header: str, rows: List): + if output_file_path and output_file_path.strip(): + ensureDir(os.path.dirname(output_file_path)) + with open(output_file_path, "a") as output_file_fwr: + # Write a new header. + output_file_fwr.write(f"{','.join(header)}\n") + for row in rows: + output_file_fwr.write(f"{','.join([f'{val}' for val in row])}\n") + output_file_fwr.close() + print( + "\nSuccessfully published outputs of FIO test runs to" + f" {output_file_path} !!!" + ) + + exportToCsvFile(output_file_path=args.output_file, header=_HEADER, rows=rows) + + def exportToGsheet( + header: str, + rows: List, + output_gsheet_id: str, + output_worksheet_name: str, + output_gsheet_keyfile: str, + ): + if ( + output_gsheet_id + and output_gsheet_id.strip() + and output_worksheet_name + and output_worksheet_name.strip() + ): + append_data_to_gsheet( + data={"header": header, "values": rows}, + worksheet=output_worksheet_name, + gsheet_id=output_gsheet_id, + serviceAccountKeyFile=output_gsheet_keyfile, + # default_service_account_key_file( + # args.project_id + # ), + ) + print( + "\nSuccessfully published outputs of FIO test runs at worksheet" + f" '{args.output_worksheet_name}' in {url(args.output_gsheet_id)}" + ) + + exportToGsheet( + output_gsheet_id=args.output_gsheet_id, + output_worksheet_name=args.output_worksheet_name, + output_gsheet_keyfile=args.output_gsheet_keyfile, + header=_HEADER, + rows=rows, + ) if __name__ == "__main__": @@ -303,20 +389,13 @@ def writeRecordsToCsvOutputFile(output: dict, output_file_path: str): dlioWorkloads = dlio_workload.ParseTestConfigForDlioWorkloads( args.workload_config ) - downloadDlioOutputs(dlioWorkloads, args.instance_id) + ret = downloadDlioOutputs(dlioWorkloads, args.instance_id) + if ret != 0: + print(f"failed to download dlio outputs: {ret}") mash_installed = is_mash_installed() if not mash_installed: print("Mash is not installed, will skip parsing CPU and memory usage.") output = createOutputScenariosFromDownloadedFiles(args) - - output_file_path = args.output_file - # Create the parent directory of output_file_path if doesn't - # exist already. - ensureDir(os.path.dirname(output_file_path)) - writeRecordsToCsvOutputFile(output, output_file_path) - print( - "\n\nSuccessfully published outputs of DLIO test runs to" - f" {output_file_path} !!!\n\n" - ) + writeOutput(output, args) diff --git a/perfmetrics/scripts/testing_on_gke/examples/fio/parse_logs.py b/perfmetrics/scripts/testing_on_gke/examples/fio/parse_logs.py index 40c360810e..fd1164ccec 100644 --- a/perfmetrics/scripts/testing_on_gke/examples/fio/parse_logs.py +++ b/perfmetrics/scripts/testing_on_gke/examples/fio/parse_logs.py @@ -22,12 +22,14 @@ import pprint import subprocess import sys +from typing import List, Tuple # local library imports sys.path.append("../") import fio_workload from utils.utils import get_memory, get_cpu, unix_to_timestamp, is_mash_installed, get_memory_from_monitoring_api, get_cpu_from_monitoring_api -from utils.parse_logs_common import ensureDir, download_gcs_objects, parseLogParserArguments, SUPPORTED_SCENARIOS +from utils.parse_logs_common import ensureDir, download_gcs_objects, parseLogParserArguments, SUPPORTED_SCENARIOS, default_service_account_key_file +from utils.gsheet import append_data_to_gsheet, url _LOCAL_LOGS_LOCATION = "../../bin/fio-logs" @@ -53,6 +55,31 @@ "numThreads": 0, } +mash_installed = False + +_HEADER = ( + "File Size", + "Read Type", + "Scenario", + "Epoch", + "Duration(s)", + "Throughput (MB/s)", + "IOPS", + "Throughput over Local SSD (%)", + "GCSFuse Lowest Memory (MB)", + "GCSFuse Highest Memory (MB)", + "GCSFuse Lowest CPU (core)", + "GCSFuse Highest CPU (core)", + "Pod-name", + "Start", + "End", + "GcsfuseMountOptions", + "BlockSize", + "FilesPerThread", + "NumThreads", + "InstanceID", +) + def downloadFioOutputs(fioWorkloads: set, instanceId: str) -> int: """Downloads instanceId-specific fio outputs for each fioWorkload locally. @@ -77,9 +104,9 @@ def downloadFioOutputs(fioWorkloads: set, instanceId: str) -> int: srcObjects = f"gs://{fioWorkload.bucket}/fio-output/{instanceId}/*" print(f"Downloading FIO outputs from {srcObjects} ...") - returncode, errorStr = download_gcs_objects(srcObjects, dstDir) + returncode = download_gcs_objects(srcObjects, dstDir) if returncode < 0: - print(f"Failed to download FIO outputs from {srcObjects}: {errorStr}") + print(f"Failed to download FIO outputs from {srcObjects}: {returncode}") return returncode return 0 @@ -216,40 +243,42 @@ def createOutputScenariosFromDownloadedFiles(args: dict) -> dict: ) r["end"] = unix_to_timestamp(per_epoch_output_data["timestamp_ms"]) - if r["scenario"] != "local-ssd": - if mash_installed: - r["lowest_memory"], r["highest_memory"] = get_memory( - r["pod_name"], - r["start"], - r["end"], - project_number=args.project_number, - ) - r["lowest_cpu"], r["highest_cpu"] = get_cpu( - r["pod_name"], - r["start"], - r["end"], - project_number=args.project_number, - ) - else: - r["lowest_memory"], r["highest_memory"] = ( - get_memory_from_monitoring_api( - pod_name=r["pod_name"], - start_epoch=r["start_epoch"], - end_epoch=r["end_epoch"], - project_id=args.project_id, - cluster_name=args.cluster_name, - namespace_name=args.namespace_name, - ) - ) - r["lowest_cpu"], r["highest_cpu"] = get_cpu_from_monitoring_api( - pod_name=r["pod_name"], - start_epoch=r["start_epoch"], - end_epoch=r["end_epoch"], - project_id=args.project_id, - cluster_name=args.cluster_name, - namespace_name=args.namespace_name, - ) - pass + def fetch_cpu_memory_data(): + if r["scenario"] != "local-ssd": + if mash_installed: + r["lowest_memory"], r["highest_memory"] = get_memory( + r["pod_name"], + r["start"], + r["end"], + project_number=args.project_number, + ) + r["lowest_cpu"], r["highest_cpu"] = get_cpu( + r["pod_name"], + r["start"], + r["end"], + project_number=args.project_number, + ) + else: + r["lowest_memory"], r["highest_memory"] = ( + get_memory_from_monitoring_api( + pod_name=r["pod_name"], + start_epoch=r["start_epoch"], + end_epoch=r["end_epoch"], + project_id=args.project_id, + cluster_name=args.cluster_name, + namespace_name=args.namespace_name, + ) + ) + r["lowest_cpu"], r["highest_cpu"] = get_cpu_from_monitoring_api( + pod_name=r["pod_name"], + start_epoch=r["start_epoch"], + end_epoch=r["end_epoch"], + project_id=args.project_id, + cluster_name=args.cluster_name, + namespace_name=args.namespace_name, + ) + + fetch_cpu_memory_data() r["gcsfuse_mount_options"] = gcsfuse_mount_options r["blockSize"] = bs @@ -270,57 +299,120 @@ def createOutputScenariosFromDownloadedFiles(args: dict) -> dict: return output -def writeRecordsToCsvOutputFile(output: dict, output_file_path: str): - with open(output_file_path, "a") as output_file_fwr: - # Write a new header. - output_file_fwr.write( - "File Size,Read Type,Scenario,Epoch,Duration" - " (s),Throughput (MB/s),IOPS,Throughput over Local SSD (%),GCSFuse" - " Lowest" - " Memory (MB),GCSFuse Highest Memory (MB),GCSFuse Lowest CPU" - " (core),GCSFuse Highest CPU" - " (core),Pod,Start,End,GcsfuseMoutOptions,BlockSize,FilesPerThread,NumThreads,InstanceID\n" - ) +def writeOutput( + output: dict, + args: dict, +): + rows = [] - for key in output: - record_set = output[key] + for key in output: + record_set = output[key] - for scenario in record_set["records"]: - if scenario not in SUPPORTED_SCENARIOS: - print(f"Unknown scenario: {scenario}. Ignoring it...") - continue + for scenario in record_set["records"]: + if scenario not in SUPPORTED_SCENARIOS: + print(f"Unknown scenario: {scenario}. Ignoring it...") + continue - for i in range(len(record_set["records"][scenario])): - r = record_set["records"][scenario][i] - - try: - if ("local-ssd" in record_set["records"]) and ( - len(record_set["records"]["local-ssd"]) - == len(record_set["records"][scenario]) - ): - r["throughput_over_local_ssd"] = round( - r["throughput_mb_per_second"] - / record_set["records"]["local-ssd"][i][ - "throughput_mb_per_second" - ] - * 100, - 2, - ) - else: - r["throughput_over_local_ssd"] = "NA" - - except Exception as e: - print( - "Error: failed to parse/write record-set for" - f" scenario: {scenario}, i: {i}, record: {r}, exception: {e}" + for i in range(len(record_set["records"][scenario])): + r = record_set["records"][scenario][i] + + try: + if ("local-ssd" in record_set["records"]) and ( + len(record_set["records"]["local-ssd"]) + == len(record_set["records"][scenario]) + ): + r["throughput_over_local_ssd"] = round( + r["throughput_mb_per_second"] + / record_set["records"]["local-ssd"][i][ + "throughput_mb_per_second" + ] + * 100, + 2, ) - continue + else: + r["throughput_over_local_ssd"] = "NA" - output_file_fwr.write( - f"{record_set['mean_file_size']},{record_set['read_type']},{scenario},{r['epoch']},{r['duration']},{r['throughput_mb_per_second']},{r['IOPS']},{r['throughput_over_local_ssd']},{r['lowest_memory']},{r['highest_memory']},{r['lowest_cpu']},{r['highest_cpu']},{r['pod_name']},{r['start']},{r['end']},\"{r['gcsfuse_mount_options']}\",{r['blockSize']},{r['filesPerThread']},{r['numThreads']},{args.instance_id}\n" + except Exception as e: + print( + "Error: failed to parse/write record-set for" + f" scenario: {scenario}, i: {i}, record: {r}, exception: {e}" ) + continue - output_file_fwr.close() + new_row = ( + record_set["mean_file_size"], + record_set["read_type"], + scenario, + r["epoch"], + r["duration"], + r["throughput_mb_per_second"], + r["IOPS"], + r["throughput_over_local_ssd"], + r["lowest_memory"], + r["highest_memory"], + r["lowest_cpu"], + r["highest_cpu"], + r["pod_name"], + r["start"], + r["end"], + f'"{r["gcsfuse_mount_options"].strip()}"', # need to wrap in quotes to encapsulate commas in the value. + r["blockSize"], + r["filesPerThread"], + r["numThreads"], + args.instance_id, + ) + rows.append(new_row) + + def exportToCsvFile(output_file_path: str, header: str, rows: List): + if output_file_path and output_file_path.strip(): + ensureDir(os.path.dirname(output_file_path)) + with open(output_file_path, "a") as output_file_fwr: + # Write a new header. + output_file_fwr.write(f"{','.join(header)}\n") + for row in rows: + output_file_fwr.write(f"{','.join([f'{val}' for val in row])}\n") + output_file_fwr.close() + print( + "\nSuccessfully published outputs of FIO test runs to" + f" {output_file_path} !!!" + ) + + exportToCsvFile(output_file_path=args.output_file, header=_HEADER, rows=rows) + + def exportToGsheet( + header: str, + rows: List, + output_gsheet_id: str, + output_worksheet_name: str, + output_gsheet_keyfile: str, + ): + if ( + output_gsheet_id + and output_gsheet_id.strip() + and output_worksheet_name + and output_worksheet_name.strip() + ): + append_data_to_gsheet( + data={"header": header, "values": rows}, + worksheet=output_worksheet_name, + gsheet_id=output_gsheet_id, + serviceAccountKeyFile=output_gsheet_keyfile, + # default_service_account_key_file( + # args.project_id + # ), + ) + print( + "\nSuccessfully published outputs of FIO test runs at worksheet" + f" '{args.output_worksheet_name}' in {url(args.output_gsheet_id)}" + ) + + exportToGsheet( + output_gsheet_id=args.output_gsheet_id, + output_worksheet_name=args.output_worksheet_name, + output_gsheet_keyfile=args.output_gsheet_keyfile, + header=_HEADER, + rows=rows, + ) if __name__ == "__main__": @@ -330,20 +422,13 @@ def writeRecordsToCsvOutputFile(output: dict, output_file_path: str): fioWorkloads = fio_workload.ParseTestConfigForFioWorkloads( args.workload_config ) - downloadFioOutputs(fioWorkloads, args.instance_id) + ret = downloadFioOutputs(fioWorkloads, args.instance_id) + if ret != 0: + print(f"failed to download fio outputs: {ret}") mash_installed = is_mash_installed() if not mash_installed: print("Mash is not installed, will skip parsing CPU and memory usage.") output = createOutputScenariosFromDownloadedFiles(args) - - output_file_path = args.output_file - # Create the parent directory of output_file_path if doesn't - # exist already. - ensureDir(os.path.dirname(output_file_path)) - writeRecordsToCsvOutputFile(output, output_file_path) - print( - "\n\nSuccessfully published outputs of FIO test runs to" - f" {output_file_path} !!!\n\n" - ) + writeOutput(output, args) diff --git a/perfmetrics/scripts/testing_on_gke/examples/run-gke-tests.sh b/perfmetrics/scripts/testing_on_gke/examples/run-gke-tests.sh index dac65ac0ad..8f61853e4f 100755 --- a/perfmetrics/scripts/testing_on_gke/examples/run-gke-tests.sh +++ b/perfmetrics/scripts/testing_on_gke/examples/run-gke-tests.sh @@ -113,6 +113,9 @@ readonly DEFAULT_INSTANCE_ID=${USER}-$(date +%Y%m%d-%H%M%S) readonly DEFAULT_POD_WAIT_TIME_IN_SECONDS=300 # 1 week readonly DEFAULT_POD_TIMEOUT_IN_SECONDS=604800 +# readonly DEFAULT_OUTPUT_GSHEET='1UghIdsyarrV1HVNc6lugFZS1jJRumhdiWnPgoEC8Fe4' +# readonly DEFAULT_OUTPUT_GSHEET_KEYFILE=gs:// + function printHelp() { echo "Usage guide: " @@ -142,6 +145,8 @@ function printHelp() { echo "instance_id=" echo "output_dir=" + echo "output_gsheet_id= in https://docs.google.com/spreadsheets/d/>" + echo "output_gsheet_keyfile=" echo "" echo "" echo "" @@ -243,6 +248,14 @@ else export output_dir="${gke_testing_dir}"/examples fi +# if output_gsheet_id is defined by this point, +if test -n "${output_gsheet_id}"; then + # and output_gsheet_keyfile is not defined, then try the pre-defind keyfile at gs://gcsfuse-aiml-test-outputs/creds/${project_id}.json . + if test -z "${output_gsheet_keyfile}"; then + export output_gsheet_keyfile=gs://gcsfuse-aiml-test-outputs/creds/${project_id}.json + fi +fi + function printRunParameters() { echo "Running $0 with following parameters:" echo "" @@ -270,6 +283,8 @@ function printRunParameters() { echo "instance_id=\"${instance_id}\"" echo "workload_config=\"${workload_config}\"" echo "output_dir=\"${output_dir}\"" + echo "output_gsheet_id=\"${output_gsheet_id}\"" + echo "output_gsheet_keyfile=\"${output_gsheet_keyfile}\"" echo "" echo "" echo "" @@ -347,6 +362,8 @@ function installDependencies() { which jq || sudo apt-get install -y jq # # Ensure sudoless docker is installed. # docker ps >/dev/null || (sudo addgroup docker && sudo usermod -aG docker $USER && newgrp docker) + # Install python modules for gsheet. + python3 -m pip install google-api-python-client } # Make sure you have access to the necessary GCP resources. The easiest way to enable it is to use @google.com as active auth. @@ -681,14 +698,14 @@ function waitTillAllPodsComplete() { function fetchAndParseFioOutputs() { printf "\nFetching and parsing fio outputs ...\n\n" cd "${gke_testing_dir}"/examples/fio - python3 parse_logs.py --project-number=${project_number} --workload-config "${workload_config}" --instance-id ${instance_id} --output-file "${output_dir}"/fio/output.csv --project-id=${project_id} --cluster-name=${cluster_name} --namespace-name=${appnamespace} + python3 parse_logs.py --project-number=${project_number} --workload-config "${workload_config}" --instance-id ${instance_id} --output-file "${output_dir}"/fio/output.csv --project-id=${project_id} --cluster-name=${cluster_name} --namespace-name=${appnamespace} --output-gsheet-id=${output_gsheet_id} --output-worksheet-name=fio --output-gsheet-keyfile=${output_gsheet_keyfile} cd - } function fetchAndParseDlioOutputs() { printf "\nFetching and parsing dlio outputs ...\n\n" cd "${gke_testing_dir}"/examples/dlio - python3 parse_logs.py --project-number=${project_number} --workload-config "${workload_config}" --instance-id ${instance_id} --output-file "${output_dir}"/dlio/output.csv --project-id=${project_id} --cluster-name=${cluster_name} --namespace-name=${appnamespace} + python3 parse_logs.py --project-number=${project_number} --workload-config "${workload_config}" --instance-id ${instance_id} --output-file "${output_dir}"/dlio/output.csv --project-id=${project_id} --cluster-name=${cluster_name} --namespace-name=${appnamespace} --output-gsheet-id=${output_gsheet_id} --output-worksheet-name=dlio --output-gsheet-keyfile=${output_gsheet_keyfile} cd - } diff --git a/perfmetrics/scripts/testing_on_gke/examples/utils/parse_logs_common.py b/perfmetrics/scripts/testing_on_gke/examples/utils/parse_logs_common.py index fef1a138a2..7a28328531 100644 --- a/perfmetrics/scripts/testing_on_gke/examples/utils/parse_logs_common.py +++ b/perfmetrics/scripts/testing_on_gke/examples/utils/parse_logs_common.py @@ -36,9 +36,10 @@ def ensureDir(dirpath: str): pass -def download_gcs_objects(src: str, dst: str) -> Tuple[int, str]: - result = subprocess.run( - [ +def download_gcs_objects(src: str, dst: str) -> int: + print(f"Downloading files from {src} to {os.path.abspath(dst)} ...") + returncode = run_command( + " ".join([ "gcloud", "-q", # ignore prompts "storage", @@ -47,13 +48,9 @@ def download_gcs_objects(src: str, dst: str) -> Tuple[int, str]: "--no-user-output-enabled", # do not print names of objects being copied src, dst, - ], - capture_output=False, - text=True, + ]) ) - if result.returncode < 0: - return (result.returncode, f"error: {result.stderr}") - return result.returncode, "" + return returncode def parseLogParserArguments() -> object: @@ -115,4 +112,46 @@ def parseLogParserArguments() -> object: help="File path of the output metrics (in CSV format)", default="output.csv", ) + parser.add_argument( + "--output-gsheet-id", + metavar="ID of a googlesheet for exporting output to.", + help=( + "File path of the output metrics (in CSV format). This is the id in" + " https://docs.google.com/spreadsheets/d/ ." + ), + required=True, + type=str, + ) + parser.add_argument( + "--output-worksheet-name", + metavar=( + "Name of a worksheet (page) in the googlesheet specified by" + " --output-gsheet-id" + ), + help="File path of the output metrics (in CSV format)", + required=True, + type=str, + ) + parser.add_argument( + "--output-gsheet-keyfile", + metavar=( + "Path of a GCS keyfile for read/write access to output google sheet." + ), + help=( + "For this to work, the google-sheet should be shared with the" + " client_email/service-account of the keyfile." + ), + required=True, + type=str, + ) + return parser.parse_args() + + +def default_service_account_key_file(project_id: str) -> str: + if project_id == "gcs-fuse-test": + return "/usr/local/google/home/gargnitin/work/cloud/storage/client/gcsfuse/src/gcsfuse/perfmetrics/scripts/testing_on_gke/examples/20240919-gcs-fuse-test-bc1a2c0aac45.json" + elif project_id == "gcs-fuse-test-ml": + return "/usr/local/google/home/gargnitin/work/cloud/storage/client/gcsfuse/src/gcsfuse/perfmetrics/scripts/testing_on_gke/examples/20240919-gcs-fuse-test-ml-d6e0247b2cf1.json" + else: + raise Exception(f"Unknown project-id: {project_id}")