-
Notifications
You must be signed in to change notification settings - Fork 3
/
loop.c
431 lines (303 loc) · 13.7 KB
/
loop.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
// loop implements the getting of IP packets and distributing of events
/* uSockets is entierly opaque so we can use the real header straight up */
#include "../uWebSockets.js/uWebSockets/uSockets/src/libusockets.h"
#include "internal.h"
// we print statistics such as numer of out of sync, number of "healed" sockets due to drop, etc
void print_statistics(struct us_loop_t *loop) {
printf("Packets out of order so far: %d\n", loop->packets_out_of_order);
printf("Healed sockets so far: %d\n", loop->healed_sockets);
printf("Duplicated packets: %d\n\n", loop->duplicated_packets);
printf("Packets received: %lld\n", loop->packets_received);
}
#include <sys/time.h>
// should be more like read() from a file
int fetchPackageBatch(struct us_loop_t *loop) {
// wait for one is pointless here
return recvmmsg(loop->fd, loop->msgs, 1024, /*MSG_WAITFORONE*/ 0, 0);
}
void releaseSend(struct us_loop_t *loop) {
// this is not blocking, should block!
// release aka send
if (loop->queuedBuffersNum) {
//printf("Sending %d packages\n", loop->queuedBuffersNum);
struct mmsghdr sendVec[1024] = {};
struct sockaddr_in sin[1024] = {};
int packages = loop->queuedBuffersNum;
for (int i = 0; i < loop->queuedBuffersNum; i++) {
IpHeader *ipHeader = (IpHeader *) loop->outBuffer[i];
struct TcpHeader *tcpHeader = (struct TcpHeader *) IpHeader_getData(ipHeader);
int length = IpHeader_getTotalLength(ipHeader);//ipHeader->getTotalLength();
sin[i].sin_family = AF_INET;
sin[i].sin_port = tcpHeader->header.dest;
sin[i].sin_addr.s_addr = ipHeader->daddr;
loop->messages[i].iov_base = ipHeader;
loop->messages[i].iov_len = length;
// send out of order!
// sendVec[i].msg_hdr.msg_iov = &messages[packages - i - 1];
// sendVec[i].msg_hdr.msg_iovlen = 1;
// sendVec[i].msg_hdr.msg_name = &sin[packages - i - 1];
// sendVec[i].msg_hdr.msg_namelen = sizeof(sockaddr_in);
sendVec[i].msg_hdr.msg_iov = &loop->messages[i];
sendVec[i].msg_hdr.msg_iovlen = 1;
sendVec[i].msg_hdr.msg_name = &sin[i];
sendVec[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_in);
}
int sent = 0;
// we just block until we're done
while (sent != loop->queuedBuffersNum) {
int tmp = sendmmsg(loop->send_fd, &sendVec[sent], loop->queuedBuffersNum - sent, 0);
if (tmp > 0) {
sent += tmp;
}
/*if (tmp != loop->queuedBuffersNum) {
printf("COULD NOT SEND ALL PACKETS WITHOUT BLOCKING!\n");
exit(0);
}*/
//break;
}
loop->queuedBuffersNum = 0;
//std::cout << "Sent now" << std::endl;
}
}
IpHeader *getIpPacket(struct us_loop_t *loop, int index, unsigned int *length) {
IpHeader *ipHeader = (IpHeader *) loop->iovecs[index].iov_base;
*length = loop->iovecs[index].iov_len;
return ipHeader;
}
IpHeader *getIpPacketBuffer(struct us_loop_t *loop) {
if (loop->queuedBuffersNum == 1024) {
//std::cout << "Releasing IP buffers in getIpPacketBuffer" << std::endl;
printf("SENDING OVERFLOW!\n");
exit(0);
releaseSend(loop);
}
return (IpHeader *) loop->outBuffer[loop->queuedBuffersNum++];
}
#include <sys/epoll.h>
void us_internal_loop_link(struct us_loop_t *loop, struct us_socket_context_t *context) {
/* Insert this context as the head of loop */
context->next = loop->head;
context->prev = 0;
if (loop->head) {
loop->head->prev = context;
}
loop->head = context;
}
struct us_loop_t *us_create_loop(void *hint, void (*wakeup_cb)(struct us_loop_t *loop), void (*pre_cb)(struct us_loop_t *loop), void (*post_cb)(struct us_loop_t *loop), unsigned int ext_size) {
struct us_loop_t *loop = (struct us_loop_t *) malloc(sizeof(struct us_loop_t) + ext_size);
loop->listen_socket = 0;
loop->context = 0;
loop->close_list = 0;
loop->pre_cb = pre_cb;
loop->post_cb = post_cb;
loop->wakeup_cb = wakeup_cb;
loop->head = 0;
loop->iterator = 0;
//
loop->packets_out_of_order = 0;
loop->healed_sockets = 0;
loop->duplicated_packets = 0;
loop->packets_received = 0;
loop->epfd = epoll_create1(0);
loop->timer = timerfd_create(CLOCK_REALTIME, TFD_NONBLOCK);
struct epoll_event event;
event.events = EPOLLIN;
event.data.u64 = 0; // mark the timer as 0
epoll_ctl(loop->epfd, EPOLL_CTL_ADD, loop->timer, &event);
/* An IPPROTO_RAW socket is send only. If you really want to receive
all IP packets, use a packet(7) socket with the ETH_P_IP protocol.
Note that packet sockets don't reassemble IP fragments, unlike raw
sockets. */
loop->send_fd = socket(AF_INET, SOCK_RAW, IPPROTO_RAW);
loop->fd = socket(AF_INET, SOCK_RAW | SOCK_NONBLOCK, IPPROTO_TCP); // close(fd) vid stängning
if (loop->fd == -1) {
//throw IP_ERR;
printf("Kan inte skapa IP socket!\n");
exit(0);
}
//struct epoll_event event;
event.events = EPOLLIN;
event.data.u64 = 1; // mark the raw socket as 1
epoll_ctl(loop->epfd, EPOLL_CTL_ADD, loop->fd, &event);
for (int i = 0; i < 1024; i++) {
loop->buffer[i] = malloc(1024 * 32);
loop->outBuffer[i] = malloc(1024 * 32);
}
printf("Loop's first out buffer is: %p\n", loop->outBuffer[0]);
const int VLEN = 1024;
memset(loop->msgs, 0, sizeof(loop->msgs));
for (int i = 0; i < VLEN; i++) {
loop->iovecs[i].iov_base = loop->buffer[i];
loop->iovecs[i].iov_len = 1024 * 32;
loop->msgs[i].msg_hdr.msg_iov = &loop->iovecs[i];
loop->msgs[i].msg_hdr.msg_iovlen = 1;
}
int one = 1;
const int *val = &one;
if (setsockopt (loop->fd, IPPROTO_IP, IP_HDRINCL, val, sizeof (one)) < 0) {
//throw IP_ERR;
}
loop->queuedBuffersNum = 0;
return loop;
}
void us_wakeup_loop(struct us_loop_t *loop) {
/* We do this immediately as of now, could be delayed to next iteration */
loop->wakeup_cb(loop);
}
void us_loop_free(struct us_loop_t *loop) {
free(loop);
}
void *us_loop_ext(struct us_loop_t *loop) {
return loop + 1;
}
#include "internal.h"
// we have this somewhere
extern struct us_socket_t *global_s;
/* Either the loop holds all sockets or the socket context do? */
#include "uthash.h"
/*
*
* #define HASH_FIND_INT(head,findint,out) \
HASH_FIND(hh,head,findint,sizeof(int),out)*/
// us_internal_socket_context_read_tcp borde returnera själva socketen antingen utbytt, skapad eller så
// sen kan vi i loopen hålla koll på alla sockets i en hashmap från 64+32 = 96 bit hash key = 12 bytes
// this is the hash table
struct us_socket_t *sockets = NULL;
void remove_socket(struct us_socket_t *s) {
HASH_DEL(sockets, s);
free(s);
global_s = 0;
}
/* We also need to merge this with uSockets and enable TLS over it */
/* WITH_USERSPACE=1 */
void us_internal_small_tick();
/* We need to have timeout by now, and every tick should print statistics on packet loss */
void us_loop_run(struct us_loop_t *loop) {
printf("Getting ip packets now\n");
int repeat_ms = 250;
int ms = 250;
int small_ticks = 0;
struct itimerspec timer_spec = {
{repeat_ms / 1000, ((long)repeat_ms * 1000000) % 1000000000},
{ms / 1000, ((long)ms * 1000000) % 1000000000}
};
timerfd_settime(loop->timer, 0, &timer_spec, NULL);
while(1) {
// epoll_wait on two fds: time and raw socket
struct epoll_event events[2];
int numEvents = epoll_wait(loop->epfd, events, 2, -1);
for (int i = 0; i < numEvents; i++) {
if (events[i].data.u64 == 0) {
//printf("Timerfd tick!\n");
us_internal_small_tick();
small_ticks++;
if (small_ticks == 16) {
//print_statistics(loop);
us_internal_timer_sweep(loop);
small_ticks = 0;
}
uint64_t buf;
read(loop->timer, &buf, 8);
}
if (events[i].data.u64 == 1) {
// a packet
int messages = fetchPackageBatch(loop);
// should never happen
if (messages == -1) {
continue;
}
//printf("Read %d packets\n", messages);
// this should never happen, if it does we read too slowly and lag behind (but should be reset whatever we miss anyways)
if (messages == 1024) {
printf("WE ARE NOT READING PACKAGES FAST ENOUGH!\n");
//exit(0);
}
for (int i = 0; i < messages; i++) {
unsigned int length;
IpHeader *ipHeader = getIpPacket(loop, i, &length);
/* First we filter out everything that isn't tcp over ipv4 */
if (ipHeader->version != 4 || ipHeader->protocol != IPPROTO_TCP) {
continue;
}
/* Now we know this is tcp */
struct TcpHeader *tcpHeader = (struct TcpHeader *) IpHeader_getData(ipHeader);
/* OMG! WE DO NOT HANDLE DUPLICATE SYN! PROPERLY!! */
// has to be: try and get this socket, in case it exists, handle the segment otherwise crete a new socket for SYN
/* Is this packet SYN? */
if (tcpHeader->header.syn && !tcpHeader->header.ack) {
/* Loop over all contexts */
for (struct us_socket_context_t *context = loop->context; context; context = context->next) {
/* Loop over all listen sockets */
for (struct us_listen_socket_t *listen_socket = context->listen_socket; listen_socket; ) {
if (listen_socket->port == TcpHeader_getDestinationPort(tcpHeader)) {
//global_s = 0;
/* NOTE: WE NEED TO CHECK IF OR NOT THIS SOCKET ALREADY EXISTS!!! */
struct SOCKET_KEY key = {
TcpHeader_getSourcePort(tcpHeader),
TcpHeader_getDestinationPort(tcpHeader),
ipHeader->saddr,
ipHeader->daddr
};
struct us_socket_t *s;
HASH_FIND(hh, sockets, &key, sizeof(struct SOCKET_KEY), s);
if (s) {
printf("GOT DUPLICATE SYN!!!!!!\n");
goto here;
}
us_internal_socket_context_read_tcp(NULL, listen_socket->context, ipHeader, tcpHeader, length);
// vi kommer att ha ändrat global_s om det finns en ny socket!
if (global_s) {
struct SOCKET_KEY key = {
TcpHeader_getSourcePort(tcpHeader),
TcpHeader_getDestinationPort(tcpHeader),
ipHeader->saddr,
ipHeader->daddr
};
global_s->key = key;
HASH_ADD(hh, sockets, key, sizeof(struct SOCKET_KEY), global_s);
global_s = 0;
}
}
/* We only have one listen socket for now */
break;
}
}
} else {
struct us_socket_t *s;
struct SOCKET_KEY key = {
TcpHeader_getSourcePort(tcpHeader),
TcpHeader_getDestinationPort(tcpHeader),
ipHeader->saddr,
ipHeader->daddr
};
HASH_FIND(hh, sockets, &key, sizeof(struct SOCKET_KEY), s);
if (s) {
us_internal_socket_context_read_tcp(s, s->context, ipHeader, tcpHeader, length);
}
}
here:
continue;
}
releaseSend(loop);
}
}
}
}
// vi behöver få in timers, och svepa över de websockets som inte får något meddelande i tid - som en absolut måttstock på stabilitet oavsett outof order etc
void us_internal_timer_sweep(struct us_loop_t *loop) {
for (loop->iterator = loop->head; loop->iterator; loop->iterator = loop->iterator->next) {
struct us_socket_context_t *context = loop->iterator;
for (context->iterator = context->head; context->iterator; ) {
struct us_socket_t *s = context->iterator;
if (s->timeout && --(s->timeout) == 0) {
context->on_socket_timeout(s);
/* Check for unlink / link */
if (s == context->iterator) {
context->iterator = s->next;
}
} else {
context->iterator = s->next;
}
}
}
}