From 0b3289b58bb97d245b43a7cd9f301a285804d449 Mon Sep 17 00:00:00 2001 From: cnbatch Date: Sat, 2 Dec 2023 16:48:22 +0800 Subject: [PATCH] FEC: faster recovery; mux_tunnels: avoid choosing congested channels --- docs/fec_en.md | 2 + docs/fec_zh-hans.md | 2 + docs/specific_examples_en.md | 6 ++ docs/specific_examples_zh-hans.md | 6 ++ src/3rd_party/ikcp.cpp | 4 +- src/3rd_party/ikcp.hpp | 2 +- src/main.cpp | 2 +- src/networks/client.cpp | 103 +++++++++++++++-------- src/networks/client.hpp | 2 +- src/networks/connections.hpp | 3 +- src/networks/kcp.cpp | 45 +++++++--- src/networks/kcp.hpp | 10 ++- src/networks/mux_tunnel.cpp | 6 +- src/networks/relay.cpp | 132 ++++++++++++++++++------------ src/networks/relay.hpp | 2 +- src/networks/server.cpp | 91 ++++++++++++-------- src/networks/server.hpp | 2 +- 17 files changed, 273 insertions(+), 147 deletions(-) diff --git a/docs/fec_en.md b/docs/fec_en.md index ca27fa7..d5c5477 100644 --- a/docs/fec_en.md +++ b/docs/fec_en.md @@ -20,6 +20,8 @@ The FEC settings of the sender and receiver **must** be exactly the same, otherw - The amount of original data (D value) should not be too small. Setting the value too low is almost equivalent to having no effect. This value should be greater than 15. - Of course, greater is not better, because it will take a very long time to generate redundant data at low traffic. + - **Game data transmission is a special case**, and the lower the value of this option, the better, preferably **set to 1**. Game traffic itself is not high, and if the packet sending interval of the game program is longer than the link latency, FEC will have no effect. As it is sensitive to latency, this value can be set to 1. + - For gaming applications, it is preferable to set up a dedicated channel (such as a dedicated VPN tunnel) separately and avoid mixing with other applications. - The higher amount of redundant data (R value) is not always better. Excessive amount of redundant data will cause unnecessary waste. diff --git a/docs/fec_zh-hans.md b/docs/fec_zh-hans.md index 7907e5c..2bf986a 100644 --- a/docs/fec_zh-hans.md +++ b/docs/fec_zh-hans.md @@ -20,6 +20,8 @@ - 原始数据量(D 值)不应该太少,数值过少等于几乎没有效果。该值最好大于 15。 - 当然了,并不是越大越好,因为在低流量时会导致很长时间才会生成冗余数据量。 + - **游戏数据传输是特例**,该值越低越好,最好**设为 1**。游戏流量本身并不高,如果游戏程序的发包间隔大于线路延迟,那么 FEC 就不会有任何效果。但因为对延时敏感,所以可以设为 1。 + - 对于游戏应用,请尽量设置单独的专用通道(例如专门的 VPN 通道),不要与其它应用混合使用。 - 冗余数据量(R 值)并非越多越好,过多的冗余数据量会造成不必要的浪费。 diff --git a/docs/specific_examples_en.md b/docs/specific_examples_en.md index cc7a1b9..dffb628 100644 --- a/docs/specific_examples_en.md +++ b/docs/specific_examples_en.md @@ -37,6 +37,12 @@ Here, it is assumed that the OpenVPN client and the KCPTube client are both runn And then, OpenVPN client just need to connect to port# 1194 of KCP Client. +### If the VPN is a dedicated channel for gaming + +For a dedicated channel for game data transmission, please add `blast=1` and change fast6 to a more responsive mode, like fast1 ~ fast4. + +Then consider using [forward error correction (FEC)](fec_en.md) based on the packet loss rate. + ### Additional matters that require attention Due to the fact that the OpenVPN client is currently connected to the local IP address, it is necessary to selectively allow the IP address of the actual server in the routing table. diff --git a/docs/specific_examples_zh-hans.md b/docs/specific_examples_zh-hans.md index 1276058..7a3b30e 100644 --- a/docs/specific_examples_zh-hans.md +++ b/docs/specific_examples_zh-hans.md @@ -37,6 +37,12 @@ encryption_algorithm=AES-GCM 然后,OpenVPN 客户端程序只需要连接 KCPTube 客户端的 1194 端口即可。 +### 如果 VPN 是游戏专用通道 + +对于专用于游戏数据传输的通道,请加上 `blast=1`,并把 fast6 改成更灵敏的模式,即 fast1 ~ fast4。 + +然后根据丢包率状况考虑是否使用[前向纠错,即 FEC](fec_zh-hans.md)。 + ### 额外注意事项 由于 OpenVPN 客户端此时连接的是本机 IP 地址,因此需要在路由表中针对性地放行实际服务器的 IP 地址。 diff --git a/src/3rd_party/ikcp.cpp b/src/3rd_party/ikcp.cpp index d80c8ee..ad134ef 100644 --- a/src/3rd_party/ikcp.cpp +++ b/src/3rd_party/ikcp.cpp @@ -1119,7 +1119,7 @@ namespace KCP // ikcp_check when to call it again (without ikcp_input/_send calling). // 'current' - current timestamp in millisec. //--------------------------------------------------------------------- - void kcp_core::update(uint32_t current) + int kcp_core::update(uint32_t current) { int32_t slap; @@ -1147,6 +1147,8 @@ namespace KCP flush(); } + + return slap; } diff --git a/src/3rd_party/ikcp.hpp b/src/3rd_party/ikcp.hpp index 1104306..9f64a30 100644 --- a/src/3rd_party/ikcp.hpp +++ b/src/3rd_party/ikcp.hpp @@ -136,7 +136,7 @@ namespace KCP // update state (call it repeatedly, every 10ms-100ms), or you can ask // ikcp_check when to call it again (without ikcp_input/_send calling). // 'current' - current timestamp in millisec. - void update(uint32_t current); + int update(uint32_t current); // Determine when should you invoke ikcp_update: // returns when you should invoke ikcp_update in millisec, if there diff --git a/src/main.cpp b/src/main.cpp index 78b625e..4f21fe4 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -19,7 +19,7 @@ int main(int argc, char *argv[]) { char app_name[] = "kcptube"; - printf("%s version 20231126\n", app_name); + printf("%s version 20231202\n", app_name); if (argc <= 1) { diff --git a/src/networks/client.cpp b/src/networks/client.cpp index cb31b16..8fcd602 100644 --- a/src/networks/client.cpp +++ b/src/networks/client.cpp @@ -403,48 +403,54 @@ void client_mode::udp_forwarder_incoming_unpack(std::shared_ptr kcp_pt if (calculate_difference((uint32_t)timestamp, packet_timestamp) > gbv_time_gap_seconds) return; + uint32_t conv = 0; + kcp_mappings *kcp_mappings_ptr = nullptr; std::pair, size_t> original_data; uint32_t fec_sn = 0; uint8_t fec_sub_sn = 0; if (current_settings.fec_data > 0 && current_settings.fec_redundant > 0) { auto [packet_header, kcp_data_ptr, kcp_data_size] = packet::unpack_fec(data.get(), plain_size); - if (packet_header.sub_sn >= current_settings.fec_data) + fec_sn = packet_header.sn; + fec_sub_sn = packet_header.sub_sn; + if (fec_sub_sn >= current_settings.fec_data) { auto [packet_header_redundant, redundant_data_ptr, redundant_data_size] = packet::unpack_fec_redundant(data.get(), plain_size); kcp_ptr = verify_kcp_conv(kcp_ptr, packet_header_redundant.kcp_conv, peer); - kcp_mappings *kcp_mappings_ptr = (kcp_mappings *)kcp_ptr->GetUserData(); + kcp_mappings_ptr = (kcp_mappings *)kcp_ptr->GetUserData(); original_data.first = std::make_unique(redundant_data_size); original_data.second = redundant_data_size; std::copy_n(redundant_data_ptr, redundant_data_size, original_data.first.get()); kcp_mappings_ptr->fec_egress_control.fec_rcv_cache[packet_header_redundant.sn][packet_header_redundant.sub_sn] = std::move(original_data); - return; + if (!fec_find_missings(kcp_ptr.get(), kcp_mappings_ptr->fec_egress_control, fec_sn, current_settings.fec_data)) + return; + data_ptr = nullptr; + packet_data_size = 0; } else { - fec_sn = packet_header.sn; - fec_sub_sn = packet_header.sub_sn; data_ptr = kcp_data_ptr; packet_data_size = kcp_data_size; original_data.first = std::make_unique(kcp_data_size); original_data.second = kcp_data_size; std::copy_n(kcp_data_ptr, kcp_data_size, original_data.first.get()); + + conv = KCP::KCP::GetConv(data_ptr); + kcp_ptr = verify_kcp_conv(kcp_ptr, conv, peer); + kcp_mappings_ptr = (kcp_mappings *)kcp_ptr->GetUserData(); + kcp_mappings_ptr->fec_egress_control.fec_rcv_cache[fec_sn][fec_sub_sn] = std::move(original_data); + fec_find_missings(kcp_ptr.get(), kcp_mappings_ptr->fec_egress_control, fec_sn, current_settings.fec_data); } } - - uint32_t conv = KCP::KCP::GetConv(data_ptr); - kcp_ptr = verify_kcp_conv(kcp_ptr, conv, peer); - - kcp_mappings *kcp_mappings_ptr = (kcp_mappings *)kcp_ptr->GetUserData(); - - if (current_settings.fec_data > 0 && current_settings.fec_redundant > 0) + else { - kcp_mappings_ptr->fec_egress_control.fec_rcv_cache[fec_sn][fec_sub_sn] = std::move(original_data); - fec_find_missings(kcp_ptr.get(), kcp_mappings_ptr->fec_egress_control, fec_sn, current_settings.fec_data); + conv = KCP::KCP::GetConv(data_ptr); + kcp_ptr = verify_kcp_conv(kcp_ptr, conv, peer); + kcp_mappings_ptr = (kcp_mappings *)kcp_ptr->GetUserData(); } - if (kcp_ptr->Input((const char *)data_ptr, (long)packet_data_size) < 0) - return; + if (data_ptr != nullptr && packet_data_size != 0) + kcp_ptr->Input((const char *)data_ptr, (long)packet_data_size); while (true) { @@ -698,24 +704,38 @@ std::shared_ptr client_mode::pick_one_from_kcp_channels(protocol_type } else { - size_t index = generate_random_number(0, kcp_channels.size() - 1); - if (prtcl == protocol_type::tcp && index % 2 != 0) + std::map peaks_of_recv, peaks_of_sent; // peak value, kcp conv + std::map recv_peaks_by_index, sent_peaks_by_index; // kcp conv, peak-sorted index + for (auto iter = kcp_channels.begin(); iter != kcp_channels.end(); ++iter) { - while (index % 2 != 0) - { - index = generate_random_number(0, kcp_channels.size() - 1); - } + int64_t recv_peak_value = iter->second->egress_kcp->ReceivedDataAveragePeak(); + int64_t sent_peak_value = iter->second->egress_kcp->SentDataAveragePeak(); + peaks_of_recv[recv_peak_value] = iter->first; + peaks_of_sent[sent_peak_value] = iter->first; } - if (prtcl == protocol_type::udp && index % 2 == 0) + + int32_t index = 0; + for (auto [peak_value, conv] : peaks_of_recv) + recv_peaks_by_index[conv] = index++; + + index = 0; + for (auto [peak_value, conv] : peaks_of_sent) + sent_peaks_by_index[conv] = index++; + + recv_peaks_by_index.rbegin()->second = -1; + sent_peaks_by_index.rbegin()->second = -1; + + std::multimap index_sum_of_conv; // index_sum, kcp conv + for (auto &[conv, mappings] : kcp_channels) { - while (index % 2 == 0) - { - index = generate_random_number(0, kcp_channels.size() - 1); - } + if (recv_peaks_by_index[conv] < 0 || sent_peaks_by_index[conv] < 0) + continue; + int32_t index_sum = recv_peaks_by_index[conv] + sent_peaks_by_index[conv]; + index_sum_of_conv.insert({ index_sum, conv }); } - auto iter = kcp_channels.begin(); - std::advance(iter, index); - kcp_ptr = iter->second->egress_kcp; + + auto conv = index_sum_of_conv.begin()->second; + kcp_ptr = kcp_channels[conv]->egress_kcp; } locker_kcp_channels.unlock(); @@ -822,19 +842,32 @@ void client_mode::fec_maker(kcp_mappings *kcp_mappings_ptr, const uint8_t *input } } -void client_mode::fec_find_missings(KCP::KCP *kcp_ptr, fec_control_data &fec_controllor, uint32_t fec_sn, uint8_t max_fec_data_count) +bool client_mode::fec_find_missings(KCP::KCP *kcp_ptr, fec_control_data &fec_controllor, uint32_t fec_sn, uint8_t max_fec_data_count) { + bool recovered = false; for (auto iter = fec_controllor.fec_rcv_cache.begin(), next_iter = iter; iter != fec_controllor.fec_rcv_cache.end(); iter = next_iter) { ++next_iter; auto sn = iter->first; - if (fec_sn == sn) - continue;; auto &mapped_data = iter->second; if (mapped_data.size() < max_fec_data_count) { if (fec_sn - sn > gbv_fec_waits) + { fec_controllor.fec_rcv_cache.erase(iter); + if (auto rcv_sn_iter = fec_controllor.fec_rcv_restored.find(sn); + rcv_sn_iter != fec_controllor.fec_rcv_restored.end()) + fec_controllor.fec_rcv_restored.erase(rcv_sn_iter); + } + continue; + } + if (auto rcv_sn_iter = fec_controllor.fec_rcv_restored.find(sn); rcv_sn_iter != fec_controllor.fec_rcv_restored.end()) + { + if (fec_sn - sn > gbv_fec_waits) + { + fec_controllor.fec_rcv_cache.erase(iter); + fec_controllor.fec_rcv_restored.erase(rcv_sn_iter); + } continue; } auto [recv_data, fec_align_length] = compact_into_container(mapped_data, max_fec_data_count); @@ -847,8 +880,10 @@ void client_mode::fec_find_missings(KCP::KCP *kcp_ptr, fec_control_data &fec_con kcp_ptr->Input((const char *)missed_data_ptr, (long)missed_data_size); } - fec_controllor.fec_rcv_cache.erase(iter); + fec_controllor.fec_rcv_restored.insert(sn); + recovered = true; } + return recovered; } bool client_mode::get_udp_target(std::shared_ptr target_connector, udp::endpoint &udp_target) diff --git a/src/networks/client.hpp b/src/networks/client.hpp index 33f62e7..a6c0355 100644 --- a/src/networks/client.hpp +++ b/src/networks/client.hpp @@ -70,7 +70,7 @@ class client_mode int kcp_sender(const char *buf, int len, void *user); void data_sender(kcp_mappings *kcp_mappings_ptr, std::unique_ptr new_buffer, size_t buffer_size); void fec_maker(kcp_mappings *kcp_mappings_ptr, const uint8_t *input_data, int data_size); - void fec_find_missings(KCP::KCP *kcp_ptr, fec_control_data &fec_controllor, uint32_t fec_sn, uint8_t max_fec_data_count); + bool fec_find_missings(KCP::KCP *kcp_ptr, fec_control_data &fec_controllor, uint32_t fec_sn, uint8_t max_fec_data_count); bool get_udp_target(std::shared_ptr target_connector, udp::endpoint &udp_target); bool update_udp_target(std::shared_ptr target_connector, udp::endpoint &udp_target); diff --git a/src/networks/connections.hpp b/src/networks/connections.hpp index 1b695c8..458cece 100644 --- a/src/networks/connections.hpp +++ b/src/networks/connections.hpp @@ -32,7 +32,7 @@ constexpr uint32_t gbv_mux_min_cache_available = 16u; constexpr uint32_t gbv_mux_min_cache_slice = 8u; constexpr uint32_t gbv_tcp_slice = 2u; constexpr uint32_t gbv_half_time = 2u; -constexpr uint32_t gbv_fec_waits = 3u; +constexpr uint16_t gbv_fec_waits = 3u; constexpr size_t gbv_buffer_size = 2048u; constexpr size_t gbv_buffer_expand_size = 128u; constexpr size_t gbv_retry_times = 30u; @@ -543,6 +543,7 @@ struct fec_control_data std::atomic fec_snd_sub_sn; std::vector, size_t>> fec_snd_cache; std::map, size_t>>> fec_rcv_cache; // uint32_t = snd_sn, uint16_t = sub_sn + std::unordered_set fec_rcv_restored; fecpp::fec_code fecc; }; diff --git a/src/networks/kcp.cpp b/src/networks/kcp.cpp index 67796ae..4bf7f66 100644 --- a/src/networks/kcp.cpp +++ b/src/networks/kcp.cpp @@ -35,14 +35,14 @@ namespace KCP { kcp_ptr = std::make_unique(); kcp_ptr->initialise(conv, this); - last_input_time.store(right_now()); + last_input_time = right_now(); post_update = empty_function; } void KCP::MoveKCP(KCP &other) noexcept { kcp_ptr = std::move(other.kcp_ptr); - last_input_time.store(other.last_input_time.load()); + last_input_time = other.last_input_time; post_update = other.post_update; } @@ -92,7 +92,12 @@ namespace KCP void KCP::SetOutput(std::function output_func) { - kcp_ptr->set_output(output_func); + //output = output_func; + kcp_ptr->set_output([this, output_func](const char *buf, int len, void *user) -> int + { + sent_data_average_peak = (7 * sent_data_average_peak + len) / 8; + return output_func(buf, len, user); + }); } void KCP::SetPostUpdate(std::function post_update_func) @@ -121,17 +126,19 @@ namespace KCP void KCP::Update(uint32_t current) { std::unique_lock locker{ mtx }; - kcp_ptr->update(current); + int ret = kcp_ptr->update(current); locker.unlock(); - post_update(kcp_ptr->user); + if (ret >= 0) + post_update(kcp_ptr->user); } void KCP::Update() { std::unique_lock locker{ mtx }; - kcp_ptr->update(TimeNowForKCP()); + int ret = kcp_ptr->update(TimeNowForKCP()); locker.unlock(); - post_update(kcp_ptr->user); + if (ret >= 0) + post_update(kcp_ptr->user); } uint32_t KCP::Check(uint32_t current) @@ -150,7 +157,9 @@ namespace KCP { std::unique_lock unique_locker{ mtx }; kcp_ptr->flush(TimeNowForKCP()); - return kcp_ptr->check(TimeNowForKCP()); + uint32_t ret = kcp_ptr->check(TimeNowForKCP()); + unique_locker.unlock(); + return ret; } // when you received a low level packet (eg. UDP packet), call it @@ -159,7 +168,9 @@ namespace KCP std::unique_lock locker{ mtx }; auto ret = kcp_ptr->input(data, size); locker.unlock(); - last_input_time.store(right_now()); + last_input_time = right_now(); + if (ret > 0) + received_data_average_peak = (7 * received_data_average_peak + size) / 8; // same as rx_srtt calculation in update_ack() return ret; } @@ -264,12 +275,22 @@ namespace KCP inbound_bandwidth = in_bw; } - int64_t KCP::LastInputTime() + int64_t KCP::LastInputTime() const + { + return last_input_time; + } + + int64_t KCP::ReceivedDataAveragePeak() const + { + return received_data_average_peak; + } + + int64_t KCP::SentDataAveragePeak() const { - return last_input_time.load(); + return sent_data_average_peak; } - void* KCP::GetUserData() + void* KCP::GetUserData() const { return kcp_ptr->user; } diff --git a/src/networks/kcp.hpp b/src/networks/kcp.hpp index 6634258..2495d29 100644 --- a/src/networks/kcp.hpp +++ b/src/networks/kcp.hpp @@ -38,7 +38,9 @@ namespace KCP std::unique_ptr kcp_ptr; uint64_t outbound_bandwidth = 0; uint64_t inbound_bandwidth = 0; - std::atomic last_input_time{0}; + int64_t last_input_time = 0; + int64_t received_data_average_peak = 0; + int64_t sent_data_average_peak = 0; mutable std::shared_mutex mtx; //std::function output; // int(*output)(const char *buf, int len, void *user) //std::function writelog; //void(*writelog)(const char *log, void *user) @@ -140,9 +142,11 @@ namespace KCP int32_t& RxMinRTO(); void SetBandwidth(uint64_t out_bw, uint64_t in_bw); - int64_t LastInputTime(); + int64_t LastInputTime() const; + int64_t ReceivedDataAveragePeak() const; + int64_t SentDataAveragePeak() const; - void* GetUserData(); + void* GetUserData() const; void SetUserData(void *user_data); bool WaitQueueIsFull(); bool WaitQueueIsEmpty(); diff --git a/src/networks/mux_tunnel.cpp b/src/networks/mux_tunnel.cpp index c9d4e43..538ab2e 100644 --- a/src/networks/mux_tunnel.cpp +++ b/src/networks/mux_tunnel.cpp @@ -521,14 +521,12 @@ void mux_tunnel::refresh_mux_queue(const std::shared_ptr &kcp_ptr) std::shared_lock tcp_cache_shared_locker{ mutex_mux_tcp_cache }; auto cache_iter = mux_tcp_cache.find(kcp_ptr); - auto size_iter = mux_tcp_cache_max_size.find(kcp_ptr); - if (cache_iter == mux_tcp_cache.end() || size_iter == mux_tcp_cache_max_size.end()) + if (cache_iter == mux_tcp_cache.end()) return; size_t tcp_cache_size = cache_iter->second.size(); - uint32_t cache_max_size = size_iter->second; tcp_cache_shared_locker.unlock(); - if (tcp_cache_size > cache_max_size / gbv_tcp_slice) + if (tcp_cache_size > 0) return; std::shared_lock locker{ mutex_id_map_to_mux_records }; diff --git a/src/networks/relay.cpp b/src/networks/relay.cpp index 4f1c050..b10fc95 100644 --- a/src/networks/relay.cpp +++ b/src/networks/relay.cpp @@ -144,16 +144,20 @@ void relay_mode::udp_listener_incoming_unpack(std::unique_ptr data, s if (calculate_difference((uint32_t)timestamp, packet_timestamp) > gbv_time_gap_seconds) return; + std::shared_ptr kcp_mappings_ptr; + std::shared_ptr kcp_ptr_ingress; + std::shared_ptr kcp_ptr_egress; std::pair, size_t> original_data; uint32_t fec_sn = 0; uint8_t fec_sub_sn = 0; if (current_settings.ingress->fec_data > 0 && current_settings.ingress->fec_redundant > 0) { auto [packet_header, kcp_data_ptr, kcp_data_size] = packet::unpack_fec(data.get(), plain_size); + fec_sn = packet_header.sn; + fec_sub_sn = packet_header.sub_sn; if (packet_header.sub_sn >= current_settings.ingress->fec_data) { auto [packet_header_redundant, redundant_data_ptr, redundant_data_size] = packet::unpack_fec_redundant(data.get(), plain_size); - std::shared_ptr kcp_mappings_ptr = nullptr; std::shared_lock locker_id_map_to_both_sides{ mutex_id_map_to_both_sides }; if (auto kcp_channels_iter = id_map_to_both_sides.find(packet_header_redundant.kcp_conv); kcp_channels_iter == id_map_to_both_sides.end()) return; @@ -167,14 +171,14 @@ void relay_mode::udp_listener_incoming_unpack(std::unique_ptr data, s original_data.first = std::make_unique(redundant_data_size); original_data.second = redundant_data_size; std::copy_n(redundant_data_ptr, redundant_data_size, original_data.first.get()); - kcp_mappings_ptr->fec_ingress_control.fec_rcv_cache[packet_header_redundant.sn][packet_header_redundant.sub_sn] = std::move(original_data); - - return; + kcp_mappings_ptr->fec_ingress_control.fec_rcv_cache[fec_sn][fec_sub_sn] = std::move(original_data); + if (!fec_find_missings(kcp_mappings_ptr->ingress_kcp.get(), kcp_mappings_ptr->fec_egress_control, fec_sn, current_settings.ingress->fec_data)) + return; + data_ptr = nullptr; + packet_data_size = 0; } else { - fec_sn = packet_header.sn; - fec_sub_sn = packet_header.sub_sn; data_ptr = kcp_data_ptr; packet_data_size = kcp_data_size; original_data.first = std::make_unique(kcp_data_size); @@ -183,39 +187,42 @@ void relay_mode::udp_listener_incoming_unpack(std::unique_ptr data, s } } - uint32_t conv = KCP::KCP::GetConv(data_ptr); - if (conv == 0) + if (data_ptr != nullptr) { - udp_listener_incoming_new_connection(std::move(data), plain_size, peer, server_port_number); - return; - } + uint32_t conv = KCP::KCP::GetConv(data_ptr); + if (conv == 0) + { + udp_listener_incoming_new_connection(std::move(data), plain_size, peer, server_port_number); + return; + } - std::shared_ptr kcp_mappings_ptr; - std::shared_lock locker_id_map_to_both_sides{ mutex_id_map_to_both_sides }; - if (auto kcp_channels_iter = id_map_to_both_sides.find(conv); kcp_channels_iter == id_map_to_both_sides.end()) - return; - else - kcp_mappings_ptr = kcp_channels_iter->second; - locker_id_map_to_both_sides.unlock(); + if (kcp_mappings_ptr == nullptr) + { + std::shared_lock locker_id_map_to_both_sides{ mutex_id_map_to_both_sides }; + if (auto kcp_channels_iter = id_map_to_both_sides.find(conv); kcp_channels_iter == id_map_to_both_sides.end()) + return; + else + kcp_mappings_ptr = kcp_channels_iter->second; + locker_id_map_to_both_sides.unlock(); - if (kcp_mappings_ptr == nullptr) - return; + if (kcp_mappings_ptr == nullptr) + return; + } - if (kcp_mappings_ptr->ingress_source_endpoint == nullptr || *kcp_mappings_ptr->ingress_source_endpoint != peer) - kcp_mappings_ptr->ingress_source_endpoint = std::make_shared(peer); + if (kcp_mappings_ptr->ingress_source_endpoint == nullptr || *kcp_mappings_ptr->ingress_source_endpoint != peer) + kcp_mappings_ptr->ingress_source_endpoint = std::make_shared(peer); - std::shared_ptr kcp_ptr_ingress = kcp_mappings_ptr->ingress_kcp; - std::shared_ptr kcp_ptr_egress = kcp_mappings_ptr->egress_kcp; - std::shared_ptr forwarder_ptr_egress = kcp_mappings_ptr->egress_forwarder; + kcp_ptr_ingress = kcp_mappings_ptr->ingress_kcp; + kcp_ptr_egress = kcp_mappings_ptr->egress_kcp; - if (current_settings.ingress->fec_data > 0 && current_settings.ingress->fec_redundant > 0) - { - kcp_mappings_ptr->fec_ingress_control.fec_rcv_cache[fec_sn][fec_sub_sn] = std::move(original_data); - fec_find_missings(kcp_ptr_ingress.get(), kcp_mappings_ptr->fec_egress_control, fec_sn, current_settings.ingress->fec_data); - } + if (current_settings.ingress->fec_data > 0 && current_settings.ingress->fec_redundant > 0) + { + kcp_mappings_ptr->fec_ingress_control.fec_rcv_cache[fec_sn][fec_sub_sn] = std::move(original_data); + fec_find_missings(kcp_ptr_ingress.get(), kcp_mappings_ptr->fec_egress_control, fec_sn, current_settings.ingress->fec_data); + } - if (kcp_ptr_ingress->Input((const char *)data_ptr, (long)packet_data_size) < 0) - return; + kcp_ptr_ingress->Input((const char *)data_ptr, (long)packet_data_size); + } while (true) { @@ -489,12 +496,16 @@ void relay_mode::udp_forwarder_incoming_unpack(std::shared_ptr kcp_ptr if (calculate_difference((uint32_t)timestamp, packet_timestamp) > gbv_time_gap_seconds) return; + uint32_t conv = 0; + kcp_mappings *kcp_mappings_ptr = nullptr; std::pair, size_t> original_data; uint32_t fec_sn = 0; uint8_t fec_sub_sn = 0; if (current_settings.egress->fec_data > 0 && current_settings.egress->fec_redundant > 0) { auto [packet_header, kcp_data_ptr, kcp_data_size] = packet::unpack_fec(data.get(), plain_size); + fec_sn = packet_header.sn; + fec_sub_sn = packet_header.sub_sn; if (packet_header.sub_sn >= current_settings.egress->fec_data) { auto [packet_header_redundant, redundant_data_ptr, redundant_data_size] = packet::unpack_fec_redundant(data.get(), plain_size); @@ -503,34 +514,35 @@ void relay_mode::udp_forwarder_incoming_unpack(std::shared_ptr kcp_ptr original_data.first = std::make_unique(redundant_data_size); original_data.second = redundant_data_size; std::copy_n(redundant_data_ptr, redundant_data_size, original_data.first.get()); - kcp_mappings_ptr->fec_egress_control.fec_rcv_cache[packet_header_redundant.sn][packet_header_redundant.sub_sn] = std::move(original_data); - return; + kcp_mappings_ptr->fec_egress_control.fec_rcv_cache[fec_sn][fec_sub_sn] = std::move(original_data); + fec_find_missings(kcp_ptr.get(), kcp_mappings_ptr->fec_egress_control, fec_sn, current_settings.egress->fec_data); + data_ptr = nullptr; + packet_data_size = 0; } else { - fec_sn = packet_header.sn; - fec_sub_sn = packet_header.sub_sn; data_ptr = kcp_data_ptr; packet_data_size = kcp_data_size; original_data.first = std::make_unique(kcp_data_size); original_data.second = kcp_data_size; std::copy_n(kcp_data_ptr, kcp_data_size, original_data.first.get()); + + conv = KCP::KCP::GetConv(data_ptr); + kcp_ptr = verify_kcp_conv(kcp_ptr, conv, peer); + kcp_mappings_ptr = (kcp_mappings *)kcp_ptr->GetUserData(); + kcp_mappings_ptr->fec_egress_control.fec_rcv_cache[fec_sn][fec_sub_sn] = std::move(original_data); + fec_find_missings(kcp_ptr.get(), kcp_mappings_ptr->fec_egress_control, fec_sn, current_settings.egress->fec_data); } } - - uint32_t conv = KCP::KCP::GetConv(data_ptr); - kcp_ptr = verify_kcp_conv(kcp_ptr, conv, peer); - - kcp_mappings *kcp_mappings_ptr = (kcp_mappings *)kcp_ptr->GetUserData(); - - if (current_settings.egress->fec_data > 0 && current_settings.egress->fec_redundant > 0) + else { - kcp_mappings_ptr->fec_egress_control.fec_rcv_cache[fec_sn][fec_sub_sn] = std::move(original_data); - fec_find_missings(kcp_ptr.get(), kcp_mappings_ptr->fec_egress_control, fec_sn, current_settings.egress->fec_data); + conv = KCP::KCP::GetConv(data_ptr); + kcp_ptr = verify_kcp_conv(kcp_ptr, conv, peer); + kcp_mappings_ptr = (kcp_mappings *)kcp_ptr->GetUserData(); } - - if (kcp_ptr->Input((const char *)data_ptr, (long)packet_data_size) < 0) - return; + + if (data_ptr != nullptr && packet_data_size != 0) + kcp_ptr->Input((const char *)data_ptr, (long)packet_data_size); while (true) { @@ -918,20 +930,32 @@ void relay_mode::data_sender_via_forwarder(kcp_mappings *kcp_mappings_ptr, std:: change_new_port(kcp_mappings_ptr); } -void relay_mode::fec_find_missings(KCP::KCP *kcp_ptr, fec_control_data &fec_controllor, uint32_t fec_sn, uint8_t max_fec_data_count) +bool relay_mode::fec_find_missings(KCP::KCP *kcp_ptr, fec_control_data &fec_controllor, uint32_t fec_sn, uint8_t max_fec_data_count) { + bool recovered = false; for (auto iter = fec_controllor.fec_rcv_cache.begin(), next_iter = iter; iter != fec_controllor.fec_rcv_cache.end(); iter = next_iter) { ++next_iter; auto sn = iter->first; - if (fec_sn == sn) - continue; - auto &mapped_data = iter->second; if (mapped_data.size() < max_fec_data_count) { if (fec_sn - sn > gbv_fec_waits) + { fec_controllor.fec_rcv_cache.erase(iter); + if (auto rcv_sn_iter = fec_controllor.fec_rcv_restored.find(sn); + rcv_sn_iter != fec_controllor.fec_rcv_restored.end()) + fec_controllor.fec_rcv_restored.erase(rcv_sn_iter); + } + continue; + } + if (auto rcv_sn_iter = fec_controllor.fec_rcv_restored.find(sn); rcv_sn_iter != fec_controllor.fec_rcv_restored.end()) + { + if (fec_sn - sn > gbv_fec_waits) + { + fec_controllor.fec_rcv_cache.erase(iter); + fec_controllor.fec_rcv_restored.erase(rcv_sn_iter); + } continue; } auto [recv_data, fec_align_length] = compact_into_container(mapped_data, max_fec_data_count); @@ -944,8 +968,10 @@ void relay_mode::fec_find_missings(KCP::KCP *kcp_ptr, fec_control_data &fec_cont kcp_ptr->Input((const char *)missed_data_ptr, (long)missed_data_size); } - fec_controllor.fec_rcv_cache.erase(iter); + fec_controllor.fec_rcv_restored.insert(sn); + recovered = true; } + return recovered; } void relay_mode::fec_maker_via_listener(kcp_mappings *kcp_mappings_ptr, const uint8_t *input_data, int data_size) diff --git a/src/networks/relay.hpp b/src/networks/relay.hpp index 09d54f6..1bdad5f 100644 --- a/src/networks/relay.hpp +++ b/src/networks/relay.hpp @@ -65,7 +65,7 @@ class relay_mode std::shared_ptr verify_kcp_conv(std::shared_ptr kcp_ptr, uint32_t conv, const udp::endpoint &peer); void data_sender_via_listener(kcp_mappings *kcp_mappings_ptr, std::unique_ptr new_buffer, size_t buffer_size); void data_sender_via_forwarder(kcp_mappings *kcp_mappings_ptr, std::unique_ptr new_buffer, size_t buffer_size); - void fec_find_missings(KCP::KCP *kcp_ptr, fec_control_data &fec_controllor, uint32_t fec_sn, uint8_t max_fec_data_count); + bool fec_find_missings(KCP::KCP *kcp_ptr, fec_control_data &fec_controllor, uint32_t fec_sn, uint8_t max_fec_data_count); void fec_maker_via_listener(kcp_mappings *kcp_mappings_ptr, const uint8_t *input_data, int data_size); void fec_maker_via_forwarder(kcp_mappings *kcp_mappings_ptr, const uint8_t *input_data, int data_size); diff --git a/src/networks/server.cpp b/src/networks/server.cpp index 443c2e6..cebb8e9 100644 --- a/src/networks/server.cpp +++ b/src/networks/server.cpp @@ -146,16 +146,21 @@ void server_mode::udp_listener_incoming_unpack(std::unique_ptr data, if (calculate_difference((uint32_t)timestamp, packet_timestamp) > gbv_time_gap_seconds) return; + uint32_t conv = 0; + std::shared_ptr kcp_mappings_ptr; + std::shared_ptr kcp_ptr; std::pair, size_t> original_data; uint32_t fec_sn = 0; uint8_t fec_sub_sn = 0; if (current_settings.fec_data > 0 && current_settings.fec_redundant > 0) { auto [packet_header, kcp_data_ptr, kcp_data_size] = packet::unpack_fec(data.get(), plain_size); + fec_sn = packet_header.sn; + fec_sub_sn = packet_header.sub_sn; if (packet_header.sub_sn >= current_settings.fec_data) { auto [packet_header_redundant, redundant_data_ptr, redundant_data_size] = packet::unpack_fec_redundant(data.get(), plain_size); - std::shared_ptr kcp_mappings_ptr = nullptr; + conv = packet_header_redundant.kcp_conv; std::shared_lock locker_kcp_channels{ mutex_kcp_channels }; if (auto kcp_channel_iter = kcp_channels.find(packet_header_redundant.kcp_conv); kcp_channel_iter != kcp_channels.end()) kcp_mappings_ptr = kcp_channel_iter->second; @@ -167,14 +172,14 @@ void server_mode::udp_listener_incoming_unpack(std::unique_ptr data, original_data.first = std::make_unique(redundant_data_size); original_data.second = redundant_data_size; std::copy_n(redundant_data_ptr, redundant_data_size, original_data.first.get()); - kcp_mappings_ptr->fec_ingress_control.fec_rcv_cache[packet_header_redundant.sn][packet_header_redundant.sub_sn] = std::move(original_data); - - return; + kcp_mappings_ptr->fec_ingress_control.fec_rcv_cache[fec_sn][fec_sub_sn] = std::move(original_data); + if (!fec_find_missings(kcp_mappings_ptr->ingress_kcp.get(), kcp_mappings_ptr->fec_ingress_control, fec_sn, current_settings.fec_data)) + return; + data_ptr = nullptr; + packet_data_size = 0; } else { - fec_sn = packet_header.sn; - fec_sub_sn = packet_header.sub_sn; data_ptr = kcp_data_ptr; packet_data_size = kcp_data_size; original_data.first = std::make_unique(kcp_data_size); @@ -183,37 +188,41 @@ void server_mode::udp_listener_incoming_unpack(std::unique_ptr data, } } - uint32_t conv = KCP::KCP::GetConv(data_ptr); - if (conv == 0) + if (data_ptr != nullptr) { - udp_listener_incoming_new_connection(std::move(data), plain_size, peer, server_port_number); - return; - } + conv = KCP::KCP::GetConv(data_ptr); + if (conv == 0) + { + udp_listener_incoming_new_connection(std::move(data), plain_size, peer, server_port_number); + return; + } - std::shared_ptr kcp_mappings_ptr = nullptr; - std::shared_lock locker_kcp_channels{ mutex_kcp_channels }; - if (auto kcp_channel_iter = kcp_channels.find(conv); kcp_channel_iter != kcp_channels.end()) - kcp_mappings_ptr = kcp_channel_iter->second; - locker_kcp_channels.unlock(); + if (kcp_mappings_ptr == nullptr) + { + std::shared_lock locker_kcp_channels{ mutex_kcp_channels }; + if (auto kcp_channel_iter = kcp_channels.find(conv); kcp_channel_iter != kcp_channels.end()) + kcp_mappings_ptr = kcp_channel_iter->second; + locker_kcp_channels.unlock(); - if (kcp_mappings_ptr == nullptr) - return; + if (kcp_mappings_ptr == nullptr) + return; + } - if (kcp_mappings_ptr->ingress_source_endpoint == nullptr || *kcp_mappings_ptr->ingress_source_endpoint != peer) - kcp_mappings_ptr->ingress_source_endpoint = std::make_shared(peer); + if (kcp_mappings_ptr->ingress_source_endpoint == nullptr || *kcp_mappings_ptr->ingress_source_endpoint != peer) + kcp_mappings_ptr->ingress_source_endpoint = std::make_shared(peer); - std::shared_ptr kcp_ptr = kcp_mappings_ptr->ingress_kcp; + kcp_ptr = kcp_mappings_ptr->ingress_kcp; - if (current_settings.fec_data > 0 && current_settings.fec_redundant > 0) - { - kcp_mappings_ptr->fec_ingress_control.fec_rcv_cache[fec_sn][fec_sub_sn] = std::move(original_data); - fec_find_missings(kcp_ptr.get(), kcp_mappings_ptr->fec_ingress_control, fec_sn, current_settings.fec_data); - } + if (current_settings.fec_data > 0 && current_settings.fec_redundant > 0) + { + kcp_mappings_ptr->fec_ingress_control.fec_rcv_cache[fec_sn][fec_sub_sn] = std::move(original_data); + fec_find_missings(kcp_ptr.get(), kcp_mappings_ptr->fec_ingress_control, fec_sn, current_settings.fec_data); + } - if (kcp_ptr->Input((const char *)data_ptr, (long)packet_data_size) < 0) - return; + kcp_ptr->Input((const char *)data_ptr, (long)packet_data_size); + } - while (true) + while (kcp_ptr != nullptr) { int buffer_size = kcp_ptr->PeekSize(); if (buffer_size <= 0) @@ -906,20 +915,32 @@ void server_mode::fec_maker(kcp_mappings *kcp_mappings_ptr, const uint8_t *input } } -void server_mode::fec_find_missings(KCP::KCP *kcp_ptr, fec_control_data &fec_controllor, uint32_t fec_sn, uint8_t max_fec_data_count) +bool server_mode::fec_find_missings(KCP::KCP *kcp_ptr, fec_control_data &fec_controllor, uint32_t fec_sn, uint8_t max_fec_data_count) { + bool recovered = false; for (auto iter = fec_controllor.fec_rcv_cache.begin(), next_iter = iter; iter != fec_controllor.fec_rcv_cache.end(); iter = next_iter) { ++next_iter; auto sn = iter->first; - if (fec_sn == sn) - continue; - auto &mapped_data = iter->second; if (mapped_data.size() < max_fec_data_count) { if (fec_sn - sn > gbv_fec_waits) + { fec_controllor.fec_rcv_cache.erase(iter); + if (auto rcv_sn_iter = fec_controllor.fec_rcv_restored.find(sn); + rcv_sn_iter != fec_controllor.fec_rcv_restored.end()) + fec_controllor.fec_rcv_restored.erase(rcv_sn_iter); + } + continue; + } + if (auto rcv_sn_iter = fec_controllor.fec_rcv_restored.find(sn); rcv_sn_iter != fec_controllor.fec_rcv_restored.end()) + { + if (fec_sn - sn > gbv_fec_waits) + { + fec_controllor.fec_rcv_cache.erase(iter); + fec_controllor.fec_rcv_restored.erase(rcv_sn_iter); + } continue; } auto [recv_data, fec_align_length] = compact_into_container(mapped_data, max_fec_data_count); @@ -932,8 +953,10 @@ void server_mode::fec_find_missings(KCP::KCP *kcp_ptr, fec_control_data &fec_con kcp_ptr->Input((const char *)missed_data_ptr, (long)missed_data_size); } - fec_controllor.fec_rcv_cache.erase(iter); + fec_controllor.fec_rcv_restored.insert(sn); + recovered = true; } + return recovered; } void server_mode::process_tcp_disconnect(tcp_session *session, std::weak_ptr kcp_ptr_weak, bool inform_peer) diff --git a/src/networks/server.hpp b/src/networks/server.hpp index 24688d1..e01a19e 100644 --- a/src/networks/server.hpp +++ b/src/networks/server.hpp @@ -66,7 +66,7 @@ class server_mode int kcp_sender(const char *buf, int len, void *user); void data_sender(kcp_mappings *kcp_mappings_ptr, std::unique_ptr new_buffer, size_t buffer_size); void fec_maker(kcp_mappings *kcp_mappings_ptr, const uint8_t *input_data, int data_size); - void fec_find_missings(KCP::KCP *kcp_ptr, fec_control_data &fec_controllor, uint32_t fec_sn, uint8_t max_fec_data_count); + bool fec_find_missings(KCP::KCP *kcp_ptr, fec_control_data &fec_controllor, uint32_t fec_sn, uint8_t max_fec_data_count); void process_tcp_disconnect(tcp_session *session, std::weak_ptr kcp_ptr_weak, bool inform_peer = true); void process_tcp_disconnect(tcp_session *session, std::weak_ptr kcp_ptr_weak, std::weak_ptr mux_records_weak);