diff --git a/src/main.cpp b/src/main.cpp index 1b3bea9..045a8e2 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -16,7 +16,7 @@ int main(int argc, char *argv[]) { #ifdef __cpp_lib_format - std::cout << std::format("{} version 20241021\n", app_name); + std::cout << std::format("{} version 20241026\n", app_name); if (argc <= 1) { std::cout << std::format("Usage: {} config1.conf\n", app_name); @@ -24,7 +24,7 @@ int main(int argc, char *argv[]) return 0; } #else - std::cout << app_name << " version 20241021\n"; + std::cout << app_name << " version 20241026\n"; if (argc <= 1) { std::cout << "Usage: " << app_name << " config1.conf\n"; diff --git a/src/networks/client.cpp b/src/networks/client.cpp index 23e96fa..72a41ad 100644 --- a/src/networks/client.cpp +++ b/src/networks/client.cpp @@ -498,7 +498,12 @@ void client_mode::cleanup_expiring_forwarders() int64_t time_elapsed = calculate_difference(time_right_now, expire_time); - if (time_elapsed > CLEANUP_WAITS / 2 && + if (time_elapsed > CLEANUP_WAITS / 3 && + time_elapsed <= CLEANUP_WAITS / 3 * 2 && + forwarder_ptr != nullptr) + forwarder_ptr->pause(true); + + if (time_elapsed > CLEANUP_WAITS / 3 * 2 && forwarder_ptr != nullptr) forwarder_ptr->stop(); @@ -556,7 +561,7 @@ void client_mode::loop_timeout_sessions() std::shared_ptr egress_forwarder = std::atomic_load(&(udp_session_ptr->egress_forwarder)); old_forwarders.push_back(egress_forwarder); - egress_forwarder->stop(); + egress_forwarder->pause(true); #if __GNUC__ == 12 && __GNUC_MINOR__ < 3 udp_session_ptr->egress_forwarder.store(nullptr); #else @@ -691,84 +696,69 @@ void client_mode::test_before_change(std::shared_ptr udp_mappings_ uint16_t destination_port_end = current_settings.destination_port_end; asio::error_code ec; - if (std::shared_ptr egress_hopping_forwarder = std::atomic_load(&(udp_mappings_ptr->egress_hopping_forwarder)); - egress_hopping_forwarder == nullptr || destination_port_start == destination_port_end) + std::shared_ptr udp_forwarder = nullptr; + try { - std::shared_ptr udp_forwarder = nullptr; - try - { - auto bind_push_func = std::bind(&ttp::task_group_pool::push_task_peer, &sequence_task_pool, _1, _2, _3); - auto bind_check_limit_func = [this](size_t number) -> bool {return sequence_task_pool.get_peer_network_task_count(number) > TASK_COUNT_LIMIT; }; - auto udp_func = std::bind(&client_mode::udp_forwarder_incoming_to_udp, this, _1, _2, _3, _4, _5); - udp_forwarder = std::make_shared(io_context, bind_push_func, bind_check_limit_func, udp_mappings_ptr, udp_func, current_settings.ip_version_only); - if (udp_forwarder == nullptr) - { - udp_mappings_ptr->hopping_timestamp.store(time_right_now + current_settings.dynamic_port_refresh); - return; - } - } - catch (std::exception &ex) - { - std::string error_message = time_to_string_with_square_brackets() + "Cannot switch to new port, error: " + ex.what() + "\n"; - std::cerr << error_message; - print_message_to_file(error_message, current_settings.log_messages); - udp_mappings_ptr->hopping_timestamp.store(time_right_now + current_settings.dynamic_port_refresh); - return; - } - - std::shared_ptr egress_target_endpoint = std::atomic_load(&(udp_mappings_ptr->egress_target_endpoint)); - if (destination_port_start == destination_port_end) - { - std::atomic_store(&(udp_mappings_ptr->hopping_endpoint), std::make_shared(*egress_target_endpoint)); - } - else - { - uint16_t current_port_number = egress_target_endpoint->port(); - uint16_t new_port_numer = generate_new_port_number(destination_port_start, destination_port_end); - for (size_t retry_times = 0; new_port_numer == current_port_number && retry_times < RETRY_TIMES; retry_times++) - { - new_port_numer = generate_new_port_number(destination_port_start, destination_port_end); - } - std::shared_ptr target = std::atomic_load(&(target_address)); - std::atomic_store(&(udp_mappings_ptr->hopping_endpoint), std::make_shared(*target, new_port_numer)); - } - - std::shared_ptr new_forwarder = udp_forwarder; - std::vector keep_alive_packet = create_empty_data(current_settings.encryption_password, current_settings.encryption, EMPTY_PACKET_SIZE); - udp_mappings_ptr->wrapper_ptr->write_iden(keep_alive_packet.data()); - - if (current_settings.ip_version_only == ip_only_options::ipv4) - new_forwarder->send_out(std::move(keep_alive_packet), local_empty_target_v4, ec); - else - new_forwarder->send_out(std::move(keep_alive_packet), local_empty_target_v6, ec); - - if (ec) + auto bind_push_func = std::bind(&ttp::task_group_pool::push_task_peer, &sequence_task_pool, _1, _2, _3); + auto bind_check_limit_func = [this](size_t number) -> bool {return sequence_task_pool.get_peer_network_task_count(number) > TASK_COUNT_LIMIT; }; + auto udp_func = std::bind(&client_mode::udp_forwarder_incoming_to_udp, this, _1, _2, _3, _4, _5); + udp_forwarder = std::make_shared(io_context, bind_push_func, bind_check_limit_func, udp_mappings_ptr, udp_func, current_settings.ip_version_only); + if (udp_forwarder == nullptr) { udp_mappings_ptr->hopping_timestamp.store(time_right_now + current_settings.dynamic_port_refresh); return; } + } + catch (std::exception &ex) + { + std::string error_message = time_to_string_with_square_brackets() + "Cannot switch to new port, error: " + ex.what() + "\n"; + std::cerr << error_message; + print_message_to_file(error_message, current_settings.log_messages); + udp_mappings_ptr->hopping_timestamp.store(time_right_now + current_settings.dynamic_port_refresh); + return; + } - new_forwarder->async_receive(); - if (egress_hopping_forwarder != nullptr) - { - egress_hopping_forwarder->pause(true); - std::scoped_lock lock_expiring_forwarders{ mutex_expiring_forwarders }; - expiring_forwarders[egress_hopping_forwarder] = right_now(); - } - std::atomic_store(&(udp_mappings_ptr->egress_hopping_forwarder), new_forwarder); + std::shared_ptr egress_target_endpoint = std::atomic_load(&(udp_mappings_ptr->egress_target_endpoint)); + if (destination_port_start == destination_port_end) + { + std::atomic_store(&(udp_mappings_ptr->hopping_endpoint), std::make_shared(*egress_target_endpoint)); } else { - std::shared_ptr hopping_endpoint = std::atomic_load(&(udp_mappings_ptr->hopping_endpoint)); - uint16_t current_port_number = hopping_endpoint->port(); + uint16_t current_port_number = egress_target_endpoint->port(); uint16_t new_port_numer = generate_new_port_number(destination_port_start, destination_port_end); for (size_t retry_times = 0; new_port_numer == current_port_number && retry_times < RETRY_TIMES; retry_times++) { new_port_numer = generate_new_port_number(destination_port_start, destination_port_end); } - hopping_endpoint->port(new_port_numer); - std::atomic_store(&(udp_mappings_ptr->hopping_endpoint), hopping_endpoint); + std::shared_ptr target = std::atomic_load(&(target_address)); + std::atomic_store(&(udp_mappings_ptr->hopping_endpoint), std::make_shared(*target, new_port_numer)); + } + + std::shared_ptr new_forwarder = udp_forwarder; + std::vector keep_alive_packet = create_empty_data(current_settings.encryption_password, current_settings.encryption, EMPTY_PACKET_SIZE); + udp_mappings_ptr->wrapper_ptr->write_iden(keep_alive_packet.data()); + + if (current_settings.ip_version_only == ip_only_options::ipv4) + new_forwarder->send_out(std::move(keep_alive_packet), local_empty_target_v4, ec); + else + new_forwarder->send_out(std::move(keep_alive_packet), local_empty_target_v6, ec); + + if (ec) + { + udp_mappings_ptr->hopping_timestamp.store(time_right_now + current_settings.dynamic_port_refresh); + return; + } + + new_forwarder->async_receive(); + if (std::shared_ptr egress_hopping_forwarder = std::atomic_load(&(udp_mappings_ptr->egress_hopping_forwarder)); + egress_hopping_forwarder != nullptr) + { + egress_hopping_forwarder->pause(true); + std::scoped_lock lock_expiring_forwarders{ mutex_expiring_forwarders }; + expiring_forwarders[egress_hopping_forwarder] = right_now(); } + std::atomic_store(&(udp_mappings_ptr->egress_hopping_forwarder), new_forwarder); udp_mappings_ptr->hopping_available.store(hop_status::testing); udp_mappings *udp_session_ptr = udp_mappings_ptr.get(); @@ -788,8 +778,9 @@ void client_mode::switch_new_port(std::shared_ptr udp_mappings_ptr udp_mappings_ptr->hopping_timestamp.store(right_now() + current_settings.dynamic_port_refresh); udp_mappings_ptr->hopping_available.store(hop_status::pending); + std::shared_ptr hopping_endpoint = std::atomic_load(&(udp_mappings_ptr->hopping_endpoint)); std::atomic_store(&(udp_mappings_ptr->egress_previous_target_endpoint), std::atomic_load(&(udp_mappings_ptr->egress_target_endpoint))); - std::atomic_store(&(udp_mappings_ptr->egress_target_endpoint), std::make_shared(*std::atomic_load(&(udp_mappings_ptr->hopping_endpoint)))); + std::atomic_store(&(udp_mappings_ptr->egress_target_endpoint), std::make_shared(*hopping_endpoint)); std::shared_ptr new_forwarder = std::atomic_load(&(udp_mappings_ptr->egress_hopping_forwarder)); std::shared_ptr old_forwarder = std::atomic_load(&(udp_mappings_ptr->egress_forwarder)); diff --git a/src/networks/relay.cpp b/src/networks/relay.cpp index aa3af22..2b7d0c6 100644 --- a/src/networks/relay.cpp +++ b/src/networks/relay.cpp @@ -747,7 +747,12 @@ void relay_mode::cleanup_expiring_data_connections() int64_t time_elapsed = calculate_difference(time_right_now, expire_time); std::shared_ptr egress_forwarder = std::atomic_load(&udp_session_ptr->egress_forwarder); - if (time_elapsed > CLEANUP_WAITS / 2 && + if (time_elapsed > CLEANUP_WAITS / 3 && + time_elapsed <= CLEANUP_WAITS / 3 * 2 && + egress_forwarder != nullptr) + egress_forwarder->pause(true); + + if (time_elapsed > CLEANUP_WAITS / 3 * 2 && egress_forwarder != nullptr) egress_forwarder->stop(); @@ -984,84 +989,69 @@ void relay_mode::test_before_change(std::shared_ptr udp_mappings_p uint16_t destination_port_end = current_settings.egress->destination_port_end; asio::error_code ec; - if (std::shared_ptr egress_hopping_forwarder = std::atomic_load(&(udp_mappings_ptr->egress_hopping_forwarder)); - egress_hopping_forwarder == nullptr || destination_port_start == destination_port_end) + std::shared_ptr udp_forwarder = nullptr; + try { - std::shared_ptr udp_forwarder = nullptr; - try - { - auto bind_push_func = std::bind(&ttp::task_group_pool::push_task_peer, &sequence_task_pool, _1, _2, _3); - auto bind_check_limit_func = [this](size_t number) -> bool {return sequence_task_pool.get_peer_network_task_count(number) > TASK_COUNT_LIMIT; }; - auto udp_func = std::bind(&relay_mode::udp_forwarder_incoming_to_udp, this, _1, _2, _3, _4, _5); - udp_forwarder = std::make_shared(io_context, bind_push_func, bind_check_limit_func, udp_mappings_ptr, udp_func, current_settings.egress->ip_version_only); - if (udp_forwarder == nullptr) - { - udp_mappings_ptr->hopping_timestamp.store(time_right_now + current_settings.dynamic_port_refresh); - return; - } - } - catch (std::exception &ex) - { - std::string error_message = time_to_string_with_square_brackets() + "Cannot switch to new port, error: " + ex.what() + "\n"; - std::cerr << error_message; - print_message_to_file(error_message, current_settings.log_messages); - udp_mappings_ptr->hopping_timestamp.store(time_right_now + current_settings.dynamic_port_refresh); - return; - } - - std::shared_ptr egress_target_endpoint = std::atomic_load(&(udp_mappings_ptr->egress_target_endpoint)); - if (destination_port_start == destination_port_end) - { - std::atomic_store(&(udp_mappings_ptr->hopping_endpoint), std::make_shared(*egress_target_endpoint)); - } - else - { - uint16_t current_port_number = egress_target_endpoint->port(); - uint16_t new_port_numer = generate_new_port_number(destination_port_start, destination_port_end); - for (size_t retry_times = 0; new_port_numer == current_port_number && retry_times < RETRY_TIMES; retry_times++) - { - new_port_numer = generate_new_port_number(destination_port_start, destination_port_end); - } - std::shared_ptr target = std::atomic_load(&(target_address)); - std::atomic_store(&(udp_mappings_ptr->hopping_endpoint), std::make_shared(*target, new_port_numer)); - } - - std::shared_ptr new_forwarder = udp_forwarder; - std::vector keep_alive_packet = create_empty_data(current_settings.egress->encryption_password, current_settings.egress->encryption, EMPTY_PACKET_SIZE); - udp_mappings_ptr->wrapper_ptr->write_iden(keep_alive_packet.data()); - - if (current_settings.egress->ip_version_only == ip_only_options::ipv4) - new_forwarder->send_out(std::move(keep_alive_packet), local_empty_target_v4, ec); - else - new_forwarder->send_out(std::move(keep_alive_packet), local_empty_target_v6, ec); - - if (ec) + auto bind_push_func = std::bind(&ttp::task_group_pool::push_task_peer, &sequence_task_pool, _1, _2, _3); + auto bind_check_limit_func = [this](size_t number) -> bool {return sequence_task_pool.get_peer_network_task_count(number) > TASK_COUNT_LIMIT; }; + auto udp_func = std::bind(&relay_mode::udp_forwarder_incoming_to_udp, this, _1, _2, _3, _4, _5); + udp_forwarder = std::make_shared(io_context, bind_push_func, bind_check_limit_func, udp_mappings_ptr, udp_func, current_settings.egress->ip_version_only); + if (udp_forwarder == nullptr) { udp_mappings_ptr->hopping_timestamp.store(time_right_now + current_settings.dynamic_port_refresh); return; } + } + catch (std::exception &ex) + { + std::string error_message = time_to_string_with_square_brackets() + "Cannot switch to new port, error: " + ex.what() + "\n"; + std::cerr << error_message; + print_message_to_file(error_message, current_settings.log_messages); + udp_mappings_ptr->hopping_timestamp.store(time_right_now + current_settings.dynamic_port_refresh); + return; + } - new_forwarder->async_receive(); - if (egress_hopping_forwarder != nullptr) - { - egress_hopping_forwarder->pause(true); - std::scoped_lock lock_expiring_forwarders{ mutex_expiring_forwarders }; - expiring_forwarders[egress_hopping_forwarder] = right_now(); - } - std::atomic_store(&(udp_mappings_ptr->egress_hopping_forwarder), new_forwarder); + std::shared_ptr egress_target_endpoint = std::atomic_load(&(udp_mappings_ptr->egress_target_endpoint)); + if (destination_port_start == destination_port_end) + { + std::atomic_store(&(udp_mappings_ptr->hopping_endpoint), std::make_shared(*egress_target_endpoint)); } else { - std::shared_ptr hopping_endpoint = std::atomic_load(&(udp_mappings_ptr->hopping_endpoint)); - uint16_t current_port_number = hopping_endpoint->port(); + uint16_t current_port_number = egress_target_endpoint->port(); uint16_t new_port_numer = generate_new_port_number(destination_port_start, destination_port_end); for (size_t retry_times = 0; new_port_numer == current_port_number && retry_times < RETRY_TIMES; retry_times++) { new_port_numer = generate_new_port_number(destination_port_start, destination_port_end); } - hopping_endpoint->port(new_port_numer); - std::atomic_store(&(udp_mappings_ptr->hopping_endpoint), hopping_endpoint); + std::shared_ptr target = std::atomic_load(&(target_address)); + std::atomic_store(&(udp_mappings_ptr->hopping_endpoint), std::make_shared(*target, new_port_numer)); + } + + std::shared_ptr new_forwarder = udp_forwarder; + std::vector keep_alive_packet = create_empty_data(current_settings.egress->encryption_password, current_settings.egress->encryption, EMPTY_PACKET_SIZE); + udp_mappings_ptr->wrapper_ptr->write_iden(keep_alive_packet.data()); + + if (current_settings.egress->ip_version_only == ip_only_options::ipv4) + new_forwarder->send_out(std::move(keep_alive_packet), local_empty_target_v4, ec); + else + new_forwarder->send_out(std::move(keep_alive_packet), local_empty_target_v6, ec); + + if (ec) + { + udp_mappings_ptr->hopping_timestamp.store(time_right_now + current_settings.dynamic_port_refresh); + return; + } + + new_forwarder->async_receive(); + if (std::shared_ptr egress_hopping_forwarder = std::atomic_load(&(udp_mappings_ptr->egress_hopping_forwarder)); + egress_hopping_forwarder != nullptr) + { + egress_hopping_forwarder->pause(true); + std::scoped_lock lock_expiring_forwarders{ mutex_expiring_forwarders }; + expiring_forwarders[egress_hopping_forwarder] = right_now(); } + std::atomic_store(&(udp_mappings_ptr->egress_hopping_forwarder), new_forwarder); udp_mappings_ptr->hopping_available.store(hop_status::testing); @@ -1082,8 +1072,9 @@ void relay_mode::switch_new_port(std::shared_ptr udp_mappings_ptr) udp_mappings_ptr->hopping_timestamp.store(right_now() + current_settings.dynamic_port_refresh); udp_mappings_ptr->hopping_available.store(hop_status::pending); + std::shared_ptr hopping_endpoint = std::atomic_load(&(udp_mappings_ptr->hopping_endpoint)); std::atomic_store(&(udp_mappings_ptr->egress_previous_target_endpoint), std::atomic_load(&(udp_mappings_ptr->egress_target_endpoint))); - std::atomic_store(&(udp_mappings_ptr->egress_target_endpoint), std::make_shared(*std::atomic_load(&(udp_mappings_ptr->hopping_endpoint)))); + std::atomic_store(&(udp_mappings_ptr->egress_target_endpoint), std::make_shared(*hopping_endpoint)); std::shared_ptr new_forwarder = std::atomic_load(&(udp_mappings_ptr->egress_hopping_forwarder)); std::shared_ptr old_forwarder = std::atomic_load(&(udp_mappings_ptr->egress_forwarder));