diff --git a/.vscode/launch.json b/.vscode/launch.json
index 9b55058..a45623f 100644
--- a/.vscode/launch.json
+++ b/.vscode/launch.json
@@ -8,7 +8,7 @@
"name": "(Linux) Launch",
"type": "cppdbg",
"request": "launch",
- "program": "${workspaceFolder}/build/ptop_rendezvous",
+ "program": "${workspaceFolder}/build/ptop",
"args": [],
"stopAtEntry": false,
"cwd": "${fileDirname}",
diff --git a/doc/happy path.drawio b/doc/happy path.drawio
index 84ea1c5..09e734b 100644
--- a/doc/happy path.drawio
+++ b/doc/happy path.drawio
@@ -1 +1 @@
-1ZdNc5swEIZ/Dcd2+IiJc0wcNzm0J3fS5pRRYANKBcsIYaC/vgIWYxV/tdPU5IT0akHadx/tDJa3SKo7ybL4C4YgLNcOK8u7tVzXcWxXPxql7pS5P+uESPKQggZhxX8CiTapBQ8hNwIVolA8M8UA0xQCZWhMSizNsBcU5q4Zi2AkrAImxuo3HqqYsnAvB/0eeBT3Ozv+VbeSsD6YMsljFmK5JXlLy1tIRNWNkmoBojGv96V779Oe1c3BJKTqlBfkGuJHl72G4dd7Uav84YGrD45Hh1N1nzGE2gCaolQxRpgysRzUG4lFGkLzWVvPhpjPiJkWHS2+glI1VZMVCrUUq0TQqj6xrL8373+c9dNH+lw7ua2MWU2zccrkQo6FDOBQnoQOkxGoQ3FUrMaErR3I0TvABPSBdIAEwRRfm5Qwgi3axA310AMqyZ+Up/vumomCdgoE18knjKejwpllKWOuYJWx1pZSX06zBHutXINUUB3MnVYverPoajsezcvhojg9/fHWJfHtN7LrcmTXOej+e0rdEyn1JwWpO3K9bbLXLQmp7nn5U47BD52R6wud1s2z1KOoGaWgG+J1H5Xp076gTEZFO8KyWbF/QbZtku05O8i++J9kO/47R9s/Ee35pNAeu74T7X1g77gB5yb7t569If1sPXu+32LIeHCkdTR+KMwma+/Vue11Zu+8cfSOHu0c7qQ6R3/sXVwfRboLeGqAzUBOjurNL80bUK2nwy9Ru7b1Y+ktfwE=
\ No newline at end of file
+5Zpbd5s4EIB/jU+fdg93u49ex213t028cU7b7EuOAhOjLUYcIXzJr18JhLloN6a1HcTpQ2I0DCAN34xmJEb2bL17T1ESfiIBRCPLCHYj+2pkWaZpWPxHSPaFZOK5hWBFcSCVKsESP4MUGlKa4QDShiIjJGI4aQp9Esfgs4YMUUq2TbUnEjWfmqAVKIKljyJV+gUHLJSjcI1K/gHwKmSHAcsza1QqS0EaooBsayJ7PrJnlBBWHK13M4iE8Uq7FNe9+5+zh45RiFmXC57/9ndGSn0wP/+VhH/8/ufSe/rFKe6yQVEmByw7y/alBSDgBpFNQllIViRG0byS/kZJFgcgHmPwVqXzkZCEC00u/AcY28u3izJGuChk60ieVYciR5eSjHf4hf6XSCC6AvaCnl3oibHUHiAN9R7IGhjdcwUKEWJ403z5SDK0OuhVZuYH0tLfYXVTsfqSj4Appm8adhtiBssE5QbZcnfraMQNUAa7F4ddnnUkq9JZrRLzbYW+WfIc1rD3jAtZ6u3A+bQ78jnRik9bsfpMhlcRffm/W+AGfd6QLOWNJVCOWO/w2trBa6p+Pix6Jx3pLSdrTfCdqOGV8yqwnU+v7h/e3dw+LKwFb0+v+Unj0/3D1fRuKuZsSFM+oafKWzpCb/MVnYFlt82y3ZHlQ/pxfpjVoDAsmEtIj9NsaUVz2e+a4b8gzPJklooH8r/ZzfX1fHb3sJjPbyuOhQolazHmUDTS/w7Urw33WEO43aHDbXWF29EL7vFPY3hPL8PrkVnDDrOv4vJfXdm6r5252sk75439sRB0xpd19nw8v3RKKdrXFBKCY5bW7rwQgipSToxmpHTaJX5L3xufpu96RgusoscVZoehn0CepZA3ZQzWyaG6+EAiMV0tstgPtZut7Ld9lxXW4DMxp6MblgPTJWaqq2X5uo0ogj/ilEGM49UhL7vhSZf4XQDQN0JF1tCYxL1D7UxaUDsdoZ5cDGpv6FB7XaF29YJaNXwrHLfWfg48L7LHCPv8YB4HSTGxaIa107WyuBzWeqRZJ2DddQ3I0msJ01QXgbpiTfEGMdCZa9fqm2t76DmI1XU1yD57DvJDpYDptULb+BVSdUtdemo50TTjWU7MsF94zJL436B/fzHb80DX9OZyObvTh3twC9J9rbwWzXt5u7xRFdh56/QKuwwDx6eLs28Z/JBbWe3q7lXcSq2AGyu6onCA0sMexRGq+ZaBNghH6FHUyD37mW23rNf7lpvt9epnZsPLKqc7v5+5vfnZaXFQXWhvoI8PM0ua+T7wl6Ib4/3PJWWH9JhLzMsx3rn00IxxtfQYGONu17r5cowPfUOuLBuO1xd6bciV/a7DC282IgmRn1gKXgt8i13lBDTYUzZbOyVe/wSrUWBgBHddpbfHehGsrtLfURSn6zzuikz6CUegIcPtKKwBw+ru/C34gPNwIK3oh1n8TaybyU9NtDBle8Hskqbkzeoj7qJErD6Ft+f/Ag==
\ No newline at end of file
diff --git a/src/client.cpp b/src/client.cpp
index a16397f..ef1f22d 100644
--- a/src/client.cpp
+++ b/src/client.cpp
@@ -9,208 +9,129 @@
#include
-
using namespace std::chrono;
-client_init_kit::client_init_kit(std::string server_address_pair, ::protocol chosen_protocol) : protocol(chosen_protocol) {
- _conn_socket = std::make_unique(server_address_pair, ServerListenPort, protocol);
+client_init_kit::client_init_kit(std::string server_address_pair, ::Protocol chosen_protocol) : protocol(chosen_protocol) {
+ _server_socket = std::make_unique(server_address_pair, ServerListenPort, protocol, "Server-Conn");
// Indicate to server we're ready for p2p
- last_send = std::chrono::system_clock::now();
- _conn_socket->send_data(create_message(MESSAGE_TYPE::MY_DATA, _conn_socket->get_myname_readable().to_bytes()));
- _conn_socket->send_data(create_message(MESSAGE_TYPE::READY_FOR_P2P));
- status = EXECUTION_STATUS::CONTINUE;
+ server_last_send = std::chrono::system_clock::now();
+ _server_socket->send_data(create_message(MESSAGE_TYPE::MY_DATA, _server_socket->get_myname_readable().to_bytes()));
+ _server_socket->send_data(create_message(MESSAGE_TYPE::READY_FOR_P2P));
//int value types will update themselves
protocol = chosen_protocol;
}
-std::unique_ptr& client_init_kit::get_conn_socket() {
- return _conn_socket; //yay bug
+std::unique_ptr& client_init_kit::get_server_socket() {
+ return _server_socket;
}
-void client_init_kit::set_conn_socket(std::unique_ptr&& input) {
- _conn_socket = std::move(input);
+void client_init_kit::set_server_socket(std::unique_ptr&& input) {
+ _server_socket = std::move(input);
if(input == nullptr)
std::cout << "client_init_kit: connection socket set to nullptr mmk?" << std::endl;
}
-client_auth_kit::client_auth_kit(client_init_kit& init_kit, const char* data, int i, MESSAGE_LENGTH_T data_len) {
- peer_public = read_peer_data(data, i, data_len);
- peer_private = read_peer_data(data, i, data_len);
- std::cout << "Target is: " << peer_private.ip_address << ":" << peer_private.port << "/" << peer_public.ip_address << ":" << peer_public.port << " priv/pub" << std::endl;
-
- auth_key_out = read_data(data, i, data_len);
- old_privatename = init_kit.get_conn_socket()->get_myname_raw();
- init_kit.set_conn_socket(nullptr);
+client_peer_kit::client_peer_kit() {
- listen_sock = std::make_unique(old_privatename, init_kit.protocol);
- listen_sock->listen();
- public_connector = std::make_unique(old_privatename, peer_public.ip_address, peer_public.port, init_kit.protocol);
- private_connector = std::make_unique(old_privatename, peer_private.ip_address, peer_private.port, init_kit.protocol);
-
- std::vector> unauthed_sockets{};
}
+void client_peer_kit::set_peer_data(client_init_kit& init_kit, const char* data, int next_data_index, MESSAGE_LENGTH_T data_len) {
+ public_info = read_peer_data(data, next_data_index, data_len);
+ private_info = read_peer_data(data, next_data_index, data_len);
+ std::cout << "Target is: " << private_info.ip_address << ":" << private_info.port << "/" << public_info.ip_address << ":" << public_info.port << " priv/pub" << std::endl;
-EXECUTION_STATUS process_auth(const Message& mess, std::unique_ptr& socket, int my_auth)
-{
- if (mess == Message::null_message)
- return EXECUTION_STATUS::FAILED;
+ old_privatename = init_kit.get_server_socket()->get_myname_raw();
+ init_kit.set_server_socket(nullptr); //need to close the server socket HERE to maintain the same session in the peer sockets
+ public_connector = std::make_unique(old_privatename, public_info.ip_address, public_info.port, init_kit.protocol, "HolePunch-Public");
+ private_connector = std::make_unique(old_privatename, private_info.ip_address, private_info.port, init_kit.protocol, "HolePunch-Private");
+
+ listen_sock = std::make_unique(old_privatename, init_kit.protocol, "HolePunch-Listen");
+ listen_sock->listen();
- const char* data = mess.Data.data();
- size_t data_len = mess.Length;
- int i = 0;
- int auth_key = 0;
- MESSAGE_TYPE type = mess.Type;
+ peer_connect_start_time = std::chrono::system_clock::now();
+}
- switch (type)
+EXECUTION_STATUS connect_public(client_init_kit& init_kit, client_peer_kit& peer_kit) {
+ auto status = peer_kit.public_connector->has_connected();
+ if (status == ConnectionStatus::SUCCESS)
{
- case MESSAGE_TYPE::AUTH_PLS:
- std::cout << "Peer (" << socket->get_endpoint_ip() << ":" << socket->get_endpoint_port() << ") requesting Auth, responding with key" << std::endl;
- socket->send_data(create_message(MESSAGE_TYPE::HERES_YOUR_AUTH, my_auth));
- return EXECUTION_STATUS::CONTINUE;
- case MESSAGE_TYPE::HERES_YOUR_AUTH:
- std::cout << "Peer (" << socket->get_endpoint_ip() << ":" << socket->get_endpoint_port() << ") has replied with key" << std::endl;
- if (!try_read_data(data, i, data_len, auth_key))
- {
- std::cout << "Failed to read key from peer" << std::endl;
- return EXECUTION_STATUS::FAILED;
- }
-
- if (auth_key == my_auth)
- {
- std::cout << "Key matches, we should be connected!" << std::endl;
- return EXECUTION_STATUS::CONNECTED;
- }
- std::cout << "Key did not match" << std::endl;
- return EXECUTION_STATUS::FAILED;
- default:
- std::cout << __func__ << "(" << __LINE__ << "): Ignoring Message with Type: " << mt_to_string(type) << std::endl;
- return EXECUTION_STATUS::CONTINUE;
+ std::cout << "Public Connection has connected" << std::endl;
+ peer_kit.peer_socket = std::make_unique(std::move(peer_kit.public_connector));
+ peer_kit.peer_socket->set_name("Public-Peer");
+
+ return EXECUTION_STATUS::PEER_CONNECTED;
}
+ else if (status == ConnectionStatus::FAILED)
+ {
+ std::cout << "Public Connection Failed, Retrying connection..." << std::endl;
+ peer_kit.public_connector = nullptr;
+ peer_kit.public_connector = std::make_unique(peer_kit.old_privatename, peer_kit.public_info.ip_address, peer_kit.public_info.port, init_kit.protocol, "HolePunch-Public");
+ }
+ return EXECUTION_STATUS::HOLE_PUNCH;
}
-bool check_for_auth_connection(client_init_kit& init_kit, client_auth_kit& auth_kit) {
- if (auth_kit.unauthed_sockets.size())
- return true;
-
- if (auth_kit.listen_sock->has_connection())
+EXECUTION_STATUS connect_private(client_init_kit& init_kit, client_peer_kit& peer_kit) {
+ auto status = peer_kit.private_connector->has_connected();
+ if (status == ConnectionStatus::SUCCESS)
{
- std::cout << "Successfully accepted peer connection, now authenticating them" << std::endl;
- auth_kit.unauthed_sockets.emplace_back(auth_kit.listen_sock->accept_connection());
+ std::cout << "Private Connection has connected" << std::endl;
+ peer_kit.peer_socket = std::make_unique(std::move(peer_kit.private_connector));
+ peer_kit.peer_socket->set_name("Private-Peer");
- if (init_kit.is_leader)
- auth_kit.unauthed_sockets.back()->send_data(create_message(MESSAGE_TYPE::AUTH_PLS));
-
- std::this_thread::sleep_for(100ms);
- return true;
+ return EXECUTION_STATUS::PEER_CONNECTED;
}
-
- if (auth_kit.private_connector != nullptr)
+ else if (status == ConnectionStatus::FAILED)
{
- auto status = auth_kit.private_connector->has_connected();
- if (status == ConnectionStatus::SUCCESS)
- {
- std::cout << "Private Connection has connected, now attempting to authenticate" << std::endl;
- auto analyser = std::make_unique(std::move(auth_kit.private_connector));
- auth_kit.unauthed_sockets.emplace_back(std::move(analyser));
- auth_kit.private_connector = nullptr;
-
- if (init_kit.is_leader)
- auth_kit.unauthed_sockets.back()->send_data(create_message(MESSAGE_TYPE::AUTH_PLS));
-
- std::this_thread::sleep_for(100ms);
- return true;
- }
- else if (status == ConnectionStatus::FAILED)
- {
- std::cout << "Private Connection attempt failed, retrying..." << std::endl;
- auth_kit.private_connector = std::make_unique(auth_kit.old_privatename, auth_kit.peer_private.ip_address, auth_kit.peer_private.port, init_kit.protocol);
- }
+ std::cout << "Private Connection attempt failed, retrying..." << std::endl;
+ peer_kit.private_connector = nullptr;
+ peer_kit.private_connector = std::make_unique(peer_kit.old_privatename, peer_kit.private_info.ip_address, peer_kit.private_info.port, init_kit.protocol, "HolePunch-Private");
}
+ return EXECUTION_STATUS::HOLE_PUNCH;
+}
- if (auth_kit.public_connector != nullptr)
+EXECUTION_STATUS connect_listener(client_peer_kit& peer_kit) {
+ if (peer_kit.listen_sock->has_connection())
{
- auto status = auth_kit.public_connector->has_connected();
- if (status == ConnectionStatus::SUCCESS)
- {
- std::cout << "Public Connection has connected, now authenticating" << std::endl;
- auth_kit.unauthed_sockets.emplace_back(std::make_unique(std::move(auth_kit.public_connector))); //add to the list of connected sockets ready to complete authentication
-
- if (init_kit.is_leader)
- auth_kit.unauthed_sockets.back()->send_data(create_message(MESSAGE_TYPE::AUTH_PLS));
+ std::cout << "Successfully accepted peer connection" << std::endl;
+ peer_kit.peer_socket = peer_kit.listen_sock->accept_connection();
- std::this_thread::sleep_for(100ms);
- return true;
- }
- else if (status == ConnectionStatus::FAILED)
- {
- std::cout << "Public Connection Failed, Retrying connection..." << std::endl;
- auth_kit.public_connector = std::make_unique(auth_kit.old_privatename, auth_kit.peer_public.ip_address, auth_kit.peer_public.port, init_kit.protocol);
- }
+ return EXECUTION_STATUS::PEER_CONNECTED;
}
+ return EXECUTION_STATUS::HOLE_PUNCH;
}
-EXECUTION_STATUS respond_to_auth(client_init_kit& init_kit, client_auth_kit& auth_kit) {
- for (size_t i = auth_kit.unauthed_sockets.size(); i-- > 0; )
- {
- auto& sock = auth_kit.unauthed_sockets[i];
+EXECUTION_STATUS connect_peer(client_init_kit& init_kit, client_peer_kit& peer_kit) {
+ if(peer_kit.peer_socket)
+ return EXECUTION_STATUS::PEER_CONNECTED;
- if (sock && sock->has_message())
- {
- auto status = process_auth(sock->receive_message(), sock, auth_kit.auth_key_out);
+ auto status = connect_listener(peer_kit);
+
+ if(status != EXECUTION_STATUS::PEER_CONNECTED)
+ status = connect_public(init_kit, peer_kit);
- if (status == EXECUTION_STATUS::FAILED)
- {
- std::cout << "Socket '" << sock->get_endpoint_ip() << ":" << sock->get_endpoint_port() << "' has failed to authenticate" << std::endl;
- auth_kit.unauthed_sockets.pop_back();
- }
+ if(status != EXECUTION_STATUS::PEER_CONNECTED)
+ status = connect_private(init_kit, peer_kit);
- else if (status == EXECUTION_STATUS::CONNECTED)
- {
- std::cout << "Socket '" << sock->get_endpoint_ip() << ":" << sock->get_endpoint_port() << "' has successfully authenticated" << std::endl;
- init_kit.set_conn_socket(std::move(sock)); //caused a bug if we dont return immediately after
- return EXECUTION_STATUS::CONNECTED; //we only care if either private or public sockets got punched, not both
- }
- }
- }
- return EXECUTION_STATUS::CONTINUE;
+ return status;
}
-/// Server giving us a peer to connect to
-/// Attempt to connect to peer by connecting to it and listening for a connection
-/// The connect socket must have same local address binding as the socket that connected to the server
-/// And we must disconnect the connection to the server
-EXECUTION_STATUS hole_punch(client_init_kit& kit, const char* data, int& auth_key_out, int i, MESSAGE_LENGTH_T data_len) {
+EXECUTION_STATUS hole_punch(client_init_kit& init_kit, client_peer_kit& peer_kit) {
try
- {
- std::cout << "Attempting to Hole Punch" << std::endl;
- client_auth_kit auth_kit{kit, data, i, data_len};
- auto start_time = std::chrono::system_clock::now();
- auto current_time = start_time;
+ {
+ EXECUTION_STATUS status = EXECUTION_STATUS::HOLE_PUNCH;
- bool connection_made = false;
+ if(init_kit.status != EXECUTION_STATUS::PEER_CONNECTED)
+ status = connect_peer(init_kit, peer_kit);
- do
- {
- if(connection_made == false)
- connection_made = check_for_auth_connection(kit, auth_kit);
+ auto current_time = std::chrono::system_clock::now();
- else
- {
- auto auth_status = respond_to_auth(kit, auth_kit);
-
- if (auth_status == EXECUTION_STATUS::CONNECTED)
- return auth_status;
- }
-
- std::this_thread::sleep_for(100ms);
- }
+ if(current_time - peer_kit.peer_connect_start_time > 15s) {
+ std::cerr << "Time out trying to hole punch reached" << std::endl;
+ return EXECUTION_STATUS::FAILED;
+ }
- while (current_time - start_time < 15s);
-
- std::cerr << "Time out trying to hole punch reached" << std::endl;
- return EXECUTION_STATUS::FAILED;
+ return status;
}
catch (const std::exception& e)
{
@@ -218,7 +139,7 @@ EXECUTION_STATUS hole_punch(client_init_kit& kit, const char* data, int& auth_ke
}
}
-EXECUTION_STATUS process_server_data(client_init_kit& kit, const Message& message)
+EXECUTION_STATUS process_server_data(client_init_kit& init_kit, client_peer_kit& peer_kit, const Message& message)
{
try
{
@@ -231,38 +152,26 @@ EXECUTION_STATUS process_server_data(client_init_kit& kit, const Message& messag
return EXECUTION_STATUS::COMPLETE;
}
- int i = 0;
+ int message_data_index = 0;
auto msg_type = message.Type;
switch (msg_type)
{
- case MESSAGE_TYPE::MSG:
- {
- std::string msg = read_string(data, i, data_len);
- std::cout << "Message received from server: " << msg << std::endl;
- return EXECUTION_STATUS::CONTINUE;
- }
- case MESSAGE_TYPE::FILE:
- {
- std::cout << "Received file from server" << std::endl;
- // TODO: actually read the file
- return EXECUTION_STATUS::CONTINUE;
- }
- case MESSAGE_TYPE::CONNECT_PEER:
- {
- return hole_punch(kit, data, kit.auth_key, i, data_len);
- }
-
- case MESSAGE_TYPE::CONNECT_PEER_AS_LEADER:
+ case MESSAGE_TYPE::CONNECT_TO_PEER:
{
- kit.is_leader = true;
- return hole_punch(kit, data, kit.auth_key, i, data_len);
+ if (init_kit.do_delay)
+ {
+ std::cout << "Delaying hole punching by 5s..." << std::endl;
+ std::this_thread::sleep_for(5s);
+ }
+ peer_kit.set_peer_data(init_kit, data, message_data_index, data_len);
+ return EXECUTION_STATUS::HOLE_PUNCH;
}
case MESSAGE_TYPE::NONE:
default:
std::cout << __func__ << "(" << __LINE__ << "): Ignoring Message with Type: " << mt_to_string(msg_type) << std::endl;
- return EXECUTION_STATUS::CONTINUE;
+ return EXECUTION_STATUS::RENDEZVOUS;
}
}
catch (const std::exception& e)
@@ -271,7 +180,7 @@ EXECUTION_STATUS process_server_data(client_init_kit& kit, const Message& messag
}
}
-EXECUTION_STATUS process_peer_data(const Message& mess, const std::unique_ptr& peer, int auth_key)
+EXECUTION_STATUS process_peer_data(const Message& mess, const std::unique_ptr& peer)
{
const char* data = mess.Data.data();
auto data_len = mess.Length;
@@ -286,114 +195,230 @@ EXECUTION_STATUS process_peer_data(const Message& mess, const std::unique_ptrsend_data(create_message(MESSAGE_TYPE::HERES_YOUR_AUTH, auth_key));
-
- return EXECUTION_STATUS::CONTINUE;
- }
- case MESSAGE_TYPE::MSG:
+ case MESSAGE_TYPE::PEER_MSG:
{
std::string msg = read_string(data, i, data_len);
std::cout << "Message received from peer: " << msg << std::endl;
- return EXECUTION_STATUS::CONTINUE;
+ return EXECUTION_STATUS::PEER_CONNECTED;
}
- case MESSAGE_TYPE::FILE:
+ case MESSAGE_TYPE::PEER_FILE:
{
std::cout << "Received file from peer" << std::endl;
// TODO: actually read the file
- return EXECUTION_STATUS::CONTINUE;
- }
- case MESSAGE_TYPE::CONNECT_PEER:
- {
- std::cout << "Received Connect Peer message when already connected" << std::endl;
-
- return EXECUTION_STATUS::CONTINUE;
+ return EXECUTION_STATUS::PEER_CONNECTED;
}
case MESSAGE_TYPE::NONE:
default:
std::cout << __func__ << "(" << __LINE__ << "): Ignoring Message with Type: " << mt_to_string(msg_type) << std::endl;
- return EXECUTION_STATUS::CONTINUE;
+ return EXECUTION_STATUS::PEER_CONNECTED;
}
- return EXECUTION_STATUS::CONTINUE;
+ return EXECUTION_STATUS::PEER_CONNECTED;
}
-
-void client_loop(std::string server_address_pair, protocol input_protocol)
+void get_user_input(thread_queue& msg_queue)
{
- std::cout << "Starting ptop!" << std::endl;
- std::cout << "Connecting to rendezvous server: " << server_address_pair << std::endl;
- client_init_kit kit{ server_address_pair, input_protocol };
- auto& connection_socket = kit.get_conn_socket();
-
- while (kit.status == EXECUTION_STATUS::CONTINUE) //listen at the start of protocol
- {
- auto now = std::chrono::system_clock::now();
-
- if (now - kit.last_send > 3s)
- {
- connection_socket->send_data(create_message(MESSAGE_TYPE::READY_FOR_P2P));
- kit.last_send = now;
- }
- if (connection_socket->has_message())
+ std::string input;
+ do
+ {
+ std::getline(std::cin, input); //waits until cin input
{
- auto message = connection_socket->receive_message();
- kit.status = process_server_data(kit, message);
+ std::unique_lock lock(msg_queue.queue_mutex);
+ msg_queue.messages.push(input);
}
+
std::this_thread::sleep_for(100ms);
- }
+ } while (true);
+}
- if (kit.status == EXECUTION_STATUS::CONNECTED)
- {
- kit.status = EXECUTION_STATUS::CONTINUE;
- std::cout << "connected to peer. enter your message!" << std::endl;
- thread_queue message_queue{};
+void print_help()
+{
+ auto space = "\t";
+ std::cout << "PTOP Peer v69.42 is running" << std::endl;
+ std::cout << "Runtime commands:" << std::endl;
+ std::cout << space << "file: [filename]" << std::endl;
+ std::cout << space << space << "sends a file to your peer (not currently implemented)" << std::endl;
+ std::cout << std::endl;
+ std::cout << space << "msg: [text]" << std::endl;
+ std::cout << space << space << "sends plain text message of [text] (without braces) to your peer" << std::endl;
+ std::cout << space << space << "example: \"msg: banana\" will send 'banana' to your peer" << std::endl;
+ std::cout << std::endl;
+ std::cout << space << "delay" << std::endl;
+ std::cout << space << space << "delays this peer's hole punch call by a set amount (changes and cbf updating this every time)" << std::endl;
+ std::cout << space << space << "this must be called before this peer tries to hole punch" << std::endl;
+ std::cout << std::endl;
+ std::cout << space << "quit" << std::endl;
+ std::cout << space << space << "closes the program" << std::endl;
+ std::cout << std::endl;
+ std::cout << space << "debug" << std::endl;
+ std::cout << space << space << "outputs current status and relevant information" << std::endl;
+}
- std::thread input_thread = std::thread([&message_queue]()
+// Returns whether to quit or not
+bool do_user_input(thread_queue& message_queue, std::unique_lock& take_message_lock, std::unique_ptr& peer_socket, client_init_kit& i_kit, client_peer_kit& peer_kit)
+{
+ if (take_message_lock.try_lock())
+ {
+ if (!message_queue.messages.empty())
{
- std::string input;
- do
+ std::string input_message = message_queue.messages.front();
+ message_queue.messages.pop();
+
+ if (input_message.substr(0, 5) == "msg: ")
{
- std::getline(std::cin, input); //waits until cin input
+ if (peer_socket)
{
- std::unique_lock lock(message_queue.queue_mutex);
- message_queue.messages.push(input);
+ std::string send_message = input_message.substr(5);
+ std::cout << "Sending string of: " << send_message << std::endl;
+ peer_socket->send_data(create_message(MESSAGE_TYPE::PEER_MSG, send_message));
}
-
- std::this_thread::sleep_for(100ms);
- } while (true);
- });
- input_thread.detach();
-
- std::unique_lock take_message_lock(message_queue.queue_mutex, std::defer_lock);
-
- do {
- if (connection_socket->has_message())
+ else
+ std::cout << "Can not send to peer, we have no peer connection" << std::endl;
+ }
+ else if (input_message.substr(0, 6) == "file: ")
{
- auto message = connection_socket->receive_message();
- kit.status = process_peer_data(message, connection_socket, kit.auth_key);
+ std::cout << "file sending not implemented" << std::endl;
}
-
+ else if (input_message.substr(0, 4) == "quit")
+ {
+ std::cout << "Quitting..." << std::endl;
+ take_message_lock.unlock();
+ return true;
+ }
+ else if (input_message.substr(0, 5) == "delay")
+ {
+ if (i_kit.status == EXECUTION_STATUS::RENDEZVOUS)
+ {
+ std::cout << "Delaying this peer's hole punch" << std::endl;
+ i_kit.do_delay = true;
+ }
+ else
+ {
+ std::cout << "Too late in execution to delay hole punching" << std::endl;
+ }
+ }
+ else if (input_message.substr(0, 5) == "debug")
{
- if (take_message_lock.try_lock())
+ std::cout << "Deburger:" << std::endl;
+ std::cout << "Protocol: " << (i_kit.protocol.is_tcp() ? "TCP" : (i_kit.protocol.is_udp() ? "UDP" : "Unknown...")) << std::endl;
+ std::cout << "Current State: ";
+ switch (i_kit.status)
{
- if (!message_queue.messages.empty())
+ default:
+ std::cout << es_to_string(i_kit.status) << " Client should not be in this state, potentially a bug" << std::endl;
+ break;
+ case EXECUTION_STATUS::RENDEZVOUS:
+ {
+ std::cout << "Rendezvousing with server" << std::endl;
+ auto& server_conn = i_kit.get_server_socket();
+ if (!server_conn)
+ std::cout << "Connection to server appears to be null" << std::endl;
+ else
{
- std::string input_message = message_queue.messages.front();
- message_queue.messages.pop();
- connection_socket->send_data(create_message(MESSAGE_TYPE::MSG, input_message));
+ std::cout << "Connected to server at: " << server_conn->get_identifier_str() << std::endl;
}
- take_message_lock.unlock();
+ }
+ break;
+ case EXECUTION_STATUS::HOLE_PUNCH:
+ std::cout << "Hole punching to peer" << std::endl;
+ if (!peer_kit.public_connector)
+ std::cout << "Public connector is null" << std::endl;
+ else
+ std::cout << "Public connector: " << peer_kit.public_connector->get_identifier_str() << std::endl;
+ if (!peer_kit.private_connector)
+ std::cout << "Private connector is null" << std::endl;
+ else
+ std::cout << "Private connector: " << peer_kit.private_connector->get_identifier_str() << std::endl;
+ if (!peer_kit.listen_sock)
+ std::cout << "Listen socket is null" << std::endl;
+ else
+ std::cout << "Listen socket: " << peer_kit.listen_sock->get_identifier_str() << std::endl;
+
+ break;
+ case EXECUTION_STATUS::PEER_CONNECTED:
+ std::cout << "Connected to peer" << std::endl;
+ if (!peer_kit.peer_socket)
+ std::cout << "Peer socket is null (a bug)" << std::endl;
+ else
+ std::cout << "Peer socket is: " << peer_kit.peer_socket->get_identifier_str() << std::endl;
+ break;
}
}
- std::this_thread::sleep_for(100ms);
+ else if (input_message.substr(0, 4) == "help")
+ {
+ print_help();
+ }
+ else if (input_message.size() && input_message.find_first_not_of(' ') != std::string::npos)
+ {
+ std::cout << "Unknown command: " << input_message << std::endl;
+ std::cout << "Type 'help' to see available commands" << std::endl;
+ }
- } while (kit.status == EXECUTION_STATUS::CONTINUE);
+ }
+ take_message_lock.unlock();
+ }
+ return false;
+}
- std::cout << "finished sending to peer" << std::endl;
+void client_loop(std::string server_address_pair, Protocol input_protocol)
+{
+ std::cout << "Starting ptop!" << std::endl;
+ std::cout << "Connecting to rendezvous server: " << server_address_pair << std::endl;
+ client_init_kit init_kit{ server_address_pair, input_protocol };
+ client_peer_kit peer_kit{};
+ auto& connection_socket = init_kit.get_server_socket();
+
+ thread_queue message_queue{};
+
+ std::thread input_thread = std::thread(get_user_input, std::ref(message_queue));
+ input_thread.detach();
+ std::unique_lock take_message_lock(message_queue.queue_mutex, std::defer_lock);
+
+ init_kit.status = EXECUTION_STATUS::RENDEZVOUS;
+
+ while (init_kit.status != EXECUTION_STATUS::COMPLETE && init_kit.status != EXECUTION_STATUS::FAILED) //listen at the start of protocol
+ {
+ switch (init_kit.status)
+ {
+ case EXECUTION_STATUS::RENDEZVOUS:
+ {
+ auto now = std::chrono::system_clock::now();
+
+ if (now - init_kit.server_last_send > 3s)
+ {
+ connection_socket->send_data(create_message(MESSAGE_TYPE::READY_FOR_P2P));
+ init_kit.server_last_send = now;
+ }
+ if (connection_socket->has_message())
+ {
+ auto message = connection_socket->receive_message();
+ init_kit.status = process_server_data(init_kit, peer_kit, message);
+ }
+ }
+ break;
+
+ case EXECUTION_STATUS::HOLE_PUNCH:
+ {
+ init_kit.status = hole_punch(init_kit, peer_kit);
+ }
+ break;
+
+ case EXECUTION_STATUS::PEER_CONNECTED:
+ {
+ if (peer_kit.peer_socket->has_message())
+ {
+ auto message = peer_kit.peer_socket->receive_message();
+ init_kit.status = process_peer_data(message, peer_kit.peer_socket);
+ }
+ }
+ break;
+ }
+
+ if (do_user_input(message_queue, take_message_lock, peer_kit.peer_socket, init_kit, peer_kit))
+ init_kit.status = EXECUTION_STATUS::COMPLETE;
+
+ std::this_thread::sleep_for(100ms);
}
}
+
diff --git a/src/client.h b/src/client.h
index 0bd1064..7052c86 100644
--- a/src/client.h
+++ b/src/client.h
@@ -9,34 +9,35 @@
#include "interfaces.h"
#include "platform.h"
-
-void client_loop(std::string server_address_pair, protocol input_protocol);
+void client_loop(std::string server_address_pair, Protocol input_protocol);
class client_init_kit {
public:
- client_init_kit(std::string server_address_pair, ::protocol input_protocol);
- std::chrono::system_clock::time_point last_send;
- int auth_key;
+ client_init_kit(std::string server_address_pair, ::Protocol input_protocol);
+ std::chrono::system_clock::time_point server_last_send;
EXECUTION_STATUS status;
- ::protocol protocol;
- bool is_leader;
+ ::Protocol protocol;
+ bool do_delay = false;
- std::unique_ptr& get_conn_socket();
- void set_conn_socket(std::unique_ptr&& input);
+ std::unique_ptr& get_server_socket();
+ void set_server_socket(std::unique_ptr&& input);
protected:
- std::unique_ptr _conn_socket;
+ std::unique_ptr _server_socket;
};
-class client_auth_kit {
+class client_peer_kit {
public:
- client_auth_kit(client_init_kit& init_kit, const char* data, int i, MESSAGE_LENGTH_T data_len);
- std::vector> unauthed_sockets;
- std::unique_ptr listen_sock;
- int auth_key_out;
- std::unique_ptr public_connector;
- std::unique_ptr private_connector;
- readable_ip_info peer_public;
- readable_ip_info peer_private;
+ client_peer_kit();
+ void set_peer_data(client_init_kit& init_kit, const char* data, int message_data_index, MESSAGE_LENGTH_T data_len);
+
+ std::unique_ptr public_connector;
+ std::unique_ptr private_connector;
+ std::unique_ptr listen_sock;
+ readable_ip_info public_info;
+ readable_ip_info private_info;
raw_name_data old_privatename;
+
+ std::chrono::system_clock::time_point peer_connect_start_time;
+ std::unique_ptr peer_socket;
};
diff --git a/src/clientmain.cpp b/src/clientmain.cpp
index 1400436..e96468d 100644
--- a/src/clientmain.cpp
+++ b/src/clientmain.cpp
@@ -60,7 +60,7 @@ int main(int argc, char** argv) {
std::string possible_protocol{};
std::cin >> possible_protocol;
- protocol validated{ possible_protocol };
+ Protocol validated{ possible_protocol };
client_loop(raw_ip, validated);
}
diff --git a/src/error.h b/src/error.h
index 098ca22..b144343 100644
--- a/src/error.h
+++ b/src/error.h
@@ -6,7 +6,7 @@
#define LINE_CONTEXT (std::string("(") + __func__ + ", " + std::to_string(__LINE__) + ")")
#endif
-void throw_new_exception(std::string input, std::string line_context);
+[[noreturn]] void throw_new_exception(std::string input, std::string line_context);
void throw_with_context(const std::exception& e, std::string context);
void print_exception(const std::exception& e, int level = 0);
\ No newline at end of file
diff --git a/src/interfaces.h b/src/interfaces.h
index 6da9f35..36732d2 100644
--- a/src/interfaces.h
+++ b/src/interfaces.h
@@ -5,6 +5,16 @@
#include "name_data.h"
#include "message.h"
+// BEGIN UDP CRAP
+
+struct udp_Message
+{
+ Message message;
+ raw_name_data from;
+};
+
+// END UDP CRAP
+
class ISocketWrapper
{
public:
@@ -20,7 +30,10 @@ class ISocketWrapper
virtual std::string get_my_port() const = 0;
virtual std::string get_endpoint_ip() const = 0;
virtual std::string get_endpoint_port() const = 0;
- virtual std::string get_identifier_str() const = 0;
+ virtual std::string get_identifier_str() = 0;
+
+ virtual const std::string& get_name() const = 0;
+ virtual void set_name(std::string name) = 0;
};
class IDataSocketWrapper : virtual public ISocketWrapper
diff --git a/src/ip.h b/src/ip.h
index 4253857..13bdd88 100644
--- a/src/ip.h
+++ b/src/ip.h
@@ -12,6 +12,7 @@ struct readable_ip_info
std::string port;
std::vector to_bytes() const;
+ inline std::string to_string() const { return "(" + ip_address + ":" + port + ")"; }
};
inline std::ostream& operator<<(std::ostream& os, const readable_ip_info& ip_info)
diff --git a/src/linux_platform.cpp b/src/linux_platform.cpp
index 3de25fa..2c762a4 100644
--- a/src/linux_platform.cpp
+++ b/src/linux_platform.cpp
@@ -38,8 +38,8 @@ std::string linux_error(int err_code)
Platform::Platform(PtopSocket&& socket)
: _socket(std::move(socket))
-{
- update_name_info();
+{
+ try_update_name_info();
if (_address == "Unassigned" || _address.empty() ||
_port == "Unassigned" || _port.empty()) {
@@ -47,6 +47,28 @@ Platform::Platform(PtopSocket&& socket)
}
}
+void Platform::try_update_name_info()
+{
+ try
+ {
+ update_name_info();
+ }
+ catch (const std::exception& e)
+ {
+ }
+}
+
+void Platform::try_update_endpoint_info()
+{
+ try
+ {
+ update_endpoint_info();
+ }
+ catch (const std::exception& e)
+ {
+ }
+}
+
void Platform::update_name_info()
{
auto name = get_myname_readable();
@@ -58,6 +80,11 @@ void Platform::update_endpoint_info()
{
try
{
+ if (_socket.is_tcp() && _socket.is_listen())
+ {
+ std::cout << "[Socket] Not updating endpoint as this socket " << get_identifier_str() << " is a listen socket" << std::endl;
+ return;
+ }
auto name = get_peername_readable();
_endpoint_address = name.ip_address;
_endpoint_port = name.port;
@@ -116,14 +143,14 @@ raw_name_data Platform::get_myname_raw() const
return _socket.get_name_raw();
}
-PtopSocket listen_construct(std::string port, protocol input_proto)
+PtopSocket listen_construct(std::string port, Protocol input_proto, std::string name)
{
- std::cout << "[Listen] Create new Socket on port (with localhost): " << port << std::endl;
- auto listen_socket = PtopSocket(input_proto);
+ std::cout << "[Listen] Creating new Socket on port (with localhost, named: " << name << "): " << port << std::endl;
+ auto listen_socket = PtopSocket(input_proto, name);
socklen_t cli_len;
if (listen_socket.is_invalid())
- throw std::runtime_error(std::string("[Listen] Failed to create linux socket: ") + linux_error());
+ throw std::runtime_error(std::string("[Listen] ") + name + " Failed to create linux socket : " + linux_error());
listen_socket.set_socket_reuse();
@@ -135,19 +162,19 @@ PtopSocket listen_construct(std::string port, protocol input_proto)
serv_addr.sin_addr.s_addr = INADDR_ANY;
serv_addr.sin_port = htons(portno);
- std::cout << "Binding..." << std::endl;
+ std::cout << name << " Binding..." << std::endl;
listen_socket.bind_socket(raw_name_data{ *(sockaddr*)&serv_addr, sizeof(serv_addr) });
return listen_socket;
}
-PlatformListener::PlatformListener(std::string port, protocol input_proto) : Platform(listen_construct(port, input_proto))
+PlatformListener::PlatformListener(std::string port, Protocol input_proto, std::string name) : Platform(listen_construct(port, input_proto, name))
{
}
void PlatformListener::listen()
{
- std::cout << "[Listen] Socket now Listening (" << get_my_ip() << ":" << get_my_port() << ")" << std::endl;
+ std::cout << "[Listen] Socket " << get_name() << " now Listening(" << get_my_ip() << ":" << get_my_port() << ")" << std::endl;
_socket.start_listening();
}
@@ -158,7 +185,7 @@ bool PlatformListener::has_connection()
std::unique_ptr PlatformListener::accept_connection()
{
- std::cout << "[Listen] Socket Attempting to accept a connection" << std::endl;
+ std::cout << "[Listen] " << get_identifier_str() << " Attempting to accept a connection" << std::endl;
try {
auto tmp = _socket.accept_data_socket();
@@ -175,10 +202,11 @@ PtopSocket steal_construct(std::unique_ptr&& old)
{
try
{
- std::cout << "[Data] Moving linux_reusable_nonblocking_connection_socket " << old->get_identifier_str() << " to a data_socket" << std::endl;
- ReusableConnector& real_old = *dynamic_cast(old.get());
+ std::cout << "[Data] Moving INonBlockingConnector " << old->get_identifier_str() << " to a data_socket" << std::endl;
+ NonBlockingConnector& real_old = *dynamic_cast(old.get());
PtopSocket sup = real_old.release_socket();
sup.set_non_blocking(false);
+ sup.set_socket_no_reuse();
return sup;
}
catch (const std::exception& e)
@@ -190,7 +218,7 @@ PtopSocket steal_construct(std::unique_ptr&& old)
void PlatformAnalyser::process_socket_data()
{
std::cout << "[Data] Trying to receive new data from Socket: " << get_identifier_str() << std::endl;
- std::vector recv_data = _socket.recv_bytes();
+ std::vector recv_data = _socket.receive_bytes();
if (recv_data.size() > 0)
{
std::cout << "Received " << recv_data.size() << " bytes" << std::endl;
@@ -241,7 +269,7 @@ PlatformAnalyser::PlatformAnalyser(std::unique_ptr&& old)
{
try
{
- //update_endpoint_info();
+ try_update_endpoint_info();
}
catch (const std::exception& e)
{
@@ -258,9 +286,9 @@ PlatformAnalyser::PlatformAnalyser(PtopSocket&& socket) : Platform(std::move(soc
throw std::runtime_error("[Data] Invalid socket in Copy Constructor");
}
-PtopSocket data_connect_construct(std::string peer_address, std::string peer_port, protocol ip_proto)
+PtopSocket data_connect_construct(std::string peer_address, std::string peer_port, Protocol ip_proto, std::string name)
{
- std::cout << "[Data] Creating a Linux Data Socket connecting to: " << peer_address << ":" << peer_port << std::endl;
+ std::cout << "[Data] Creating a Linux Data Socket (named " << name << ") connecting to : " << peer_address << ":" << peer_port << std::endl;
struct addrinfo* result = NULL,
* ptr = NULL,
@@ -274,18 +302,19 @@ PtopSocket data_connect_construct(std::string peer_address, std::string peer_por
int n = getaddrinfo(peer_address.c_str(), peer_port.c_str(), &hints, &result);
if (n == SOCKET_ERROR)
- throw_new_exception("Failed to get address info for: " + peer_address + ":" + peer_port + " with: " + linux_error(), LINE_CONTEXT);
+ throw_new_exception("Failed to get address info for: (" + name + ") " + peer_address + ":" + peer_port + " with: " + linux_error(), LINE_CONTEXT);
- auto conn_socket = PtopSocket(ip_proto);
+ auto conn_socket = PtopSocket(ip_proto, name);
+ conn_socket.set_socket_reuse();
conn_socket.connect(result->ai_addr, result->ai_addrlen);
return conn_socket;
}
-PlatformAnalyser::PlatformAnalyser(std::string peer_address, std::string peer_port, protocol proto)
-: Platform(data_connect_construct(peer_address, peer_port, proto))
+PlatformAnalyser::PlatformAnalyser(std::string peer_address, std::string peer_port, Protocol proto, std::string name)
+: Platform(data_connect_construct(peer_address, peer_port, proto, name))
{
- update_endpoint_info();
+ try_update_endpoint_info();
}
Message PlatformAnalyser::receive_message()
@@ -299,7 +328,7 @@ Message PlatformAnalyser::receive_message()
return tmp;
}
- throw_new_exception("Failed to parse incoming data", LINE_CONTEXT);
+ return Message::null_message;
}
bool PlatformAnalyser::has_message()
@@ -319,15 +348,15 @@ bool PlatformAnalyser::send_data(const Message& message)
return false;
}
-PtopSocket reuse_listen_construct(raw_name_data data, protocol proto)
+PtopSocket reuse_listen_construct(raw_name_data data, Protocol proto, std::string name)
{
auto readable = convert_to_readable(data);
- std::cout << "[ListenReuseNoB] Creating Reusable Listen Socket on: " << readable.ip_address << ":" << readable.port << std::endl;
+ std::cout << "[ListenReuseNoB] Creating Reusable Listen Socket '" << name << "' on: " << readable.ip_address << ":" << readable.port << std::endl;
- auto listen_socket = PtopSocket(proto);
+ auto listen_socket = PtopSocket(proto, name);
if (listen_socket.is_invalid())
- throw std::runtime_error("[ListenReuseNoB] " + readable.ip_address + ":" + readable.port + " Failed to create reusable nonblocking listen socket: " + linux_error());
+ throw_new_exception("[ListenReuseNoB] (" + name + ") " + readable.ip_address + ":" + readable.port + " Failed to create reusable nonblocking listen socket: " + linux_error(), LINE_CONTEXT);
listen_socket.set_non_blocking(true);
listen_socket.set_socket_reuse();
@@ -337,36 +366,36 @@ PtopSocket reuse_listen_construct(raw_name_data data, protocol proto)
return listen_socket;
}
-ReusableListener::ReusableListener(raw_name_data data, protocol proto)
-: Platform(reuse_listen_construct(data, proto))
+NonBlockingListener::NonBlockingListener(raw_name_data data, Protocol proto, std::string name)
+: Platform(reuse_listen_construct(data, proto, name))
{
}
-void ReusableListener::listen()
+void NonBlockingListener::listen()
{
- std::cout << "[ListenReuseNoB] Now Listening on: " << get_my_ip() << ":" << get_my_port() << std::endl;
+ std::cout << "[ListenReuseNoB] " + get_identifier_str() + " Now Listening on port: " << get_my_port() << std::endl;
_socket.listen(4);
}
-bool ReusableListener::has_connection()
+bool NonBlockingListener::has_connection()
{
return _socket.has_connection();
}
-std::unique_ptr ReusableListener::accept_connection()
+std::unique_ptr NonBlockingListener::accept_connection()
{
- std::cout << "[ListenReuseNoB] Accepting Connection..." << std::endl;
+ std::cout << "[ListenReuseNoB] " + get_identifier_str() + " Accepting Connection..." << std::endl;
auto new_sock = _socket.accept_data_socket();
return std::make_unique(std::move(new_sock));
}
-PtopSocket reuse_connection_construct(raw_name_data data, protocol proto)
+PtopSocket reuse_connection_construct(raw_name_data data, Protocol proto, std::string name)
{
auto readable = convert_to_readable(data);
- std::cout << "[DataReuseNoB] Creating Connection socket bound to: " << readable.ip_address << ":" << readable.port << std::endl;
- auto conn_socket = PtopSocket(proto);
+ std::cout << "[DataReuseNoB] Creating Connection socket '" << name << "' bound to : " << readable.ip_address << ":" << readable.port << std::endl;
+ auto conn_socket = PtopSocket(proto, name);
if (conn_socket.is_invalid())
throw_new_exception("[DataReuseNoB] Failed to create nonblocking socket: " + linux_error(), LINE_CONTEXT);
@@ -374,14 +403,14 @@ PtopSocket reuse_connection_construct(raw_name_data data, protocol proto)
conn_socket.set_non_blocking(true);
conn_socket.set_socket_reuse();
- conn_socket.bind_socket(data, "[DataReuseNoB] Failed to bind");
- std::cout << "[DataReuseNoB] Successfully bound Data socket to: " << readable.ip_address << ":" << readable.port << std::endl;
+ conn_socket.bind_socket(data, std::string("[DataReuseNoB] (") + name + ") Failed to bind");
+ std::cout << "[DataReuseNoB] Successfully bound Data socket (" << name << ") to: " << readable.ip_address << ":" << readable.port << std::endl;
return conn_socket;
}
-ReusableConnector::ReusableConnector(raw_name_data data, std::string ip_address, std::string port, protocol proto)
-: Platform(reuse_connection_construct(data, proto))
+NonBlockingConnector::NonBlockingConnector(raw_name_data data, std::string ip_address, std::string port, Protocol proto, std::string name)
+: Platform(reuse_connection_construct(data, proto, name))
{
// if tcp?
try
@@ -394,11 +423,11 @@ ReusableConnector::ReusableConnector(raw_name_data data, std::string ip_address,
}
}
-void ReusableConnector::connect(std::string ip_address, std::string port)
+void NonBlockingConnector::connect(std::string ip_address, std::string port)
{
try
{
- std::cout << "[DataReuseNoB] Trying to connect to: " << ip_address << ":" << port << std::endl;
+ std::cout << "[DataReuseNoB] (" << get_name() << ") Trying to connect to : " << ip_address << ":" << port << std::endl;
struct addrinfo* results, hints;
bzero(&hints, sizeof(hints));
hints.ai_family = AF_INET;
@@ -409,13 +438,14 @@ void ReusableConnector::connect(std::string ip_address, std::string port)
iResult = getaddrinfo(ip_address.c_str(), port.c_str(), &hints, &results);
if (iResult != 0)
- throw_new_exception("Failed to getaddrinfo, error: " + std::to_string(iResult), LINE_CONTEXT);
+ throw_new_exception("Socket '" + get_name() + "' Failed to getaddrinfo, error: " + std::to_string(iResult), LINE_CONTEXT);
if (results == nullptr)
- throw_new_exception(("No possible sockets found for '") + ip_address + ":" + port + "'", LINE_CONTEXT);
+ throw_new_exception(("No possible sockets found for '") + ip_address + ":" + port + "' (socket '" + get_name() + "')", LINE_CONTEXT);
_socket.connect(results->ai_addr, results->ai_addrlen);
- std::cout << "[DataReuseNoB] Successfully BEGUN Connection to: " << ip_address << ":" << port << std::endl;
+ std::cout << "[DataReuseNoB] (" << get_name() << ") Successfully BEGUN Connection to : " << ip_address << ":" << port << std::endl;
+ try_update_endpoint_info();
}
catch (const std::exception& e)
{
@@ -423,7 +453,7 @@ void ReusableConnector::connect(std::string ip_address, std::string port)
}
}
-ConnectionStatus ReusableConnector::has_connected()
+ConnectionStatus NonBlockingConnector::has_connected()
{
try
{
@@ -435,7 +465,7 @@ ConnectionStatus ReusableConnector::has_connected()
auto sock_error = _socket.get_socket_option(SO_ERROR);
if (sock_error != 0 && sock_error != EAGAIN && sock_error != EINPROGRESS)
{
- std::cerr << LINE_CONTEXT << " [DataReuseNoB] Socket failed to connect with: " << linux_error(sock_error) << std::endl;
+ std::cerr << "[DataReuseNoB] " << LINE_CONTEXT << " Socket '" << get_name() << "' failed to connect with: " << linux_error(sock_error) << std::endl;
return ConnectionStatus::FAILED;
}
@@ -449,7 +479,7 @@ ConnectionStatus ReusableConnector::has_connected()
auto sock_error = _socket.get_socket_option(SO_ERROR);
- std::cerr << LINE_CONTEXT << " [DataReuseNoB] Socket failed to connect with: " << linux_error(sock_error) << std::endl;
+ std::cerr << "[DataReuseNoB] " << LINE_CONTEXT << " Socket '" << get_name() << "' failed to connect with: " << linux_error(sock_error) << std::endl;
return ConnectionStatus::FAILED;
}
diff --git a/src/loop.h b/src/loop.h
index 5b6c34b..a8213b7 100644
--- a/src/loop.h
+++ b/src/loop.h
@@ -1,5 +1,6 @@
#pragma once
+#include
#include
#include
#include
@@ -9,12 +10,40 @@
enum class EXECUTION_STATUS
{
- CONTINUE = 0,
- CONNECTED,
+ NONE = 0,
+ CONTINUE,
+ SERVER_CONNECTED,
COMPLETE,
FAILED,
+ RENDEZVOUS,
+ HOLE_PUNCH,
+ PEER_CONNECTED,
};
+inline std::string es_to_string(EXECUTION_STATUS status)
+{
+ switch (status)
+ {
+ default:
+ case EXECUTION_STATUS::NONE:
+ return "EXECUTION_STATUS::NONE: No Execution Status";
+ case EXECUTION_STATUS::CONTINUE:
+ return "EXECUTION_STATUS::CONTINUE: Continue Current Operation";
+ case EXECUTION_STATUS::SERVER_CONNECTED:
+ return "EXECUTION_STATUS::SERVER_CONNECTED: Connected to Server";
+ case EXECUTION_STATUS::COMPLETE:
+ return "EXECUTION_STATUS::COMPLETE: Program has completed";
+ case EXECUTION_STATUS::FAILED:
+ return "EXECUTION_STATUS::FAILED: Program has encountered an error";
+ case EXECUTION_STATUS::RENDEZVOUS:
+ return "EXECUTION_STATUS::RENDEZVOUS: Program is communicating with rendezvous server";
+ case EXECUTION_STATUS::HOLE_PUNCH:
+ return "EXECUTION_STATUS::HOLE_PUNCH: Program is attempting to hole punch to a peer";
+ case EXECUTION_STATUS::PEER_CONNECTED:
+ return "EXECUTION_STATUS::PEER_CONNECTED: Program has connected to the peer and is communicating with them";
+ }
+}
+
template::value>> // Only allow Plain-old-data to use this method
T read_data(const char* data, int& index, size_t data_len)
{
diff --git a/src/message.h b/src/message.h
index 2ceca70..14c1457 100644
--- a/src/message.h
+++ b/src/message.h
@@ -23,12 +23,9 @@ enum class MESSAGE_TYPE
NONE = 0,
MY_DATA,
READY_FOR_P2P,
- CONNECT_PEER,
- CONNECT_PEER_AS_LEADER,
- AUTH_PLS,
- HERES_YOUR_AUTH,
- MSG,
- FILE,
+ CONNECT_TO_PEER,
+ PEER_MSG,
+ PEER_FILE,
};
typedef uint32_t MESSAGE_LENGTH_T;
@@ -56,24 +53,15 @@ inline std::string mt_to_string(const MESSAGE_TYPE& t)
case MESSAGE_TYPE::READY_FOR_P2P:
return "READY_FOR_P2P: Connection is ready for P2P";
- case MESSAGE_TYPE::MSG:
+ case MESSAGE_TYPE::PEER_MSG:
return "MSG: Plain Text Msg";
- case MESSAGE_TYPE::FILE:
+ case MESSAGE_TYPE::PEER_FILE:
return "FILE: Incoming File";
- case MESSAGE_TYPE::CONNECT_PEER:
+ case MESSAGE_TYPE::CONNECT_TO_PEER:
return "CONNECT_PEER: Data required to connect to a peer";
- case MESSAGE_TYPE::CONNECT_PEER_AS_LEADER:
- return "CONNECT_PEER_AS_LEADER: your client was marked as first in the pair grouping";
-
- case MESSAGE_TYPE::AUTH_PLS:
- return "AUTH_PLS: Request for Auth";
-
- case MESSAGE_TYPE::HERES_YOUR_AUTH:
- return "HERES_YOUR_AUTH: Auth Request Response";
-
case MESSAGE_TYPE::NONE:
default:
return "NONE: None";
diff --git a/src/name_data.h b/src/name_data.h
index 4294566..d4e22f8 100644
--- a/src/name_data.h
+++ b/src/name_data.h
@@ -15,14 +15,16 @@
struct raw_name_data
{
raw_name_data() = default;
- raw_name_data(sockaddr addr) : name(addr), name_len(sizeof(addr)) {}
- raw_name_data(sockaddr addr, socklen_t len) : name(addr), name_len(len) {}
- raw_name_data(sockaddr_in addr) : name(*(sockaddr*)&addr), name_len(sizeof(addr)) {}
+ raw_name_data(sockaddr addr) : name(addr), name_len(sizeof(addr)), initialized(true) {};
+ raw_name_data(sockaddr addr, socklen_t len) : name(addr), name_len(len), initialized(true) {};
+ raw_name_data(sockaddr_in addr) : name(*(sockaddr*)&addr), name_len(sizeof(addr)), initialized(true) {};
sockaddr name;
socklen_t name_len;
-
+
+ bool initialized = false;
sockaddr_in& ipv4_addr() { return *(sockaddr_in*)&name; }
+ const sockaddr_in& ipv4_addr() const { return *(sockaddr_in*)&name; }
inline bool operator==(const raw_name_data& other) const
{
diff --git a/src/platform.cpp b/src/platform.cpp
index 2b1d01f..3141247 100644
--- a/src/platform.cpp
+++ b/src/platform.cpp
@@ -1,20 +1,31 @@
#include "platform.h"
#include "error.h"
+#include "loop.h"
#if defined(__linux__)
#include
#endif
#include
+#include
-std::string Platform::get_identifier_str() const {
- if (_endpoint_assigned == false)
- return std::string("(priv: ") + _address + ":" + _port + ", pub: N/A)";
+std::string Platform::get_identifier_str() {
+ std::string name_str = "Unnamed";
+ if (_socket.get_name().size())
+ name_str = _socket.get_name();
+ if (_socket.is_tcp() && _socket.is_listen())
+ return std::string("(") + name_str + " is a listen on: " + _address + ":" + _port + ")";
+
+ if (_endpoint_assigned == false)
+ try_update_endpoint_info();
+
+ if (_endpoint_assigned == false)
+ return std::string("(") + name_str + " priv: " + _address + ":" + _port + ", pub : N / A)";
- return std::string("(pub: ") + _address + ":" + _port + ")";
+ return std::string("(") + name_str + " pub: " + _endpoint_address + ":" + _endpoint_port + ")";
}
-readable_ip_info convert_to_readable(raw_name_data data)
+readable_ip_info convert_to_readable(const raw_name_data& data)
{
std::vector buf{ 50, '0', std::allocator() };
const char* str = inet_ntop(AF_INET, &data.ipv4_addr().sin_addr, buf.data(), buf.size());
@@ -39,3 +50,262 @@ Platform::~Platform()
else
std::cout << "Closing Dead socket" << std::endl;
}
+
+void UDPAcceptedConnector::throw_if_no_listener() const
+{
+ if (!_listen)
+ throw_new_exception("Accepted UDP Connector has no valid listener", LINE_CONTEXT);
+}
+
+UDPAcceptedConnector::UDPAcceptedConnector(UDPListener* listen, raw_name_data endpoint) : _listen(listen), _my_endpoint(endpoint)
+{
+}
+
+UDPAcceptedConnector::~UDPAcceptedConnector()
+{
+ if (_listen)
+ _listen->remove_connector(_my_endpoint, this);
+}
+
+Message UDPAcceptedConnector::receive_message()
+{
+ throw_if_no_listener();
+
+ return _listen->receive_message(_my_endpoint);
+}
+
+bool UDPAcceptedConnector::has_message()
+{
+ throw_if_no_listener();
+
+ return _listen->has_message(_my_endpoint);
+}
+
+bool UDPAcceptedConnector::send_data(const Message& message)
+{
+ throw_if_no_listener();
+
+ return _listen->send_data(message, _my_endpoint);
+}
+
+readable_ip_info UDPAcceptedConnector::get_peer_data() const
+{
+ return convert_to_readable(_my_endpoint);
+}
+
+raw_name_data UDPAcceptedConnector::get_peername_raw() const
+{
+ return _my_endpoint;
+}
+
+raw_name_data UDPAcceptedConnector::get_myname_raw() const
+{
+ throw_if_no_listener();
+
+ return _listen->my_data();
+}
+
+readable_ip_info UDPAcceptedConnector::get_peername_readable() const
+{
+ return convert_to_readable(get_peername_raw());
+}
+
+readable_ip_info UDPAcceptedConnector::get_myname_readable() const
+{
+ return convert_to_readable(get_myname_raw());
+}
+
+std::string UDPAcceptedConnector::get_my_ip() const
+{
+ auto readable = get_myname_readable();
+
+ return readable.ip_address;
+}
+
+std::string UDPAcceptedConnector::get_my_port() const
+{
+ auto readable = get_myname_readable();
+
+ return readable.port;
+}
+
+std::string UDPAcceptedConnector::get_endpoint_ip() const
+{
+ auto readable = get_peername_readable();
+
+ return readable.ip_address;
+}
+
+std::string UDPAcceptedConnector::get_endpoint_port() const
+{
+ auto readable = get_peername_readable();
+
+ return readable.port;
+}
+
+std::string UDPAcceptedConnector::get_identifier_str()
+{
+ return "(UDPAcceptedConnector on pub: " + get_peername_readable().to_string() + ")";
+}
+
+const std::string& UDPAcceptedConnector::get_name() const
+{
+ throw_if_no_listener();
+
+ return "UDPAccepterConnector of: " + _listen->get_name();
+}
+
+void UDPAcceptedConnector::set_name(std::string name)
+{
+}
+
+void process_into_messages(std::queue& message_queue, const std::vector& data)
+{
+ auto& recv_data = data;
+ if (recv_data.size() > 0)
+ {
+ std::cout << "Received (UDP) " << recv_data.size() << " bytes" << std::endl;
+
+ int data_read = 0;
+
+ while ((recv_data.size() - data_read) > 0)
+ {
+ MESSAGE_TYPE type;
+ MESSAGE_LENGTH_T length;
+ std::vector data;
+
+ if (!try_read_data(recv_data.data(), data_read, recv_data.size(), type))
+ {
+ return;
+ }
+ if (!try_read_data(recv_data.data(), data_read, recv_data.size(), length))
+ {
+ return;
+ }
+ if (data_read + length > recv_data.size())
+ {
+ return;
+ }
+ data = std::vector(recv_data.data() + data_read, recv_data.data() + data_read + length);
+ data_read += length;
+ auto new_message = Message{ type, length, std::move(data) };
+ message_queue.push(new_message);
+ }
+ }
+}
+
+void UDPListener::process_data()
+{
+ while (_socket.has_message())
+ {
+ auto received = _socket.receive_udp_bytes();
+
+ if (_messages.find(received.endpoint) == _messages.end())
+ {
+ _messages[received.endpoint] = std::queue();
+ process_into_messages(_messages[received.endpoint], received.bytes);
+ }
+
+ if (_connectors.find(received.endpoint) == _connectors.end())
+ {
+ if (std::find(_new_connections.begin(), _new_connections.end(), received.endpoint) == _new_connections.end())
+ _new_connections.push_back(received.endpoint);
+ }
+ }
+}
+
+void UDPListener::remove_connector(raw_name_data endpoint, UDPAcceptedConnector* conn)
+{
+ if (_connectors.find(endpoint) != _connectors.end())
+ {
+ if (_connectors[endpoint] != conn)
+ {
+ std::cerr << "Incorrect value for key given from destroying UDPAcceptedConnector" << std::endl;
+ return;
+ }
+ _connectors.erase(endpoint);
+ }
+}
+
+bool UDPListener::send_data(const Message& message, raw_name_data to)
+{
+ return _socket.send_udp_bytes(udp_bytes{ message.to_bytes(), to });
+}
+
+bool UDPListener::has_message(raw_name_data from)
+{
+ process_data();
+
+ if (_messages.find(from) == _messages.end())
+ return false;
+ return _messages[from].size();
+}
+
+Message UDPListener::receive_message(raw_name_data from)
+{
+ process_data();
+
+ if (_messages.find(from) == _messages.end())
+ throw_new_exception("Failure receiving message for specific UDP endpoint: " + convert_to_readable(from).to_string() + " could not find any messages for endpoint", LINE_CONTEXT);
+
+ auto& q = _messages[from];
+ auto tmp = q.front();
+ q.pop();
+ return tmp;
+}
+
+raw_name_data UDPListener::my_data()
+{
+ return _socket.get_name_raw();
+}
+
+PtopSocket construct_udp_listener(std::string port, Protocol proto, std::string name)
+{
+ if (!proto.is_udp())
+ throw_new_exception("UDP Listener must only be used with UDP", LINE_CONTEXT);
+
+ auto listen_socket = PtopSocket(proto, name);
+
+ struct sockaddr_in serv_addr;
+
+ int portno = atoi(port.c_str());
+#ifdef WIN32
+ ZeroMemory(&serv_addr, sizeof(serv_addr));
+#elif __linux__
+ bzero((char*)&serv_addr, sizeof(serv_addr));
+#endif
+ serv_addr.sin_family = AF_INET;
+ serv_addr.sin_addr.s_addr = INADDR_ANY;
+ serv_addr.sin_port = htons(portno);
+
+ std::cout << "[UDPListen] Binding UDP Listener (" << name << ") to port " << port << std::endl;
+ listen_socket.bind_socket(raw_name_data{ *(sockaddr*)&serv_addr, sizeof(serv_addr) });
+
+ return listen_socket;
+}
+
+UDPListener::UDPListener(std::string port, Protocol proto, std::string name) : Platform(construct_udp_listener(port, proto, name))
+{
+}
+
+bool UDPListener::has_connection()
+{
+ process_data();
+
+ return _new_connections.size();
+}
+
+std::unique_ptr UDPListener::accept_connection()
+{
+ if (!has_connection())
+ return nullptr;
+
+ const auto& new_conn_endpoint = _new_connections.back();
+
+ auto new_conn = std::unique_ptr(new UDPAcceptedConnector(this, new_conn_endpoint));
+
+ _connectors[new_conn_endpoint] = new_conn.get();
+ _new_connections.pop_back();
+
+ return new_conn;
+}
diff --git a/src/platform.h b/src/platform.h
index 0504fc6..23ceb29 100644
--- a/src/platform.h
+++ b/src/platform.h
@@ -2,20 +2,18 @@
#include
#include
+#include
#include "ptop_socket.h"
#include "interfaces.h"
-#if defined(__linux__)
-const int SOCKET_ERROR = -1;
-#endif
#ifdef WIN32
#pragma warning(push)
#pragma warning(disable : 4250)
#endif
-readable_ip_info convert_to_readable(raw_name_data);
+readable_ip_info convert_to_readable(const raw_name_data&);
class Platform : public virtual ISocketWrapper {
protected:
@@ -45,7 +43,10 @@ class Platform : public virtual ISocketWrapper {
inline std::string get_my_port() const override { return _port; }
inline std::string get_endpoint_ip() const override { return _endpoint_address; }
inline std::string get_endpoint_port() const override { return _endpoint_port; }
- std::string get_identifier_str() const override;
+ std::string get_identifier_str() override;
+
+ inline const std::string& get_name() const override { return _socket.get_name(); }
+ inline void set_name(std::string name) override { return _socket.set_name(name); }
inline PtopSocket&& release_socket() { return std::move(_socket); }
virtual ~Platform();
@@ -53,7 +54,7 @@ class Platform : public virtual ISocketWrapper {
class PlatformListener : public Platform, public virtual IListenSocketWrapper {
public:
- PlatformListener(std::string port, protocol input_protocol);
+ PlatformListener(std::string port, Protocol input_protocol, std::string name);
void listen() override;
bool has_connection() override;
std::unique_ptr accept_connection() override;
@@ -66,7 +67,7 @@ class PlatformAnalyser : public Platform, public virtual IDataSocketWrapper {
public:
PlatformAnalyser(std::unique_ptr&& old);
PlatformAnalyser(PtopSocket&& socket);
- PlatformAnalyser(std::string peer_address, std::string peer_port, protocol input_protocol);
+ PlatformAnalyser(std::string peer_address, std::string peer_port, Protocol input_protocol, std::string name);
Message receive_message() override;
bool has_message() override;
@@ -74,23 +75,113 @@ class PlatformAnalyser : public Platform, public virtual IDataSocketWrapper {
bool send_data(const Message& message) override;
};
-class ReusableListener : public Platform, public virtual INonBlockingListener {
+class NonBlockingListener : public Platform, public virtual INonBlockingListener {
public:
- ReusableListener(raw_name_data data, protocol input_protocol);
+ NonBlockingListener(raw_name_data data, Protocol input_protocol, std::string name);
void listen() override;
bool has_connection() override;
std::unique_ptr accept_connection() override;
};
-class ReusableConnector : public Platform, public virtual INonBlockingConnector {
+class NonBlockingConnector : public Platform, public virtual INonBlockingConnector {
public:
- ReusableConnector(raw_name_data private_binding, std::string ip_address, std::string port, protocol input_protocol);
+ NonBlockingConnector(raw_name_data private_binding, std::string ip_address, std::string port, Protocol input_protocol, std::string);
void connect(std::string ip_address, std::string port) override; // Called in constructor, can be called again if it fails
ConnectionStatus has_connected() override;
};
+// Taken from https://stackoverflow.com/a/19195373
+template
+inline void hash_combine(std::size_t& s, const T& v)
+{
+ std::hash h;
+ s ^= h(v) + 0x9e3779b9 + (s << 6) + (s >> 2);
+}
+
+namespace std
+{
+ template<>
+ struct hash
+ {
+ std::size_t operator()(const raw_name_data& name_data) const
+ {
+ using std::size_t;
+ using std::hash;
+ using std::string;
+
+ auto readable = convert_to_readable(name_data);
+
+ size_t h = hash()(readable.ip_address);
+ hash_combine(h, readable.port);
+ return h;
+ }
+ };
+}
+
+
+class UDPListener;
+
+class UDPAcceptedConnector : public virtual IDataSocketWrapper
+{
+ void throw_if_no_listener() const;
+
+ UDPListener* _listen;
+ raw_name_data _my_endpoint;
+
+ friend class UDPListener;
+
+ UDPAcceptedConnector(UDPListener* listen, raw_name_data endpoint);
+
+public:
+ ~UDPAcceptedConnector();
+
+ Message receive_message() override;
+ bool has_message() override;
+
+ bool send_data(const Message& message) override;
+
+ // Inherited via IDataSocketWrapper
+ virtual readable_ip_info get_peer_data() const override;
+ virtual raw_name_data get_peername_raw() const override;
+ virtual raw_name_data get_myname_raw() const override;
+ virtual readable_ip_info get_peername_readable() const override;
+ virtual readable_ip_info get_myname_readable() const override;
+ virtual std::string get_my_ip() const override;
+ virtual std::string get_my_port() const override;
+ virtual std::string get_endpoint_ip() const override;
+ virtual std::string get_endpoint_port() const override;
+ virtual std::string get_identifier_str() override;
+ virtual const std::string& get_name() const override;
+ virtual void set_name(std::string name) override;
+};
+
+class UDPListener : public Platform, public virtual IListenSocketWrapper
+{
+ std::unordered_map> _messages;
+ std::unordered_map _connectors;
+ std::vector _new_connections;
+
+ friend class UDPAcceptedConnector;
+
+ void process_data();
+
+ void remove_connector(raw_name_data endpoint, UDPAcceptedConnector* conn);
+
+ bool send_data(const Message& message, raw_name_data to);
+ bool has_message(raw_name_data from);
+ Message receive_message(raw_name_data from);
+
+ raw_name_data my_data();
+public:
+ UDPListener(std::string port, Protocol proto, std::string name);
+
+ inline void listen() override {}
+ bool has_connection() override;
+ std::unique_ptr accept_connection() override;
+};
+
#ifdef WIN32
#pragma warning(pop)
#endif
\ No newline at end of file
diff --git a/src/protocol.cpp b/src/protocol.cpp
index 346849a..cfa31e2 100644
--- a/src/protocol.cpp
+++ b/src/protocol.cpp
@@ -1,5 +1,8 @@
#include "protocol.h"
#include "error.h"
+#include "ptop_socket.h"
+#include "socket.h"
+#include "platform.h"
#if defined(WIN32) | defined(_WIN64)
#include
@@ -15,7 +18,7 @@
using namespace std;
-protocol::protocol(string possible_protocol) {
+Protocol::Protocol(string possible_protocol) {
transform(possible_protocol.begin(), possible_protocol.end(), possible_protocol.begin(), ::tolower);
if (possible_protocol == "tcp") {
@@ -35,10 +38,96 @@ protocol::protocol(string possible_protocol) {
}
}
-bool protocol::is_tcp() const {
+bool Protocol::is_tcp() const {
return ai_protocol == IPPROTO_TCP;
}
-bool protocol::is_udp() const {
+bool Protocol::is_udp() const {
return ai_protocol == IPPROTO_UDP;
-}
\ No newline at end of file
+}
+
+bool Protocol::send_bytes(SOCKET handle, raw_name_data endpoint, std::vector bytes) {
+ if (is_tcp())
+ {
+ int result = send(handle, bytes.data(), (int)bytes.size(), 0);
+ if (result == SOCKET_ERROR)
+ return false;
+ return true;
+ }
+ else if (is_udp())
+ {
+ int result = sendto(handle, bytes.data(), (int)bytes.size(), 0, &endpoint.name, endpoint.name_len);
+ if (result == SOCKET_ERROR)
+ return false;
+ return true;
+ }
+ return false;
+}
+
+std::vector Protocol::receive_bytes(SOCKET handle, raw_name_data& expected_endpoint)
+{
+ if (is_tcp())
+ {
+ std::vector data(500, (char)0, std::allocator());
+ int result = ::recv(handle, data.data(), (int)data.size(), 0);
+ if (result == SOCKET_ERROR)
+ {
+ std::cout << "Receiving data failed" << std::endl;
+ return std::vector();
+ }
+ data.resize(result);
+ return data;
+ }
+ if (is_udp())
+ {
+ if (expected_endpoint.initialized == false){
+ expected_endpoint.name_len = sizeof(raw_name_data);
+ }
+ auto addr = sockaddr{};
+ socklen_t addr_len = sizeof(sockaddr_in);
+ std::vector data(500, (char)0, std::allocator());
+ int result = ::recvfrom(handle, data.data(), (int)data.size(), 0, &addr, &addr_len);
+ throw_if_socket_error(result, "Failed to receive UDP bytes with: " + get_last_error(), LINE_CONTEXT);
+
+ raw_name_data incoming{ addr, addr_len };
+ if (incoming != expected_endpoint && expected_endpoint.initialized == true)
+ {
+ auto readable = convert_to_readable(incoming);
+ auto message = "Receiving UDP data from an undesired endpoint (" + readable.ip_address + ":" + readable.port + ")";
+ throw_new_exception(message, LINE_CONTEXT);
+ }
+ if (result == SOCKET_ERROR)
+ {
+ auto message = "Receiving (UDP) data failed: " + get_last_error();
+ throw_new_exception(message, LINE_CONTEXT);
+ }
+ if (expected_endpoint.initialized == false)
+ std::memcpy(&expected_endpoint, &incoming, sizeof(raw_name_data));
+ expected_endpoint.initialized = true;
+ data.resize(result);
+ return data;
+ }
+ throw_new_exception("Invalid protocol", LINE_CONTEXT);
+}
+
+bool Protocol::has_died(SOCKET handle, bool has_message) {
+ if (is_tcp())
+ {
+ if (has_message)
+ {
+ std::vector recv_data{ 100, '0', std::allocator() };
+ int n = recv(handle, recv_data.data(), (int)recv_data.size(), MSG_PEEK);
+ if (n == SOCKET_ERROR)
+ {
+ std::cerr << "[Data] Failed to peek data from linux socket (trying to determine if closed): " << get_last_error() << std::endl;
+ return true;
+ }
+ return n == 0;
+ }
+ return false;
+ }
+ if (is_udp())
+ return false;
+ throw_new_exception("Invalid protocol", LINE_CONTEXT);
+ return true;
+}
diff --git a/src/protocol.h b/src/protocol.h
index f61169b..c606d11 100644
--- a/src/protocol.h
+++ b/src/protocol.h
@@ -1,10 +1,12 @@
#pragma once
+#include "socket.h"
#include
+#include "name_data.h"
-class protocol {
+class Protocol {
public:
- protocol(std::string possible_protocol);
+ Protocol(std::string possible_protocol);
private:
int ai_family;
@@ -17,7 +19,11 @@ class protocol {
inline int get_ai_socktype() const { return ai_socktype; }
inline int get_ai_protocol() const { return ai_protocol; }
inline int get_ai_flags() const { return ai_flags; }
-
+
bool is_tcp() const;
bool is_udp() const;
+
+ bool send_bytes(SOCKET handle, raw_name_data endpoint, std::vector bytes);
+ std::vector receive_bytes(SOCKET handle, raw_name_data& expected_endpoint);
+ bool has_died(SOCKET handle, bool has_message);
};
\ No newline at end of file
diff --git a/src/ptop_socket.cpp b/src/ptop_socket.cpp
index 04757e6..70e3421 100644
--- a/src/ptop_socket.cpp
+++ b/src/ptop_socket.cpp
@@ -23,9 +23,11 @@
#endif
#include
+#include
+using namespace std::chrono;
-PtopSocket::PtopSocket(protocol proto) : _protocol(proto)
+PtopSocket::PtopSocket(Protocol proto, std::string name) : _protocol(proto), _name(std::move(name))
{
int domain = _protocol.get_ai_family();
int type = _protocol.get_ai_socktype();
@@ -36,7 +38,7 @@ PtopSocket::PtopSocket(protocol proto) : _protocol(proto)
PtopSocket& PtopSocket::bind_socket(const raw_name_data& name, std::string error_message)
{
int result = bind(_handle, &name.name, name.name_len);
- throw_if_socket_error(result, error_message, LINE_CONTEXT);
+ throw_if_socket_error(result, error_message + " " + get_last_error(), LINE_CONTEXT);
return *this;
}
@@ -44,11 +46,15 @@ PtopSocket& PtopSocket::connect(sockaddr* addr, socklen_t len)
{
if (_protocol.is_tcp())
{
+ // cheeky little hack
+ set_socket_option(SO_KEEPALIVE, (int)1);
int n = ::connect(_handle, addr, len);
throw_if_socket_error(n, "Failed to connect " + get_last_error(), LINE_CONTEXT);
}
else if (_protocol.is_udp())
{
+ int n = ::connect(_handle, addr, len);
+ throw_if_socket_error(n, "Failed to connect " + get_last_error(), LINE_CONTEXT);
_endpoint = raw_name_data(*addr, len);
}
return *this;
@@ -76,7 +82,9 @@ PtopSocket PtopSocket::accept_data_socket()
{
throw_new_exception("Failed to accept incoming connection: " + get_last_error(), LINE_CONTEXT);
}
- return PtopSocket(new_socket, _protocol, raw_name_data(client_addr));
+ auto new_sock = PtopSocket(new_socket, _protocol, raw_name_data(client_addr));
+ new_sock.set_socket_option(SO_KEEPALIVE, (int)1);
+ return new_sock;
}
catch(std::exception& e) {
@@ -92,14 +100,19 @@ bool PtopSocket::try_connect(sockaddr* addr, socklen_t len)
void PtopSocket::listen(int max_conns)
{
- auto n = ::listen(_handle, max_conns);
- throw_if_socket_error(n, "Failed to listen on socket. " + get_last_error(), LINE_CONTEXT);
+ if (is_tcp())
+ {
+ auto n = ::listen(_handle, max_conns);
+ throw_if_socket_error(n, "Failed to listen on socket. " + get_last_error(), LINE_CONTEXT);
+ }
}
bool PtopSocket::has_connection() const
{
try
{
+ if (is_udp())
+ return false;
struct timeval timeout;
timeout.tv_sec = 0;
timeout.tv_usec = 0;
@@ -175,41 +188,12 @@ bool PtopSocket::select_for(::select_for epic_for) const
bool PtopSocket::has_message() const
{
- struct timeval timeout;
- timeout.tv_sec = 0;
- timeout.tv_usec = 0;
-
- fd_set poll_read_set;
- FD_ZERO(&poll_read_set);
- FD_SET(_handle, &poll_read_set);
-
- int n = select((int)_handle + 1, &poll_read_set, 0, 0, &timeout);
- throw_if_socket_error(n, "Failed to poll linux socket readability " + get_last_error(), LINE_CONTEXT);
-
- return n > 0;
+ return select_for(select_for::READ);
}
-bool PtopSocket::has_died() const
+bool PtopSocket::has_died()
{
- if (is_tcp())
- {
- if (has_message())
- {
- std::vector recv_data{ 100, '0', std::allocator() };
- int n = recv(_handle, recv_data.data(), (int)recv_data.size(), MSG_PEEK);
- if (n == SOCKET_ERROR)
- {
- std::cerr << "[Data] Failed to peek data from linux socket (trying to determine if closed): " << get_last_error() << std::endl;
- return true;
- }
- return n == 0;
- }
- return false;
- }
- if (is_udp())
- return false;
- throw_new_exception("Invalid protocol", LINE_CONTEXT);
- return true;
+ return _protocol.has_died(_handle, has_message());
}
raw_name_data PtopSocket::get_peer_raw() const
@@ -240,62 +224,24 @@ raw_name_data PtopSocket::get_name_raw() const
bool PtopSocket::send_bytes(std::vector bytes)
{
- if (is_tcp())
- {
- int result = send(_handle, bytes.data(), (int)bytes.size(), 0);
- if (result == SOCKET_ERROR)
- return false;
- return true;
- }
- else if (is_udp())
- {
- int result = sendto(_handle, bytes.data(), (int)bytes.size(), 0, &_endpoint.name, _endpoint.name_len);
- if (result == SOCKET_ERROR)
- return false;
- return true;
- }
- else
- {
- throw_new_exception("Can not send data with an invalid protocol", LINE_CONTEXT);
- }
+ return _protocol.send_bytes(_handle, _endpoint, bytes);
}
-std::vector PtopSocket::recv_bytes()
+std::vector PtopSocket::receive_bytes()
{
- if (is_tcp())
- {
- std::vector data(500, (char)0, std::allocator());
- int result = ::recv(_handle, data.data(), (int)data.size(), 0);
- if (result == SOCKET_ERROR)
- {
- std::cout << "Receiving data failed" << std::endl;
- return std::vector();
- }
- data.resize(result);
- return data;
- }
- if (is_udp())
- {
- while (true)
- {
- sockaddr addr;
- socklen_t addr_len;
- std::vector data(500, (char)0, std::allocator());
- int result = ::recvfrom(_handle, data.data(), (int)data.size(), 0, &addr, &addr_len);
- raw_name_data incoming{ addr, addr_len };
-
- if (incoming != _endpoint)
- {
- auto readable = convert_to_readable(incoming);
- std::cout << "Receiving UDP data from an undesired endpoint (" << readable << ")" << std::endl;
- continue;
- }
- if (result == SOCKET_ERROR)
- {
- std::cerr << "Receiving (UDP) data failed: " << socket_error_to_string(result) << std::endl;
- return std::vector();
- }
- }
- }
- throw_new_exception("Invalid protocol", LINE_CONTEXT);
+ return _protocol.receive_bytes(_handle, _endpoint);
}
+
+bool PtopSocket::send_udp_bytes(udp_bytes bytes)
+{
+ return _protocol.send_bytes(_handle, bytes.endpoint, bytes.bytes);
+}
+
+udp_bytes PtopSocket::receive_udp_bytes(){
+ raw_name_data new_name;
+ auto result = _protocol.receive_bytes(_handle, new_name);
+ udp_bytes output = udp_bytes{
+ result, new_name
+ };
+ return output;
+}
\ No newline at end of file
diff --git a/src/ptop_socket.h b/src/ptop_socket.h
index 155a9aa..9a8f0fb 100644
--- a/src/ptop_socket.h
+++ b/src/ptop_socket.h
@@ -1,16 +1,10 @@
#pragma once
+#include "socket.h"
#include "name_data.h"
#include "protocol.h"
#include "error.h"
-
-#ifdef WIN32
-#include
-
-#elif defined(__linux__)
-#include
-using SOCKET = int;
-#endif
+#include "message.h"
#include
@@ -20,31 +14,52 @@ void throw_if_socket_error(int n, std::string message, std::string line_context)
std::string socket_error_to_string(int err);
std::string get_last_error();
-enum select_for
+enum class select_for
{
READ,
WRITE,
EXCEPT
};
+// UDP CRAP
+
+struct udp_bytes
+{
+ std::vector bytes;
+ raw_name_data endpoint;
+};
+
+// END UDP CRAP
+
class PtopSocket
{
private:
SOCKET _handle;
- protocol _protocol;
+ Protocol _protocol;
raw_name_data _endpoint;
- PtopSocket(SOCKET handle, protocol proto) : _handle(handle), _protocol(proto), _endpoint() {}
- PtopSocket(SOCKET handle, protocol proto, raw_name_data endpoint) : _handle(handle), _protocol(proto), _endpoint(endpoint) {}
+ std::string _name;
+ PtopSocket(SOCKET handle, Protocol proto, std::string name = "") : _handle(handle), _protocol(proto), _endpoint(), _name(name) {}
+ PtopSocket(SOCKET handle, Protocol proto, raw_name_data endpoint, std::string name = "") : _handle(handle), _protocol(proto), _endpoint(endpoint), _name(name) {}
public:
- explicit PtopSocket(protocol proto);
+ explicit PtopSocket(Protocol proto, std::string name = "");
- PtopSocket(PtopSocket&& other) : _handle(other._handle), _protocol(other._protocol) {
+ PtopSocket(PtopSocket&& other) : _handle(other._handle), _protocol(other._protocol), _endpoint(other._endpoint), _name(std::move(other._name)) {
other._handle = REALLY_INVALID_SOCKET;
};
~PtopSocket();
+ inline void set_name(std::string name)
+ {
+ _name = std::move(name);
+ }
+
+ inline const std::string& get_name() const
+ {
+ return _name;
+ }
+
template
PtopSocket& set_socket_option(int option_name, OptT optionVal, std::string error_message)
{
@@ -60,7 +75,7 @@ class PtopSocket
}
template
- OptT get_socket_option(int option_name)
+ OptT get_socket_option(int option_name) const
{
OptT opt;
socklen_t optSize = sizeof(OptT);
@@ -69,10 +84,10 @@ class PtopSocket
return opt;
}
-#ifdef __linux__
+#ifdef SO_REUSEPORT
inline PtopSocket& set_socket_reuse() { set_socket_option(SO_REUSEPORT, (int)1, "Failed to set socket (port) reusability"); return set_socket_option(SO_REUSEADDR, (int)1, "Failed to set socket reusability"); }
inline PtopSocket& set_socket_no_reuse() { set_socket_option(SO_REUSEPORT, (int)0, "Failed to set socket (port) un-reusability"); return set_socket_option(SO_REUSEADDR, (int)0, "Failed to set socket un-reusability"); }
-#elif defined(WIN32)
+#else
inline PtopSocket& set_socket_reuse() { return set_socket_option(SO_REUSEADDR, (int)1, "Failed to set socket reusability"); }
inline PtopSocket& set_socket_no_reuse() { return set_socket_option(SO_REUSEADDR, (int)0, "Failed to set socket un-reusability"); }
#endif
@@ -95,7 +110,7 @@ class PtopSocket
bool select_for(select_for s_for) const;
bool has_message() const;
- bool has_died() const;
+ bool has_died();
raw_name_data get_peer_raw() const;
raw_name_data get_name_raw() const;
@@ -104,10 +119,14 @@ class PtopSocket
inline bool is_valid() const { return _handle != REALLY_INVALID_SOCKET; }
inline bool is_tcp() const { return _protocol.is_tcp(); }
inline bool is_udp() const { return _protocol.is_udp(); }
+ inline bool is_listen() const { return get_socket_option(SO_ACCEPTCONN); }
inline SOCKET get_handle() const { return _handle; }
- inline const protocol& get_protocol() const { return _protocol; }
+ inline const Protocol& get_protocol() const { return _protocol; }
bool send_bytes(std::vector bytes);
- std::vector recv_bytes();
+ std::vector receive_bytes();
+
+ bool send_udp_bytes(udp_bytes bytes);
+ udp_bytes receive_udp_bytes();
};
\ No newline at end of file
diff --git a/src/server.cpp b/src/server.cpp
index 0667816..b5d12f9 100644
--- a/src/server.cpp
+++ b/src/server.cpp
@@ -13,12 +13,15 @@
using namespace std::chrono;
-server_init_kit::server_init_kit(protocol ip_proto) : proto(ip_proto) {
+server_init_kit::server_init_kit(Protocol ip_proto) : proto(ip_proto) {
clientA = nullptr;
clientB = nullptr;
cA = nullptr;
cB = nullptr;
- server_socket = std::make_unique(ServerListenPort, ip_proto);
+ if (ip_proto.is_udp())
+ server_socket = std::make_unique(ServerListenPort, ip_proto, "Server-Listener");
+ else
+ server_socket = std::make_unique(ServerListenPort, ip_proto, "Server-Listener");
server_socket->listen();
recv_data = std::vector();
//dont need to initialize structs. it will default its params by itself
@@ -36,8 +39,8 @@ void hole_punch_clients(IDataSocketWrapper*& clientA, IDataSocketWrapper*& clien
std::cout << "Hole punching clients: A(" << dataA.ip_address << ":" << dataA.port << "), B(" << dataB.ip_address << ":" << dataB.port << ")" << std::endl;
- clientA->send_data(create_message(MESSAGE_TYPE::CONNECT_PEER_AS_LEADER, dataB.to_bytes(), privB.to_bytes(), 69));
- clientB->send_data(create_message(MESSAGE_TYPE::CONNECT_PEER, dataA.to_bytes(), privA.to_bytes(), 69));
+ clientA->send_data(create_message(MESSAGE_TYPE::CONNECT_TO_PEER, dataB.to_bytes(), privB.to_bytes()));
+ clientB->send_data(create_message(MESSAGE_TYPE::CONNECT_TO_PEER, dataA.to_bytes(), privA.to_bytes()));
clientA = nullptr;
clientB = nullptr;
@@ -221,8 +224,8 @@ void server_loop()
std::thread user_input_thread{ input_thread_func, std::ref(user_input_queue) };
user_input_thread.detach();
- server_init_kit init_tcp{ protocol{"tcp"} };
- server_init_kit init_udp{ protocol{"udp"} };
+ server_init_kit init_tcp{ Protocol{"tcp"} };
+ server_init_kit init_udp{ Protocol{"udp"} };
std::cout << "Server is ready..." << std::endl;
while (true)
diff --git a/src/server.h b/src/server.h
index 81d6b48..2ad80d5 100644
--- a/src/server.h
+++ b/src/server.h
@@ -12,6 +12,8 @@
#include
#include
+
+
class server_init_kit;
void server_loop();
@@ -33,9 +35,9 @@ class server_init_kit {
std::vector recv_data;
EXECUTION_STATUS status;
- protocol proto;
+ Protocol proto;
- server_init_kit(protocol proto);
+ server_init_kit(Protocol proto);
server_init_kit(server_init_kit&& other);
server_init_kit& operator=(server_init_kit&& other);
diff --git a/src/socket.h b/src/socket.h
new file mode 100644
index 0000000..44d4ce2
--- /dev/null
+++ b/src/socket.h
@@ -0,0 +1,11 @@
+#pragma once
+
+#ifdef WIN32
+#include
+
+#elif defined(__linux__)
+#include
+using SOCKET = int;
+const int SOCKET_ERROR = -1;
+#endif
+
diff --git a/src/windows_platform.cpp b/src/windows_platform.cpp
index 0b2d484..4460a55 100644
--- a/src/windows_platform.cpp
+++ b/src/windows_platform.cpp
@@ -16,9 +16,13 @@ Platform::Platform(PtopSocket&& socket)
: _socket(std::move(socket))
{
try_update_name_info();
- try_update_endpoint_info();
// big chungus
+ if(_socket.is_udp()) {
+ std::cout << "Platform constructed for UDP" << std::endl;
+ return;
+ }
+
if (_address == "Unassigned" || _address.empty() ||
_port == "Unassigned" || _port.empty()) {
auto message = "failed to update name info";
@@ -102,6 +106,11 @@ void Platform::update_endpoint_info()
{
try
{
+ if (_socket.is_tcp() && _socket.is_listen())
+ {
+ std::cout << "[Socket] Not updating endpoint as this socket " << get_identifier_str() << " is a listen socket" << std::endl;
+ return;
+ }
auto name = get_peername_readable();
_endpoint_address = name.ip_address;
_endpoint_port = name.port;
@@ -137,7 +146,7 @@ readable_ip_info Platform::get_peer_data() const
std::vector buf{ 50, '0', std::allocator() };
const char* str = inet_ntop(AF_INET, &peer_name.sin_addr, buf.data(), buf.size());
if (!str)
- throw std::runtime_error(std::string("Failed to convert sockaddr to string: ") + get_last_error());
+ throw std::runtime_error(std::string("Socket '") + get_name() + "' Failed to convert sockaddr to string : " + get_last_error());
std::string address = str;
@@ -154,7 +163,7 @@ raw_name_data Platform::get_peername_raw() const
socklen_t peer_size = sizeof(peer_name);
int n = getpeername(_socket.get_handle(), (sockaddr*)&peer_name, &peer_size);
if (n != 0) {
- auto error = std::string("[Socket] Failed to getpeername: ") + get_last_error();
+ auto error = std::string("[Socket] (" + get_name() + ") Failed to getpeername : ") + get_last_error();
throw_new_exception(error, LINE_CONTEXT);
}
@@ -171,7 +180,7 @@ raw_name_data Platform::get_myname_raw() const
int n = getsockname(_socket.get_handle(), (sockaddr*)&peer_name, &peer_size);
if (n != 0) {
- auto error = std::string("[Socket] Failed to getsockname: ") + get_last_error();
+ auto error = std::string("[Socket] (" + get_name() + ") Failed to getsockname : ") + get_last_error();
throw_new_exception(error, LINE_CONTEXT);
}
@@ -181,10 +190,10 @@ raw_name_data Platform::get_myname_raw() const
return raw_data;
}
-PtopSocket construct_windowslistensocket(std::string port, protocol input_protocol) {
+PtopSocket construct_windowslistensocket(std::string port, Protocol input_protocol, std::string name) {
try
{
- std::cout << "[Listen] Create new Socket on port (with localhost): " << port << std::endl;
+ std::cout << "[Listen] Creating new Socket on port (with localhost, named: " << name << "): " << port << std::endl;
int iResult;
struct addrinfo* result = NULL, * ptr = NULL, hints;
@@ -197,17 +206,17 @@ PtopSocket construct_windowslistensocket(std::string port, protocol input_protoc
iResult = getaddrinfo(NULL, port.c_str(), &hints, &result);
if (iResult != 0)
{
- throw std::exception((std::string("[Listen] Failed to create windows socket: getaddrinfo failed with") + std::to_string(iResult)).c_str());
+ throw std::exception((std::string("[Listen] (" + name + ") Failed to create windows socket : getaddrinfo failed with") + std::to_string(iResult)).c_str());
}
- PtopSocket conn_socket = PtopSocket(input_protocol);
+ PtopSocket conn_socket = PtopSocket(input_protocol, name);
if (conn_socket.is_invalid())
{
- throw std::exception((std::string("[Listen] Failed to create socket with WSA error: ") + get_last_error()).c_str());
+ throw std::exception((std::string("[Listen] (" + name + ") Failed to create socket with WSA error : ") + get_last_error()).c_str());
}
- std::cout << "Binding..." << std::endl;
+ std::cout << name << " is Binding..." << std::endl;
conn_socket.bind_socket(raw_name_data{ *result->ai_addr, (socklen_t)result->ai_addrlen });
return conn_socket;
@@ -218,13 +227,13 @@ PtopSocket construct_windowslistensocket(std::string port, protocol input_protoc
}
}
-PlatformListener::PlatformListener(std::string port, protocol input_protocol) : Platform(construct_windowslistensocket(port, input_protocol))
+PlatformListener::PlatformListener(std::string port, Protocol input_protocol, std::string name) : Platform(construct_windowslistensocket(port, input_protocol, name))
{
- std::cout << "[Listen] Post Bind Check: Bound to: " << get_my_ip() << ":" << get_my_port() << std::endl;
}
void PlatformListener::listen()
{
+ std::cout << "[Listen] Socket " << get_name() << " now Listening(" << get_my_ip() << ":" << get_my_port() << ")" << std::endl;
_socket.listen(4);
}
@@ -239,8 +248,8 @@ std::unique_ptr PlatformListener::accept_connection() {
return std::make_unique(std::move(tmp));
}
-PtopSocket construct_windows_data_socket(std::string peer_address, std::string peer_port, protocol input_protocol) {
- std::cout << "[Data] Creating a Windows Data Socket connecting to: " << peer_address << ":" << peer_port << std::endl;
+PtopSocket construct_windows_data_socket(std::string peer_address, std::string peer_port, Protocol input_protocol, std::string name) {
+ std::cout << "[Data] Creating a Windows Data Socket (" << name << ") connecting to : " << peer_address << " : " << peer_port << std::endl;
struct addrinfo* result = NULL,
*ptr = NULL,
hints;
@@ -255,21 +264,21 @@ PtopSocket construct_windows_data_socket(std::string peer_address, std::string p
if (iResult != 0) {
std::cerr << "Failed to resolve server: " << iResult << std::endl;
- throw std::exception((std::string("Failed to resolve peer address, error: ") + std::to_string(iResult)).c_str());
+ throw std::exception((std::string("Socket " + name + " Failed to resolve peer address, error: ") + std::to_string(iResult)).c_str());
}
- PtopSocket conn_socket = PtopSocket(input_protocol);
+ PtopSocket conn_socket = PtopSocket(input_protocol, name);
conn_socket.connect(result->ai_addr, (socklen_t)result->ai_addrlen);
return conn_socket;
}
-PlatformAnalyser::PlatformAnalyser(std::string peer_address, std::string peer_port, protocol input_protocol) : Platform(construct_windows_data_socket(peer_address, peer_port, input_protocol))
+PlatformAnalyser::PlatformAnalyser(std::string peer_address, std::string peer_port, Protocol input_protocol, std::string name) : Platform(construct_windows_data_socket(peer_address, peer_port, input_protocol, name))
{
try
{
- update_endpoint_info();
+ try_update_endpoint_info();
}
catch (const std::exception& e)
{
@@ -291,17 +300,18 @@ bool PlatformAnalyser::send_data(const Message& message)
PtopSocket windows_data_socket_steal_construct(std::unique_ptr&& old)
{
- std::cout << "[Data] Moving linux_reusable_nonblocking_connection_socket " << old->get_identifier_str() << " to a data_socket" << std::endl;
- ReusableConnector& real_old = *dynamic_cast(old.get());
+ std::cout << "[Data] Moving INonBlockingConnector " << old->get_identifier_str() << " to a PlatformAnalyzer" << std::endl;
+ NonBlockingConnector& real_old = *dynamic_cast(old.get());
PtopSocket epic = real_old.release_socket();
epic.set_non_blocking(false);
+ epic.set_socket_no_reuse();
return epic;
}
void PlatformAnalyser::process_socket_data()
{
std::cout << "[Data] Trying to receive new data from Socket: " << Platform::get_identifier_str() << std::endl;
- std::vector recv_data = _socket.recv_bytes();
+ std::vector recv_data = _socket.receive_bytes();
if (recv_data.size() > 0)
{
std::cout << "Received " << recv_data.size() << " bytes" << std::endl;
@@ -378,7 +388,7 @@ PlatformAnalyser::PlatformAnalyser(PtopSocket&& socket)
}
Message PlatformAnalyser::receive_message() {
- process_socket_data();
+ process_socket_data(); // Will block until it receives data
if (_stored_messages.size() > 0)
{
@@ -387,6 +397,7 @@ Message PlatformAnalyser::receive_message() {
return tmp;
}
+ // Only way we have no messages is if connection closed
return Message::null_message;
}
@@ -395,16 +406,16 @@ bool PlatformAnalyser::has_message()
return _socket.has_message();
}
-PtopSocket windows_reuse_nb_listen_construct(raw_name_data data, protocol proto)
+PtopSocket windows_reuse_nb_listen_construct(raw_name_data data, Protocol proto, std::string name)
{
try
{
auto readable = convert_to_readable(data);
- std::cout << "[ListenReuseNoB] Creating Reusable Listen Socket on: " << readable.ip_address << ":" << readable.port << std::endl;
+ std::cout << "[ListenReuseNoB] Creating Reusable Listen Socket '" << name << "' on: " << readable.ip_address << ":" << readable.port << std::endl;
- PtopSocket listen_socket = PtopSocket(proto);
+ PtopSocket listen_socket = PtopSocket(proto, name);
if (listen_socket.is_invalid())
- throw_new_exception("[ListenReuseNoB] " + readable.ip_address + ":" + readable.port + " Failed to create reusable nonblocking listen socket: " + get_last_error(), LINE_CONTEXT);
+ throw_new_exception("[ListenReuseNoB] (" + name + ") " + readable.ip_address + ":" + readable.port + " Failed to create reusable nonblocking listen socket: " + get_last_error(), LINE_CONTEXT);
listen_socket.set_non_blocking(true);
listen_socket.set_socket_reuse();
@@ -419,43 +430,45 @@ PtopSocket windows_reuse_nb_listen_construct(raw_name_data data, protocol proto)
}
}
-ReusableListener::ReusableListener(raw_name_data data, protocol input_protocol) : Platform(
- windows_reuse_nb_listen_construct(data, input_protocol))
+NonBlockingListener::NonBlockingListener(raw_name_data data, Protocol input_protocol, std::string name)
+ : Platform(
+ windows_reuse_nb_listen_construct(data, input_protocol, name)
+ )
{}
-void ReusableListener::listen()
+void NonBlockingListener::listen()
{
- std::cout << "[ListenReuseNoB] Now Listening on: " << Platform::get_my_ip() << ":" << Platform::get_my_port() << std::endl;
+ std::cout << "[ListenReuseNoB] " + get_identifier_str() + " Now Listening on port: " << get_my_port() << std::endl;
_socket.listen(4);
}
-bool ReusableListener::has_connection()
+bool NonBlockingListener::has_connection()
{
return _socket.has_connection();
}
-std::unique_ptr ReusableListener::accept_connection()
+std::unique_ptr NonBlockingListener::accept_connection()
{
- std::cout << "[ListenReuseNoB] Accepting Connection..." << std::endl;
+ std::cout << "[ListenReuseNoB] " + get_identifier_str() + " Accepting Connection..." << std::endl;
auto new_sock = _socket.accept_data_socket();
return std::make_unique(std::move(new_sock));
}
-PtopSocket windows_reuse_nb_construct(raw_name_data name, protocol proto)
+PtopSocket windows_reuse_nb_construct(raw_name_data data, Protocol proto, std::string name)
{
try {
- auto readable = convert_to_readable(name);
- std::cout << "[DataReuseNoB] Creating Connection socket bound to: " << readable.ip_address << ":" << readable.port << std::endl;
- auto conn_socket = PtopSocket(proto);
+ auto readable = convert_to_readable(data);
+ std::cout << "[DataReuseNoB] Creating Connection socket '" << name << "' bound to : " << readable.ip_address << ":" << readable.port << std::endl;
+ auto conn_socket = PtopSocket(proto, name);
if (conn_socket.is_invalid())
- throw std::runtime_error(std::string("[DataReuseNoB] Failed to create nonblocking socket: ") + get_last_error());
+ throw_new_exception(std::string("[DataReuseNoB] Failed to create nonblocking socket (" + name + "): ") + get_last_error(), LINE_CONTEXT);
conn_socket.set_non_blocking(true);
conn_socket.set_socket_reuse();
- conn_socket.bind_socket(name, "[DataReuseNoB] Failed to bind");
- std::cout << "[DataReuseNoB] Successfully bound Data socket to: " << readable.ip_address << ":" << readable.port << std::endl;
+ conn_socket.bind_socket(data, "[DataReuseNoB] '" + name + "' Failed to bind");
+ std::cout << "[DataReuseNoB] Successfully bound Data socket (" << name << ") to: " << readable.ip_address << " : " << readable.port << std::endl;
return conn_socket;
}
@@ -465,18 +478,20 @@ PtopSocket windows_reuse_nb_construct(raw_name_data name, protocol proto)
}
}
-ReusableConnector::ReusableConnector(
- raw_name_data name, std::string ip_address, std::string port, protocol input_protocol) : Platform(
- windows_reuse_nb_construct(name, input_protocol))
+NonBlockingConnector::NonBlockingConnector(
+ raw_name_data data, std::string ip_address, std::string port, Protocol input_protocol, std::string name)
+ : Platform(
+ windows_reuse_nb_construct(data, input_protocol, name)
+ )
{
connect(ip_address, port);
}
-void ReusableConnector::connect(std::string ip_address, std::string port)
+void NonBlockingConnector::connect(std::string ip_address, std::string port)
{
try
{
- std::cout << "[DataReuseNoB] Trying to connect to: " << ip_address << ":" << port << std::endl;
+ std::cout << "[DataReuseNoB] (" << get_name() << ") Trying to connect to : " << ip_address << ":" << port << std::endl;
struct addrinfo* results, hints;
ZeroMemory(&hints, sizeof(hints));
hints.ai_family = AF_INET;
@@ -487,13 +502,14 @@ void ReusableConnector::connect(std::string ip_address, std::string port)
iResult = getaddrinfo(ip_address.c_str(), port.c_str(), &hints, &results);
if (iResult != 0)
- throw std::exception((std::string("Failed to resolve peer address, error: ") + std::to_string(iResult)).c_str());
+ throw_new_exception("Socket '" + get_name() + "' Failed to getaddrinfo, error: " + std::to_string(iResult), LINE_CONTEXT);
if (results == nullptr)
- throw std::exception((std::string("Could not resolve '") + ip_address + ":" + port + "'").c_str());
+ throw_new_exception(("No possible sockets found for '") + ip_address + ":" + port + "' (socket '" + get_name() + "')", LINE_CONTEXT);
_socket.connect(results->ai_addr, (socklen_t)results->ai_addrlen);
- std::cout << "[DataReuseNoB] Successfully BEGUN Connection to: " << ip_address << ":" << port << std::endl;
+ std::cout << "[DataReuseNoB] (" << get_name() << ") Successfully BEGUN Connection to : " << ip_address << ":" << port << std::endl;
+ try_update_endpoint_info();
}
catch (const std::exception& e)
{
@@ -501,32 +517,40 @@ void ReusableConnector::connect(std::string ip_address, std::string port)
}
}
-ConnectionStatus ReusableConnector::has_connected()
+ConnectionStatus NonBlockingConnector::has_connected()
{
- try
- {
- if (_socket.is_invalid())
- return ConnectionStatus::FAILED;
+ try
+ {
+ if (_socket.is_invalid())
+ return ConnectionStatus::FAILED;
- if (_socket.poll_for(POLLWRNORM))
- {
- return ConnectionStatus::SUCCESS;
- }
+ if (_socket.select_for(select_for::WRITE))
+ {
+ auto sock_error = _socket.get_socket_option(SO_ERROR);
+ if (sock_error != 0 && sock_error != EAGAIN && sock_error != EINPROGRESS)
+ {
+ std::cerr << "[DataReuseNoB] " << LINE_CONTEXT << " Socket '" << get_name() << "' failed to connect with: " << get_win_error(sock_error) << std::endl;
+ return ConnectionStatus::FAILED;
+ }
+ update_endpoint_if_needed();
+ return ConnectionStatus::SUCCESS;
+ }
- if (!_socket.select_for(select_for::EXCEPT))
- return ConnectionStatus::PENDING;
- auto sock_error = _socket.get_socket_option(SO_ERROR);
+ if (!_socket.select_for(select_for::EXCEPT))
+ return ConnectionStatus::PENDING;
- std::cerr << "Socket has error code: " << sock_error << std::endl;
+ auto sock_error = _socket.get_socket_option(SO_ERROR);
- return ConnectionStatus::FAILED;
- }
- catch (const std::exception& e)
- {
- throw_with_context(e, LINE_CONTEXT);
- }
+ std::cerr << "[DataReuseNoB] " << LINE_CONTEXT << " Socket '" << get_name() << "' failed to connect with: " << get_win_error(sock_error) << std::endl;
+
+ return ConnectionStatus::FAILED;
+ }
+ catch (const std::exception& e)
+ {
+ throw_with_context(e, LINE_CONTEXT);
+ }
}
#endif
\ No newline at end of file