Skip to content

Commit

Permalink
updated thread processor code
Browse files Browse the repository at this point in the history
  • Loading branch information
acgreek committed Jun 2, 2016
1 parent 091bf61 commit 0386600
Show file tree
Hide file tree
Showing 3 changed files with 221 additions and 172 deletions.
101 changes: 101 additions & 0 deletions include/batch_processor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
#pragma once
#include "thread_processor.hpp"
namespace ParallelDo {
/**
* Use this to post a series of jobs to run in parallel and then wait for
* them to complete in the scheduling thread
*/
class BatchTracker : boost::noncopyable
{
public:
BatchTracker(ThreadProcessor *threadProcessorp):
number_of_jobs_total(0), number_of_jobs_complete(0),
cond_(), mutex(), threadProcessorp_(threadProcessorp) { }
virtual ~BatchTracker() { };

/**
* post a function to run in parallel
* @func a function that takes no arguments to run in parallel, use
* a boost::bind to schedule with arguments
*/
void post(boost::function<void ()> func) {
threadProcessorp_->post(boost::bind(&BatchTracker::wrap, this, func));
incJobCount();
}
void postWorkList(boost::function<void ()> func) {
threadProcessorp_->post(boost::bind(&BatchTracker::wrap, this, func));
incJobCount();
}

/**
* @param seconds max number of seconds to wait for all jobs to
* complete
*/
bool wait_until_done(time_t seconds = 0) {
if (number_of_jobs_complete == number_of_jobs_total)
return true;
return wait_until_done_locked(seconds);
}

/**
* call this before re-using a batch tracker after calling
* wait_until_done
*/
void reset() {
number_of_jobs_total = number_of_jobs_complete = 0;
}

/**
* returns number of time post was called since instansiation or
* reset() called
*/
int scheduled() const {
return number_of_jobs_total;
}

/**
* number of jobs that have completed since instansiation or reset()
* called
*/
int complete() const {
return number_of_jobs_complete;
}

private:
int incJobCount() {
return number_of_jobs_total++;

}

void done() {
boost::mutex::scoped_lock lock(mutex);
number_of_jobs_complete++;
if (number_of_jobs_complete == number_of_jobs_total)
cond_.notify_one();
}

void wrap(boost::function<void ()> func) {
func();
done();
}

bool wait_until_done_locked(time_t max_seconds) {
boost::mutex::scoped_lock lock(mutex);
time_t start, now = start = time(NULL);
do {
cond_.wait(mutex);
if (number_of_jobs_complete != number_of_jobs_total) {
now = time(NULL);
}
} while (number_of_jobs_complete != number_of_jobs_total &&
(max_seconds == 0 || ((now - start) < max_seconds)));
return (number_of_jobs_complete == number_of_jobs_total);
}

volatile int number_of_jobs_total;
volatile int number_of_jobs_complete;
boost::condition cond_;
boost::mutex mutex;
ThreadProcessor *threadProcessorp_;
};
}
10 changes: 7 additions & 3 deletions include/cuda_emu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

#ifdef __APPLE__
#include <string.h>
#else
#else
#include <malloc.h>
#endif
#include <boost/bind.hpp>
Expand All @@ -13,6 +13,10 @@
#include <boost/shared_ptr.hpp>
#include <boost/shared_ptr.hpp>
#include "thread_processor.hpp"
#include "batch_processor.h"


using namespace ParallelDo;

#define UNUSED __attribute__((unused))

Expand All @@ -36,7 +40,7 @@ void aligned_free( void *mem )
free( ((void**)mem)[-1] );
}
// End shameless copy
#else
#else
#define aligned_malloc memalign
#define aligned_free free
#endif
Expand Down Expand Up @@ -105,7 +109,7 @@ cudaError_t cudaDeviceSynchronize()
#define __global__

#define __shared__ volatile static
#define __restrict__
#define __restrict__


#define blockIdx getBlockIdx()
Expand Down
Loading

0 comments on commit 0386600

Please sign in to comment.