-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.cpp
189 lines (154 loc) · 6.31 KB
/
main.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
#include <thread>
#include <libgen.h>
#include "net.h"
#include "locked_ostream.h"
#include "client_set.h"
#include <cstring>
#include <signal.h>
#include <atomic>
static std::mutex console_lock;
#define cout_locked() LockedOStream(std::cout, console_lock)
#define cerr_locked() LockedOStream(std::cerr, console_lock)
class SignalBlocker {
public:
SignalBlocker() {
sigemptyset(&signal_mask);
}
SignalBlocker(std::initializer_list<int> signals)
: SignalBlocker()
{
for (auto signal : signals) {
addSignal(signal);
}
}
~SignalBlocker() {
unblock();
}
void addSignal(int signal) {
sigaddset(&signal_mask, signal);
}
void block() {
int rc = pthread_sigmask(SIG_BLOCK, &signal_mask, nullptr);
if (rc != 0) {
throw std::runtime_error("Couldn't block signal: " + std::string(std::strerror(rc)));
}
}
void unblock() {
int rc = pthread_sigmask(SIG_UNBLOCK, &signal_mask, nullptr);
if (rc != 0) {
throw std::runtime_error("Couldn't block signal: " + std::string(std::strerror(rc)));
}
}
private:
sigset_t signal_mask;
};
/**
* Any packet received on this port registers the
* sender as a target for outflows.
*/
static void outflow_admin(int port, ClientSet *registrations) {
try {
Udp listener;
listener.bind("0.0.0.0", port);
cout_locked() << "Waiting for clients to register on " << port << "\n";
while (true) {
std::string client;
int port;
std::string data;
std::tie(client, port, data) = listener.recv();
if (data == "unregister") {
if (registrations->removeClient(client, port)) {
cout_locked() << "Unregistered " << client << ":" << port << "\n";
} else {
cerr_locked() << "Client " << client << ":" << port << " not registered or already unregistered\n";
}
} else if (data == "register") {
if (registrations->addClient(client, port)) {
cout_locked() << "Registered client " << client << ":" << port << ". Payload (" << data << ")\n";
} else {
cerr_locked() << "Client " << client << ":" << port << " already registered\n";
}
} else {
cerr_locked() << "Unsupported command (" << data << ") from client " << client << ":" << port << "\n";
}
}
cout_locked() << "Admin thread finished" << std::endl;
} catch (const std::exception& e) {
cerr_locked() << "Unhandled exception in admin processor: " << e.what() << std::endl;
exit(2);
}
}
static void udp_mirror(int incoming_port, ClientSet *registrations) {
try {
Udp listener;
listener.bind("0.0.0.0", incoming_port);
cout_locked() << "Listening for incoming UDP data on " << incoming_port << "\n";
while (true) {
std::string sender;
int sender_port;
std::string data;
std::tie(sender, sender_port, data) = listener.recv();
auto registered = registrations->getClientList();
for (const auto& client : registered) {
try {
// TODO: spoof src IP/PORT to be sender/sender_port
listener.send(std::get<0>(client), std::get<1>(client), data);
} catch (const std::exception& e) {
cerr_locked() << "Couldn't send re-broadcast packet to " << std::get<0>(client) << ":" << std::get<1>(client) << "\n";
}
}
// cout_locked() << "Re-broadcast datagram " << data.size() << " bytes to " << registered.size() << " clients\n";
}
cout_locked() << "UDP mirror thread finished" << std::endl;
} catch (const std::exception& e) {
cerr_locked() << "Unhandled exception in receiver: " << e.what() << std::endl;
exit(2);
}
}
int main(int argc, char **argv) {
try {
int nargs = argc - 1;
if (nargs != 2) {
cerr_locked() << "Expecting 2 arguments, but got " << nargs << "\n"
<< "Usage: " << basename(argv[0]) << " <udp port> <admin port>" << "\n"
<< " udp port The port to listen for UDP packets on. These are then\n"
<< " re-transmitted based to the registered listeners\n"
<< " admin port The port to listen on for registration/deregistration commands.\n"
;
return 1;
}
int incoming_port = atoi(argv[1]);
int outflow_admin_port = atoi(argv[2]);
if (incoming_port == 0 || outflow_admin_port == 0) {
if (incoming_port == 0) {
cerr_locked() << "Invalid UDP fanout listen port '" << argv[1] << "'\n";
}
if (outflow_admin_port == 0) {
cerr_locked() << "Invalid admin port '" << argv[2] << "'\n";
}
return 1;
}
ClientSet registrations;
// mask out the SIGINT signal so that when we create the child threads, we know that
// there will not be signals delivered to them.
SignalBlocker signalBlocker = {SIGINT};
signalBlocker.block();
//cout_locked() << "Starting mirror thread" << std::endl;
std::thread mirror_thread(udp_mirror, incoming_port, ®istrations);
//cout_locked() << "Starting admin thread" << std::endl;
std::thread admin_thread(outflow_admin, outflow_admin_port, ®istrations);
// child threads are created (inherting the block of the required signals), we can re-enable signals
// and know that only the main thread will process signals
signalBlocker.unblock();
//cout_locked() << "Detaching mirror thread" << std::endl;
mirror_thread.detach();
//cout_locked() << "Waiting for admin thread to finish" << std::endl;
admin_thread.join();
return 0;
} catch (const std::exception& e) {
cerr_locked() << "Unhandled exception in main: " << e.what() << std::endl;
} catch (...) {
cerr_locked() << "Unknown exception in main";
}
return 2;
}