Skip to content

Commit

Permalink
Merge pull request #1234 from egbertbouman/tunnel_ipv6
Browse files Browse the repository at this point in the history
Added IPv6 support to TunnelCommunity
  • Loading branch information
egbertbouman authored Oct 18, 2023
2 parents 6eaa728 + 0b2041f commit 8c5e1b9
Show file tree
Hide file tree
Showing 9 changed files with 143 additions and 70 deletions.
8 changes: 4 additions & 4 deletions ipv8/messaging/anonymization/payload.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class ExtendPayload(CellablePayload):

msg_id = 4
names = ['circuit_id', 'identifier', 'node_public_key', 'key', 'node_addr']
format_list = ['I', 'H', 'varlenH', 'varlenH', 'ipv4']
format_list = ['I', 'H', 'varlenH', 'varlenH', 'ip_address']

circuit_id: int
identifier: int
Expand Down Expand Up @@ -217,7 +217,7 @@ class RendezvousEstablishedPayload(CellablePayload):

msg_id = 12
names = ['circuit_id', 'identifier', 'rendezvous_point_addr']
format_list = ['I', 'H', 'ipv4']
format_list = ['I', 'H', 'ip_address']

circuit_id: int
identifier: int
Expand Down Expand Up @@ -307,7 +307,7 @@ class IntroductionInfo(VariablePayload):
"""

names = ['address', 'key', 'seeder_pk', 'source']
format_list = ['ipv4', 'varlenH', 'varlenH', 'B']
format_list = ['ip_address', 'varlenH', 'varlenH', 'B']

address: Address
key: bytes
Expand Down Expand Up @@ -338,7 +338,7 @@ class RendezvousInfo(VariablePayload):
"""

names = ['address', 'key', 'cookie']
format_list = ['ipv4', 'varlenH', '20s']
format_list = ['ip_address', 'varlenH', '20s']

address: Address
key: bytes
Expand Down
141 changes: 103 additions & 38 deletions ipv8/messaging/anonymization/tunnel.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,15 @@
import socket
import sys
import time
from asyncio import CancelledError, DatagramProtocol, DatagramTransport, Future, ensure_future, gather, get_running_loop
from asyncio import (
CancelledError,
DatagramProtocol,
DatagramTransport,
Future,
ensure_future,
gather,
get_running_loop,
)
from binascii import hexlify
from collections import deque
from struct import unpack_from
Expand All @@ -15,7 +23,7 @@
from ...keyvault.public.libnaclkey import LibNaCLPK
from ...peer import Peer
from ...taskmanager import TaskManager
from ...util import succeed
from ..interfaces.udp.endpoint import DomainAddress, UDPv4Address, UDPv6Address

if TYPE_CHECKING:
from ...keyvault.private.libnaclkey import LibNaCLSK
Expand Down Expand Up @@ -173,7 +181,38 @@ def beat_heart(self) -> None:
self.last_activity = time.time()


class TunnelExitSocket(Tunnel[Peer], DatagramProtocol, TaskManager):
class TunnelProtocol(DatagramProtocol):
"""
Protocol used by TunnelExitSocket.
"""

def __init__(self, received_cb: Callable, local_addr: Address) -> None:
"""
Create a new TunnelProtocol.
"""
self.received_cb = received_cb
self.local_addr = local_addr
self.logger = logging.getLogger(self.__class__.__name__)

async def open(self) -> DatagramTransport: # noqa: A003
"""
Opens a datagram endpoint and returns the Transport.
"""
transport, _ = await get_running_loop().create_datagram_endpoint(lambda: self,
local_addr=self.local_addr)

listen_addr = transport.get_extra_info('socket').getsockname()[:2]
self.logger.info('Listening on %s:%s', *listen_addr)
return cast(DatagramTransport, transport)

def datagram_received(self, data: bytes, addr: Address) -> None:
"""
Callback for when data is received by the socket.
"""
self.received_cb(data, addr)


class TunnelExitSocket(Tunnel[Peer], TaskManager):
"""
Socket for exit nodes that communicates with the outside world.
"""
Expand All @@ -185,73 +224,96 @@ def __init__(self, circuit_id: int, peer: Peer, overlay: TunnelCommunity) -> Non
Tunnel.__init__(self, circuit_id, peer)
TaskManager.__init__(self)
self.overlay = overlay
self.transport: DatagramTransport | None = None
self.transport_ipv4: DatagramTransport | None = None
self.transport_ipv6: DatagramTransport | None = None
self.queue: deque[tuple[bytes, Address]] = deque(maxlen=10)
self.enabled = False

def enable(self) -> None:
"""
Allow data to be sent.
This creates the datagram endpoint that allows us to send messages.
This creates the datagram endpoints that allows us to send messages.
"""
if not self.enabled:
self.enabled = True

async def create_transport() -> None:
self.transport, _ = await get_running_loop().create_datagram_endpoint(lambda: self,
local_addr=('0.0.0.0', 0))
# Send any packets that have been waiting while the transport was being created
async def create_transports() -> None:
self.transport_ipv4 = await TunnelProtocol(self.datagram_received_ipv4, ('0.0.0.0', 0)).open()
self.transport_ipv6 = await TunnelProtocol(self.datagram_received_ipv6, ('::', 0)).open()

# Send any packets that have been waiting while the transports were being created
while self.queue:
self.sendto(*self.queue.popleft())
self.register_task("create_transport", create_transport)

self.register_task("create_transports", create_transports)

def sendto(self, data: bytes, destination: Address) -> None:
"""
Send o message over our datagram transporter.
"""
if not self.transport:
self.queue.append((data, destination))
if not self.is_allowed(data):
return
transport = cast(DatagramTransport, self.transport)

self.beat_heart()
if self.is_allowed(data):
def on_ip_address(future: Future[str]) -> None:
# Since this call comes from the TunnelCommunity, we assume the destination
# address is either UDPv4Address/UDPv6Address/DomainAddress
if isinstance(destination, DomainAddress):
def on_address(future: Future[Address]) -> None:
try:
ip_address = future.result()
except (CancelledError, Exception) as e:
self.logger.exception("Can't resolve ip address for %s. Failure: %s", destination[0], e)
return
self.sendto(data, ip_address)

self.logger.debug("Resolved hostname %s to ip_address %s", destination[0], ip_address)
try:
transport.sendto(data, (ip_address, destination[1]))
self.bytes_up += len(data)
except OSError as e:
self.logger.exception("Failed to write to transport. Destination: %r error: %r", destination, e)
task = ensure_future(self.resolve(destination))
# If this fails, the TaskManager logs the packet.
self.register_anonymous_task("resolving_%r" % destination[0], task,
ignore=(OSError, ValueError)).add_done_callback(on_address)
return

try:
socket.inet_aton(destination[0])
on_ip_address(succeed(destination[0]))
except (OSError, ValueError):
task = ensure_future(self.resolve(destination[0]))
# If this also fails, the TaskManager logs the packet.
# The host probably really does not exist.
self.register_anonymous_task("resolving_%r" % destination[0], task,
ignore=(OSError, ValueError)).add_done_callback(on_ip_address)
transport = self.transport_ipv6 if isinstance(destination, UDPv6Address) else self.transport_ipv4

if not transport:
self.queue.append((data, destination))
return

transport.sendto(data, destination)
self.bytes_up += len(data)
self.beat_heart()

async def resolve(self, host: str) -> str:
async def resolve(self, address: Address) -> Address:
"""
Using asyncio's getaddrinfo since the aiodns resolver seems to have issues.
Returns [(family, type, proto, canonname, sockaddr)].
"""
infos = await get_running_loop().getaddrinfo(host, 0, family=socket.AF_INET)
return infos[0][-1][0]
info_list = await get_running_loop().getaddrinfo(address[0], 0)
# For the time being we prefer dealing with IPv4 addresses.
info_list.sort(key=lambda x: x[0])
ip = info_list[0][-1][0]
family = info_list[0][0]
if family == socket.AF_INET6:
return UDPv6Address(ip, address[1])
return UDPv4Address(ip, address[1])

def datagram_received_ipv4(self, data: bytes, source: Address) -> None:
"""
Callback for when data is received by the IPv4 socket.
"""
self.datagram_received(data, UDPv4Address(*source))

def datagram_received_ipv6(self, data: bytes, source: Address) -> None:
"""
Callback for when data is received by the IPv6 socket.
"""
if source[0][:7] == '::ffff:':
# We're not processing mapped IPv4, we have a separate endpoint for that.
return
self.datagram_received(data, UDPv6Address(*source[:2]))

def datagram_received(self, data: bytes, source: Address) -> None:
"""
Callback for when data is received by the socket.
Callback for when data is received by a IPv4/IPv6 socket.
"""
self.beat_heart()
self.bytes_down += len(data)
Expand Down Expand Up @@ -294,9 +356,12 @@ async def close(self) -> None:
# The resolution tasks can't be cancelled, so we need to wait for
# them to finish.
await self.shutdown_task_manager()
if self.transport:
self.transport.close()
self.transport = None
if self.transport_ipv4:
self.transport_ipv4.close()
self.transport_ipv4 = None
if self.transport_ipv6:
self.transport_ipv6.close()
self.transport_ipv6 = None


class Circuit(Tunnel[Optional[Peer]]):
Expand Down
3 changes: 1 addition & 2 deletions ipv8/messaging/interfaces/endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,7 @@ def _guess_lan_address(self, addresses: Iterable[str]) -> str:
Chooses the most likely Interface instance out of INTERFACES to use as our LAN address.
"""
for address in addresses:
if (not (self.is_ipv6_listener and not self._is_ipv6_address(address))
and not (not self.is_ipv6_listener and self._is_ipv6_address(address))):
if self.is_ipv6_listener == self._is_ipv6_address(address):
return address

return "127.0.0.1"
Expand Down
19 changes: 10 additions & 9 deletions ipv8/messaging/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import socket
import typing
from binascii import hexlify
from contextlib import suppress
from struct import Struct, pack, unpack_from
from typing import TYPE_CHECKING, cast

Expand Down Expand Up @@ -259,17 +260,17 @@ def pack(self, address: tuple[str, int]) -> bytes:
"""
Pack a generic address as bytes.
"""
if isinstance(address, UDPv6Address):
address = typing.cast(UDPv6Address, address)
with suppress(OSError):
return pack('>B4sH', ADDRESS_TYPE_IPV4, socket.inet_pton(socket.AF_INET, address[0]), address[1])
with suppress(OSError):
return pack('>B16sH', ADDRESS_TYPE_IPV6,
socket.inet_pton(socket.AF_INET6, address.ip), address.port)
if not self.ip_only and isinstance(address, DomainAddress):
address = typing.cast(DomainAddress, address)
host_bytes = address.host.encode()
socket.inet_pton(socket.AF_INET6, address[0]), address[1])

if not self.ip_only:
host_bytes = address[0].encode()
return pack(f'>BH{len(host_bytes)}sH', ADDRESS_TYPE_DOMAIN_NAME,
len(host_bytes), host_bytes, address.port)
if isinstance(address, tuple):
return pack('>B4sH', ADDRESS_TYPE_IPV4, socket.inet_pton(socket.AF_INET, address[0]), address[1])
len(host_bytes), host_bytes, address[1])

msg = f"Unexpected address type {address}"
raise PackError(msg)

Expand Down
5 changes: 4 additions & 1 deletion ipv8/test/REST/test_isolation_endpoint.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
from __future__ import annotations

import unittest
from typing import TYPE_CHECKING, cast

from ...bootstrapping.dispersy.bootstrapper import DispersyBootstrapper
from ...configuration import DISPERSY_BOOTSTRAPPER
from ...messaging.anonymization.community import TunnelCommunity
from ..mocking.community import MockCommunity
from ..mocking.endpoint import MockEndpoint, MockEndpointListener
from ..mocking.endpoint import AutoMockEndpoint, MockEndpoint, MockEndpointListener
from ..REST.rest_base import RESTTestBase

if TYPE_CHECKING:
Expand Down Expand Up @@ -101,6 +102,7 @@ async def test_no_choice(self) -> None:

self.assertFalse(response["success"])

@unittest.skipIf(AutoMockEndpoint.IPV6_ADDRESSES, "IPv6 not supported")
async def test_add_bootstrap(self) -> None:
"""
Check if bootstrap nodes are correctly added.
Expand All @@ -117,6 +119,7 @@ async def test_add_bootstrap(self) -> None:
self.assertIn(TestIsolationEndpoint.FAKE_BOOTSTRAP_ADDRESS, self.ipv8.network.blacklist)
self.assertLessEqual(1, len(self.fake_endpoint_listener.received_packets))

@unittest.skipIf(AutoMockEndpoint.IPV6_ADDRESSES, "IPv6 not supported")
async def test_add_exit(self) -> None:
"""
Check if exit nodes are correctly added.
Expand Down
3 changes: 3 additions & 0 deletions ipv8/test/bootstrapping/dispersy/test_bootstrapper.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import unittest

from ....bootstrapping.dispersy.bootstrapper import DispersyBootstrapper
from ...base import TestBase
from ...mocking.community import MockCommunity
from ...mocking.endpoint import AutoMockEndpoint, MockEndpointListener


@unittest.skipIf(AutoMockEndpoint.IPV6_ADDRESSES, "IPv6 not supported")
class TestDispersyBootstrapper(TestBase):
"""
Tests related to Dispersy-style bootstrapping.
Expand Down
30 changes: 14 additions & 16 deletions ipv8/test/mocking/endpoint.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import os
import random
from asyncio import get_running_loop
from typing import TYPE_CHECKING
Expand Down Expand Up @@ -142,7 +143,7 @@ class AutoMockEndpoint(MockEndpoint):
Randomly generate LAN + WAN addresses that are globally unique and register them in the "internet" dictionary.
"""

ADDRESS_TYPE = "UDPv4Address"
IPV6_ADDRESSES = bool(int(os.environ.get("TEST_IPV8_WITH_IPV6", 0)))

def __init__(self) -> None:
"""
Expand All @@ -153,7 +154,7 @@ def __init__(self) -> None:
self._port = 0

def _generate_address(self) -> UDPv4Address | UDPv6Address:
if self.ADDRESS_TYPE == "UDPv4Address":
if not self.IPV6_ADDRESSES:
b0 = random.randint(0, 255)
b1 = random.randint(0, 255)
b2 = random.randint(0, 255)
Expand All @@ -162,20 +163,17 @@ def _generate_address(self) -> UDPv4Address | UDPv6Address:

return UDPv4Address('%d.%d.%d.%d' % (b0, b1, b2, b3), port)

if self.ADDRESS_TYPE == "UDPv6Address":
b0 = random.randint(0, 65535)
b1 = random.randint(0, 65535)
b2 = random.randint(0, 65535)
b3 = random.randint(0, 65535)
b4 = random.randint(0, 65535)
b5 = random.randint(0, 65535)
b6 = random.randint(0, 65535)
b7 = random.randint(0, 65535)
port = random.randint(0, 65535)

return UDPv6Address(f"{b0:02x}:{b1:02x}:{b2:02x}:{b3:02x}:{b4:02x}:{b5:02x}:{b6:02x}:{b7:02x}", port)

raise RuntimeError("Illegal address type specified: " + repr(self.ADDRESS_TYPE))
b0 = random.randint(0, 65535)
b1 = random.randint(0, 65535)
b2 = random.randint(0, 65535)
b3 = random.randint(0, 65535)
b4 = random.randint(0, 65535)
b5 = random.randint(0, 65535)
b6 = random.randint(0, 65535)
b7 = random.randint(0, 65535)
port = random.randint(0, 65535)

return UDPv6Address(f"{b0:02x}:{b1:02x}:{b2:02x}:{b3:02x}:{b4:02x}:{b5:02x}:{b6:02x}:{b7:02x}", port)

def _is_lan(self, address: Address) -> bool:
"""
Expand Down
Loading

0 comments on commit 8c5e1b9

Please sign in to comment.