forked from uNetworking/uWebSockets
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy paththroughput_socketio.cpp
175 lines (152 loc) · 6.09 KB
/
throughput_socketio.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
/* This is a very high performance general WebSocket server throughput benchmark */
/* This version is hardcoded for measuring Socket.IO throughput and only work with 20 byte messages */
#include <iostream>
#include <vector>
#include <chrono>
#include <uv.h>
#include <cstring>
#include <endian.h>
using namespace std;
using namespace chrono;
uv_loop_t *loop;
uv_buf_t upgradeHeader;
uv_buf_t framePack;
int byteSize, framesPerSend;
unsigned char *framePackBuffer;
int framePackBufferLength = 0;
const char upgradeHeaderBuffer[] = "GET /socket.io/?EIO=4&transport=websocket HTTP/1.1\r\n"
"Upgrade: websocket\r\n"
"Connection: Upgrade\r\n"
"Sec-WebSocket-Key: x3JJHMbDL1EzLkh9GBhXDw==\r\n"
"Host: server.example.com\r\n"
"Sec-WebSocket-Version: 13\r\n\r\n";
int connections, remainingBytes;
vector<uv_stream_t *> sockets;
sockaddr_in addr;
unsigned long sent = 0;
auto startPoint = time_point<high_resolution_clock>();
void echo()
{
for (int i = 0; i < connections; i++) {
// Write message on random socket
uv_write(new uv_write_t, sockets[rand() % connections], &framePack, 1, [](uv_write_t *write_t, int status) {
if (status < 0) {
cout << "Write error" << endl;
exit(0);
}
delete write_t;
});
// Server does not send a mask of 4 bytes
remainingBytes += framePackBufferLength - 4 * framesPerSend;
sent += framesPerSend;
}
}
void newConnection()
{
uv_tcp_t *socket = new uv_tcp_t;
socket->data = nullptr;
uv_tcp_init(loop, socket);
uv_tcp_connect(new uv_connect_t, socket, (sockaddr *) &addr, [](uv_connect_t *connect, int status) {
if (status < 0) {
cout << "Connection error" << endl;
exit(-1);
} else {
uv_read_start(connect->handle, [](uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) {
buf->base = new char[suggested_size];
buf->len = suggested_size;
}, [](uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) {
if (stream->data) {
// If we now received all bytes we expect from last echoing, echo again
remainingBytes -= nread;
if (!remainingBytes) {
cout << "Echo performance: " << double(sent) / (1e-3 * duration_cast<microseconds>(high_resolution_clock::now() - startPoint).count()) << " echoes/ms" << endl;
echo();
}
} else {
// Receive Socket.IO connection messages and ignore them
char message[1024];
int fd;
uv_fileno((uv_handle_t *) stream, &fd);
for (int r = nread; r < 221; ) {
r += std::max<int>(0, recv(fd, message, sizeof(message), 0));
}
// WebSocket connection established here
stream->data = (void *) 1;
sockets.push_back(stream);
cout << "Connections: " << sockets.size() << endl;
// Perform first batch of echo sending
if (sockets.size() == connections) {
startPoint = high_resolution_clock::now();
echo();
} else {
newConnection();
}
}
delete [] buf->base;
});
// Send upgrade header
uv_write(new uv_write_t, connect->handle, &upgradeHeader, 1, [](uv_write_t *write_t, int status) {
if (status < 0) {
cout << "Connection error" << endl;
exit(-1);
}
delete write_t;
});
}
});
}
int main(int argc, char *argv[])
{
// Read arguments
if (argc != 5) {
cout << "Usage: throughput numberOfConnections payloadByteSize(20 only) framesPerSend port" << endl;
return -1;
}
connections = atoi(argv[1]);
byteSize = 20; // we can only do 20 bytes in this version!
framesPerSend = atoi(argv[3]);
int port = atoi(argv[4]);
// Init
loop = uv_default_loop();
uv_ip4_addr("127.0.0.1", port, &addr);
upgradeHeader.base = (char *) upgradeHeaderBuffer;
upgradeHeader.len = sizeof(upgradeHeaderBuffer) - 1;
// Fill with random data
int allocLength = (byteSize + 14) * framesPerSend;
framePackBuffer = new unsigned char[allocLength];
for (int i = 0; i < allocLength; i++) {
framePackBuffer[i] = rand() % 255;
}
// This is the 20 byte message (JSON) according to Socket.IO
unsigned char socketio[] = "xxxx42[\"message\",\"yolo\"]";
for (int i = 4; i < 24; i++) {
socketio[i] ^= 'x';
cout << (int) socketio[i] << endl;
}
// Format message frame(s)
unsigned char *framePackBufferOffset = framePackBuffer;
for (int i = 0; i < framesPerSend; i++) {
framePackBufferOffset[0] = 129;
if (byteSize < 126) {
framePackBufferLength += byteSize + 6;
framePackBufferOffset[1] = 128 | byteSize;
memcpy(framePackBufferOffset + 2, socketio, 24);
framePackBufferOffset += byteSize + 6;
} else if (byteSize <= UINT16_MAX) {
framePackBufferLength += byteSize + 8;
framePackBufferOffset[1] = 128 | 126;
*((uint16_t *) &framePackBufferOffset[2]) = htons(byteSize);
framePackBufferOffset += byteSize + 8;
} else {
framePackBufferLength += byteSize + 14;
framePackBufferOffset[1] = 128 | 127;
*((uint64_t *) &framePackBufferOffset[2]) = htobe64(byteSize);
framePackBufferOffset += byteSize + 14;
}
}
framePack.base = (char *) framePackBuffer;
framePack.len = framePackBufferLength;
// Connect to echo server
newConnection();
return uv_run(loop, UV_RUN_DEFAULT);
}