Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Framework for testing effect of various MKL settings #2714

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion 3rd_party/3rd_party.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ elseif ("${HOST_SYSTEM_NAME}" STREQUAL "linux")
set(MKL_LOCATION "/usr/local/gcc103/lib")
set(MKL_EXTENSION ".so.2")
set(MKL_PREFIX "libmkl_")
set(MKL_LIBRARIES "avx2" "avx512" "core" "def" "gnu_thread" "intel_lp64" "mc3" "vml_avx2" "vml_avx512" "vml_cmpt" "vml_def" "vml_mc3")
set(MKL_LIBRARIES "avx2" "avx512" "core" "def" "gnu_thread" "intel_lp64" "mc3" "sequential" "vml_avx2" "vml_avx512" "vml_cmpt" "vml_def" "vml_mc3")
endif()
set(BOOST_EXTENSION mt-${BOOST_ARCH}-1_83.so.1.83.0)
set(BOOST_LIBRARIES "atomic" "chrono" "date_time" "filesystem" "iostreams" "log" "log_setup" "program_options" "regex" "system" "thread" "unit_test_framework")
Expand Down
2 changes: 2 additions & 0 deletions bin/pytorch_inference/CCommandParser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ CCommandParser::validateControlMessageJson(const json::object& doc,
}
case E_ClearCache:
case E_ProcessStats:
case E_FreeMklBuffers:
// No extra arguments needed
break;
case E_Unknown:
Expand Down Expand Up @@ -276,6 +277,7 @@ CCommandParser::SControlMessage CCommandParser::jsonToControlMessage(const json:
doc.at(REQUEST_ID).as_string()};
case E_ClearCache:
case E_ProcessStats:
case E_FreeMklBuffers:
return {controlMessageType, 0, doc.at(REQUEST_ID).as_string()};
case E_Unknown:
break;
Expand Down
1 change: 1 addition & 0 deletions bin/pytorch_inference/CCommandParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ class CCommandParser {
E_NumberOfAllocations,
E_ClearCache,
E_ProcessStats,
E_FreeMklBuffers,
E_Unknown
};

Expand Down
21 changes: 21 additions & 0 deletions bin/pytorch_inference/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,23 @@ set(ML_LINK_LIBRARIES
${C10_LIB}
)

if (LINK_TCMALLOC)
message(AUTHOR_WARNING "Linking libtcmalloc. Build is not for production release.")
list(APPEND ML_LINK_LIBRARIES tcmalloc)
endif ()

if (LINK_PROFILER)
message(AUTHOR_WARNING "Linking libprofiler. Build is not for production release.")
list(APPEND ML_LINK_LIBRARIES profiler)
endif ()

set(MKL_ARCH intel64)
set(MKL_THREADING sequential)
set(MKL_INTERFACE lp64)
find_package(MKL CONFIG REQUIRED PATHS $ENV{MKLROOT})
message(STATUS "Imported oneMKL targets: ${MKL_IMPORTED_TARGETS}")


ml_add_executable(pytorch_inference
CBufferedIStreamAdapter.cc
CCmdLineParser.cc
Expand All @@ -29,4 +46,8 @@ ml_add_executable(pytorch_inference
CThreadSettings.cc
)

target_compile_options(pytorch_inference PUBLIC $<TARGET_PROPERTY:MKL::MKL,INTERFACE_COMPILE_OPTIONS>)
target_include_directories(pytorch_inference PUBLIC $<TARGET_PROPERTY:MKL::MKL,INTERFACE_INCLUDE_DIRECTORIES>)
target_link_libraries(pytorch_inference PUBLIC $<LINK_ONLY:MKL::MKL>)

ml_codesign(pytorch_inference)
18 changes: 18 additions & 0 deletions bin/pytorch_inference/Main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

#include <ATen/Parallel.h>
#include <ATen/ops/cat.h>
#include <mkl.h>
#include <torch/csrc/api/include/torch/types.h>
#include <torch/script.h>

Expand Down Expand Up @@ -135,6 +136,11 @@ void handleControlMessage(const ml::torch::CCommandParser::SControlMessage& cont
ml::core::CProcessStats::residentSetSize(),
ml::core::CProcessStats::maxResidentSetSize());
break;
case ml::torch::CCommandParser::E_FreeMklBuffers:
LOG_INFO(<< "Freeing MKL buffers on request.");
mkl_free_buffers();
resultWriter.writeSimpleAck(controlMessage.s_RequestId);
break;
case ml::torch::CCommandParser::E_Unknown:
std::string message{"Attempt to handle unknown control message"};
LOG_ERROR(<< message);
Expand Down Expand Up @@ -307,6 +313,18 @@ int main(int argc, char** argv) {
LOG_DEBUG(<< "Using a single allocation");
}

MKLVersion mkl_version;
mkl_get_version(&mkl_version);

LOG_INFO(<< "Using oneMKL " << mkl_version.MajorVersion << "." << mkl_version.UpdateVersion);

int fastMmDisabled = mkl_disable_fast_mm();
if (fastMmDisabled != 1) {
LOG_ERROR(<< "Disabling MKL fast MM failed.");
} else {
LOG_INFO(<< "Disabled MKL fast MM.");
}

commandParser.ioLoop(
[&module_, &resultWriter](ml::torch::CCommandParser::CRequestCacheInterface& cache,
ml::torch::CCommandParser::SRequest request) -> bool {
Expand Down
21 changes: 13 additions & 8 deletions bin/pytorch_inference/evaluate.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,9 @@ def path_to_app():

def launch_pytorch_app(args):

command = [path_to_app(),
os.environ["HEAPPROFILE"] = "/tmp/heapprof"
command = [#'/heaptrack/build/bin/heaptrack',
path_to_app(),
'--restore=' + args.restore_file,
'--input=' + args.input_file,
'--output=' + args.output_file,
Expand Down Expand Up @@ -385,6 +387,10 @@ def threading_benchmark(args):
print(f"{result['inference_threads']},{result['num_allocations']},{result['run_time_ms']},{result['avg_time_ms']}")


def create_mkl_free_buffers_request(request_num):
print("mkl_free_buffers")
return {"request_id": "mkl_free_buffers_" + str(request_num), "control": 3}

def create_mem_usage_request(request_num):
return {"request_id": "mem_" + str(request_num), "control": 2}

Expand All @@ -410,13 +416,16 @@ def memory_usage(args):
with open(args.input_file, 'w') as input_file:

request_num = 0
request_sizes = [10, 20, 30, 40, 50]
#request_sizes = [10, 20, 30, 40, 50]
request_sizes = [1, 2, 3, 4]

write_request(create_mem_usage_request(request_num), input_file)
for i in request_sizes:
request_num = request_num + 1
write_request(create_inference_request(batch_size=i, num_tokens=512, request_num=request_num), input_file)
write_request(create_mem_usage_request(request_num), input_file)
if i % 2 == 0:
write_request(create_mkl_free_buffers_request(request_num), input_file)

launch_pytorch_app(args)

Expand Down Expand Up @@ -453,6 +462,7 @@ def memory_usage(args):
continue

if 'error' in result:
print(result)
print(f"Inference failed. Request: {result['error']['request_id']}, Msg: {result['error']['error']}")
continue

Expand All @@ -477,12 +487,7 @@ def main():
else:
test_evaluation(args)
finally:
if os.path.isfile(args.restore_file):
os.remove(args.restore_file)
if os.path.isfile(args.input_file):
os.remove(args.input_file)
if os.path.isfile(args.output_file):
os.remove(args.output_file)
pass

if __name__ == "__main__":
main()
Expand Down
201 changes: 201 additions & 0 deletions bin/pytorch_inference/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
import argparse
import json
import os
import platform
import random
import stat
import subprocess
import time


#
# python3 signal9.py '/Users/davidkyle/Development/NLP Models/elser_2/elser_model_2.pt' --num_allocations=4
#

def parse_arguments():
parser = argparse.ArgumentParser()
parser.add_argument('model', help='A TorchScript model with .pt extension')
parser.add_argument('exec_path', default='pytorch_inference', help='he path to the pytorch_inference executable')
parser.add_argument('input_file', default='pytorch_requests.txt', help='The path to the file containing the requests')
# The pipes are created by the C++ app
# The names must match those passed to app
parser.add_argument('--output_file', default='out.json')
parser.add_argument('--log_file', default='log.txt')
parser.add_argument('--num_threads_per_allocation', type=int, help='The number of inference threads used by LibTorch. Defaults to 1.')
parser.add_argument('--num_allocations', type=int, help='The number of allocations for parallel forwarding. Defaults to 1')
parser.add_argument('--cache_size', type=int, help='Cache size limit. Defaults to 0')
parser.add_argument('--valgrind_log_file', default='valgrind_out.txt', help='Valgrind output file')

return parser.parse_args()


# def path_to_app2():
#
# os_platform = platform.system()
# if os_platform == 'Darwin':
# if platform.machine() == 'arm64':
# sub_path = 'darwin-aarch64/controller.app/Contents/MacOS/'
# else:
# sub_path = 'darwin-x86_64/controller.app/Contents/MacOS/'
# elif os_platform == 'Linux':
# if platform.machine() == 'aarch64':
# sub_path = 'linux-aarch64/bin/'
# else:
# sub_path = 'linux-x86_64/bin/'
# elif os_platform == 'Windows':
# sub_path = 'windows-x86_64/bin/'
# else:
# raise RuntimeError('Unknown platform')
#
# return "../../build/distribution/platform/" + sub_path + "pytorch_inference"


def path_to_app(args):
return args.exec_path


def lauch_pytorch_app(args, input_pipe, restore_file):
# command = ['valgrind', '--leak-check=full', '--show-leak-kinds=all', '--track-origins=yes', '--verbose',
# '--log-file=' + args.valgrind_log_file,
# path_to_app(args),
command = [path_to_app(args),
'--restore=' + restore_file,
'--input=' + input_pipe, '--inputIsPipe',
'--output=' + args.output_file,
# '--logPipe=' + args.log_file,
'--validElasticLicenseKeyConfirmed=true',
]

if args.num_threads_per_allocation:
command.append('--numThreadsPerAllocation=' + str(args.num_threads_per_allocation))

if args.num_allocations:
command.append('--numAllocations=' + str(args.num_allocations))

cache_size_to_use = 0
if args.cache_size:
cache_size_to_use = args.cache_size

command.append('--cacheMemorylimitBytes=' + str(cache_size_to_use))

# For the memory benchmark always use the immediate executor
# if args.memory_benchmark:
# command.append('--useImmediateExecutor')
subprocess.Popen(command)


def stream_file(source, destination):
while True:
piece = source.read(8192)
if not piece:
break

destination.write(piece)


def wait_for_pipe(file_name, num_retries=5):
'''
the pipe must exist else it will be created as an
ordinary file when opened for write
'''
while num_retries > 0:
try:
if stat.S_ISFIFO(os.stat(file_name).st_mode):
break
except Exception:
pass


num_retries = num_retries -1
time.sleep(5)

return stat.S_ISFIFO(os.stat(file_name).st_mode)

def write_mem_usage_request(request_num, destination):
json.dump({"request_id": "mem_" + str(request_num), "control": 2}, destination)

def write_random_request(request_id, destination):
json.dump(build_random_inference_request(request_id=request_id), destination)

def build_random_inference_request(request_id):
num_tokens = 510
tokens = [101] # CLS
for _ in range(num_tokens):
tokens.append(random.randrange(110, 28000))
tokens.append(102) # SEP

arg_1 = [1] * (num_tokens + 2)
arg_2 = [0] * (num_tokens + 2)
arg_3 = [i for i in range(num_tokens + 2)]

request = {
"request_id": request_id,
"tokens": [tokens],
"arg_1": [arg_1],
"arg_2": [arg_2],
"arg_3": [arg_3],
}

return request


def restore_model(model, restore_file_name):
# create the restore file
with open(restore_file_name, 'wb') as restore_file:
file_stats = os.stat(model)
file_size = file_stats.st_size

# 4 byte unsigned int
b = (file_size).to_bytes(4, 'big')
restore_file.write(b)

print("streaming model of size", file_size, flush=True)

with open(model, 'rb') as source_file:
stream_file(source_file, restore_file)


def main():
args = parse_arguments()

input_pipe_name = "model_input"
restore_file_name = "model_restore_temp"

# stream the torchscript model
restore_model(args.model, restore_file_name=restore_file_name)

lauch_pytorch_app(args, input_pipe=input_pipe_name, restore_file=restore_file_name)

if not wait_for_pipe(input_pipe_name):
print("Error: input pipe [{}] has not been created".format(input_pipe_name))
return

input_pipe = open(input_pipe_name, 'w')

print("writing requests")

request_num = 0
with open(args.input_file) as file:
for line in file:
request_num = request_num + 1
if request_num % 300 == 0:
print("Request number: ", request_num)
# print(line.rstrip())
input_pipe.write(line.rstrip().rstrip('\n'))


# i = 0
# while True:
# if i % 100 == 0:
# print("mem")
# write_mem_usage_request(str(i), input_pipe)
# else:
# write_random_request(str(i), input_pipe)
#
# i = i + 1

input_pipe.close()


if __name__ == "__main__":
main()
Loading