diff --git a/ipv8/messaging/anonymization/payload.py b/ipv8/messaging/anonymization/payload.py index 13a9c58e0..dd4bd2d75 100644 --- a/ipv8/messaging/anonymization/payload.py +++ b/ipv8/messaging/anonymization/payload.py @@ -96,7 +96,7 @@ class ExtendPayload(CellablePayload): msg_id = 4 names = ['circuit_id', 'identifier', 'node_public_key', 'key', 'node_addr'] - format_list = ['I', 'H', 'varlenH', 'varlenH', 'ipv4'] + format_list = ['I', 'H', 'varlenH', 'varlenH', 'ip_address'] circuit_id: int identifier: int @@ -217,7 +217,7 @@ class RendezvousEstablishedPayload(CellablePayload): msg_id = 12 names = ['circuit_id', 'identifier', 'rendezvous_point_addr'] - format_list = ['I', 'H', 'ipv4'] + format_list = ['I', 'H', 'ip_address'] circuit_id: int identifier: int @@ -307,7 +307,7 @@ class IntroductionInfo(VariablePayload): """ names = ['address', 'key', 'seeder_pk', 'source'] - format_list = ['ipv4', 'varlenH', 'varlenH', 'B'] + format_list = ['ip_address', 'varlenH', 'varlenH', 'B'] address: Address key: bytes @@ -338,7 +338,7 @@ class RendezvousInfo(VariablePayload): """ names = ['address', 'key', 'cookie'] - format_list = ['ipv4', 'varlenH', '20s'] + format_list = ['ip_address', 'varlenH', '20s'] address: Address key: bytes diff --git a/ipv8/messaging/anonymization/tunnel.py b/ipv8/messaging/anonymization/tunnel.py index 93416cf60..d1748e35a 100644 --- a/ipv8/messaging/anonymization/tunnel.py +++ b/ipv8/messaging/anonymization/tunnel.py @@ -5,7 +5,15 @@ import socket import sys import time -from asyncio import CancelledError, DatagramProtocol, DatagramTransport, Future, ensure_future, gather, get_running_loop +from asyncio import ( + CancelledError, + DatagramProtocol, + DatagramTransport, + Future, + ensure_future, + gather, + get_running_loop, +) from binascii import hexlify from collections import deque from struct import unpack_from @@ -15,7 +23,7 @@ from ...keyvault.public.libnaclkey import LibNaCLPK from ...peer import Peer from ...taskmanager import TaskManager -from ...util import succeed +from ..interfaces.udp.endpoint import DomainAddress, UDPv4Address, UDPv6Address if TYPE_CHECKING: from ...keyvault.private.libnaclkey import LibNaCLSK @@ -173,7 +181,38 @@ def beat_heart(self) -> None: self.last_activity = time.time() -class TunnelExitSocket(Tunnel[Peer], DatagramProtocol, TaskManager): +class TunnelProtocol(DatagramProtocol): + """ + Protocol used by TunnelExitSocket. + """ + + def __init__(self, received_cb: Callable, local_addr: Address) -> None: + """ + Create a new TunnelProtocol. + """ + self.received_cb = received_cb + self.local_addr = local_addr + self.logger = logging.getLogger(self.__class__.__name__) + + async def open(self) -> DatagramTransport: # noqa: A003 + """ + Opens a datagram endpoint and returns the Transport. + """ + transport, _ = await get_running_loop().create_datagram_endpoint(lambda: self, + local_addr=self.local_addr) + + listen_addr = transport.get_extra_info('socket').getsockname()[:2] + self.logger.info('Listening on %s:%s', *listen_addr) + return cast(DatagramTransport, transport) + + def datagram_received(self, data: bytes, addr: Address) -> None: + """ + Callback for when data is received by the socket. + """ + self.received_cb(data, addr) + + +class TunnelExitSocket(Tunnel[Peer], TaskManager): """ Socket for exit nodes that communicates with the outside world. """ @@ -185,7 +224,8 @@ def __init__(self, circuit_id: int, peer: Peer, overlay: TunnelCommunity) -> Non Tunnel.__init__(self, circuit_id, peer) TaskManager.__init__(self) self.overlay = overlay - self.transport: DatagramTransport | None = None + self.transport_ipv4: DatagramTransport | None = None + self.transport_ipv6: DatagramTransport | None = None self.queue: deque[tuple[bytes, Address]] = deque(maxlen=10) self.enabled = False @@ -193,65 +233,87 @@ def enable(self) -> None: """ Allow data to be sent. - This creates the datagram endpoint that allows us to send messages. + This creates the datagram endpoints that allows us to send messages. """ if not self.enabled: self.enabled = True - async def create_transport() -> None: - self.transport, _ = await get_running_loop().create_datagram_endpoint(lambda: self, - local_addr=('0.0.0.0', 0)) - # Send any packets that have been waiting while the transport was being created + async def create_transports() -> None: + self.transport_ipv4 = await TunnelProtocol(self.datagram_received_ipv4, ('0.0.0.0', 0)).open() + self.transport_ipv6 = await TunnelProtocol(self.datagram_received_ipv6, ('::', 0)).open() + + # Send any packets that have been waiting while the transports were being created while self.queue: self.sendto(*self.queue.popleft()) - self.register_task("create_transport", create_transport) + + self.register_task("create_transports", create_transports) def sendto(self, data: bytes, destination: Address) -> None: """ Send o message over our datagram transporter. """ - if not self.transport: - self.queue.append((data, destination)) + if not self.is_allowed(data): return - transport = cast(DatagramTransport, self.transport) - self.beat_heart() - if self.is_allowed(data): - def on_ip_address(future: Future[str]) -> None: + # Since this call comes from the TunnelCommunity, we assume the destination + # address is either UDPv4Address/UDPv6Address/DomainAddress + if isinstance(destination, DomainAddress): + def on_address(future: Future[Address]) -> None: try: ip_address = future.result() except (CancelledError, Exception) as e: self.logger.exception("Can't resolve ip address for %s. Failure: %s", destination[0], e) return + self.sendto(data, ip_address) - self.logger.debug("Resolved hostname %s to ip_address %s", destination[0], ip_address) - try: - transport.sendto(data, (ip_address, destination[1])) - self.bytes_up += len(data) - except OSError as e: - self.logger.exception("Failed to write to transport. Destination: %r error: %r", destination, e) + task = ensure_future(self.resolve(destination)) + # If this fails, the TaskManager logs the packet. + self.register_anonymous_task("resolving_%r" % destination[0], task, + ignore=(OSError, ValueError)).add_done_callback(on_address) + return - try: - socket.inet_aton(destination[0]) - on_ip_address(succeed(destination[0])) - except (OSError, ValueError): - task = ensure_future(self.resolve(destination[0])) - # If this also fails, the TaskManager logs the packet. - # The host probably really does not exist. - self.register_anonymous_task("resolving_%r" % destination[0], task, - ignore=(OSError, ValueError)).add_done_callback(on_ip_address) + transport = self.transport_ipv6 if isinstance(destination, UDPv6Address) else self.transport_ipv4 + + if not transport: + self.queue.append((data, destination)) + return + + transport.sendto(data, destination) + self.bytes_up += len(data) + self.beat_heart() - async def resolve(self, host: str) -> str: + async def resolve(self, address: Address) -> Address: """ Using asyncio's getaddrinfo since the aiodns resolver seems to have issues. Returns [(family, type, proto, canonname, sockaddr)]. """ - infos = await get_running_loop().getaddrinfo(host, 0, family=socket.AF_INET) - return infos[0][-1][0] + info_list = await get_running_loop().getaddrinfo(address[0], 0) + # For the time being we prefer dealing with IPv4 addresses. + info_list.sort(key=lambda x: x[0]) + ip = info_list[0][-1][0] + family = info_list[0][0] + if family == socket.AF_INET6: + return UDPv6Address(ip, address[1]) + return UDPv4Address(ip, address[1]) + + def datagram_received_ipv4(self, data: bytes, source: Address) -> None: + """ + Callback for when data is received by the IPv4 socket. + """ + self.datagram_received(data, UDPv4Address(*source)) + + def datagram_received_ipv6(self, data: bytes, source: Address) -> None: + """ + Callback for when data is received by the IPv6 socket. + """ + if source[0][:7] == '::ffff:': + # We're not processing mapped IPv4, we have a separate endpoint for that. + return + self.datagram_received(data, UDPv6Address(*source[:2])) def datagram_received(self, data: bytes, source: Address) -> None: """ - Callback for when data is received by the socket. + Callback for when data is received by a IPv4/IPv6 socket. """ self.beat_heart() self.bytes_down += len(data) @@ -294,9 +356,12 @@ async def close(self) -> None: # The resolution tasks can't be cancelled, so we need to wait for # them to finish. await self.shutdown_task_manager() - if self.transport: - self.transport.close() - self.transport = None + if self.transport_ipv4: + self.transport_ipv4.close() + self.transport_ipv4 = None + if self.transport_ipv6: + self.transport_ipv6.close() + self.transport_ipv6 = None class Circuit(Tunnel[Optional[Peer]]): diff --git a/ipv8/messaging/interfaces/endpoint.py b/ipv8/messaging/interfaces/endpoint.py index 4fc5d1cdf..0dfef63df 100644 --- a/ipv8/messaging/interfaces/endpoint.py +++ b/ipv8/messaging/interfaces/endpoint.py @@ -256,8 +256,7 @@ def _guess_lan_address(self, addresses: Iterable[str]) -> str: Chooses the most likely Interface instance out of INTERFACES to use as our LAN address. """ for address in addresses: - if (not (self.is_ipv6_listener and not self._is_ipv6_address(address)) - and not (not self.is_ipv6_listener and self._is_ipv6_address(address))): + if self.is_ipv6_listener == self._is_ipv6_address(address): return address return "127.0.0.1" diff --git a/ipv8/messaging/serialization.py b/ipv8/messaging/serialization.py index 1ccf243c2..b2a9a90e7 100644 --- a/ipv8/messaging/serialization.py +++ b/ipv8/messaging/serialization.py @@ -4,6 +4,7 @@ import socket import typing from binascii import hexlify +from contextlib import suppress from struct import Struct, pack, unpack_from from typing import TYPE_CHECKING, cast @@ -259,17 +260,17 @@ def pack(self, address: tuple[str, int]) -> bytes: """ Pack a generic address as bytes. """ - if isinstance(address, UDPv6Address): - address = typing.cast(UDPv6Address, address) + with suppress(OSError): + return pack('>B4sH', ADDRESS_TYPE_IPV4, socket.inet_pton(socket.AF_INET, address[0]), address[1]) + with suppress(OSError): return pack('>B16sH', ADDRESS_TYPE_IPV6, - socket.inet_pton(socket.AF_INET6, address.ip), address.port) - if not self.ip_only and isinstance(address, DomainAddress): - address = typing.cast(DomainAddress, address) - host_bytes = address.host.encode() + socket.inet_pton(socket.AF_INET6, address[0]), address[1]) + + if not self.ip_only: + host_bytes = address[0].encode() return pack(f'>BH{len(host_bytes)}sH', ADDRESS_TYPE_DOMAIN_NAME, - len(host_bytes), host_bytes, address.port) - if isinstance(address, tuple): - return pack('>B4sH', ADDRESS_TYPE_IPV4, socket.inet_pton(socket.AF_INET, address[0]), address[1]) + len(host_bytes), host_bytes, address[1]) + msg = f"Unexpected address type {address}" raise PackError(msg) diff --git a/ipv8/test/REST/test_isolation_endpoint.py b/ipv8/test/REST/test_isolation_endpoint.py index e10c91c2e..70caab8a3 100644 --- a/ipv8/test/REST/test_isolation_endpoint.py +++ b/ipv8/test/REST/test_isolation_endpoint.py @@ -1,12 +1,13 @@ from __future__ import annotations +import unittest from typing import TYPE_CHECKING, cast from ...bootstrapping.dispersy.bootstrapper import DispersyBootstrapper from ...configuration import DISPERSY_BOOTSTRAPPER from ...messaging.anonymization.community import TunnelCommunity from ..mocking.community import MockCommunity -from ..mocking.endpoint import MockEndpoint, MockEndpointListener +from ..mocking.endpoint import AutoMockEndpoint, MockEndpoint, MockEndpointListener from ..REST.rest_base import RESTTestBase if TYPE_CHECKING: @@ -101,6 +102,7 @@ async def test_no_choice(self) -> None: self.assertFalse(response["success"]) + @unittest.skipIf(AutoMockEndpoint.IPV6_ADDRESSES, "IPv6 not supported") async def test_add_bootstrap(self) -> None: """ Check if bootstrap nodes are correctly added. @@ -117,6 +119,7 @@ async def test_add_bootstrap(self) -> None: self.assertIn(TestIsolationEndpoint.FAKE_BOOTSTRAP_ADDRESS, self.ipv8.network.blacklist) self.assertLessEqual(1, len(self.fake_endpoint_listener.received_packets)) + @unittest.skipIf(AutoMockEndpoint.IPV6_ADDRESSES, "IPv6 not supported") async def test_add_exit(self) -> None: """ Check if exit nodes are correctly added. diff --git a/ipv8/test/bootstrapping/dispersy/test_bootstrapper.py b/ipv8/test/bootstrapping/dispersy/test_bootstrapper.py index a9fc9cf18..3bb924714 100644 --- a/ipv8/test/bootstrapping/dispersy/test_bootstrapper.py +++ b/ipv8/test/bootstrapping/dispersy/test_bootstrapper.py @@ -1,9 +1,12 @@ +import unittest + from ....bootstrapping.dispersy.bootstrapper import DispersyBootstrapper from ...base import TestBase from ...mocking.community import MockCommunity from ...mocking.endpoint import AutoMockEndpoint, MockEndpointListener +@unittest.skipIf(AutoMockEndpoint.IPV6_ADDRESSES, "IPv6 not supported") class TestDispersyBootstrapper(TestBase): """ Tests related to Dispersy-style bootstrapping. diff --git a/ipv8/test/mocking/endpoint.py b/ipv8/test/mocking/endpoint.py index e59cc9fe4..063bdbf0a 100644 --- a/ipv8/test/mocking/endpoint.py +++ b/ipv8/test/mocking/endpoint.py @@ -1,5 +1,6 @@ from __future__ import annotations +import os import random from asyncio import get_running_loop from typing import TYPE_CHECKING @@ -142,7 +143,7 @@ class AutoMockEndpoint(MockEndpoint): Randomly generate LAN + WAN addresses that are globally unique and register them in the "internet" dictionary. """ - ADDRESS_TYPE = "UDPv4Address" + IPV6_ADDRESSES = bool(int(os.environ.get("TEST_IPV8_WITH_IPV6", 0))) def __init__(self) -> None: """ @@ -153,7 +154,7 @@ def __init__(self) -> None: self._port = 0 def _generate_address(self) -> UDPv4Address | UDPv6Address: - if self.ADDRESS_TYPE == "UDPv4Address": + if not self.IPV6_ADDRESSES: b0 = random.randint(0, 255) b1 = random.randint(0, 255) b2 = random.randint(0, 255) @@ -162,20 +163,17 @@ def _generate_address(self) -> UDPv4Address | UDPv6Address: return UDPv4Address('%d.%d.%d.%d' % (b0, b1, b2, b3), port) - if self.ADDRESS_TYPE == "UDPv6Address": - b0 = random.randint(0, 65535) - b1 = random.randint(0, 65535) - b2 = random.randint(0, 65535) - b3 = random.randint(0, 65535) - b4 = random.randint(0, 65535) - b5 = random.randint(0, 65535) - b6 = random.randint(0, 65535) - b7 = random.randint(0, 65535) - port = random.randint(0, 65535) - - return UDPv6Address(f"{b0:02x}:{b1:02x}:{b2:02x}:{b3:02x}:{b4:02x}:{b5:02x}:{b6:02x}:{b7:02x}", port) - - raise RuntimeError("Illegal address type specified: " + repr(self.ADDRESS_TYPE)) + b0 = random.randint(0, 65535) + b1 = random.randint(0, 65535) + b2 = random.randint(0, 65535) + b3 = random.randint(0, 65535) + b4 = random.randint(0, 65535) + b5 = random.randint(0, 65535) + b6 = random.randint(0, 65535) + b7 = random.randint(0, 65535) + port = random.randint(0, 65535) + + return UDPv6Address(f"{b0:02x}:{b1:02x}:{b2:02x}:{b3:02x}:{b4:02x}:{b5:02x}:{b6:02x}:{b7:02x}", port) def _is_lan(self, address: Address) -> bool: """ diff --git a/ipv8/test/peerdiscovery/test_community.py b/ipv8/test/peerdiscovery/test_community.py index 8e2f8c3a0..755423fce 100644 --- a/ipv8/test/peerdiscovery/test_community.py +++ b/ipv8/test/peerdiscovery/test_community.py @@ -1,4 +1,5 @@ import os +import unittest from functools import reduce from typing import cast @@ -10,6 +11,7 @@ from ..mocking.endpoint import AutoMockEndpoint +@unittest.skipIf(AutoMockEndpoint.IPV6_ADDRESSES, "IPv6 not supported") class TestDiscoveryCommunity(TestBase): """ Tests related to the DiscoveryCommunity. diff --git a/ipv8/test/test_community.py b/ipv8/test/test_community.py index 6dc0bb1e8..2a1e85c57 100644 --- a/ipv8/test/test_community.py +++ b/ipv8/test/test_community.py @@ -1,5 +1,6 @@ from __future__ import annotations +import unittest from typing import TYPE_CHECKING from ..bootstrapping.dispersy.bootstrapper import DispersyBootstrapper @@ -102,6 +103,7 @@ async def walk_from_to(self, from_i: int, to_i: int) -> None: self.overlay(from_i).walk_to(self.address(to_i)) await self.deliver_messages() + @unittest.skipIf(AutoMockEndpoint.IPV6_ADDRESSES, "IPv6 not supported") async def test_introduce_old(self) -> None: """ Check that no new-style messages are going to the old-style peer.