-
Notifications
You must be signed in to change notification settings - Fork 2
/
worklist.h
200 lines (185 loc) · 6.51 KB
/
worklist.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
#pragma once
#include <emu_cxx_utils/replicated.h>
#include <emu_cxx_utils/intrinsics.h>
#include <emu_cxx_utils/execution_policy.h>
#include <cilk/cilk.h>
template<class Edge>
class worklist {
private:
// The first vertex in the work list
// If this is a replicated instance, there will be a different head on
// each nodelet
volatile long head_;
// The following arrays have one element per vertex. Together they function
// as a linked-list node using struct-of-arrays.
// Next vertex ID to process (next pointer in linked list)
emu::striped_array<long> next_vertex_;
// Pointer to start of edge list to process
emu::striped_array<Edge*> edges_begin_;
// Pointer past the end of edge list to process
emu::striped_array<Edge*> edges_end_;
public:
explicit worklist(long num_vertices)
: head_(-1)
, next_vertex_(num_vertices)
, edges_begin_(num_vertices)
, edges_end_(num_vertices)
{}
worklist(const worklist& other, emu::shallow_copy shallow)
: next_vertex_(other.next_vertex_, shallow)
, edges_begin_(other.edges_begin_, shallow)
, edges_end_(other.edges_end_, shallow)
{}
/**
* Reset all replicated copies of the work queue.
* Only valid to call on replicated instance
*/
void clear_all ()
{
assert(emu::pmanip::is_repl(this));
for (long nlet = 0; nlet < NODELETS(); ++nlet) {
get_nth(nlet).clear();
}
}
/**
* Reset the work queue so that new edges can be added
*/
void clear()
{
head_ = -1;
}
/**
* Returns the nth replicated copy of the work list
* Only valid to call on a replicated instance
* @param n nodelet ID
* @return the nth replicated copy of the work list
*/
worklist&
get_nth(long n)
{
return *emu::pmanip::get_nth(this, n);
}
/**
* Atomically append edges to the work queue.
*
* @param src source vertex for all edges
* @param edges_begin Pointer to start of edge list to append
* @param edges_end Pointer past the end of the edge list to append
*/
void append(long src, Edge * edges_begin, Edge * edges_end)
{
assert(emu::pmanip::is_repl(this));
// Get the pointer from the nodelet where src vertex lives
volatile long * head_ptr = &get_nth(src & (NODELETS()-1)).head_;
// Append to head of worklist
edges_begin_[src] = edges_begin;
edges_end_[src] = edges_end;
long prev_head;
do {
prev_head = *head_ptr;
next_vertex_[src] = prev_head;
} while (prev_head != emu::atomic_cas(head_ptr, prev_head, src));
}
private:
// Worker function spawned in dynamic process_all
template<class Visitor, long Grain>
void worker(Visitor visitor)
{
long grain = Grain;
// Walk through the worklist
for (long src = head_; src >= 0; src = next_vertex_[src]) {
// Get end pointer for the vertex list of src
auto edges_end = edges_end_[src];
// Try to atomically grab some edges to process for this vertex
Edge *e1, *e2;
for (e1 = emu::atomic_addms(&edges_begin_[src], grain);
e1 < edges_end;
e1 = emu::atomic_addms(&edges_begin_[src], grain))
{
// Compute endpoint of this granule
e2 = e1 + grain; if (e2 > edges_end) { e2 = edges_end; }
// Call visitor on the range of edges
visitor(src, e1, e2);
}
// This vertex is done, move to the next one
}
}
public:
/**
* Process the edges in the worklist in parallel
* Spawns local worker threads to pull items off of the work list.
* @param policy Dynamic policy: grain size indicates how many items each
* thread will pull off the worklist at a time.
* @param visitor Lambda function to call on each edge, with signature:
* @c void (long src, long dst)
*/
template<class Visitor, long Grain>
void process(emu::dynamic_policy<Grain> policy, Visitor visitor)
{
for (long t = 0; t < emu::threads_per_nodelet; ++t) {
cilk_spawn worker<Visitor, Grain>(visitor);
}
}
template<class Visitor, long Grain>
void process(emu::parallel_policy<Grain> policy, Visitor visitor)
{
// Walk through the worklist
constexpr long grain = Grain;
for (long src = head_; src >= 0; src = next_vertex_[src]) {
// Spawn a thread for each granule
auto begin = edges_begin_[src];
auto end = edges_end_[src];
for (auto e1 = begin; e1 < end; e1 += grain) {
// Compute endpoint of this granule
auto e2 = e1 + grain; if (e2 > end) { e2 = end; }
cilk_spawn visitor(src, e1, e2);
}
// This vertex is done, move to the next one
}
}
/**
* Process the edges in the worklist
* @param visitor Lambda function to call on each edge, with signature:
* @c void (long src, long dst)
*/
template<class Visitor>
void process(emu::sequenced_policy, Visitor visitor)
{
// Walk through the worklist
for (long src = head_; src >= 0; src = next_vertex_[src]) {
// Visit each edge for this vertex
visitor(src, edges_begin_[src], edges_end_[src]);
// This vertex is done, move to the next one
}
}
/**
* Process the edges in all replicated copies of the worklist
* Spawns local worker threads on each nodelet to pull items off of
* the work list.
* Only valid to call on a replicated instance
* @param policy Execution policy to use at each nodelet
* @param visitor Lambda function to call on each edge, with signature:
* @c void (long src, long dst)
*/
template<class Policy, class Visitor>
void process_all_ranges(Policy policy, Visitor visitor)
{
assert(emu::pmanip::is_repl(this));
emu::repl_for_each(emu::parallel_policy<1>(), *this,
[policy, visitor](worklist & w) {
w.process(policy, visitor);
}
);
}
template<class Policy, class Visitor>
void process_all_edges(Policy policy, Visitor visitor)
{
process_all_ranges(policy,
[visitor](long src, Edge * begin, Edge * end) {
for (auto e = begin; e != end; ++e) {
visitor(src, *e);
}
}
);
}
};