Skip to content

Commit

Permalink
Cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
leemaguire committed Jul 8, 2024
1 parent ccdc22d commit 03425a1
Show file tree
Hide file tree
Showing 5 changed files with 170 additions and 41 deletions.
11 changes: 5 additions & 6 deletions src/cpprealm/internal/network/network_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ namespace realm::networking {
const bool is_ipv4 = is_valid_ipv4(host);
const URLScheme url_scheme = get_url_scheme(uri.get_scheme());

if (port.empty()) {
port = is_localhost ? "9090" : "443";
}

try {
auto resolver = realm::sync::network::Resolver{service};
if (m_proxy_config) {
Expand Down Expand Up @@ -183,12 +187,7 @@ namespace realm::networking {
if (m_proxy_config) {
realm::sync::HTTPRequest req;
req.method = realm::sync::HTTPMethod::Connect;

// if (is_ipv4) {
req.headers.emplace("Host", util::format("%1:%2", host, port));
// } else {
// req.headers.emplace("Host", util::format("%1:%2", host, is_localhost ? "9090" : port));
// }
req.headers.emplace("Host", util::format("%1:%2", host, port));

if (m_proxy_config->username_password) {
auto userpass = util::format("%1:%2", m_proxy_config->username_password->first, m_proxy_config->username_password->second);
Expand Down
21 changes: 3 additions & 18 deletions src/cpprealm/networking/networking.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,28 +63,13 @@ namespace realm::internal::networking {

::realm::sync::WebSocketEndpoint to_core_websocket_endpoint(const ::realm::networking::websocket_endpoint& ep) {
::realm::sync::WebSocketEndpoint core_ep;

auto uri = util::Uri(ep.url);
auto protocol = to_protocol_envelope(uri.get_scheme());

auto uri_path = uri.get_auth();
if (uri_path.find("//") == 0) {
uri_path = uri_path.substr(2);
}

std::string address;
std::string port;
size_t colon_pos = uri_path.find(':');
if (colon_pos != std::string::npos) {
// Extract the address
address = uri_path.substr(0, colon_pos);
// Extract the port
port = uri_path.substr(colon_pos + 1);
} else {
REALM_TERMINATE("Invalid URL");
}
std::string userinfo, host, port;
uri.get_auth(userinfo, host, port);

core_ep.address = address;
core_ep.address = host;
core_ep.port = std::stoi(port);
core_ep.path = uri.get_path() + uri.get_query();
core_ep.protocols = ep.protocols;
Expand Down
97 changes: 92 additions & 5 deletions tests/sync/networking_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,29 +11,43 @@ TEST_CASE("sends plaintext data to proxy", "[proxy]") {
cfg.server_uses_ssl = false; // Set to true if using services.cloud.mongodb.com
tests::utils::proxy_server server(std::move(cfg));

std::set<tests::utils::proxy_server::event> proxy_events;
server.set_callback([&proxy_events](tests::utils::proxy_server::event e) {
proxy_events.insert(e);
});

proxy_config pc;
pc.port = 1234;
pc.address = "127.0.0.1";
realm::App::configuration config;
config.proxy_configuration = pc;
config.app_id =Admin::Session::shared().cached_app_id();
config.app_id = Admin::Session::shared().cached_app_id();
config.base_url = Admin::Session::shared().base_url();

bool provider_called, http_transport_called = false;

struct foo_socket_provider : public ::realm::networking::default_socket_provider {
foo_socket_provider(bool& provider_called) : m_called(provider_called) { }

std::unique_ptr<::realm::networking::websocket_interface> connect(std::unique_ptr<::realm::networking::websocket_observer> o,
::realm::networking::websocket_endpoint&& ep) override {
const std::string from = "wss:";
const std::string to = "ws:";
if (ep.url.find(from) == 0) {
ep.url.replace(0, from.length(), to);
}

m_called = true;
return ::realm::networking::default_socket_provider::connect(std::move(o), std::move(ep));
}

private:
bool& m_called;
};
config.sync_socket_provider = std::make_shared<foo_socket_provider>();
config.sync_socket_provider = std::make_shared<foo_socket_provider>(provider_called);

struct foo_http_transport : public ::realm::networking::default_http_transport {
foo_http_transport(bool& provider_called) : m_called(provider_called) { }

void send_request_to_server(const ::realm::networking::request& request,
std::function<void(const ::realm::networking::response&)>&& completion) override {
auto req_copy = request;
Expand All @@ -42,11 +56,14 @@ TEST_CASE("sends plaintext data to proxy", "[proxy]") {
if (req_copy.url.find(from) == 0) {
req_copy.url.replace(0, from.length(), to);
}

m_called = true;
return ::realm::networking::default_http_transport::send_request_to_server(req_copy, std::move(completion));
}

private:
bool& m_called;
};
config.http_transport_client = std::make_shared<foo_http_transport>();
config.http_transport_client = std::make_shared<foo_http_transport>(http_transport_called);

auto app = realm::App(config);
app.get_sync_manager().set_log_level(logger::level::all);
Expand Down Expand Up @@ -74,4 +91,74 @@ TEST_CASE("sends plaintext data to proxy", "[proxy]") {
CHECK(sub.name == "foo-strings");
CHECK(sub.object_class_name == "AllTypesObject");
CHECK(sub.query_string == "str_col == \"foo\"");

std::set<tests::utils::proxy_server::event> expected_events;
expected_events.insert(tests::utils::proxy_server::event::connect);
expected_events.insert(tests::utils::proxy_server::event::client);
expected_events.insert(tests::utils::proxy_server::event::nonssl);
expected_events.insert(tests::utils::proxy_server::event::websocket_upgrade);
expected_events.insert(tests::utils::proxy_server::event::websocket);

bool is_subset = std::includes(expected_events.begin(), expected_events.end(), proxy_events.begin(), proxy_events.end());
CHECK(is_subset);
CHECK(provider_called);
CHECK(http_transport_called);
}

TEST_CASE("proxy roundtrip", "[proxy]") {

tests::utils::proxy_server::config cfg;
cfg.port = 1234;
cfg.server_uses_ssl = false; // Set to true if using services.cloud.mongodb.com
tests::utils::proxy_server server(std::move(cfg));

std::set<tests::utils::proxy_server::event> proxy_events;
server.set_callback([&proxy_events](tests::utils::proxy_server::event e) {
proxy_events.insert(e);
});

proxy_config pc;
pc.port = 1234;
pc.address = "127.0.0.1";
realm::App::configuration config;
config.proxy_configuration = pc;
config.app_id = Admin::Session::shared().cached_app_id();
config.base_url = Admin::Session::shared().base_url();

auto app = realm::App(config);
app.get_sync_manager().set_log_level(logger::level::all);

auto user = app.login(realm::App::credentials::anonymous()).get();
auto flx_sync_config = user.flexible_sync_configuration();
auto synced_realm = db(flx_sync_config);

auto update_success = synced_realm.subscriptions().update([](realm::mutable_sync_subscription_set &subs) {
subs.clear();
}).get();
CHECK(update_success == true);
CHECK(synced_realm.subscriptions().size() == 0);

update_success = synced_realm.subscriptions().update([](realm::mutable_sync_subscription_set &subs) {
subs.add<AllTypesObject>("foo-strings", [](auto &obj) {
return obj.str_col == "foo";
});
subs.add<AllTypesObjectLink>("foo-link");
}).get();
CHECK(update_success == true);
CHECK(synced_realm.subscriptions().size() == 2);

auto sub = *synced_realm.subscriptions().find("foo-strings");
CHECK(sub.name == "foo-strings");
CHECK(sub.object_class_name == "AllTypesObject");
CHECK(sub.query_string == "str_col == \"foo\"");

std::set<tests::utils::proxy_server::event> expected_events;
expected_events.insert(tests::utils::proxy_server::event::connect);
expected_events.insert(tests::utils::proxy_server::event::client);
expected_events.insert(tests::utils::proxy_server::event::nonssl);
expected_events.insert(tests::utils::proxy_server::event::websocket_upgrade);
expected_events.insert(tests::utils::proxy_server::event::websocket);

bool is_subset = std::includes(expected_events.begin(), expected_events.end(), proxy_events.begin(), proxy_events.end());
CHECK(is_subset);
}
66 changes: 54 additions & 12 deletions tests/utils/networking/proxy_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@ namespace realm::tests::utils {

class proxy_session : public std::enable_shared_from_this<proxy_session> {
public:
proxy_session(tcp::socket client_socket, std::shared_ptr<asio::ssl::context> ctx, bool server_uses_ssl)
proxy_session(tcp::socket client_socket, std::shared_ptr<asio::ssl::context> ctx, bool server_uses_ssl, std::function<void(proxy_server::event)> fn)
: m_client_socket(std::move(client_socket)),
m_server_socket(m_client_socket.get_executor()),
m_resolver(m_client_socket.get_executor()),
m_ssl_ctx(ctx),
m_ssl_server_socket(m_client_socket.get_executor(), *ctx),
m_server_uses_ssl(server_uses_ssl) {}
m_server_uses_ssl(server_uses_ssl),
m_event_handler(std::move(fn)) {}

void start() {
do_read_client_request();
Expand Down Expand Up @@ -47,6 +48,10 @@ namespace realm::tests::utils {
std::string host = request.substr(host_start, host_end - host_start);
std::string port = request.substr(host_end + 1, request.find(" ", host_end) - host_end - 1);

if (m_event_handler) {
m_event_handler(proxy_server::event::connect);
}

if (host == "127.0.0.1" || host == "localhost") {
tcp::endpoint endpoint(boost::asio::ip::make_address("127.0.0.1"), std::stoi(port));
connect_to_server(endpoint, host);
Expand Down Expand Up @@ -96,6 +101,10 @@ namespace realm::tests::utils {
void do_ssl_handshake_to_server(const std::string &hostname) {
auto self(shared_from_this());

if (m_event_handler) {
m_event_handler(proxy_server::event::ssl_handshake);
}

m_ssl_server_socket.set_verify_callback(asio::ssl::host_name_verification(hostname));
m_ssl_server_socket.set_verify_mode(boost::asio::ssl::verify_peer);

Expand Down Expand Up @@ -128,6 +137,9 @@ namespace realm::tests::utils {

void do_ssl_write_to_server(std::size_t length) {
auto self(shared_from_this());
if (m_event_handler) {
m_event_handler(proxy_server::event::ssl);
}
boost::asio::async_write(m_ssl_server_socket, boost::asio::buffer(std::string(m_client_buffer.data()), length),
[this, self](boost::system::error_code ec, std::size_t) {
if (!ec) {
Expand All @@ -140,6 +152,9 @@ namespace realm::tests::utils {

void do_ssl_read_server() {
auto self(shared_from_this());
if (m_event_handler) {
m_event_handler(proxy_server::event::ssl);
}
m_ssl_server_socket.async_read_some(boost::asio::buffer(m_server_buffer),
[this, self](boost::system::error_code ec, std::size_t length) {
if (!ec) {
Expand All @@ -152,6 +167,9 @@ namespace realm::tests::utils {

void do_read_client() {
auto self(shared_from_this());
if (m_event_handler) {
m_event_handler(proxy_server::event::client);
}
m_client_socket.async_read_some(boost::asio::buffer(m_client_buffer),
[this, self](boost::system::error_code ec, std::size_t length) {
if (!ec) {
Expand All @@ -170,6 +188,9 @@ namespace realm::tests::utils {

void do_read_from_websocket_client() {
auto self(shared_from_this());
if (m_event_handler) {
m_event_handler(proxy_server::event::websocket);
}
m_client_socket.async_read_some(boost::asio::buffer(m_client_buffer),
[this, self](boost::system::error_code ec, std::size_t length) {
if (!ec) {
Expand All @@ -184,6 +205,9 @@ namespace realm::tests::utils {

void do_write_to_websocket_server(std::size_t length) {
auto self(shared_from_this());
if (m_event_handler) {
m_event_handler(proxy_server::event::websocket);
}
if (m_server_uses_ssl) {
boost::asio::async_write(m_ssl_server_socket, boost::asio::buffer(m_client_buffer.data(), length),
[this, self](boost::system::error_code ec, std::size_t) {
Expand All @@ -207,6 +231,9 @@ namespace realm::tests::utils {

void do_read_from_websocket_server() {
auto self(shared_from_this());
if (m_event_handler) {
m_event_handler(proxy_server::event::websocket);
}
if (m_server_uses_ssl) {
m_ssl_server_socket.async_read_some(boost::asio::buffer(m_server_buffer),
[this, self](boost::system::error_code ec, std::size_t length) {
Expand All @@ -232,24 +259,32 @@ namespace realm::tests::utils {

void do_write_to_websocket_client(std::size_t length) {
auto self(shared_from_this());
boost::asio::async_write(m_client_socket, boost::asio::buffer(m_server_buffer.data(), length),
[this, self](boost::system::error_code ec, std::size_t) {
if (!ec) {
do_read_from_websocket_server();
} else {
REALM_TERMINATE("Proxy: Error writing from websocket client.");
}
});

if (m_event_handler) {
m_event_handler(proxy_server::event::websocket);
}
boost::asio::async_write(m_client_socket, boost::asio::buffer(m_server_buffer.data(), length),
[this, self](boost::system::error_code ec, std::size_t) {
if (!ec) {
do_read_from_websocket_server();
} else {
REALM_TERMINATE("Proxy: Error writing from websocket client.");
}
});
}

void upgrade_client_to_websocket(std::size_t length) {
if (m_event_handler) {
m_event_handler(proxy_server::event::websocket_upgrade);
}
do_read_from_websocket_client();
do_read_from_websocket_server();
}

void do_write_to_server(std::size_t length) {
auto self(shared_from_this());
if (m_event_handler) {
m_event_handler(proxy_server::event::nonssl);
}
boost::asio::async_write(m_server_socket, boost::asio::buffer(std::string(m_client_buffer.data()), length),
[this, self](boost::system::error_code ec, std::size_t) {
if (!ec) {
Expand All @@ -262,6 +297,9 @@ namespace realm::tests::utils {

void do_read_server() {
auto self(shared_from_this());
if (m_event_handler) {
m_event_handler(proxy_server::event::nonssl);
}
m_server_socket.async_read_some(boost::asio::buffer(m_server_buffer),
[this, self](boost::system::error_code ec, std::size_t length) {
if (!ec) {
Expand All @@ -274,6 +312,9 @@ namespace realm::tests::utils {

void do_write_to_client(std::size_t length) {
auto self(shared_from_this());
if (m_event_handler) {
m_event_handler(proxy_server::event::client);
}
auto res = std::string(m_server_buffer.data(), length);
bool upgrade_to_websocket = res.find("HTTP/1.1 101 Switching Protocols") != std::string::npos;

Expand Down Expand Up @@ -309,6 +350,7 @@ namespace realm::tests::utils {
std::array<char, 8192> m_server_buffer;

const std::string server_endpoint = "services.cloud.mongodb.com";
std::function<void(proxy_server::event)> m_event_handler;
};

proxy_server::proxy_server(const config &cfg) : m_config(cfg), m_strand(m_io_context) {
Expand Down Expand Up @@ -337,7 +379,7 @@ namespace realm::tests::utils {
asio::bind_executor(m_strand,
[this](std::error_code ec, asio::ip::tcp::socket socket) {
if (!ec) {
std::make_shared<proxy_session>(std::move(socket), m_ssl_ctx, m_config.server_uses_ssl)->start();
std::make_shared<proxy_session>(std::move(socket), m_ssl_ctx, m_config.server_uses_ssl, std::move(m_event_handler))->start();
}
do_accept();
}));
Expand Down
Loading

0 comments on commit 03425a1

Please sign in to comment.