From 03866005088ec05f44727f753265f12db0b310d3 Mon Sep 17 00:00:00 2001 From: Anthony Car Date: Wed, 1 Jun 2016 23:20:09 -0400 Subject: [PATCH] updated thread processor code --- include/batch_processor.h | 101 +++++++++++++ include/cuda_emu.hpp | 10 +- include/thread_processor.hpp | 282 ++++++++++++++--------------------- 3 files changed, 221 insertions(+), 172 deletions(-) create mode 100644 include/batch_processor.h diff --git a/include/batch_processor.h b/include/batch_processor.h new file mode 100644 index 0000000..7b3eb40 --- /dev/null +++ b/include/batch_processor.h @@ -0,0 +1,101 @@ +#pragma once +#include "thread_processor.hpp" +namespace ParallelDo { +/** + * Use this to post a series of jobs to run in parallel and then wait for + * them to complete in the scheduling thread + */ +class BatchTracker : boost::noncopyable +{ + public: + BatchTracker(ThreadProcessor *threadProcessorp): + number_of_jobs_total(0), number_of_jobs_complete(0), + cond_(), mutex(), threadProcessorp_(threadProcessorp) { } + virtual ~BatchTracker() { }; + + /** + * post a function to run in parallel + * @func a function that takes no arguments to run in parallel, use + * a boost::bind to schedule with arguments + */ + void post(boost::function func) { + threadProcessorp_->post(boost::bind(&BatchTracker::wrap, this, func)); + incJobCount(); + } + void postWorkList(boost::function func) { + threadProcessorp_->post(boost::bind(&BatchTracker::wrap, this, func)); + incJobCount(); + } + + /** + * @param seconds max number of seconds to wait for all jobs to + * complete + */ + bool wait_until_done(time_t seconds = 0) { + if (number_of_jobs_complete == number_of_jobs_total) + return true; + return wait_until_done_locked(seconds); + } + + /** + * call this before re-using a batch tracker after calling + * wait_until_done + */ + void reset() { + number_of_jobs_total = number_of_jobs_complete = 0; + } + + /** + * returns number of time post was called since instansiation or + * reset() called + */ + int scheduled() const { + return number_of_jobs_total; + } + + /** + * number of jobs that have completed since instansiation or reset() + * called + */ + int complete() const { + return number_of_jobs_complete; + } + + private: + int incJobCount() { + return number_of_jobs_total++; + + } + + void done() { + boost::mutex::scoped_lock lock(mutex); + number_of_jobs_complete++; + if (number_of_jobs_complete == number_of_jobs_total) + cond_.notify_one(); + } + + void wrap(boost::function func) { + func(); + done(); + } + + bool wait_until_done_locked(time_t max_seconds) { + boost::mutex::scoped_lock lock(mutex); + time_t start, now = start = time(NULL); + do { + cond_.wait(mutex); + if (number_of_jobs_complete != number_of_jobs_total) { + now = time(NULL); + } + } while (number_of_jobs_complete != number_of_jobs_total && + (max_seconds == 0 || ((now - start) < max_seconds))); + return (number_of_jobs_complete == number_of_jobs_total); + } + + volatile int number_of_jobs_total; + volatile int number_of_jobs_complete; + boost::condition cond_; + boost::mutex mutex; + ThreadProcessor *threadProcessorp_; +}; +} diff --git a/include/cuda_emu.hpp b/include/cuda_emu.hpp index e2330aa..356c9b3 100644 --- a/include/cuda_emu.hpp +++ b/include/cuda_emu.hpp @@ -3,7 +3,7 @@ #ifdef __APPLE__ #include -#else +#else #include #endif #include @@ -13,6 +13,10 @@ #include #include #include "thread_processor.hpp" +#include "batch_processor.h" + + +using namespace ParallelDo; #define UNUSED __attribute__((unused)) @@ -36,7 +40,7 @@ void aligned_free( void *mem ) free( ((void**)mem)[-1] ); } // End shameless copy -#else +#else #define aligned_malloc memalign #define aligned_free free #endif @@ -105,7 +109,7 @@ cudaError_t cudaDeviceSynchronize() #define __global__ #define __shared__ volatile static -#define __restrict__ +#define __restrict__ #define blockIdx getBlockIdx() diff --git a/include/thread_processor.hpp b/include/thread_processor.hpp index 82c39a2..271b189 100644 --- a/include/thread_processor.hpp +++ b/include/thread_processor.hpp @@ -1,3 +1,4 @@ +// vim: set noexpandtab #ifndef THREAD_PROCESSOR_HPP #define THREAD_PROCESSOR_HPP @@ -6,191 +7,134 @@ #include #include +namespace ParallelDo { class ThreadProcessor { -public: - ThreadProcessor(int max_wait_for_job_sec = 200, int number_of_worker_threads = 0): - io_mutex_(), cond_(), - initialized_(false), actors_(), - number_messages_(0), done_(false), message_list_(), - max_wait_for_job_sec_(max_wait_for_job_sec), - number_of_worker_threads_(number_of_worker_threads) { - start_workers(); - } - - void set_done() { - boost::mutex::scoped_lock lock(io_mutex_); - done_ = true; - } - - ~ThreadProcessor() { - set_done(); - cond_.notify_all(); - //actors_.interrupt_all(); - actors_.join_all(); - - } + public: + ThreadProcessor(int max_wait_for_job = 500, int number_of_worker_threads = 0, int jobs_per_worker=5): + io_mutex_(), cond_(), + initialized_(false), actors_(), + number_messages_(0), done_(false), message_list_(), + max_wait_for_job_(max_wait_for_job), + number_of_worker_threads_(number_of_worker_threads), jobs_per_worker_(jobs_per_worker),exit_func_(NULL) { + start_workers(); + } + typedef boost::function work_t; - typedef boost::function work_t; + void set_exit_func(ThreadProcessor::work_t exit_func) { + exit_func_ = exit_func; + } - /** - * you can use this to schedule a task to run if you don't care to - * wait for it to complete - * - * func should be a function that takes no arguments - */ - void post(work_t func) { - boost::mutex::scoped_lock lock(io_mutex_); - message_list_.push_back(func); + void set_done() { + boost::mutex::scoped_lock lock(io_mutex_); + done_ = true; + } - number_messages_++; - cond_.notify_one(); - } + ~ThreadProcessor() { + set_done(); + cond_.notify_all(); + //actors_.interrupt_all(); + actors_.join_all(); + } - int queued() const { //this is intensionally not locked - return number_messages_; - } + /** + * you can use this to schedule a task to run if you don't care to + * wait for it to complete + * + * func should be a function that takes no arguments + */ + void post(work_t func) { + boost::mutex::scoped_lock lock(io_mutex_); + message_list_.push_back(func); + number_messages_++; + lock.unlock(); + cond_.notify_one(); + } + void postWorkList(std::list &worklist) { + boost::mutex::scoped_lock lock(io_mutex_); + int size = worklist.size(); + message_list_.splice(message_list_.end(),worklist ); + number_messages_+=size; + lock.unlock(); + cond_.notify_one(); + } -private: - bool getJob(work_t &job) { - boost::mutex::scoped_lock lock(io_mutex_); - while (false == done_ && queued() == 0) { - boost::system_time tAbsoluteTime = boost::get_system_time() + boost::posix_time::seconds(max_wait_for_job_sec_); - cond_.timed_wait(io_mutex_, tAbsoluteTime); + int queued() const { //this is intensionally not locked + return number_messages_; } - if (queued() == 0) - return false; - job = message_list_.front(); - message_list_.pop_front(); - number_messages_--; - return true; - } + private: + bool getJobs(std::list &jobs) { + boost::mutex::scoped_lock lock(io_mutex_); + while (false == done_ && queued() == 0) { + boost::system_time tAbsoluteTime = boost::get_system_time() + boost::posix_time::milliseconds(max_wait_for_job_); + cond_.timed_wait(io_mutex_, tAbsoluteTime); + } + int available = queued(); + if (available == 0) + return false; + int jobs_to_get; + if (available <= jobs_per_worker_) + jobs_to_get = 1; + else + jobs_to_get = jobs_per_worker_; + std::list::iterator itr = message_list_.begin(); + int i; + for (i=0; i < jobs_to_get; i++) { + ++itr; + } - void worker(int worker_id __attribute__((unused))) { - while (!done_) { - work_t job; - bool success = getJob(job); - if (false == success) - break; - job(); + jobs.splice(jobs.end(), message_list_, message_list_.begin(), itr); + number_messages_-=i; + if (number_messages_> 0) { + lock.unlock(); + cond_.notify_one(); + } + return true; } - } - void start_workers() { - if (false == initialized_) { - int number_of_threads = - 0 == number_of_worker_threads_ ? boost::thread::hardware_concurrency() : number_of_worker_threads_; - for (int i = 0; i < number_of_threads; i++) { - actors_.create_thread(boost::bind(&ThreadProcessor::worker, this, i)); + void worker(int worker_id __attribute__((unused))) { + + std::list myWorkList; + while (!done_) { + work_t job; + bool success = getJobs(myWorkList); + if (false == success) + break; + while (myWorkList.size() > 0) { + work_t job = myWorkList.front(); + myWorkList.pop_front(); + job(); + } + if (exit_func_) + exit_func_(); } - initialized_ = true; } - } - boost::mutex io_mutex_; - boost::condition cond_; - bool initialized_; - boost::thread_group actors_; + void start_workers() { + if (false == initialized_) { + int number_of_threads = + 0 == number_of_worker_threads_ ? boost::thread::hardware_concurrency() : number_of_worker_threads_; + for (int i = 0; i < number_of_threads; i++) { + actors_.create_thread(boost::bind(&ThreadProcessor::worker, this, i)); + } + initialized_ = true; + } + } + boost::mutex io_mutex_; + boost::condition cond_; - volatile int number_messages_; - volatile bool done_; + bool initialized_; + boost::thread_group actors_; - std::list message_list_; - int max_wait_for_job_sec_; - int number_of_worker_threads_; -public: + volatile int number_messages_; + volatile bool done_; + std::list message_list_; + int max_wait_for_job_; + int number_of_worker_threads_; + int jobs_per_worker_; + work_t exit_func_; + public: }; -/** - * Use this to post a series of jobs to run in parallel and then wait for - * them to complete in the scheduling thread - */ -class BatchTracker : boost::noncopyable { -public: - BatchTracker(ThreadProcessor *threadProcessorp): - number_of_jobs_total(0), number_of_jobs_complete(0), - cond_(), mutex(), threadProcessorp_(threadProcessorp) { } - virtual ~BatchTracker() { }; - - /** - * post a function to run in parallel - * @func a function that takes no arguments to run in parallel, use - * a boost::bind to schedule with arguments - */ - void post(boost::function func) { - threadProcessorp_->post(boost::bind(&BatchTracker::wrap, this, func)); - incJobCount(); - } - - /** - * @param seconds max number of seconds to wait for all jobs to - * complete - */ - bool wait_until_done(time_t seconds = 0) { - if (number_of_jobs_complete == number_of_jobs_total) - return true; - return wait_until_done_locked(seconds); - } - - /** - * call this before re-using a batch tracker after calling - * wait_until_done - */ - void reset() { - number_of_jobs_total = number_of_jobs_complete = 0; - } - - /** - * returns number of time post was called since instansiation or - * reset() called - */ - int scheduled() const { - return number_of_jobs_total; - } - - /** - * number of jobs that have completed since instansiation or reset() - * called - */ - int complete() const { - return number_of_jobs_complete; - } - -private: - int incJobCount() { - return number_of_jobs_total++; - - } - - void done() { - boost::mutex::scoped_lock lock(mutex); - number_of_jobs_complete++; - if (number_of_jobs_complete == number_of_jobs_total) - cond_.notify_one(); - } - - void wrap(boost::function func) { - func(); - done(); - } - - bool wait_until_done_locked(time_t max_seconds) { - boost::mutex::scoped_lock lock(mutex); - time_t start, now = start = time(NULL); - do { - cond_.wait(mutex); - if (number_of_jobs_complete != number_of_jobs_total) { - now = time(NULL); - } - } while (number_of_jobs_complete != number_of_jobs_total && - (max_seconds == 0 || ((now - start) < max_seconds))); - return (number_of_jobs_complete == number_of_jobs_total); - } - - volatile int number_of_jobs_total; - volatile int number_of_jobs_complete; - boost::condition cond_; - boost::mutex mutex; - ThreadProcessor *threadProcessorp_; -}; +} #endif