Skip to content

Commit

Permalink
Added multi-gpu test
Browse files Browse the repository at this point in the history
  • Loading branch information
cypof committed Jul 6, 2015
1 parent eb2cfcc commit abdb736
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 42 deletions.
2 changes: 1 addition & 1 deletion include/caffe/parallel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class P2PSync : public GPUParams<Dtype>, public Solver<Dtype>::Callback,
return solver_;
}

static void run(shared_ptr<Solver<Dtype> > root, const vector<int>& gpus);
void run(const vector<int>& gpus);

protected:
void on_start(Timer* timer, ostringstream* timing);
Expand Down
17 changes: 8 additions & 9 deletions src/caffe/parallel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -384,8 +384,7 @@ void P2PSync<Dtype>::on_gradients_ready(Timer* timer, ostringstream* timing) {
}

template<typename Dtype>
void P2PSync<Dtype>::run(shared_ptr<Solver<Dtype> > root,
const vector<int>& gpus) {
void P2PSync<Dtype>::run(const vector<int>& gpus) {
// Pair devices for map-reduce synchronization
vector<DevicePair> pairs;
DevicePair::compute(gpus, &pairs);
Expand All @@ -395,26 +394,26 @@ void P2PSync<Dtype>::run(shared_ptr<Solver<Dtype> > root,
}
LOG(INFO)<< "GPUs pairs " << s.str();

SolverParameter param(root->param());
SolverParameter param(solver_->param());
vector<shared_ptr<P2PSync<Dtype> > > syncs(gpus.size());
syncs[0].reset(new P2PSync<Dtype>(root, NULL, param));

// Build the GPU tree by finding the parent for each solver
for (int attempts = 0; attempts < pairs.size(); ++attempts) {
for (int i = 1; i < pairs.size(); ++i) {
if (!syncs[i].get()) {
P2PSync<Dtype>* parent = NULL;
for (int j = 0; j < syncs.size(); ++j) {
if (syncs[j]) {
const SolverParameter& p = syncs[j]->solver()->param();
P2PSync<Dtype>* sync = j == 0 ? this : syncs[j].get();
if (sync) {
const SolverParameter& p = sync->solver()->param();
if (p.device_id() == pairs[i].parent()) {
parent = (P2PSync<Dtype>*) syncs[j].get();
parent = sync;
}
}
}
if (parent) {
param.set_device_id(pairs[i].device());
syncs[i].reset(new P2PSync<Dtype>(root, parent, param));
syncs[i].reset(new P2PSync<Dtype>(solver_, parent, param));
parent->children_.push_back((P2PSync<Dtype>*) syncs[i].get());
}
}
Expand All @@ -428,7 +427,7 @@ void P2PSync<Dtype>::run(shared_ptr<Solver<Dtype> > root,
}

// Run root solver on current thread
syncs[0]->solver_->Solve();
solver_->Solve();

for (int i = 1; i < syncs.size(); ++i) {
syncs[i]->StopInternalThread();
Expand Down
110 changes: 79 additions & 31 deletions src/caffe/test/test_gradient_based_solver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "gtest/gtest.h"

#include "caffe/common.hpp"
#include "caffe/parallel.hpp"
#include "caffe/proto/caffe.pb.h"
#include "caffe/solver.hpp"

Expand All @@ -22,12 +23,12 @@ class GradientBasedSolverTest : public MultiDeviceTest<TypeParam> {
typedef typename TypeParam::Dtype Dtype;

protected:
GradientBasedSolverTest() :
seed_(1701), num_(4), channels_(3), height_(10), width_(10) {}
GradientBasedSolverTest() {}

shared_ptr<SGDSolver<Dtype> > solver_;
int seed_;
int num_, channels_, height_, width_;
shared_ptr<P2PSync<Dtype> > sync_;
static const int seed_ = 1701;
static const int num_ = 4, channels_ = 3, height_ = 10, width_ = 10;
Dtype delta_; // Stability constant for AdaGrad.

virtual SolverParameter_SolverType solver_type() = 0;
Expand Down Expand Up @@ -55,22 +56,35 @@ class GradientBasedSolverTest : public MultiDeviceTest<TypeParam> {
param.delta() : 0;
}

// Passed as method argument, explicit field names improve readability
class Run {
public:
Run() : num(num_), num_iters(1), iter_size(1), devices(1),
const_data(false) {
}

int num;
int num_iters;
int iter_size;
int devices;
int const_data; // Required for multi-GPU solvers to have same sequences
};

void RunLeastSquaresSolver(const Dtype learning_rate,
const Dtype weight_decay, const Dtype momentum, const int num_iters,
const int iter_size = 1) {
const Dtype weight_decay, const Dtype momentum, const Run& run) {
ostringstream proto;
proto <<
"max_iter: " << num_iters << " "
"max_iter: " << run.num_iters << " "
"base_lr: " << learning_rate << " "
"lr_policy: 'fixed' "
"iter_size: " << iter_size << " "
"iter_size: " << run.iter_size << " "
"net_param { "
" name: 'TestNetwork' "
" layer { "
" name: 'data' "
" type: 'DummyData' "
" dummy_data_param { "
" num: " << num_ / iter_size << " "
" num: " << run.num << " "
" channels: " << channels_ << " "
" height: " << height_ << " "
" width: " << width_ << " "
Expand All @@ -82,7 +96,7 @@ class GradientBasedSolverTest : public MultiDeviceTest<TypeParam> {
" value: 1.0 "
" } "
" data_filler { "
" type: 'gaussian' "
" type: '" << (run.const_data ? "constant" : "gaussian") << "' "
" std: 1.0 "
" } "
" } "
Expand Down Expand Up @@ -121,17 +135,30 @@ class GradientBasedSolverTest : public MultiDeviceTest<TypeParam> {
}
Caffe::set_random_seed(this->seed_);
this->InitSolverFromProtoString(proto.str());
this->solver_->Solve();
if (run.devices == 1) {
this->solver_->Solve();
} else {
LOG(INFO) << "Multi-GPU test on " << run.devices << " devices";
vector<int> gpus;
for (int i = 0; i < run.devices; ++i) {
gpus.push_back(i);
}
Caffe::set_solver_count(gpus.size());
this->sync_.reset(new P2PSync<Dtype>(
this->solver_, NULL, this->solver_->param()));
this->sync_->run(gpus);
Caffe::set_solver_count(1);
}
}

// Compute an update value given the current state of the train net,
// using the analytical formula for the least squares gradient.
// updated_params will store the updated weight and bias results,
// using the blobs' diffs to hold the update values themselves.
void ComputeLeastSquaresUpdate(const Dtype learning_rate,
const Dtype weight_decay, const Dtype momentum,
const Dtype weight_decay, const Dtype momentum, const int num,
vector<shared_ptr<Blob<Dtype> > >* updated_params) {
const int N = num_;
const int N = num;
const int D = channels_ * height_ * width_;

// Run a forward pass, and manually compute the update values from the
Expand Down Expand Up @@ -281,8 +308,9 @@ class GradientBasedSolverTest : public MultiDeviceTest<TypeParam> {
const double kPrecision = 1e-2;
const double kMinPrecision = 1e-7;
// Solve without accumulation and save parameters.
this->RunLeastSquaresSolver(kLearningRate, kWeightDecay, kMomentum,
kNumIters);
Run run1;
run1.num_iters = kNumIters;
this->RunLeastSquaresSolver(kLearningRate, kWeightDecay, kMomentum, run1);
// Save parameters for comparison.
Net<Dtype>& net = *this->solver_->net();
const vector<shared_ptr<Blob<Dtype> > >& param_blobs =
Expand All @@ -293,8 +321,11 @@ class GradientBasedSolverTest : public MultiDeviceTest<TypeParam> {
noaccum_params[i]->CopyFrom(*param_blobs[i], false, true);
}
// Solve by equivalent accumulation of gradients over divided batches.
this->RunLeastSquaresSolver(kLearningRate, kWeightDecay, kMomentum,
kNumIters, kIterSize);
Run run2;
run2.num = num_ / kIterSize;
run2.num_iters = kNumIters;
run2.iter_size = kIterSize;
this->RunLeastSquaresSolver(kLearningRate, kWeightDecay, kMomentum, run2);
Net<Dtype>& net_accum = *this->solver_->net();
const vector<shared_ptr<Blob<Dtype> > >& accum_params =
net_accum.layer_by_name("innerprod")->blobs();
Expand Down Expand Up @@ -333,20 +364,37 @@ class GradientBasedSolverTest : public MultiDeviceTest<TypeParam> {
void TestLeastSquaresUpdate(const Dtype learning_rate = 1.0,
const Dtype weight_decay = 0.0, const Dtype momentum = 0.0,
const int iter_to_check = 0) {
// Initialize the solver and run K (= iter_to_check) solver iterations.
RunLeastSquaresSolver(learning_rate, weight_decay, momentum, iter_to_check);

// Compute the (K+1)th update using the analytic least squares gradient.
vector<shared_ptr<Blob<Dtype> > > updated_params;
ComputeLeastSquaresUpdate(learning_rate, weight_decay, momentum,
&updated_params);

// Reinitialize the solver and run K+1 solver iterations.
RunLeastSquaresSolver(learning_rate, weight_decay, momentum,
iter_to_check + 1);

// Check that the solver's solution matches ours.
CheckLeastSquaresUpdate(updated_params);
// Loop for each possible set of devices
int available_devices = 1;
#ifndef CPU_ONLY
if (Caffe::mode() == Caffe::GPU) {
CUDA_CHECK(cudaGetDeviceCount(&available_devices));
}
#endif
for (int devices = 1; devices <= available_devices; ++devices) {
// Initialize the solver and run K (= iter_to_check) solver iterations.
Run ref;
// Run on one device, but increase batch to get equivalent run
ref.num = num_ * devices;
ref.num_iters = iter_to_check;
ref.const_data = devices > 1;
RunLeastSquaresSolver(learning_rate, weight_decay, momentum, ref);

// Compute the (K+1)th update using the analytic least squares gradient.
vector<shared_ptr<Blob<Dtype> > > updated_params;
ComputeLeastSquaresUpdate(learning_rate, weight_decay, momentum, ref.num,
&updated_params);

// Reinitialize the solver and run K+1 solver iterations.
Run run;
run.num_iters = iter_to_check + 1;
run.devices = devices;
run.const_data = devices > 1;
RunLeastSquaresSolver(learning_rate, weight_decay, momentum, run);

// Check that the solver's solution matches ours.
CheckLeastSquaresUpdate(updated_params);
}
}
};

Expand Down
3 changes: 2 additions & 1 deletion tools/caffe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ int train() {
}

if (gpus.size() > 1) {
caffe::P2PSync<float>::run(solver, gpus);
caffe::P2PSync<float> sync(solver, NULL, solver->param());
sync.run(gpus);
} else {
LOG(INFO) << "Starting Optimization";
solver->Solve();
Expand Down

0 comments on commit abdb736

Please sign in to comment.