Skip to content

Commit

Permalink
Data queues, prefetching and multi-source
Browse files Browse the repository at this point in the history
  • Loading branch information
cypof committed Apr 2, 2015
1 parent 2a7fe03 commit 60db09a
Show file tree
Hide file tree
Showing 20 changed files with 702 additions and 192 deletions.
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
PROJECT := caffe

CONFIG_FILE := Makefile.config
CONFIG_FILE ?= Makefile.config
include $(CONFIG_FILE)

BUILD_DIR_LINK := $(BUILD_DIR)
Expand Down Expand Up @@ -270,6 +270,8 @@ endif
# Debugging
ifeq ($(DEBUG), 1)
COMMON_FLAGS += -DDEBUG -g -O0
# Compile issue in DEBUG on MAC (https://svn.boost.org/trac/boost/ticket/9392)
COMMON_FLAGS += -DBOOST_NOINLINE='__attribute__ ((noinline))'
NVCCFLAGS += -G
else
COMMON_FLAGS += -DNDEBUG -O2
Expand Down
4 changes: 3 additions & 1 deletion include/caffe/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,8 @@ class Caffe {
// freed in a non-pinned way, which may cause problems - I haven't verified
// it personally but better to note it here in the header file.
inline static void set_mode(Brew mode) { Get().mode_ = mode; }
// Sets the random seed of both boost and curand
// Random seed of both boost and curand
static unsigned int get_random_seed();
static void set_random_seed(const unsigned int seed);
// Sets the device. Since we have cublas and curand stuff, set device also
// requires us to reset those values.
Expand All @@ -156,6 +157,7 @@ class Caffe {
curandGenerator_t curand_generator_;
#endif
shared_ptr<RNG> random_generator_;
unsigned int random_generator_seed_;

Brew mode_;
static shared_ptr<Caffe> singleton_;
Expand Down
92 changes: 75 additions & 17 deletions include/caffe/data_layers.hpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
#ifndef CAFFE_DATA_LAYERS_HPP_
#define CAFFE_DATA_LAYERS_HPP_

#include <map>
#include <string>
#include <utility>
#include <vector>

#include "boost/scoped_ptr.hpp"
#include "boost/random/mersenne_twister.hpp"
#include "boost/random/uniform_real.hpp"
#include "boost/random/variate_generator.hpp"
#include "boost/weak_ptr.hpp"
#include "hdf5.h"

#include "caffe/blob.hpp"
Expand All @@ -16,10 +20,16 @@
#include "caffe/layer.hpp"
#include "caffe/net.hpp"
#include "caffe/proto/caffe.pb.h"
#include "caffe/util/blocking_queue.hpp"
#include "caffe/util/db.hpp"

namespace caffe {

using boost::weak_ptr;
using boost::mt19937;
using boost::uniform_real;
using boost::variate_generator;

/**
* @brief Provides base for data layers that feed blobs to the Net.
*
Expand Down Expand Up @@ -52,12 +62,17 @@ class BaseDataLayer : public Layer<Dtype> {
bool output_labels_;
};

template <typename Dtype>
class Batch {
public:
Blob<Dtype> data_, label_;
};

template <typename Dtype>
class BasePrefetchingDataLayer :
public BaseDataLayer<Dtype>, public InternalThread {
public:
explicit BasePrefetchingDataLayer(const LayerParameter& param)
: BaseDataLayer<Dtype>(param) {}
explicit BasePrefetchingDataLayer(const LayerParameter& param);
virtual ~BasePrefetchingDataLayer() {}
// LayerSetUp: implements common data layer setup functionality, and calls
// DataLayerSetUp to do special data layer setup for individual layer types.
Expand All @@ -70,22 +85,63 @@ class BasePrefetchingDataLayer :
virtual void Forward_gpu(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top);

virtual void CreatePrefetchThread();
virtual void JoinPrefetchThread();
// The thread's function
virtual void InternalThreadEntry() {}
// Prefetches batches (asynchronously if to GPU memory)
static const int PREFETCH_COUNT = 3;

protected:
Blob<Dtype> prefetch_data_;
Blob<Dtype> prefetch_label_;
virtual void InternalThreadEntry();
virtual void load_batch(Batch<Dtype>* batch) = 0;

Batch<Dtype> prefetch_[PREFETCH_COUNT];
blocking_queue<Batch<Dtype>*> prefetch_free_;
blocking_queue<Batch<Dtype>*> prefetch_full_;
int device_;

Blob<Dtype> transformed_data_;
};

// Prefetches datums to host memory that can be read by multiple data layers.
class DataLoader {
public:
DataLoader(const DataParameter& param, int index);
~DataLoader();

inline blocking_queue<Datum*>& free() {
return body_.get()->free_;
}
inline blocking_queue<Datum*>& full() {
return body_.get()->full_;
}

protected:
class Body: public InternalThread {
public:
Body(const DataParameter& param, int index);
~Body();

void InternalThreadEntry();

shared_ptr<db::DB> db_;
shared_ptr<db::Cursor> cursor_;

blocking_queue<Datum*> free_;
blocking_queue<Datum*> full_;

DISABLE_COPY_AND_ASSIGN(Body);
};

static map<string, weak_ptr<Body> > instances_;

const string source_;
shared_ptr<Body> body_;

DISABLE_COPY_AND_ASSIGN(DataLoader);
};

template <typename Dtype>
class DataLayer : public BasePrefetchingDataLayer<Dtype> {
class DataLayer: public BasePrefetchingDataLayer<Dtype> {
public:
explicit DataLayer(const LayerParameter& param)
: BasePrefetchingDataLayer<Dtype>(param) {}
explicit DataLayer(const LayerParameter& param);
virtual ~DataLayer();
virtual void DataLayerSetUp(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top);
Expand All @@ -96,10 +152,12 @@ class DataLayer : public BasePrefetchingDataLayer<Dtype> {
virtual inline int MaxTopBlobs() const { return 2; }

protected:
virtual void InternalThreadEntry();
virtual void load_batch(Batch<Dtype>* batch);
DataLoader* next_loader();

shared_ptr<db::DB> db_;
shared_ptr<db::Cursor> cursor_;
vector<shared_ptr<DataLoader> > loaders_;
mt19937 rand_engine_;
uniform_real<float> rand_;
};

/**
Expand Down Expand Up @@ -238,7 +296,7 @@ class ImageDataLayer : public BasePrefetchingDataLayer<Dtype> {
protected:
shared_ptr<Caffe::RNG> prefetch_rng_;
virtual void ShuffleImages();
virtual void InternalThreadEntry();
virtual void load_batch(Batch<Dtype>* batch);

vector<std::pair<std::string, int> > lines_;
int lines_id_;
Expand Down Expand Up @@ -310,7 +368,7 @@ class WindowDataLayer : public BasePrefetchingDataLayer<Dtype> {

protected:
virtual unsigned int PrefetchRand();
virtual void InternalThreadEntry();
virtual void load_batch(Batch<Dtype>* batch);

shared_ptr<Caffe::RNG> prefetch_rng_;
vector<std::pair<std::string, vector<int> > > image_database_;
Expand Down
9 changes: 7 additions & 2 deletions include/caffe/internal_thread.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,28 @@ namespace caffe {
*/
class InternalThread {
public:
InternalThread() : thread_() {}
InternalThread() : thread_(), must_stop_() {}
virtual ~InternalThread();

/** Returns true if the thread was successfully started. **/
bool StartInternalThread();

/** Will not return until the internal thread has exited. */
bool WaitForInternalThreadToExit();
bool StopInternalThread();

bool is_started() const;

bool must_stop() {
return must_stop_;
}

protected:
/* Implement this method in your subclass
with the code you want your thread to run. */
virtual void InternalThreadEntry() {}

shared_ptr<boost::thread> thread_;
bool must_stop_;
};

} // namespace caffe
Expand Down
4 changes: 4 additions & 0 deletions include/caffe/syncedmem.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ class SyncedMemory {
SyncedHead head() { return head_; }
size_t size() { return size_; }

#ifndef CPU_ONLY
void async_gpu_push(const cudaStream_t& stream);
#endif

private:
void to_cpu();
void to_gpu();
Expand Down
50 changes: 50 additions & 0 deletions include/caffe/util/blocking_queue.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#ifndef CAFFE_UTIL_BLOCKING_QUEUE_H_
#define CAFFE_UTIL_BLOCKING_QUEUE_H_

#include <queue>
#include <string>

#include "caffe/common.hpp"

namespace caffe {

template<typename T>
class blocking_queue {
public:
explicit blocking_queue();
virtual ~blocking_queue();

void push(const T& t);

bool empty() const;

bool try_pop(T* t);

T pop(const string& log_on_wait = "");

// Return element without removing it
T peek();

inline uint64_t pops() {
return pops_;
}

protected:
/**
Move synchronization fields out instead of including boost/thread.hpp
to avoid a boost/NVCC issues (#1009, #1010) on OSX. Also fails on
Linux CUDA 7.0.18.
*/
class sync;

std::queue<T> queue_;
shared_ptr<sync> sync_;
time_t last_wait_log_;
uint64_t pops_;

DISABLE_COPY_AND_ASSIGN(blocking_queue);
};

} // namespace caffe

#endif
10 changes: 10 additions & 0 deletions src/caffe/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,14 @@ Caffe::Caffe()

Caffe::~Caffe() { }

unsigned int Caffe::get_random_seed() {
return Get().random_generator_seed_;
}

void Caffe::set_random_seed(const unsigned int seed) {
// RNG seed
Get().random_generator_.reset(new RNG(seed));
Get().random_generator_seed_ = seed;
}

void Caffe::SetDevice(const int device_id) {
Expand Down Expand Up @@ -108,6 +113,10 @@ Caffe::~Caffe() {
}
}

unsigned int Caffe::get_random_seed() {
return Get().random_generator_seed_;
}

void Caffe::set_random_seed(const unsigned int seed) {
// Curand seed
static bool g_curand_availability_logged = false;
Expand All @@ -124,6 +133,7 @@ void Caffe::set_random_seed(const unsigned int seed) {
}
// RNG seed
Get().random_generator_.reset(new RNG(seed));
Get().random_generator_seed_ = seed;
}

void Caffe::SetDevice(const int device_id) {
Expand Down
9 changes: 6 additions & 3 deletions src/caffe/internal_thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
namespace caffe {

InternalThread::~InternalThread() {
WaitForInternalThreadToExit();
StopInternalThread();
}

bool InternalThread::is_started() const {
Expand All @@ -13,9 +13,10 @@ bool InternalThread::is_started() const {


bool InternalThread::StartInternalThread() {
if (!WaitForInternalThreadToExit()) {
if (!StopInternalThread()) {
return false;
}
must_stop_ = false;
try {
thread_.reset(
new boost::thread(&InternalThread::InternalThreadEntry, this));
Expand All @@ -26,8 +27,10 @@ bool InternalThread::StartInternalThread() {
}

/** Will not return until the internal thread has exited. */
bool InternalThread::WaitForInternalThreadToExit() {
bool InternalThread::StopInternalThread() {
must_stop_ = true;
if (is_started()) {
thread_->interrupt();
try {
thread_->join();
} catch (...) {
Expand Down
Loading

0 comments on commit 60db09a

Please sign in to comment.