-
Notifications
You must be signed in to change notification settings - Fork 2
/
main.cc
334 lines (316 loc) · 11.2 KB
/
main.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
#include "core/sleep.hh"
#include "core/app-template.hh"
#include "core/reactor.hh"
#include "core/gate.hh"
#include <iostream>
#include <pthread.h>
#include <string>
#include <queue>
using namespace std::chrono_literals;
thread_local bool stop_flag = false;;
thread_local seastar::server_socket* common_listener = nullptr;
class Req;
typedef seastar::shared_ptr<Req> ReqPtr;
class Req {
public:
Req(const char* _name, int _col_id, std::chrono::milliseconds _wait_ms) :
name(_name), col_id(_col_id), wait_ms(_wait_ms) {
}
virtual ~Req() {
std::cout<<"~"<<name.c_str()<<std::endl;
}
std::string name;
int col_id;
std::chrono::milliseconds wait_ms;
seastar::future<> process() {
std::cout<<"processing:"<<name.c_str()<<std::endl;
return
seastar::sleep(wait_ms).then([this] {
std::cout<<"processed:"<<name.c_str()<<std::endl;
return seastar::make_ready_future<>();
});
}
};
class Q {
std::queue<std::pair<seastar::promise<>, ReqPtr>> q;
bool waiting = false;
std::string name;
seastar::semaphore limit;
public:
Q(const char* _name = "") : name(_name), limit(1) {
}
Q(Q&& q) :
q(std::move(q.q)),
waiting(q.waiting),
name(std::move(q.name)),
limit(std::move(q.limit)) {
}
virtual ~Q() {
std::cout<<"~"<<name.c_str()<<std::endl;
}
bool is_empty() const {
return !waiting && q.size() == 0;
}
void set_name(const char* _name) {
name = _name;
}
const char* get_name() const {
return name.c_str();
}
seastar::future<> _queue2(seastar::promise<>&& p, ReqPtr r) {
waiting = true;
return with_semaphore(limit, 1, [r, this, p = std::move(p)] () mutable {
return do_with(ReqPtr(r), [this, p = std::move(p)] (auto&r) mutable {
return r->process().then([this, p = std::move(p)] () mutable {
waiting = false;
p.set_value();
if (q.size()) {
_queue2(std::move(q.front().first), q.front().second);
q.pop();
}
return seastar::make_ready_future<>();
});
});
});
}
seastar::future<> queue2(ReqPtr r) {
std::cout<<"queue2:"<<r->name.c_str()<<std::endl;
if (q.size() || waiting) {
q.emplace(seastar::promise<>(), r);
return q.back().first.get_future();
}
return _queue2(seastar::promise<>(), r); // fake promise
}
seastar::future<ReqPtr> queue(ReqPtr r) {
std::cout<<"queue:"<<r->name.c_str()<<std::endl;
return with_semaphore(limit, 1, [r] {
return r->process().then([r] {
return seastar::make_ready_future<ReqPtr>(r);
});
});
}
seastar::future<> queue_and_forget(ReqPtr r) {
std::cout<<"queue:"<<r->name.c_str()<<std::endl;
return with_semaphore(limit, 1, [r] {
return do_with(ReqPtr(r), [] (auto&r) {
return r->process().then([] {
return seastar::make_ready_future<>();
});
});
});
}
template <typename Func>
seastar::future<> queue_fn(ReqPtr r,
Func&& fn) {
std::cout<<"queue_fn:"<<r->name.c_str()<<std::endl;
return with_semaphore(limit, 1, [r, fn] {
return do_with(ReqPtr(r), [fn] (auto&r) {
return fn(r).then([] {
return seastar::make_ready_future<>();
});
});
});
}
};
typedef seastar::shared_ptr<Q> QPtr;
class StoreShard;
typedef seastar::shared_ptr<StoreShard> StoreShardPtr;
class StoreShard
{
std::map<int, QPtr> col_queues;
public:
StoreShard()
{
}
virtual ~StoreShard() {
std::cout<<"~StoreShard:"<<this<<std::endl;
}
seastar::future<> distribute(ReqPtr r) {
if (stop_flag) {
throw std::string("termination is in progress");
}
auto it = col_queues.find(r->col_id);
if (it == col_queues.end()) {
char buf[32];
sprintf(buf, "que_%u", r->col_id);
it = col_queues.emplace(r->col_id, seastar::make_shared<Q>(buf)).first;
}
std::cout<<"forward "<<r->name.c_str()<<" to "<<this<<":"<<it->second->get_name()<<std::endl;
return it->second->queue2(r).finally([this, col_id = r->col_id] {
std::cout<<"RemoveQueue?"<<col_id<<std::endl;
auto it = col_queues.find(col_id);
if (it != col_queues.end() &&
it->second->is_empty()) {
col_queues.erase(it);
std::cout<<"RemoveQueue!"<<col_id<<std::endl;
}
});
}
seastar::future<> loop() {
return seastar::repeat([this] {
return seastar::sleep(500ms).then([this] {
return stop_flag && col_queues.empty() ?
seastar::stop_iteration::yes :
seastar::stop_iteration::no;
});
});
}
static thread_local StoreShardPtr thread_shard;
static seastar::future<> run() {
assert(thread_shard.get() == nullptr);
thread_shard = seastar::make_shared<StoreShard>();
return thread_shard->loop();
}
static seastar::future<> process(ReqPtr r) {
int shard_no = r->col_id % seastar::smp::count;
std::cout<<"process submitting to shard="<<shard_no<<std::endl;
return seastar::smp::submit_to( shard_no, [r] () {
assert(thread_shard.get() != nullptr);
return thread_shard->distribute(r);
});
}
};
thread_local StoreShardPtr StoreShard::thread_shard;
seastar::future<> q_test() {
return seastar::do_with(Q ("q"), [] (auto& q) {
ReqPtr r1 = seastar::make_shared<Req>("first_1", 1, 2000ms);
ReqPtr r2 = seastar::make_shared<Req>("second_2", 2, 3000ms);
ReqPtr r3 = seastar::make_shared<Req>("third_1", 1, 1000ms);
ReqPtr r4 = seastar::make_shared<Req>("forth_2", 2, 3000ms);
auto f1 = q.queue2(r1);
auto f2 = q.queue2(r2);
auto f3 = q.queue2(r3);
auto f4 = q.queue2(r4);
return seastar::when_all(std::move(f1), std::move(f2), std::move(f3), std::move(f4))
.then( [] (auto t) {
std::cout<<"all done"<<std::endl;
return seastar::make_ready_future<>();
});
});
}
seastar::future<> StoreShardTest()
{
StoreShardPtr s = seastar::make_shared<StoreShard>();
return seastar::do_with(StoreShardPtr(s), [] (auto& s) {
ReqPtr r1 = seastar::make_shared<Req>("first_1", 1, 2000ms);
ReqPtr r2 = seastar::make_shared<Req>("second_2", 2, 3000ms);
ReqPtr r3 = seastar::make_shared<Req>("third_1", 2, 1000ms);
ReqPtr r4 = seastar::make_shared<Req>("forth_2", 1, 3000ms);
auto f1 = s->distribute(r1);
auto f2 = s->distribute(r2);
auto f3 = s->distribute(r3);
auto f4 = s->distribute(r4);
return seastar::when_all(std::move(f1), std::move(f2), std::move(f3), std::move(f4))
.then( [] (auto t) {
std::cout<<"all done for shard"<<std::endl;
return seastar::make_ready_future<>();
});
});
}
seastar::future<> handle_connection(seastar::connected_socket s,
seastar::socket_address a) {
auto out = s.output();
auto in = s.input();
std::cout<<"handling con..."<<std::endl;
return do_with(std::move(s), std::move(out), std::move(in),
[] (auto& s, auto& out, auto& in) {
return seastar::repeat([&out, &in] {
return in.read().then([&out] (auto buf) {
if (buf) {
const char* cptr = buf.get();
if (strstr(cptr, "stop") == cptr) {
std::cout<<"stop received"<<std::endl;
seastar::smp::invoke_on_all([] {
stop_flag = true;
if (common_listener) {
common_listener->abort_accept();
}
return seastar::make_ready_future<>();
});
return seastar::make_ready_future<seastar::stop_iteration>(
seastar::stop_iteration::no);
} else if (strstr(cptr, "req") == cptr) {
std::string str(cptr + 3);
char* s = &str.at(0);
char* name = std::strtok(s, " \t");
long col_id = std::strtol(
std::strtok(nullptr," \t"),
nullptr, 10);
long ms_long = std::strtol(
std::strtok(nullptr," \t"),
nullptr, 10);
std::chrono::milliseconds ms(ms_long);
ReqPtr r = seastar::make_shared<Req>(name, col_id, ms);
StoreShard::process(r);
return seastar::make_ready_future<seastar::stop_iteration>(
seastar::stop_iteration::no);
}
std::cout<<"<<"<<buf.get()<<std::flush;
return out.write(std::move(buf)).then([&out] {
return out.flush();
}).then([] {
return seastar::stop_iteration::no;
});
} else {
return seastar::make_ready_future<seastar::stop_iteration>(
seastar::stop_iteration::yes);
}
});
}).then([&out] {
std::cout<<"closing con..."<<std::endl;
return out.close();
});
});
}
seastar::future<> ip_listen_loop() {
seastar::listen_options lo;
lo.reuse_address = true;
return seastar::do_with(seastar::gate(), seastar::listen(seastar::make_ipv4_address({1234}), lo),
[] (auto &g, auto& listener) {
common_listener = &listener;
return seastar::repeat([&g, &listener] {
if (stop_flag) {
return seastar::make_ready_future<seastar::stop_iteration>(
seastar::stop_iteration::yes);
}
//std::cout<<"repeat"<<pthread_self()<<std::endl;
return listener.accept().then([&g, &listener]
(seastar::connected_socket s, seastar::socket_address a) {
//std::cout<<"post accept "<<pthread_self()<<std::endl;
seastar::with_gate(g, [&s, &a] {
handle_connection(std::move(s), std::move(a));
});
//std::cout<<"post gate"<<std::endl;
return seastar::stop_iteration::no;
}).handle_exception([](...) {
std::cout<<"exception while handling connection"<<std::endl;
return seastar::make_ready_future<seastar::stop_iteration>(
seastar::stop_iteration::yes);
});
}).then( [&g] {
//std::cout<<"close gate"<<std::endl;
return g.close();
});
}).finally([] {
common_listener = nullptr;
return seastar::make_ready_future<>();
});
}
boost::integer_range<unsigned> effective_shards(0, 1);
int main(int argc, char** argv) {
seastar::app_template app;
app.run(argc, argv, [] {
effective_shards = seastar::smp::all_cpus();
std::cout<<"RUN: "<<effective_shards<< " shards/cpus .."<<std::endl;
auto f0 = seastar::smp::invoke_on_all([] {
return StoreShard::run();
});
auto f1 = seastar::smp::invoke_on_all([] {
return ip_listen_loop();
});
return seastar::when_all(std::move(f0), std::move(f1)).then([] (auto t) {
std::cout<<"all done"<<std::endl;
return seastar::make_ready_future<>();
});
});
}