Skip to content

Commit

Permalink
Added DataReader for parallel training with one DB session
Browse files Browse the repository at this point in the history
- Makes sure each solver accesses a different subset of the data
- Sequential reading of DB for performance
- Prefetches a configurable amount of data to host memory
- Distributes data to solvers in round-robin way for determinism
  • Loading branch information
cypof committed May 19, 2015
1 parent 01cbda5 commit 0bd8238
Show file tree
Hide file tree
Showing 8 changed files with 249 additions and 35 deletions.
8 changes: 3 additions & 5 deletions include/caffe/data_layers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
#include <utility>
#include <vector>

#include "boost/scoped_ptr.hpp"
#include "hdf5.h"

#include "caffe/blob.hpp"
#include "caffe/common.hpp"
#include "caffe/data_reader.hpp"
#include "caffe/data_transformer.hpp"
#include "caffe/filler.hpp"
#include "caffe/internal_thread.hpp"
Expand Down Expand Up @@ -90,8 +90,7 @@ class BasePrefetchingDataLayer :
template <typename Dtype>
class DataLayer : public BasePrefetchingDataLayer<Dtype> {
public:
explicit DataLayer(const LayerParameter& param)
: BasePrefetchingDataLayer<Dtype>(param) {}
explicit DataLayer(const LayerParameter& param);
virtual ~DataLayer();
virtual void DataLayerSetUp(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top);
Expand All @@ -104,8 +103,7 @@ class DataLayer : public BasePrefetchingDataLayer<Dtype> {
protected:
virtual void load_batch(Batch<Dtype>* batch);

shared_ptr<db::DB> db_;
shared_ptr<db::Cursor> cursor_;
DataReader reader_;
};

/**
Expand Down
82 changes: 82 additions & 0 deletions include/caffe/data_reader.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
#ifndef CAFFE_DATA_READER_HPP_
#define CAFFE_DATA_READER_HPP_

#include <map>
#include <string>
#include <vector>

#include "caffe/common.hpp"
#include "caffe/internal_thread.hpp"
#include "caffe/util/blocking_queue.hpp"
#include "caffe/util/db.hpp"

namespace caffe {

/**
* @brief Reads data from a source to queues available to data layers.
* A single reading thread is created per source, even if multiple solvers
* are running in parallel, e.g. for multi-GPU training. This makes sure
* databases are read sequentially, and that each solver accesses a different
* subset of the database. Data is distributed to solvers in a round-robin
* way to keep parallel training deterministic.
*/
class DataReader {
public:
explicit DataReader(const LayerParameter& param);
~DataReader();

inline BlockingQueue<Datum*>& free() const {
return queue_pair_->free_;
}
inline BlockingQueue<Datum*>& full() const {
return queue_pair_->full_;
}

protected:
// Queue pairs are shared between a body and its readers

This comment has been minimized.

Copy link
@cdoersch

cdoersch Jun 1, 2015

The variable/class names in this file and the associated comments are pretty uninformative. I can't think of a better name than QueuePair (though it's pretty uninformative). However, 'body' could mean many different things. 'DataSource' would be better for body, but it would be good to have something even more descriptive if you can think of one.

class QueuePair {
public:
explicit QueuePair(int size);
~QueuePair();

BlockingQueue<Datum*> free_;
BlockingQueue<Datum*> full_;

This comment has been minimized.

Copy link
@cdoersch

cdoersch Jun 1, 2015

Whenever I see these two variables referenced elsewhere in the code, I think they're booleans. How about 'populated_datums_' and 'unpopulated_datums_'?


DISABLE_COPY_AND_ASSIGN(QueuePair);
};

// A single body is created per source

This comment has been minimized.

Copy link
@cdoersch

cdoersch Jun 1, 2015

You know there's problems with variable names when there are comments like this one. To me, this says there are these things called 'sources' and each one has a 'body,' and nothing else. What do you mean by source? What types of sources are currently supported? A body is not just paired with a source, it's the code's primary representation of a source; any access to a source must be done through a body. But then why is it called a 'Body'? Why not call it a 'Source'?

This comment has been minimized.

Copy link
@cypof

cypof Jun 9, 2015

Author Owner

OK not very clear. The name body is in the sense of this http://c2.com/cgi/wiki?HandleBodyPattern. I will probably switch to DataSource or something similar, and comment that a single instance gets created even if multiple readers use it.

class Body : public InternalThread {
public:
explicit Body(const LayerParameter& param);
virtual ~Body();

protected:
void InternalThreadEntry();
void read_one(db::Cursor* cursor, QueuePair* qp);

const LayerParameter param_;
BlockingQueue<shared_ptr<QueuePair> > new_queue_pairs_;

friend class DataReader;

DISABLE_COPY_AND_ASSIGN(Body);
};

// A source is uniquely identified by its layer name + path, in case
// the same database is read from two different locations in the net.
static inline string source_key(const LayerParameter& param) {
return param.name() + ":" + param.data_param().source();
}

const shared_ptr<QueuePair> queue_pair_;
shared_ptr<Body> body_;

static map<const string, boost::weak_ptr<DataReader::Body> > bodies_;

DISABLE_COPY_AND_ASSIGN(DataReader);
};

} // namespace caffe

#endif // CAFFE_DATA_READER_HPP_
121 changes: 121 additions & 0 deletions src/caffe/data_reader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
#include <boost/thread.hpp>
#include <map>
#include <string>
#include <vector>

#include "caffe/common.hpp"
#include "caffe/data_layers.hpp"
#include "caffe/data_reader.hpp"
#include "caffe/proto/caffe.pb.h"

namespace caffe {

using boost::weak_ptr;

map<const string, weak_ptr<DataReader::Body> > DataReader::bodies_;
static boost::mutex bodies_mutex_;

DataReader::DataReader(const LayerParameter& param)
: queue_pair_(new QueuePair( //
param.data_param().prefetch() * param.data_param().batch_size())) {
// Get or create a body

This comment has been minimized.

Copy link
@cdoersch

cdoersch Jun 1, 2015

I don't understand this comment.

This comment has been minimized.

Copy link
@cypof

cypof Jun 9, 2015

Author Owner

A body gets created only if one doesn't exist for this source. A source here is identified by its source_key().

boost::mutex::scoped_lock lock(bodies_mutex_);
string key = source_key(param);
weak_ptr<Body>& weak = bodies_[key];
body_ = weak.lock();
if (!body_) {
body_.reset(new Body(param));
bodies_[key] = weak_ptr<Body>(body_);
}
body_->new_queue_pairs_.push(queue_pair_);
}

DataReader::~DataReader() {
string key = source_key(body_->param_);
body_.reset();
boost::mutex::scoped_lock lock(bodies_mutex_);
if (bodies_[key].expired()) {
bodies_.erase(key);
}
}

//

DataReader::QueuePair::QueuePair(int size) {
// Initialize the free queue with requested number of datums
for (int i = 0; i < size; ++i) {
free_.push(new Datum());
}
}

DataReader::QueuePair::~QueuePair() {
Datum* datum;
while (free_.try_pop(&datum)) {
delete datum;
}
while (full_.try_pop(&datum)) {
delete datum;
}
}

//

DataReader::Body::Body(const LayerParameter& param)
: param_(param),
new_queue_pairs_() {
StartInternalThread();
}

DataReader::Body::~Body() {
StopInternalThread();
}

void DataReader::Body::InternalThreadEntry() {
shared_ptr<db::DB> db(db::GetDB(param_.data_param().backend()));
db->Open(param_.data_param().source(), db::READ);
shared_ptr<db::Cursor> cursor(db->NewCursor());
vector<shared_ptr<QueuePair> > qps;
try {
// int solver_count = param_.phase() == TRAIN ? Caffe::solver_count() : 1;
// TODO single solver until multi-gpu merge
int solver_count = 1;

This comment has been minimized.

Copy link
@cdoersch

cdoersch Jun 1, 2015

Maybe a nitpick, but it seems odd to me that this variable would be solver-specific, since it's possible for other code to use this for multithreaded access.

Perhaps more importantly, looking at your later code, it seems somewhat hacky to read this value from a global variable; it's a sort of hidden dependency. Is there no way for this to be passed in? I guess DataReaders are currently created by layers, but layers aren't really supposed to understand that they're multi-threaded. The necessity of this global variable access here suggests that this may be the wrong design. One alternative approach is to have this function wait for new_queue_pairs_ indefinitely, until it gets a NULL. Then we can have a separate static function in Body that's called at the end of network setup, which goes through all existing Body's and sends a NULL to each one's new_queue_pairs_. Not sure I like this approach that much better than what we have now, but I think it's an improvement.

Later we can change the bodies_ variable to not be static, but instead passed in by Caffe during solver construction. In general, I think we want to get away from the assumption that there's only one solver running at a time, because there's nothing obviously blocking that pattern in the Python/Matlab interfaces.

This comment has been minimized.

Copy link
@cypof

cypof Jun 9, 2015

Author Owner

The number of solvers if used in a few places. It would be great to have a concept of a group of solvers, or some object representing the current training task that can be passed around. I am not sure we should change things until we have a better idea of the right way to express this. For now, I feel it OK to store this on the Caffe object (and a couple other variables like is this the root solver, and random generator). It's only visible to the current thread.


// To ensure deterministic runs, only start running once all solvers
// are ready. But solvers need to peek on one item during initialization,
// so read one item, then wait for the next solver.
for (int i = 0; i < solver_count; ++i) {
shared_ptr<QueuePair> qp(new_queue_pairs_.pop());
read_one(cursor.get(), qp.get());
qps.push_back(qp);
}
// Main loop
while (!must_stop()) {
for (int i = 0; i < solver_count; ++i) {
read_one(cursor.get(), qps[i].get());
}
// Check no additional readers have been created. This can happen if
// more than one net is trained at a time per process, whether single
// or multi solver. It might also happen if two data layers have same
// name and same source.
CHECK_EQ(new_queue_pairs_.size(), 0);
}
} catch (boost::thread_interrupted&) {
// Interrupted exception is expected on shutdown
}
}

void DataReader::Body::read_one(db::Cursor* cursor, QueuePair* qp) {
Datum* datum = qp->free_.pop();
// TODO deserialize in-place instead of copy?
datum->ParseFromString(cursor->value());
qp->full_.push(datum);

// go to the next iter
cursor->Next();
if (!cursor->valid()) {
DLOG(INFO) << "Restarting data prefetching from start.";
cursor->SeekToFirst();
}
}

} // namespace caffe
40 changes: 11 additions & 29 deletions src/caffe/layers/data_layer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,15 @@
#include "caffe/proto/caffe.pb.h"
#include "caffe/util/benchmark.hpp"
#include "caffe/util/io.hpp"
#include "caffe/util/math_functions.hpp"
#include "caffe/util/rng.hpp"

namespace caffe {

template <typename Dtype>
DataLayer<Dtype>::DataLayer(const LayerParameter& param)
: BasePrefetchingDataLayer<Dtype>(param),
reader_(param) {
}

template <typename Dtype>
DataLayer<Dtype>::~DataLayer() {
this->StopInternalThread();
Expand All @@ -24,23 +28,8 @@ DataLayer<Dtype>::~DataLayer() {
template <typename Dtype>
void DataLayer<Dtype>::DataLayerSetUp(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top) {
// Initialize DB
db_.reset(db::GetDB(this->layer_param_.data_param().backend()));
db_->Open(this->layer_param_.data_param().source(), db::READ);
cursor_.reset(db_->NewCursor());

// Check if we should randomly skip a few data points
if (this->layer_param_.data_param().rand_skip()) {
unsigned int skip = caffe_rng_rand() %
this->layer_param_.data_param().rand_skip();
LOG(INFO) << "Skipping first " << skip << " data points.";
while (skip-- > 0) {
cursor_->Next();
}
}
// Read a data point, and use it to initialize the top blob.
Datum datum;
datum.ParseFromString(cursor_->value());
Datum& datum = *(reader_.full().peek());

bool force_color = this->layer_param_.data_param().force_encoded_color();
if ((force_color && DecodeDatum(&datum, true)) ||
Expand Down Expand Up @@ -97,8 +86,7 @@ void DataLayer<Dtype>::load_batch(Batch<Dtype>* batch) {
const int crop_size = this->layer_param_.transform_param().crop_size();
bool force_color = this->layer_param_.data_param().force_encoded_color();
if (batch_size == 1 && crop_size == 0) {
Datum datum;
datum.ParseFromString(cursor_->value());
Datum& datum = *(reader_.full().peek());
if (datum.encoded()) {
if (force_color) {
DecodeDatum(&datum, true);
Expand All @@ -121,9 +109,7 @@ void DataLayer<Dtype>::load_batch(Batch<Dtype>* batch) {
for (int item_id = 0; item_id < batch_size; ++item_id) {
timer.Start();
// get a blob
Datum datum;
datum.ParseFromString(cursor_->value());

Datum& datum = *(reader_.full().pop("Waiting for data"));
cv::Mat cv_img;
if (datum.encoded()) {
if (force_color) {
Expand Down Expand Up @@ -153,12 +139,8 @@ void DataLayer<Dtype>::load_batch(Batch<Dtype>* batch) {
top_label[item_id] = datum.label();
}
trans_time += timer.MicroSeconds();
// go to the next iter
cursor_->Next();
if (!cursor_->valid()) {
DLOG(INFO) << "Restarting data prefetching from start.";
cursor_->SeekToFirst();
}

reader_.free().push(const_cast<Datum*>(&datum));
}
batch_timer.Stop();
DLOG(INFO) << "Prefetch batch: " << batch_timer.MilliSeconds() << " ms.";
Expand Down
4 changes: 4 additions & 0 deletions src/caffe/proto/caffe.proto
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,7 @@ message DataParameter {
// to avoid all asynchronous sgd clients to start at the same point. The skip
// point would be set as rand_skip * rand(0,1). Note that rand_skip should not
// be larger than the number of keys in the database.
// DEPRECATED. Each solver accesses a different subset of the database.
optional uint32 rand_skip = 7 [default = 0];
optional DB backend = 8 [default = LEVELDB];
// DEPRECATED. See TransformationParameter. For data pre-processing, we can do
Expand All @@ -470,6 +471,9 @@ message DataParameter {
optional bool mirror = 6 [default = false];
// Force the encoded image to have 3 color channels
optional bool force_encoded_color = 9 [default = false];
// Prefetch queue (Number of batches to prefetch to host memory, increase if
// data access bandwidth varies).
optional uint32 prefetch = 10 [default = 4];
}

message DropoutParameter {
Expand Down
14 changes: 13 additions & 1 deletion src/caffe/test/test_layer_factory.cpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
#include <map>
#include <string>

#include "boost/scoped_ptr.hpp"
#include "gtest/gtest.h"

#include "caffe/common.hpp"
#include "caffe/layer.hpp"
#include "caffe/layer_factory.hpp"
#include "caffe/util/db.hpp"
#include "caffe/util/io.hpp"

#include "caffe/test/test_caffe_main.hpp"

Expand All @@ -21,11 +24,20 @@ TYPED_TEST(LayerFactoryTest, TestCreateLayer) {
typename LayerRegistry<Dtype>::CreatorRegistry& registry =
LayerRegistry<Dtype>::Registry();
shared_ptr<Layer<Dtype> > layer;
LayerParameter layer_param;
for (typename LayerRegistry<Dtype>::CreatorRegistry::iterator iter =
registry.begin(); iter != registry.end(); ++iter) {
// Special case: PythonLayer is checked by pytest
if (iter->first == "Python") { continue; }
LayerParameter layer_param;
// Data layers expect a DB
if (iter->first == "Data") {
string tmp;
MakeTempDir(&tmp);
boost::scoped_ptr<db::DB> db(db::GetDB(DataParameter_DB_LEVELDB));
db->Open(tmp, db::NEW);
db->Close();
layer_param.mutable_data_param()->set_source(tmp);
}
layer_param.set_type(iter->first);
layer = LayerRegistry<Dtype>::CreateLayer(layer_param);
EXPECT_EQ(iter->first, layer->type());
Expand Down
Loading

0 comments on commit 0bd8238

Please sign in to comment.