Skip to content
This repository has been archived by the owner on Dec 21, 2023. It is now read-only.

Introducing OMP #3001

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 17 additions & 13 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -fPIC")
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wall -Werror")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Werror")

# Go through and add in all the relevant build flags as appropriate
# Go through and add in all the relevant build flags as appropriate
check_and_set_compiler_flag(-ftemplate-depth=900 CXX)
check_and_set_compiler_flag(-Wno-c++11-narrowing CXX)
check_and_set_compiler_flag(-Wno-stdlibcxx-not-found CXX RESTRICT_CLANG)
Expand Down Expand Up @@ -154,17 +154,17 @@ check_and_set_compiler_flag(-mfpmath=sse RELEASE)

if(APPLE)
# This triggers a bug in clang; the 10.13 symbol ___chkstk_darwin is missing in
# 10.13, and the code generated doesn't run on that, even with the 10.12 target.
# 10.13, and the code generated doesn't run on that, even with the 10.12 target.
# The solution to this is to disable the stack checking globally, which seems
# to be enabled by default in clang 11 and later for objective C++ code.
# to be enabled by default in clang 11 and later for objective C++ code.
check_and_set_compiler_flag(-fno-stack-check)
check_and_set_compiler_flag(-fno-stack-protector)
endif()

# Turn on ARC globally.
check_and_set_compiler_flag(-fobjc-arc)

# This flag needs to be set after all other warning flags, or it may give false
# This flag needs to be set after all other warning flags, or it may give false
# positives.
check_and_set_compiler_flag(-Wno-unknown-warning-option)

Expand Down Expand Up @@ -193,7 +193,7 @@ Find_Int128_Types()
include(CompilerOddities)
Set_Compiler_Specific_Flags()

# Set up threads.
# Set up threads.
set(THREADS_PREFER_PTHREAD_FLAG TRUE)
set(CMAKE_THREAD_PREFER_PTHREAD TRUE)
find_package(Threads REQUIRED)
Expand All @@ -212,13 +212,13 @@ if (APPLE)
find_library(CORE_VIDEO CoreVideo)
find_library(METAL NAMES Metal)
find_library(METAL_PERFORMANCE_SHADERS NAMES MetalPerformanceShaders)
set(_TC_APPLE_DEPENDENCIES
${ACCELERATE} ${CORE_GRAPHICS} ${JAVASCRIPT_CORE} ${FOUNDATION}
set(_TC_APPLE_DEPENDENCIES
${ACCELERATE} ${CORE_GRAPHICS} ${JAVASCRIPT_CORE} ${FOUNDATION}
${CORE_IMAGE} ${CORE_ML} ${CORE_VIDEO} ${METAL} ${METAL_PERFORMANCE_SHADERS})

if(NOT ${TC_BUILD_IOS})
find_library(APPKIT AppKit)
set(_TC_APPLE_DEPENDENCIES ${_TC_APPLE_DEPENDENCIES} ${APPKIT})
set(_TC_APPLE_DEPENDENCIES ${_TC_APPLE_DEPENDENCIES} ${APPKIT})
endif()
endif()

Expand Down Expand Up @@ -358,7 +358,7 @@ foreach(package ${packages})
add_dependencies(external_dependencies ex_${depname})
endforeach()

set(_TC_EXTERNAL_DEPENDENCIES
set(_TC_EXTERNAL_DEPENDENCIES
openssl libxml2 curl
Threads::Threads
${CMAKE_DL_LIBS})
Expand All @@ -368,7 +368,7 @@ set(_TC_EXTERNAL_DEPENDENCIES
# These are used by C API, unity_shared, etc.

if(NOT _TC_DEFAULT_SERVER_INITIALIZER)
set(_TC_DEFAULT_SERVER_INITIALIZER
set(_TC_DEFAULT_SERVER_INITIALIZER
"${CMAKE_SOURCE_DIR}/src/capi/default_server_initializer.cpp"
)
endif()
Expand Down Expand Up @@ -459,15 +459,20 @@ set(_TC_COMMON_REQUIREMENTS
boost
libbz2
z
omp
)

# if (NOT TC_BUILD_IOS)
guihao-liang marked this conversation as resolved.
Show resolved Hide resolved
# LIST(APPEND _TC_COMMON_REQUIREMENTS omp)
# endif()

if(APPLE)
set(_TC_COMMON_REQUIREMENTS
# External dependencies (from deps)
${_TC_COMMON_REQUIREMENTS}
${_TC_APPLE_DEPENDENCIES})
endif()

set(_TC_COMMON_OBJECTS
${_TC_COMMON_OBJECTS}
"$<TARGET_OBJECTS:annotation>"
Expand All @@ -491,7 +496,7 @@ if(APPLE)
${_TC_COMMON_OBJECTS}
"$<TARGET_OBJECTS:vega_renderer>"
)
set(_TC_COMMON_REQUIREMENTS
set(_TC_COMMON_REQUIREMENTS
${_TC_COMMON_REQUIREMENTS}
${APPKIT}
${CORE_GRAPHICS}
Expand Down Expand Up @@ -541,4 +546,3 @@ endif()

add_subdirectory(src)
add_subdirectory(test)

28 changes: 20 additions & 8 deletions src/core/parallel/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,12 +1,24 @@
project(Turi)

make_library(parallel OBJECT
make_library(
parallel
OBJECT
SOURCES
pthread_tools.cpp
thread_pool.cpp
execute_task_in_native_thread.cpp
pthread_tools.cpp
thread_pool.cpp
execute_task_in_native_thread.cpp
lambda_omp.cpp
REQUIRES
platform_config
logger
EXTERNAL_VISIBILITY
)
platform_config
logger
EXTERNAL_VISIBILITY)

if(NOT TC_BUILD_IOS)
# get_target_property(TURI_OMP_H_PATH omp TURI_OMP_H_PATH) if
# (TURI_OMP_H_PATH) message(FATAL_ERROR "TURI_OMP_H_PATH not set in target
# omp") endif() set_source_files_properties(lambda_omp.cpp PROPERTIES
# COMPILE_FLAGS -I${TURI_OMP_H_PATH})
target_link_libraries(parallel omp)
set_source_files_properties(lambda_omp.cpp PROPERTIES COMPILE_FLAGS
"-Xpreprocessor -fopenmp")
endif()
18 changes: 18 additions & 0 deletions src/core/parallel/lambda_omp.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#include <omp.h>
guihao-liang marked this conversation as resolved.
Show resolved Hide resolved

#include <core/parallel/lambda_omp.hpp>
namespace turi {

void in_parallel(
const std::function<void(size_t thread_id, size_t num_threads)>& fn) {
#pragma omp parallel
{
size_t nworkers = omp_get_num_threads();
#pragma omp parallel for
for (size_t ii = 0; ii < nworkers; ii++) {
fn(ii, nworkers);
}
}
}

}; // namespace turi
147 changes: 67 additions & 80 deletions src/core/parallel/lambda_omp.hpp
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
/* Copyright © 2017 Apple Inc. All rights reserved.
*
* Use of this source code is governed by a BSD-3-clause license that can
* be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
* be found in the LICENSE.txt file or at
* https://opensource.org/licenses/BSD-3-Clause
*/
#ifndef TURI_PARALLEL_LAMBDA_OMP_HPP
#define TURI_PARALLEL_LAMBDA_OMP_HPP
#include <iterator>
#include <utility>
#include <functional>
#include <type_traits>

#include <core/util/basic_types.hpp>
#include <core/parallel/thread_pool.hpp>
#include <core/util/basic_types.hpp>
#include <functional>
#include <iterator>
#include <type_traits>
#include <utility>

namespace turi {

Expand All @@ -32,25 +33,8 @@ namespace turi {
* \param fn The function to run. The function must take two size_t arguments:
* the thread ID and the number of threads.
*/
inline void in_parallel(const std::function<void (size_t thread_id,
size_t num_threads)>& fn) {
size_t nworkers = thread_pool::get_instance().size();

if (thread::get_tls_data().is_in_thread() || nworkers <= 1) {

fn(0, 1);
return;

} else {

parallel_task_queue threads(thread_pool::get_instance());

for (unsigned int i = 0;i < nworkers; ++i) {
threads.launch([&fn, i, nworkers]() { fn(i, nworkers); }, i);
}
threads.join();
}
}
void in_parallel(
const std::function<void(size_t thread_id, size_t num_threads)>& fn);

/**
* Returns the thread pool dedicated for running parallel for jobs.
Expand Down Expand Up @@ -90,38 +74,39 @@ thread_pool& get_parfor_thread_pool();
* argument which is a current index.
*/
template <typename FunctionType>
void parallel_for(size_t begin,
size_t end,
const FunctionType& fn) {

void parallel_for(size_t begin, size_t end, const FunctionType& fn) {
size_t nworkers = thread_pool::get_instance().size();

if (thread::get_tls_data().is_in_thread() || nworkers <= 1) {
// we do not support recursive calls to in parallel yet.
for(size_t i = begin; i < end; ++i) {
for (size_t i = begin; i < end; ++i) {
fn(i);
}
} else {
parallel_task_queue threads(thread_pool::get_instance());
size_t nlen = end - begin; // total range
double split_size = (double)nlen / nworkers; // size of range each worker gets
size_t nlen = end - begin; // total range
double split_size =
(double)nlen / nworkers; // size of range each worker gets
for (size_t i = 0; i < nworkers; ++i) {
size_t worker_begin = begin + split_size * i; // beginning of this worker's range
size_t worker_end = begin + split_size * (i + 1); // end of this worker's range
size_t worker_begin =
begin + split_size * i; // beginning of this worker's range
size_t worker_end =
begin + split_size * (i + 1); // end of this worker's range
if (i == nworkers - 1) worker_end = end;
threads.launch([&fn, worker_begin, worker_end]() {
size_t worker_iter = worker_begin;
while (worker_iter < worker_end) {
fn(worker_iter);
++worker_iter;
}
}, i);
threads.launch(
[&fn, worker_begin, worker_end]() {
size_t worker_iter = worker_begin;
while (worker_iter < worker_end) {
fn(worker_iter);
++worker_iter;
}
},
i);
}
threads.join();
}
}


/**
* Runs a map reduce operation for ranging from the integers 'begin' to 'end'.
* \ingroup threading
Expand Down Expand Up @@ -155,37 +140,40 @@ void parallel_for(size_t begin,
* argument which is a current index.
*/
template <typename FunctionType, typename ReduceType>
ReduceType fold_reduce (size_t begin,
size_t end,
const FunctionType& fn,
ReduceType base = ReduceType()) {
ReduceType fold_reduce(size_t begin, size_t end, const FunctionType& fn,
ReduceType base = ReduceType()) {
size_t nworkers = thread_pool::get_instance().size();

if (thread::get_tls_data().is_in_thread() || nworkers <= 1) {
// we do not support recursive calls to in parallel yet.
ReduceType acc = base;
for(size_t i = begin; i < end; ++i) {
for (size_t i = begin; i < end; ++i) {
fn(i, acc);
}
return acc;
} else {
parallel_task_queue threads(thread_pool::get_instance());

size_t nlen = end - begin; // total rangeS
double split_size = (double)nlen / nworkers; // size of range each worker gets
size_t nlen = end - begin; // total rangeS
double split_size =
(double)nlen / nworkers; // size of range each worker gets

std::vector<ReduceType> acc(nworkers, base);
for (size_t i = 0;i < nworkers; ++i) {
size_t worker_begin = begin + split_size * i; // beginning of this worker's range
size_t worker_end = begin + split_size * (i + 1); // end of this worker's range
for (size_t i = 0; i < nworkers; ++i) {
size_t worker_begin =
begin + split_size * i; // beginning of this worker's range
size_t worker_end =
begin + split_size * (i + 1); // end of this worker's range
if (i == nworkers - 1) worker_end = end;
threads.launch([&fn, &acc, worker_begin, worker_end, i]() {
size_t worker_iter = worker_begin;
while (worker_iter < worker_end) {
fn(worker_iter, acc[i]);
++worker_iter;
}
}, i);
threads.launch(
[&fn, &acc, worker_begin, worker_end, i]() {
size_t worker_iter = worker_begin;
while (worker_iter < worker_end) {
fn(worker_iter, acc[i]);
++worker_iter;
}
},
i);
}
threads.join();
ReduceType ret = base;
Expand Down Expand Up @@ -226,11 +214,11 @@ ReduceType fold_reduce (size_t begin,
* argument which is a current index.
*/
template <typename RandomAccessIterator, typename FunctionType>
inline void parallel_for(RandomAccessIterator iter_begin,
RandomAccessIterator iter_end,
const FunctionType& fn,
std::random_access_iterator_tag = typename std::iterator_traits<RandomAccessIterator>::iterator_category()) {

inline void parallel_for(
RandomAccessIterator iter_begin, RandomAccessIterator iter_end,
const FunctionType& fn,
std::random_access_iterator_tag = typename std::iterator_traits<
RandomAccessIterator>::iterator_category()) {
size_t nworkers = thread_pool::get_instance().size();

if (thread::get_tls_data().is_in_thread() || nworkers <= 1) {
Expand All @@ -242,29 +230,28 @@ inline void parallel_for(RandomAccessIterator iter_begin,
} else {
parallel_task_queue threads(thread_pool::get_instance());

size_t nlen = std::distance(iter_begin, iter_end); // number of elements
size_t nlen = std::distance(iter_begin, iter_end); // number of elements

double split_size = (double)nlen / nworkers; // number of elements per worker
double split_size =
(double)nlen / nworkers; // number of elements per worker

for (size_t i = 0;i < nworkers; ++i) {
size_t worker_begin = split_size * i; // index this worker starts at
size_t worker_end = split_size * (i + 1); // index this worker ends at
for (size_t i = 0; i < nworkers; ++i) {
size_t worker_begin = split_size * i; // index this worker starts at
size_t worker_end = split_size * (i + 1); // index this worker ends at
if (i == nworkers - 1) worker_end = nlen;
threads.launch(
[&fn, worker_begin, worker_end, &iter_begin]() {
RandomAccessIterator my_begin = iter_begin + worker_begin;
RandomAccessIterator my_end = iter_begin + worker_end;
while (my_begin != my_end) {
fn(*my_begin);
++my_begin;
}
} );
threads.launch([&fn, worker_begin, worker_end, &iter_begin]() {
RandomAccessIterator my_begin = iter_begin + worker_begin;
RandomAccessIterator my_end = iter_begin + worker_end;
while (my_begin != my_end) {
fn(*my_begin);
++my_begin;
}
});
}
threads.join();
}
}


};
}; // namespace turi

#endif
1 change: 1 addition & 0 deletions src/external/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ add_subdirectory(xgboost)
add_subdirectory(coremltools_wrap)
add_subdirectory(boost)
add_subdirectory(nanomsg)
add_subdirectory(openmp)

if(${TC_BUILD_REMOTEFS})
add_subdirectory(aws-sdk-cpp)
Expand Down
Loading