From 848bc2a71626470146395938f821af9cd970a316 Mon Sep 17 00:00:00 2001 From: Ed Savage Date: Fri, 30 Aug 2024 16:05:05 +1200 Subject: [PATCH] [ML] Framework for testing effect of various MKL settings Checkpointing current status for visibility. --- 3rd_party/3rd_party.cmake | 2 +- bin/pytorch_inference/CCommandParser.cc | 2 + bin/pytorch_inference/CCommandParser.h | 1 + bin/pytorch_inference/CMakeLists.txt | 21 +++ bin/pytorch_inference/Main.cc | 18 ++ bin/pytorch_inference/evaluate.py | 21 ++- bin/pytorch_inference/main.py | 201 ++++++++++++++++++++++ bin/pytorch_inference/signal9.py | 213 ++++++++++++++++++++++++ 8 files changed, 470 insertions(+), 9 deletions(-) create mode 100755 bin/pytorch_inference/main.py create mode 100644 bin/pytorch_inference/signal9.py diff --git a/3rd_party/3rd_party.cmake b/3rd_party/3rd_party.cmake index dd0ee72928..f832fe514c 100644 --- a/3rd_party/3rd_party.cmake +++ b/3rd_party/3rd_party.cmake @@ -71,7 +71,7 @@ elseif ("${HOST_SYSTEM_NAME}" STREQUAL "linux") set(MKL_LOCATION "/usr/local/gcc103/lib") set(MKL_EXTENSION ".so.2") set(MKL_PREFIX "libmkl_") - set(MKL_LIBRARIES "avx2" "avx512" "core" "def" "gnu_thread" "intel_lp64" "mc3" "vml_avx2" "vml_avx512" "vml_cmpt" "vml_def" "vml_mc3") + set(MKL_LIBRARIES "avx2" "avx512" "core" "def" "gnu_thread" "intel_lp64" "mc3" "sequential" "vml_avx2" "vml_avx512" "vml_cmpt" "vml_def" "vml_mc3") endif() set(BOOST_EXTENSION mt-${BOOST_ARCH}-1_83.so.1.83.0) set(BOOST_LIBRARIES "atomic" "chrono" "date_time" "filesystem" "iostreams" "log" "log_setup" "program_options" "regex" "system" "thread" "unit_test_framework") diff --git a/bin/pytorch_inference/CCommandParser.cc b/bin/pytorch_inference/CCommandParser.cc index a5a323dc49..a4287bdfd8 100644 --- a/bin/pytorch_inference/CCommandParser.cc +++ b/bin/pytorch_inference/CCommandParser.cc @@ -143,6 +143,7 @@ CCommandParser::validateControlMessageJson(const json::object& doc, } case E_ClearCache: case E_ProcessStats: + case E_FreeMklBuffers: // No extra arguments needed break; case E_Unknown: @@ -276,6 +277,7 @@ CCommandParser::SControlMessage CCommandParser::jsonToControlMessage(const json: doc.at(REQUEST_ID).as_string()}; case E_ClearCache: case E_ProcessStats: + case E_FreeMklBuffers: return {controlMessageType, 0, doc.at(REQUEST_ID).as_string()}; case E_Unknown: break; diff --git a/bin/pytorch_inference/CCommandParser.h b/bin/pytorch_inference/CCommandParser.h index 4cbc4dd4ac..1dc695c16f 100644 --- a/bin/pytorch_inference/CCommandParser.h +++ b/bin/pytorch_inference/CCommandParser.h @@ -126,6 +126,7 @@ class CCommandParser { E_NumberOfAllocations, E_ClearCache, E_ProcessStats, + E_FreeMklBuffers, E_Unknown }; diff --git a/bin/pytorch_inference/CMakeLists.txt b/bin/pytorch_inference/CMakeLists.txt index 62a4f3defd..b80e2eed91 100644 --- a/bin/pytorch_inference/CMakeLists.txt +++ b/bin/pytorch_inference/CMakeLists.txt @@ -21,6 +21,23 @@ set(ML_LINK_LIBRARIES ${C10_LIB} ) +if (LINK_TCMALLOC) + message(AUTHOR_WARNING "Linking libtcmalloc. Build is not for production release.") + list(APPEND ML_LINK_LIBRARIES tcmalloc) +endif () + +if (LINK_PROFILER) + message(AUTHOR_WARNING "Linking libprofiler. Build is not for production release.") + list(APPEND ML_LINK_LIBRARIES profiler) +endif () + +set(MKL_ARCH intel64) +set(MKL_THREADING sequential) +set(MKL_INTERFACE lp64) +find_package(MKL CONFIG REQUIRED PATHS $ENV{MKLROOT}) +message(STATUS "Imported oneMKL targets: ${MKL_IMPORTED_TARGETS}") + + ml_add_executable(pytorch_inference CBufferedIStreamAdapter.cc CCmdLineParser.cc @@ -29,4 +46,8 @@ ml_add_executable(pytorch_inference CThreadSettings.cc ) +target_compile_options(pytorch_inference PUBLIC $) +target_include_directories(pytorch_inference PUBLIC $) +target_link_libraries(pytorch_inference PUBLIC $) + ml_codesign(pytorch_inference) diff --git a/bin/pytorch_inference/Main.cc b/bin/pytorch_inference/Main.cc index 662810a48c..2386a907a9 100644 --- a/bin/pytorch_inference/Main.cc +++ b/bin/pytorch_inference/Main.cc @@ -32,6 +32,7 @@ #include #include +#include #include #include @@ -135,6 +136,11 @@ void handleControlMessage(const ml::torch::CCommandParser::SControlMessage& cont ml::core::CProcessStats::residentSetSize(), ml::core::CProcessStats::maxResidentSetSize()); break; + case ml::torch::CCommandParser::E_FreeMklBuffers: + LOG_INFO(<< "Freeing MKL buffers on request."); + mkl_free_buffers(); + resultWriter.writeSimpleAck(controlMessage.s_RequestId); + break; case ml::torch::CCommandParser::E_Unknown: std::string message{"Attempt to handle unknown control message"}; LOG_ERROR(<< message); @@ -307,6 +313,18 @@ int main(int argc, char** argv) { LOG_DEBUG(<< "Using a single allocation"); } + MKLVersion mkl_version; + mkl_get_version(&mkl_version); + + LOG_INFO(<< "Using oneMKL " << mkl_version.MajorVersion << "." << mkl_version.UpdateVersion); + + int fastMmDisabled = mkl_disable_fast_mm(); + if (fastMmDisabled != 1) { + LOG_ERROR(<< "Disabling MKL fast MM failed."); + } else { + LOG_INFO(<< "Disabled MKL fast MM."); + } + commandParser.ioLoop( [&module_, &resultWriter](ml::torch::CCommandParser::CRequestCacheInterface& cache, ml::torch::CCommandParser::SRequest request) -> bool { diff --git a/bin/pytorch_inference/evaluate.py b/bin/pytorch_inference/evaluate.py index 7e60ec13a1..20c40e3dcc 100644 --- a/bin/pytorch_inference/evaluate.py +++ b/bin/pytorch_inference/evaluate.py @@ -147,7 +147,9 @@ def path_to_app(): def launch_pytorch_app(args): - command = [path_to_app(), + os.environ["HEAPPROFILE"] = "/tmp/heapprof" + command = [#'/heaptrack/build/bin/heaptrack', + path_to_app(), '--restore=' + args.restore_file, '--input=' + args.input_file, '--output=' + args.output_file, @@ -385,6 +387,10 @@ def threading_benchmark(args): print(f"{result['inference_threads']},{result['num_allocations']},{result['run_time_ms']},{result['avg_time_ms']}") +def create_mkl_free_buffers_request(request_num): + print("mkl_free_buffers") + return {"request_id": "mkl_free_buffers_" + str(request_num), "control": 3} + def create_mem_usage_request(request_num): return {"request_id": "mem_" + str(request_num), "control": 2} @@ -410,13 +416,16 @@ def memory_usage(args): with open(args.input_file, 'w') as input_file: request_num = 0 - request_sizes = [10, 20, 30, 40, 50] + #request_sizes = [10, 20, 30, 40, 50] + request_sizes = [1, 2, 3, 4] write_request(create_mem_usage_request(request_num), input_file) for i in request_sizes: request_num = request_num + 1 write_request(create_inference_request(batch_size=i, num_tokens=512, request_num=request_num), input_file) write_request(create_mem_usage_request(request_num), input_file) + if i % 2 == 0: + write_request(create_mkl_free_buffers_request(request_num), input_file) launch_pytorch_app(args) @@ -453,6 +462,7 @@ def memory_usage(args): continue if 'error' in result: + print(result) print(f"Inference failed. Request: {result['error']['request_id']}, Msg: {result['error']['error']}") continue @@ -477,12 +487,7 @@ def main(): else: test_evaluation(args) finally: - if os.path.isfile(args.restore_file): - os.remove(args.restore_file) - if os.path.isfile(args.input_file): - os.remove(args.input_file) - if os.path.isfile(args.output_file): - os.remove(args.output_file) + pass if __name__ == "__main__": main() diff --git a/bin/pytorch_inference/main.py b/bin/pytorch_inference/main.py new file mode 100755 index 0000000000..dd3bf187eb --- /dev/null +++ b/bin/pytorch_inference/main.py @@ -0,0 +1,201 @@ +import argparse +import json +import os +import platform +import random +import stat +import subprocess +import time + + +# +# python3 signal9.py '/Users/davidkyle/Development/NLP Models/elser_2/elser_model_2.pt' --num_allocations=4 +# + +def parse_arguments(): + parser = argparse.ArgumentParser() + parser.add_argument('model', help='A TorchScript model with .pt extension') + parser.add_argument('exec_path', default='pytorch_inference', help='he path to the pytorch_inference executable') + parser.add_argument('input_file', default='pytorch_requests.txt', help='The path to the file containing the requests') + # The pipes are created by the C++ app + # The names must match those passed to app + parser.add_argument('--output_file', default='out.json') + parser.add_argument('--log_file', default='log.txt') + parser.add_argument('--num_threads_per_allocation', type=int, help='The number of inference threads used by LibTorch. Defaults to 1.') + parser.add_argument('--num_allocations', type=int, help='The number of allocations for parallel forwarding. Defaults to 1') + parser.add_argument('--cache_size', type=int, help='Cache size limit. Defaults to 0') + parser.add_argument('--valgrind_log_file', default='valgrind_out.txt', help='Valgrind output file') + + return parser.parse_args() + + +# def path_to_app2(): +# +# os_platform = platform.system() +# if os_platform == 'Darwin': +# if platform.machine() == 'arm64': +# sub_path = 'darwin-aarch64/controller.app/Contents/MacOS/' +# else: +# sub_path = 'darwin-x86_64/controller.app/Contents/MacOS/' +# elif os_platform == 'Linux': +# if platform.machine() == 'aarch64': +# sub_path = 'linux-aarch64/bin/' +# else: +# sub_path = 'linux-x86_64/bin/' +# elif os_platform == 'Windows': +# sub_path = 'windows-x86_64/bin/' +# else: +# raise RuntimeError('Unknown platform') +# +# return "../../build/distribution/platform/" + sub_path + "pytorch_inference" + + +def path_to_app(args): + return args.exec_path + + +def lauch_pytorch_app(args, input_pipe, restore_file): + # command = ['valgrind', '--leak-check=full', '--show-leak-kinds=all', '--track-origins=yes', '--verbose', + # '--log-file=' + args.valgrind_log_file, + # path_to_app(args), + command = [path_to_app(args), + '--restore=' + restore_file, + '--input=' + input_pipe, '--inputIsPipe', + '--output=' + args.output_file, + # '--logPipe=' + args.log_file, + '--validElasticLicenseKeyConfirmed=true', + ] + + if args.num_threads_per_allocation: + command.append('--numThreadsPerAllocation=' + str(args.num_threads_per_allocation)) + + if args.num_allocations: + command.append('--numAllocations=' + str(args.num_allocations)) + + cache_size_to_use = 0 + if args.cache_size: + cache_size_to_use = args.cache_size + + command.append('--cacheMemorylimitBytes=' + str(cache_size_to_use)) + + # For the memory benchmark always use the immediate executor + # if args.memory_benchmark: + # command.append('--useImmediateExecutor') + subprocess.Popen(command) + + +def stream_file(source, destination): + while True: + piece = source.read(8192) + if not piece: + break + + destination.write(piece) + + +def wait_for_pipe(file_name, num_retries=5): + ''' + the pipe must exist else it will be created as an + ordinary file when opened for write + ''' + while num_retries > 0: + try: + if stat.S_ISFIFO(os.stat(file_name).st_mode): + break + except Exception: + pass + + + num_retries = num_retries -1 + time.sleep(5) + + return stat.S_ISFIFO(os.stat(file_name).st_mode) + +def write_mem_usage_request(request_num, destination): + json.dump({"request_id": "mem_" + str(request_num), "control": 2}, destination) + +def write_random_request(request_id, destination): + json.dump(build_random_inference_request(request_id=request_id), destination) + +def build_random_inference_request(request_id): + num_tokens = 510 + tokens = [101] # CLS + for _ in range(num_tokens): + tokens.append(random.randrange(110, 28000)) + tokens.append(102) # SEP + + arg_1 = [1] * (num_tokens + 2) + arg_2 = [0] * (num_tokens + 2) + arg_3 = [i for i in range(num_tokens + 2)] + + request = { + "request_id": request_id, + "tokens": [tokens], + "arg_1": [arg_1], + "arg_2": [arg_2], + "arg_3": [arg_3], + } + + return request + + +def restore_model(model, restore_file_name): + # create the restore file + with open(restore_file_name, 'wb') as restore_file: + file_stats = os.stat(model) + file_size = file_stats.st_size + + # 4 byte unsigned int + b = (file_size).to_bytes(4, 'big') + restore_file.write(b) + + print("streaming model of size", file_size, flush=True) + + with open(model, 'rb') as source_file: + stream_file(source_file, restore_file) + + +def main(): + args = parse_arguments() + + input_pipe_name = "model_input" + restore_file_name = "model_restore_temp" + + # stream the torchscript model + restore_model(args.model, restore_file_name=restore_file_name) + + lauch_pytorch_app(args, input_pipe=input_pipe_name, restore_file=restore_file_name) + + if not wait_for_pipe(input_pipe_name): + print("Error: input pipe [{}] has not been created".format(input_pipe_name)) + return + + input_pipe = open(input_pipe_name, 'w') + + print("writing requests") + + request_num = 0 + with open(args.input_file) as file: + for line in file: + request_num = request_num + 1 + if request_num % 300 == 0: + print("Request number: ", request_num) + # print(line.rstrip()) + input_pipe.write(line.rstrip().rstrip('\n')) + + + # i = 0 + # while True: + # if i % 100 == 0: + # print("mem") + # write_mem_usage_request(str(i), input_pipe) + # else: + # write_random_request(str(i), input_pipe) + # + # i = i + 1 + + input_pipe.close() + + +if __name__ == "__main__": + main() diff --git a/bin/pytorch_inference/signal9.py b/bin/pytorch_inference/signal9.py new file mode 100644 index 0000000000..cb261bdeab --- /dev/null +++ b/bin/pytorch_inference/signal9.py @@ -0,0 +1,213 @@ +import argparse +import sys +import json +import os +import platform +import random +import stat +import subprocess +import time + + +# +# python3 signal9.py '/Users/davidkyle/Development/NLP Models/elser_2/elser_model_2.pt' --num_allocations=4 +# + +def parse_arguments(): + parser = argparse.ArgumentParser() + parser.add_argument('model', help='A TorchScript model with .pt extension') + # The pipes are created by the C++ app + # The names must match those passed to app + parser.add_argument('--output_file', default='out.json') + parser.add_argument('--log_file', default='log.txt') + parser.add_argument('--num_threads_per_allocation', type=int, help='The number of inference threads used by LibTorch. Defaults to 1.') + parser.add_argument('--num_allocations', type=int, help='The number of allocations for parallel forwarding. Defaults to 1') + + return parser.parse_args() + + +def path_to_app(): + + os_platform = platform.system() + if os_platform == 'Darwin': + if platform.machine() == 'arm64': + sub_path = 'darwin-aarch64/controller.app/Contents/MacOS/' + else: + sub_path = 'darwin-x86_64/controller.app/Contents/MacOS/' + elif os_platform == 'Linux': + if platform.machine() == 'aarch64': + sub_path = 'linux-aarch64/bin/' + else: + sub_path = 'linux-x86_64/bin/' + elif os_platform == 'Windows': + sub_path = 'windows-x86_64/bin/' + else: + raise RuntimeError('Unknown platform') + + return "../../build/distribution/platform/" + sub_path + "pytorch_inference" + +def launch_pytorch_app(args, input_pipe, restore_file): + + #command = ['valgrind', + #'--leak-check=full', + #'--show-leak-kinds=all', + #'--track-origins=yes', + #'--verbose', + #'--log-file=valgrind.log', + #path_to_app(), + #'--restore=' + restore_file, + #'--input=' + input_pipe, '--inputIsPipe', + #'--output=' + args.output_file, + ## '--logPipe=' + args.log_file, + #'--validElasticLicenseKeyConfirmed=true', + #] + + command = [ #'/heaptrack/build/bin/heaptrack', + path_to_app(), + '--restore=' + restore_file, + '--input=' + input_pipe, '--inputIsPipe', + '--output=' + args.output_file,# '--outputIsPipe', + # '--logPipe=' + args.log_file, + '--validElasticLicenseKeyConfirmed=true', + ] + + if args.num_threads_per_allocation: + command.append('--numThreadsPerAllocation=' + str(args.num_threads_per_allocation)) + + if args.num_allocations: + command.append('--numAllocations=' + str(args.num_allocations)) + + print(command) + + # For the memory benchmark always use the immediate executor + # if args.memory_benchmark: + # command.append('--useImmediateExecutor') + return subprocess.Popen(command) + + +def stream_file(source, destination) : + while True: + piece = source.read(8192) + if not piece: + break + + destination.write(piece) + +def wait_for_pipe(file_name, num_retries=5) : + ''' + the pipe must exist else it will be created as an + ordinary file when opened for write + ''' + while num_retries > 0: + try: + print("Checking " + file_name + " is fifo") + if stat.S_ISFIFO(os.stat(file_name).st_mode): + print(file_name + " fifo exists") + break + except Exception as e: + print("exception: " + str(e)) + pass + + + num_retries = num_retries -1 + time.sleep(5) + + print("returning: " + str( stat.S_ISFIFO(os.stat(file_name).st_mode))) + return stat.S_ISFIFO(os.stat(file_name).st_mode) + +def write_mem_usage_request(request_num, destination): + json.dump({"request_id": "mem_" + str(request_num), "control": 2}, destination) + +def write_random_request(request_id, destination): + json.dump(build_random_inference_request(request_id=request_id), destination) + #json.dump(build_random_inference_request(request_id=request_id), sys.stdout) + +def build_random_inference_request(request_id): + num_tokens = 510 + tokens = [101] # CLS + for _ in range(num_tokens): + tokens.append(random.randrange(110, 28000)) + tokens.append(102) # SEP + + arg_1 = [1] * (num_tokens + 2) + arg_2 = [0] * (num_tokens + 2) + arg_3 = [i for i in range(num_tokens + 2)] + + request = { + "request_id": request_id, + "tokens": [tokens], + "arg_1": [arg_1], + "arg_2": [arg_2], + "arg_3": [arg_3], + } + + return request + + + + + +def restore_model(model, restore_file_name): + # create the restore file + with open(restore_file_name, 'wb') as restore_file: + file_stats = os.stat(model) + file_size = file_stats.st_size + + # 4 byte unsigned int + b = (file_size).to_bytes(4, 'big') + restore_file.write(b) + + print("streaming model of size", file_size, flush=True) + + with open(model, 'rb') as source_file: + stream_file(source_file, restore_file) + + +def main(): + + os.environ["(ASAN_OPTIONS"]="halt_on_error=false,log_path=./as.log" + args = parse_arguments() + + input_pipe_name = "model_input" + restore_file_name = "model_restore_temp" + + # stream the torchscript model + restore_model(args.model, restore_file_name=restore_file_name) + + #os.mkfifo(input_pipe_name, 0o600) + + p = launch_pytorch_app(args, input_pipe=input_pipe_name, restore_file=restore_file_name) + + if not wait_for_pipe(input_pipe_name): + print("Error: input pipe [{}] has not been created".format(input_pipe_name)) + return + else: + print("Error: input pipe [{}] has been created".format(input_pipe_name)) + + with open(input_pipe_name, 'w') as input_pipe: + + print("writing requests") + + i = 0 + while True: + if i == 1: + #if i == 200000: + print("break") + break + if i % 100 == 0: + #print(str(i)+": mem") + #write_mem_usage_request(str(i), input_pipe) + write_random_request(str(i), input_pipe) + #time.sleep(1) + else: + write_random_request(str(i), input_pipe) + i = i + 1; + p.poll() + if p.returncode != None: + print("breaking") + break + + + +if __name__ == "__main__": + main()