Skip to content

Commit

Permalink
FEC: faster recovery; mux_tunnels: avoid choosing congested channels
Browse files Browse the repository at this point in the history
  • Loading branch information
cnbatch committed Dec 2, 2023
1 parent e3f2ece commit 0b3289b
Show file tree
Hide file tree
Showing 17 changed files with 273 additions and 147 deletions.
2 changes: 2 additions & 0 deletions docs/fec_en.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ The FEC settings of the sender and receiver **must** be exactly the same, otherw

- The amount of original data (D value) should not be too small. Setting the value too low is almost equivalent to having no effect. This value should be greater than 15.
- Of course, greater is not better, because it will take a very long time to generate redundant data at low traffic.
- **Game data transmission is a special case**, and the lower the value of this option, the better, preferably **set to 1**. Game traffic itself is not high, and if the packet sending interval of the game program is longer than the link latency, FEC will have no effect. As it is sensitive to latency, this value can be set to 1.
- For gaming applications, it is preferable to set up a dedicated channel (such as a dedicated VPN tunnel) separately and avoid mixing with other applications.

- The higher amount of redundant data (R value) is not always better. Excessive amount of redundant data will cause unnecessary waste.

Expand Down
2 changes: 2 additions & 0 deletions docs/fec_zh-hans.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

- 原始数据量(D 值)不应该太少,数值过少等于几乎没有效果。该值最好大于 15。
- 当然了,并不是越大越好,因为在低流量时会导致很长时间才会生成冗余数据量。
- **游戏数据传输是特例**,该值越低越好,最好**设为 1**。游戏流量本身并不高,如果游戏程序的发包间隔大于线路延迟,那么 FEC 就不会有任何效果。但因为对延时敏感,所以可以设为 1。
- 对于游戏应用,请尽量设置单独的专用通道(例如专门的 VPN 通道),不要与其它应用混合使用。

- 冗余数据量(R 值)并非越多越好,过多的冗余数据量会造成不必要的浪费。

Expand Down
6 changes: 6 additions & 0 deletions docs/specific_examples_en.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ Here, it is assumed that the OpenVPN client and the KCPTube client are both runn

And then, OpenVPN client just need to connect to port# 1194 of KCP Client.

### If the VPN is a dedicated channel for gaming

For a dedicated channel for game data transmission, please add `blast=1` and change fast6 to a more responsive mode, like fast1 ~ fast4.

Then consider using [forward error correction (FEC)](fec_en.md) based on the packet loss rate.

### Additional matters that require attention

Due to the fact that the OpenVPN client is currently connected to the local IP address, it is necessary to selectively allow the IP address of the actual server in the routing table.
Expand Down
6 changes: 6 additions & 0 deletions docs/specific_examples_zh-hans.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ encryption_algorithm=AES-GCM

然后,OpenVPN 客户端程序只需要连接 KCPTube 客户端的 1194 端口即可。

### 如果 VPN 是游戏专用通道

对于专用于游戏数据传输的通道,请加上 `blast=1`,并把 fast6 改成更灵敏的模式,即 fast1 ~ fast4。

然后根据丢包率状况考虑是否使用[前向纠错,即 FEC](fec_zh-hans.md)

### 额外注意事项

由于 OpenVPN 客户端此时连接的是本机 IP 地址,因此需要在路由表中针对性地放行实际服务器的 IP 地址。
Expand Down
4 changes: 3 additions & 1 deletion src/3rd_party/ikcp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1119,7 +1119,7 @@ namespace KCP
// ikcp_check when to call it again (without ikcp_input/_send calling).
// 'current' - current timestamp in millisec.
//---------------------------------------------------------------------
void kcp_core::update(uint32_t current)
int kcp_core::update(uint32_t current)
{
int32_t slap;

Expand Down Expand Up @@ -1147,6 +1147,8 @@ namespace KCP

flush();
}

return slap;
}


Expand Down
2 changes: 1 addition & 1 deletion src/3rd_party/ikcp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ namespace KCP
// update state (call it repeatedly, every 10ms-100ms), or you can ask
// ikcp_check when to call it again (without ikcp_input/_send calling).
// 'current' - current timestamp in millisec.
void update(uint32_t current);
int update(uint32_t current);

// Determine when should you invoke ikcp_update:
// returns when you should invoke ikcp_update in millisec, if there
Expand Down
2 changes: 1 addition & 1 deletion src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
int main(int argc, char *argv[])
{
char app_name[] = "kcptube";
printf("%s version 20231126\n", app_name);
printf("%s version 20231202\n", app_name);

if (argc <= 1)
{
Expand Down
103 changes: 69 additions & 34 deletions src/networks/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -403,48 +403,54 @@ void client_mode::udp_forwarder_incoming_unpack(std::shared_ptr<KCP::KCP> kcp_pt
if (calculate_difference<int64_t>((uint32_t)timestamp, packet_timestamp) > gbv_time_gap_seconds)
return;

uint32_t conv = 0;
kcp_mappings *kcp_mappings_ptr = nullptr;
std::pair<std::unique_ptr<uint8_t[]>, size_t> original_data;
uint32_t fec_sn = 0;
uint8_t fec_sub_sn = 0;
if (current_settings.fec_data > 0 && current_settings.fec_redundant > 0)
{
auto [packet_header, kcp_data_ptr, kcp_data_size] = packet::unpack_fec(data.get(), plain_size);
if (packet_header.sub_sn >= current_settings.fec_data)
fec_sn = packet_header.sn;
fec_sub_sn = packet_header.sub_sn;
if (fec_sub_sn >= current_settings.fec_data)
{
auto [packet_header_redundant, redundant_data_ptr, redundant_data_size] = packet::unpack_fec_redundant(data.get(), plain_size);
kcp_ptr = verify_kcp_conv(kcp_ptr, packet_header_redundant.kcp_conv, peer);
kcp_mappings *kcp_mappings_ptr = (kcp_mappings *)kcp_ptr->GetUserData();
kcp_mappings_ptr = (kcp_mappings *)kcp_ptr->GetUserData();
original_data.first = std::make_unique<uint8_t[]>(redundant_data_size);
original_data.second = redundant_data_size;
std::copy_n(redundant_data_ptr, redundant_data_size, original_data.first.get());
kcp_mappings_ptr->fec_egress_control.fec_rcv_cache[packet_header_redundant.sn][packet_header_redundant.sub_sn] = std::move(original_data);
return;
if (!fec_find_missings(kcp_ptr.get(), kcp_mappings_ptr->fec_egress_control, fec_sn, current_settings.fec_data))
return;
data_ptr = nullptr;
packet_data_size = 0;
}
else
{
fec_sn = packet_header.sn;
fec_sub_sn = packet_header.sub_sn;
data_ptr = kcp_data_ptr;
packet_data_size = kcp_data_size;
original_data.first = std::make_unique<uint8_t[]>(kcp_data_size);
original_data.second = kcp_data_size;
std::copy_n(kcp_data_ptr, kcp_data_size, original_data.first.get());

conv = KCP::KCP::GetConv(data_ptr);
kcp_ptr = verify_kcp_conv(kcp_ptr, conv, peer);
kcp_mappings_ptr = (kcp_mappings *)kcp_ptr->GetUserData();
kcp_mappings_ptr->fec_egress_control.fec_rcv_cache[fec_sn][fec_sub_sn] = std::move(original_data);
fec_find_missings(kcp_ptr.get(), kcp_mappings_ptr->fec_egress_control, fec_sn, current_settings.fec_data);
}
}

uint32_t conv = KCP::KCP::GetConv(data_ptr);
kcp_ptr = verify_kcp_conv(kcp_ptr, conv, peer);

kcp_mappings *kcp_mappings_ptr = (kcp_mappings *)kcp_ptr->GetUserData();

if (current_settings.fec_data > 0 && current_settings.fec_redundant > 0)
else
{
kcp_mappings_ptr->fec_egress_control.fec_rcv_cache[fec_sn][fec_sub_sn] = std::move(original_data);
fec_find_missings(kcp_ptr.get(), kcp_mappings_ptr->fec_egress_control, fec_sn, current_settings.fec_data);
conv = KCP::KCP::GetConv(data_ptr);
kcp_ptr = verify_kcp_conv(kcp_ptr, conv, peer);
kcp_mappings_ptr = (kcp_mappings *)kcp_ptr->GetUserData();
}

if (kcp_ptr->Input((const char *)data_ptr, (long)packet_data_size) < 0)
return;
if (data_ptr != nullptr && packet_data_size != 0)
kcp_ptr->Input((const char *)data_ptr, (long)packet_data_size);

while (true)
{
Expand Down Expand Up @@ -698,24 +704,38 @@ std::shared_ptr<KCP::KCP> client_mode::pick_one_from_kcp_channels(protocol_type
}
else
{
size_t index = generate_random_number<size_t>(0, kcp_channels.size() - 1);
if (prtcl == protocol_type::tcp && index % 2 != 0)
std::map<int64_t, uint32_t> peaks_of_recv, peaks_of_sent; // peak value, kcp conv
std::map<uint32_t, int32_t> recv_peaks_by_index, sent_peaks_by_index; // kcp conv, peak-sorted index
for (auto iter = kcp_channels.begin(); iter != kcp_channels.end(); ++iter)
{
while (index % 2 != 0)
{
index = generate_random_number<size_t>(0, kcp_channels.size() - 1);
}
int64_t recv_peak_value = iter->second->egress_kcp->ReceivedDataAveragePeak();
int64_t sent_peak_value = iter->second->egress_kcp->SentDataAveragePeak();
peaks_of_recv[recv_peak_value] = iter->first;
peaks_of_sent[sent_peak_value] = iter->first;
}
if (prtcl == protocol_type::udp && index % 2 == 0)

int32_t index = 0;
for (auto [peak_value, conv] : peaks_of_recv)
recv_peaks_by_index[conv] = index++;

index = 0;
for (auto [peak_value, conv] : peaks_of_sent)
sent_peaks_by_index[conv] = index++;

recv_peaks_by_index.rbegin()->second = -1;
sent_peaks_by_index.rbegin()->second = -1;

std::multimap<int32_t, uint32_t> index_sum_of_conv; // index_sum, kcp conv
for (auto &[conv, mappings] : kcp_channels)
{
while (index % 2 == 0)
{
index = generate_random_number<size_t>(0, kcp_channels.size() - 1);
}
if (recv_peaks_by_index[conv] < 0 || sent_peaks_by_index[conv] < 0)
continue;
int32_t index_sum = recv_peaks_by_index[conv] + sent_peaks_by_index[conv];
index_sum_of_conv.insert({ index_sum, conv });
}
auto iter = kcp_channels.begin();
std::advance(iter, index);
kcp_ptr = iter->second->egress_kcp;

auto conv = index_sum_of_conv.begin()->second;
kcp_ptr = kcp_channels[conv]->egress_kcp;
}
locker_kcp_channels.unlock();

Expand Down Expand Up @@ -822,19 +842,32 @@ void client_mode::fec_maker(kcp_mappings *kcp_mappings_ptr, const uint8_t *input
}
}

void client_mode::fec_find_missings(KCP::KCP *kcp_ptr, fec_control_data &fec_controllor, uint32_t fec_sn, uint8_t max_fec_data_count)
bool client_mode::fec_find_missings(KCP::KCP *kcp_ptr, fec_control_data &fec_controllor, uint32_t fec_sn, uint8_t max_fec_data_count)
{
bool recovered = false;
for (auto iter = fec_controllor.fec_rcv_cache.begin(), next_iter = iter; iter != fec_controllor.fec_rcv_cache.end(); iter = next_iter)
{
++next_iter;
auto sn = iter->first;
if (fec_sn == sn)
continue;;
auto &mapped_data = iter->second;
if (mapped_data.size() < max_fec_data_count)
{
if (fec_sn - sn > gbv_fec_waits)
{
fec_controllor.fec_rcv_cache.erase(iter);
if (auto rcv_sn_iter = fec_controllor.fec_rcv_restored.find(sn);
rcv_sn_iter != fec_controllor.fec_rcv_restored.end())
fec_controllor.fec_rcv_restored.erase(rcv_sn_iter);
}
continue;
}
if (auto rcv_sn_iter = fec_controllor.fec_rcv_restored.find(sn); rcv_sn_iter != fec_controllor.fec_rcv_restored.end())
{
if (fec_sn - sn > gbv_fec_waits)
{
fec_controllor.fec_rcv_cache.erase(iter);
fec_controllor.fec_rcv_restored.erase(rcv_sn_iter);
}
continue;
}
auto [recv_data, fec_align_length] = compact_into_container(mapped_data, max_fec_data_count);
Expand All @@ -847,8 +880,10 @@ void client_mode::fec_find_missings(KCP::KCP *kcp_ptr, fec_control_data &fec_con
kcp_ptr->Input((const char *)missed_data_ptr, (long)missed_data_size);
}

fec_controllor.fec_rcv_cache.erase(iter);
fec_controllor.fec_rcv_restored.insert(sn);
recovered = true;
}
return recovered;
}

bool client_mode::get_udp_target(std::shared_ptr<forwarder> target_connector, udp::endpoint &udp_target)
Expand Down
2 changes: 1 addition & 1 deletion src/networks/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class client_mode
int kcp_sender(const char *buf, int len, void *user);
void data_sender(kcp_mappings *kcp_mappings_ptr, std::unique_ptr<uint8_t[]> new_buffer, size_t buffer_size);
void fec_maker(kcp_mappings *kcp_mappings_ptr, const uint8_t *input_data, int data_size);
void fec_find_missings(KCP::KCP *kcp_ptr, fec_control_data &fec_controllor, uint32_t fec_sn, uint8_t max_fec_data_count);
bool fec_find_missings(KCP::KCP *kcp_ptr, fec_control_data &fec_controllor, uint32_t fec_sn, uint8_t max_fec_data_count);

bool get_udp_target(std::shared_ptr<forwarder> target_connector, udp::endpoint &udp_target);
bool update_udp_target(std::shared_ptr<forwarder> target_connector, udp::endpoint &udp_target);
Expand Down
3 changes: 2 additions & 1 deletion src/networks/connections.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ constexpr uint32_t gbv_mux_min_cache_available = 16u;
constexpr uint32_t gbv_mux_min_cache_slice = 8u;
constexpr uint32_t gbv_tcp_slice = 2u;
constexpr uint32_t gbv_half_time = 2u;
constexpr uint32_t gbv_fec_waits = 3u;
constexpr uint16_t gbv_fec_waits = 3u;
constexpr size_t gbv_buffer_size = 2048u;
constexpr size_t gbv_buffer_expand_size = 128u;
constexpr size_t gbv_retry_times = 30u;
Expand Down Expand Up @@ -543,6 +543,7 @@ struct fec_control_data
std::atomic<uint32_t> fec_snd_sub_sn;
std::vector<std::pair<std::unique_ptr<uint8_t[]>, size_t>> fec_snd_cache;
std::map<uint32_t, std::map<uint16_t, std::pair<std::unique_ptr<uint8_t[]>, size_t>>> fec_rcv_cache; // uint32_t = snd_sn, uint16_t = sub_sn
std::unordered_set<uint32_t> fec_rcv_restored;
fecpp::fec_code fecc;
};

Expand Down
45 changes: 33 additions & 12 deletions src/networks/kcp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@ namespace KCP
{
kcp_ptr = std::make_unique<kcp_core>();
kcp_ptr->initialise(conv, this);
last_input_time.store(right_now());
last_input_time = right_now();
post_update = empty_function;
}

void KCP::MoveKCP(KCP &other) noexcept
{
kcp_ptr = std::move(other.kcp_ptr);
last_input_time.store(other.last_input_time.load());
last_input_time = other.last_input_time;
post_update = other.post_update;
}

Expand Down Expand Up @@ -92,7 +92,12 @@ namespace KCP

void KCP::SetOutput(std::function<int(const char *, int, void *)> output_func)
{
kcp_ptr->set_output(output_func);
//output = output_func;
kcp_ptr->set_output([this, output_func](const char *buf, int len, void *user) -> int
{
sent_data_average_peak = (7 * sent_data_average_peak + len) / 8;
return output_func(buf, len, user);
});
}

void KCP::SetPostUpdate(std::function<void(void *)> post_update_func)
Expand Down Expand Up @@ -121,17 +126,19 @@ namespace KCP
void KCP::Update(uint32_t current)
{
std::unique_lock locker{ mtx };
kcp_ptr->update(current);
int ret = kcp_ptr->update(current);
locker.unlock();
post_update(kcp_ptr->user);
if (ret >= 0)
post_update(kcp_ptr->user);
}

void KCP::Update()
{
std::unique_lock locker{ mtx };
kcp_ptr->update(TimeNowForKCP());
int ret = kcp_ptr->update(TimeNowForKCP());
locker.unlock();
post_update(kcp_ptr->user);
if (ret >= 0)
post_update(kcp_ptr->user);
}

uint32_t KCP::Check(uint32_t current)
Expand All @@ -150,7 +157,9 @@ namespace KCP
{
std::unique_lock unique_locker{ mtx };
kcp_ptr->flush(TimeNowForKCP());
return kcp_ptr->check(TimeNowForKCP());
uint32_t ret = kcp_ptr->check(TimeNowForKCP());
unique_locker.unlock();
return ret;
}

// when you received a low level packet (eg. UDP packet), call it
Expand All @@ -159,7 +168,9 @@ namespace KCP
std::unique_lock locker{ mtx };
auto ret = kcp_ptr->input(data, size);
locker.unlock();
last_input_time.store(right_now());
last_input_time = right_now();
if (ret > 0)
received_data_average_peak = (7 * received_data_average_peak + size) / 8; // same as rx_srtt calculation in update_ack()
return ret;
}

Expand Down Expand Up @@ -264,12 +275,22 @@ namespace KCP
inbound_bandwidth = in_bw;
}

int64_t KCP::LastInputTime()
int64_t KCP::LastInputTime() const
{
return last_input_time;
}

int64_t KCP::ReceivedDataAveragePeak() const
{
return received_data_average_peak;
}

int64_t KCP::SentDataAveragePeak() const
{
return last_input_time.load();
return sent_data_average_peak;
}

void* KCP::GetUserData()
void* KCP::GetUserData() const
{
return kcp_ptr->user;
}
Expand Down
Loading

0 comments on commit 0b3289b

Please sign in to comment.