forked from uNetworking/uWebSockets
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy paththroughput.cpp
158 lines (137 loc) · 5.27 KB
/
throughput.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
/* This is a very high performance general WebSocket server throughput benchmark */
#include <iostream>
#include <vector>
#include <chrono>
#include <uv.h>
#include <cstring>
#include <endian.h>
using namespace std;
using namespace chrono;
uv_loop_t *loop;
uv_buf_t upgradeHeader;
uv_buf_t framePack;
int byteSize, framesPerSend;
unsigned char *framePackBuffer;
int framePackBufferLength = 0;
const char upgradeHeaderBuffer[] = "GET / HTTP/1.1\r\n"
"Upgrade: websocket\r\n"
"Connection: Upgrade\r\n"
"Sec-WebSocket-Key: x3JJHMbDL1EzLkh9GBhXDw==\r\n"
"Host: server.example.com\r\n"
"Sec-WebSocket-Version: 13\r\n\r\n";
int connections, remainingBytes;
vector<uv_stream_t *> sockets;
sockaddr_in addr;
unsigned long sent = 0;
auto startPoint = time_point<high_resolution_clock>();
void echo()
{
for (int i = 0; i < connections; i++) {
// Write message on random socket
uv_write(new uv_write_t, sockets[rand() % connections], &framePack, 1, [](uv_write_t *write_t, int status) {
if (status < 0) {
cout << "Write error" << endl;
exit(0);
}
delete write_t;
});
// Server does not send a mask of 4 bytes
remainingBytes += framePackBufferLength - 4 * framesPerSend;
sent += framesPerSend;
}
}
void newConnection()
{
uv_tcp_t *socket = new uv_tcp_t;
socket->data = nullptr;
uv_tcp_init(loop, socket);
uv_tcp_connect(new uv_connect_t, socket, (sockaddr *) &addr, [](uv_connect_t *connect, int status) {
if (status < 0) {
cout << "Connection error" << endl;
exit(-1);
} else {
uv_read_start(connect->handle, [](uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) {
buf->base = new char[suggested_size];
buf->len = suggested_size;
}, [](uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) {
if (stream->data) {
// If we now received all bytes we expect from last echoing, echo again
remainingBytes -= nread;
if (!remainingBytes) {
cout << "Echo performance: " << double(sent) / (1e-3 * duration_cast<microseconds>(high_resolution_clock::now() - startPoint).count()) << " echoes/ms" << endl;
echo();
}
} else {
// WebSocket connection established here
stream->data = (void *) 1;
sockets.push_back(stream);
cout << "Connections: " << sockets.size() << endl;
// Perform first batch of echo sending
if (sockets.size() == connections) {
startPoint = high_resolution_clock::now();
echo();
} else {
newConnection();
}
}
delete [] buf->base;
});
// Send upgrade header
uv_write(new uv_write_t, connect->handle, &upgradeHeader, 1, [](uv_write_t *write_t, int status) {
if (status < 0) {
cout << "Connection error" << endl;
exit(-1);
}
delete write_t;
});
}
});
}
int main(int argc, char *argv[])
{
// Read arguments
if (argc != 5) {
cout << "Usage: throughput numberOfConnections payloadByteSize framesPerSend port" << endl;
return -1;
}
connections = atoi(argv[1]);
byteSize = atoi(argv[2]);
framesPerSend = atoi(argv[3]);
int port = atoi(argv[4]);
// Init
loop = uv_default_loop();
uv_ip4_addr("127.0.0.1", port, &addr);
upgradeHeader.base = (char *) upgradeHeaderBuffer;
upgradeHeader.len = sizeof(upgradeHeaderBuffer) - 1;
// Fill with random data
int allocLength = (byteSize + 14) * framesPerSend;
framePackBuffer = new unsigned char[allocLength];
for (int i = 0; i < allocLength; i++) {
framePackBuffer[i] = rand() % 255;
}
// Format message frame(s)
unsigned char *framePackBufferOffset = framePackBuffer;
for (int i = 0; i < framesPerSend; i++) {
framePackBufferOffset[0] = 130;
if (byteSize < 126) {
framePackBufferLength += byteSize + 6;
framePackBufferOffset[1] = 128 | byteSize;
framePackBufferOffset += byteSize + 6;
} else if (byteSize <= UINT16_MAX) {
framePackBufferLength += byteSize + 8;
framePackBufferOffset[1] = 128 | 126;
*((uint16_t *) &framePackBufferOffset[2]) = htons(byteSize);
framePackBufferOffset += byteSize + 8;
} else {
framePackBufferLength += byteSize + 14;
framePackBufferOffset[1] = 128 | 127;
*((uint64_t *) &framePackBufferOffset[2]) = htobe64(byteSize);
framePackBufferOffset += byteSize + 14;
}
}
framePack.base = (char *) framePackBuffer;
framePack.len = framePackBufferLength;
// Connect to echo server
newConnection();
return uv_run(loop, UV_RUN_DEFAULT);
}