Skip to content

Commit

Permalink
events overhaul
Browse files Browse the repository at this point in the history
  • Loading branch information
boeschf committed Oct 16, 2024
1 parent eb6fbe7 commit cc12918
Show file tree
Hide file tree
Showing 25 changed files with 383 additions and 720 deletions.
9 changes: 0 additions & 9 deletions arbor/backends/event.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
#include <arbor/fvm_types.hpp>
#include <arbor/serdes.hpp>
#include <arbor/mechanism_abi.h>
#include <arbor/generic_event.hpp>

// Structures for the representation of event delivery targets and
// staged events.
Expand Down Expand Up @@ -46,9 +45,6 @@ struct deliverable_event {
ARB_SERDES_ENABLE(deliverable_event, time, weight, handle);
};

template<>
struct has_event_index<deliverable_event> : public std::true_type {};

// Subset of event information required for mechanism delivery.
struct deliverable_event_data {
cell_local_size_type mech_index; // same as target_handle::mech_index
Expand All @@ -61,11 +57,6 @@ struct deliverable_event_data {
weight);
};

// Stream index accessor function for multi_event_stream:
inline cell_local_size_type event_index(const arb_deliverable_event_data& ed) {
return ed.mech_index;
}

// Delivery data accessor function for multi_event_stream:
inline arb_deliverable_event_data event_data(const deliverable_event& ev) {
return {ev.handle.mech_index, ev.weight};
Expand Down
107 changes: 74 additions & 33 deletions arbor/backends/event_stream_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@

#include <vector>

#include <arbor/generic_event.hpp>
#include <arbor/mechanism_abi.h>


#include "backends/event.hpp"
#include "backends/event_stream_state.hpp"
#include "event_lane.hpp"
Expand All @@ -20,8 +18,7 @@ template <typename Event>
struct event_stream_base {
using size_type = std::size_t;
using event_type = Event;
using event_time_type = ::arb::event_time_type<Event>;
using event_data_type = ::arb::event_data_type<Event>;
using event_data_type = decltype(event_data(std::declval<Event>()));

protected: // members
std::vector<event_data_type> ev_data_;
Expand Down Expand Up @@ -62,29 +59,28 @@ struct event_stream_base {
index_ = 0;
}

// Construct a mapping of mech_id to a stream s.t. streams are partitioned into
// time step buckets by `ev_span`
protected:
// backend specific initializations
virtual void init() = 0;
};

struct spike_event_stream_base : event_stream_base<deliverable_event> {
template<typename EventStream>
static std::enable_if_t<std::is_base_of_v<event_stream_base<deliverable_event>, EventStream>>
multi_event_stream(const event_lane_subrange& lanes,
const std::vector<target_handle>& handles,
const std::vector<std::size_t>& divs,
const timestep_range& steps,
std::unordered_map<unsigned, EventStream>& streams) {
friend void initialize(const event_lane_subrange& lanes,
const std::vector<target_handle>& handles,
const std::vector<std::size_t>& divs,
const timestep_range& steps,
std::unordered_map<unsigned, EventStream>& streams) {
arb_assert(lanes.size() < divs.size());

// temporary data structures to hold
// - the number of events in every time interval per mechanism
// - time sorted events per mechanism
std::unordered_map<unsigned, std::vector<std::size_t>> dt_sizes;
std::unordered_map<unsigned, std::vector<deliverable_event>> evts;

// reset streams and allocate sufficient space for temporaries
auto n_steps = steps.size();
for (auto& [k, v]: streams) {
v.clear();
dt_sizes[k].resize(n_steps, 0);
evts[k].reserve(v.ev_data_.capacity());
v.spike_counter_.clear();
v.spike_counter_.resize(steps.size(), 0);
v.spikes_.clear();
v.spikes_.reserve(v.ev_data_.capacity());
}

// loop over lanes: group events by mechanism and sort them by time
Expand All @@ -102,29 +98,74 @@ struct event_stream_base {
if (step >= n_steps) break;
arb_assert(div + target < handles.size());
const auto& handle = handles[div + target];
auto& sorted_evts = evts[handle.mech_id];
sorted_evts.emplace_back(time, handle, weight);
auto& stream = streams[handle.mech_id];
stream.spikes_.emplace_back(step, time, handle.mech_index, weight);
// insertion sort with last element as pivot
auto first = sorted_evts.begin();
auto last = sorted_evts.end();
auto first = stream.spikes_.begin();
auto last = stream.spikes_.end();
auto pivot = std::prev(last, 1);
std::rotate(std::upper_bound(first, pivot, *pivot, [](auto const& l, auto const& r) noexcept { return l.time < r.time; }),
std::rotate(std::upper_bound(first, pivot, *pivot, [](auto const& l, auto const& r) noexcept {
if (l.step < r.step) return true;
if (l.step > r.step) return false;
if (l.mech_index < r.mech_index) return true;
if (l.mech_index > r.mech_index) return false;
return l.time < r.time; }),
pivot, last);
// increment count in current time interval
dt_sizes[handle.mech_id][step]++;
stream.spike_counter_[step]++;
}
}

for (auto& [id, stream]: streams) {
// copy temporary deliverable_events into stream's ev_data_
auto& sorted_evts = evts[id];
stream.ev_data_.reserve(sorted_evts.size());
std::transform(sorted_evts.begin(), sorted_evts.end(), std::back_inserter(stream.ev_data_),
[](auto const& e) noexcept -> arb_deliverable_event_data { return {e.handle.mech_index, e.weight}; });
// scan over dt_sizes[id] written to ev_spans_
util::make_partition(stream.ev_spans_, dt_sizes[id]);
stream.init();
stream.ev_data_.reserve(stream.spikes_.size());
std::transform(stream.spikes_.begin(), stream.spikes_.end(), std::back_inserter(stream.ev_data_),
[](auto const& e) noexcept -> arb_deliverable_event_data {
return {e.mech_index, e.weight}; });
// scan over spike_counter_ and written to ev_spans_
util::make_partition(stream.ev_spans_, stream.spike_counter_);
// delegate to derived class init: static cast necessary to access protected init()
static_cast<spike_event_stream_base&>(stream).init();
}
}

protected: // members
struct spike_data {
arb_size_type step = 0;
time_type time = 0;
cell_local_size_type mech_index = 0;
float weight = 0;
};
std::vector<spike_data> spikes_;
std::vector<std::size_t> spike_counter_;
};

struct sample_event_stream_base : event_stream_base<sample_event> {
friend void initialize(const std::vector<std::vector<sample_event>>& staged,
sample_event_stream_base& stream) {
// clear previous data
stream.clear();

// return if there are no timestep bins
if (!staged.size()) return;

// return if there are no events
auto num_events = util::sum_by(staged, [] (const auto& v) {return v.size();});
if (!num_events) return;

// allocate space for spans and data
stream.ev_spans_.reserve(staged.size() + 1);
stream.ev_data_.reserve(num_events);

// add event data and spans
for (const auto& v : staged) {
for (const auto& ev: v) stream.ev_data_.push_back(ev.raw);
stream.ev_spans_.push_back(stream.ev_data_.size());
}

arb_assert(num_events == stream.ev_data_.size());
arb_assert(staged.size() + 1 == stream.ev_spans_.size());
stream.init();
}
};

Expand Down
126 changes: 20 additions & 106 deletions arbor/backends/gpu/event_stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,113 +2,33 @@

// Indexed collection of pop-only event queues --- CUDA back-end implementation.

#include <arbor/mechanism_abi.h>

#include "backends/event_stream_base.hpp"
#include "util/transform.hpp"
#include "threading/threading.hpp"
#include "timestep_range.hpp"
#include "memory/memory.hpp"

namespace arb {
namespace gpu {

template <typename Event>
struct event_stream: public event_stream_base<Event> {
public:
using base = event_stream_base<Event>;
using size_type = typename base::size_type;
using event_data_type = typename base::event_data_type;
using device_array = memory::device_vector<event_data_type>;

using base::clear;
using base::ev_data_;
using base::ev_spans_;
using base::base_ptr_;

event_stream() = default;
event_stream(task_system_handle t): base(), thread_pool_{t} {}

// Initialize event streams from a vector of vector of events
// Outer vector represents time step bins
void init(const std::vector<std::vector<Event>>& staged) {
// clear previous data
clear();

// return if there are no timestep bins
if (!staged.size()) return;

// return if there are no events
const size_type num_events = util::sum_by(staged, [] (const auto& v) {return v.size();});
if (!num_events) return;

// allocate space for spans and data
ev_spans_.resize(staged.size() + 1);
ev_data_.resize(num_events);
resize(device_ev_data_, num_events);

// compute offsets by exclusive scan over staged events
util::make_partition(ev_spans_,
util::transform_view(staged, [](const auto& v) { return v.size(); }),
0ull);

// assign, copy to device (and potentially sort) the event data in parallel
arb_assert(thread_pool_);
arb_assert(ev_spans_.size() == staged.size() + 1);
threading::parallel_for::apply(0, ev_spans_.size() - 1, thread_pool_.get(),
[this, &staged](size_type i) {
const auto beg = ev_spans_[i];
const auto end = ev_spans_[i + 1];
arb_assert(end >= beg);
const auto len = end - beg;

auto host_span = memory::make_view(ev_data_)(beg, end);

// make event data and copy
std::copy_n(util::transform_view(staged[i],
[](const auto& x) { return event_data(x); }).begin(),
len,
host_span.begin());
// sort if necessary
if constexpr (has_event_index<Event>::value) {
util::stable_sort_by(host_span,
[](const event_data_type& ed) { return event_index(ed); });
}
// copy to device
auto device_span = memory::make_view(device_ev_data_)(beg, end);
memory::copy_async(host_span, device_span);
});

base_ptr_ = device_ev_data_.data();
template<typename BaseEventStream>
struct event_stream : BaseEventStream {
public:
ARB_SERDES_ENABLE(event_stream<BaseEventStream>,
ev_data_,
ev_spans_,
device_ev_data_,
index_);

arb_assert(num_events == device_ev_data_.size());
arb_assert(num_events == ev_data_.size());
protected:
void init() override final {
resize(this->device_ev_data_, this->device_ev_data_.size());
memory::copy_async(this->ev_data_, this->device_ev_data_);
this->base_ptr_ = this->device_ev_data_.data();
}

// Initialize event stream assuming ev_data_ and ev_span_ has
// been set previously (e.g. by `base::multi_event_stream`)
void init() {
resize(device_ev_data_, ev_data_.size());
base_ptr_ = device_ev_data_.data();

threading::parallel_for::apply(0, ev_spans_.size() - 1, thread_pool_.get(),
[this](size_type i) {
const auto beg = ev_spans_[i];
const auto end = ev_spans_[i + 1];
arb_assert(end >= beg);

auto host_span = memory::make_view(ev_data_)(beg, end);
auto device_span = memory::make_view(device_ev_data_)(beg, end);
private: // device memory
using event_data_type = typename BaseEventStream::event_data_type;
using device_array = memory::device_vector<event_data_type>;

// sort if necessary
if constexpr (has_event_index<Event>::value) {
util::stable_sort_by(host_span,
[](const event_data_type& ed) { return event_index(ed); });
}
// copy to device
memory::copy_async(host_span, device_span);
});
}
device_array device_ev_data_;

template<typename D>
static void resize(D& d, std::size_t size) {
Expand All @@ -117,16 +37,10 @@ struct event_stream: public event_stream_base<Event> {
d = D(size);
}
}

ARB_SERDES_ENABLE(event_stream<Event>,
ev_data_,
ev_spans_,
device_ev_data_,
index_);

task_system_handle thread_pool_;
device_array device_ev_data_;
};

using spike_event_stream = event_stream<spike_event_stream_base>;
using sample_event_stream = event_stream<sample_event_stream_base>;

} // namespace gpu
} // namespace arb
2 changes: 0 additions & 2 deletions arbor/backends/gpu/fvm.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ struct backend {
using threshold_watcher = arb::gpu::threshold_watcher;
using cable_solver = arb::gpu::matrix_state_fine<arb_value_type, arb_index_type>;
using diffusion_solver = arb::gpu::diffusion_state<arb_value_type, arb_index_type>;
using deliverable_event_stream = arb::gpu::deliverable_event_stream;
using sample_event_stream = arb::gpu::sample_event_stream;

using shared_state = arb::gpu::shared_state;
using ion_state = arb::gpu::ion_state;
Expand Down
3 changes: 0 additions & 3 deletions arbor/backends/gpu/gpu_store_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@ using array = memory::device_vector<arb_value_type>;
using iarray = memory::device_vector<arb_index_type>;
using sarray = memory::device_vector<arb_size_type>;

using deliverable_event_stream = arb::gpu::event_stream<deliverable_event>;
using sample_event_stream = arb::gpu::event_stream<sample_event>;

} // namespace gpu
} // namespace arb

12 changes: 2 additions & 10 deletions arbor/backends/gpu/shared_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ shared_state::shared_state(task_system_handle tp,
time_since_spike(n_cell*n_detector),
src_to_spike(make_const_view(src_to_spike_)),
cbprng_seed(cbprng_seed_),
sample_events(thread_pool),
sample_events(),
watcher{n_cv_, src_to_spike.data(), detector_info}
{
memory::fill(time_since_spike, -1.0);
Expand Down Expand Up @@ -240,7 +240,7 @@ void shared_state::instantiate(mechanism& m,

if (storage.count(id)) throw arb::arbor_internal_error("Duplicate mech id in shared state");
auto& store = storage.emplace(id, mech_storage{}).first->second;
streams[id] = deliverable_event_stream{thread_pool};
streams[id] = spike_event_stream{};

// Allocate view pointers
store.state_vars_ = std::vector<arb_value_type*>(m.mech_.n_state_vars);
Expand Down Expand Up @@ -388,14 +388,6 @@ void shared_state::take_samples() {
}
}

void shared_state::init_events(const event_lane_subrange& lanes,
const std::vector<target_handle>& handles,
const std::vector<size_t>& divs,
const timestep_range& dts) {
arb::gpu::event_stream<deliverable_event>::multi_event_stream(lanes, handles, divs, dts, streams);
}


// Debug interface
ARB_ARBOR_API std::ostream& operator<<(std::ostream& o, shared_state& s) {
using io::csv;
Expand Down
Loading

0 comments on commit cc12918

Please sign in to comment.