Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support listening both IPv4 and IPv6 + NodeRecord extension #178

Merged
merged 14 commits into from
Jun 25, 2024
16 changes: 14 additions & 2 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
@@ -1,15 +1,25 @@
version: 2.1
executors:
medium_executor:
docker:
- image: cimg/openjdk:11.0.13
machine:
image: ubuntu-2204:current
resource_class: medium
working_directory: ~/project
environment:
JAVA_TOOL_OPTIONS: -Xmx2g
GRADLE_OPTS: -Dorg.gradle.daemon=false -Dorg.gradle.parallel=true -Dorg.gradle.workers.max=2

commands:
install_java:
description: "Install Java 11"
steps:
- run:
name: Install Java 11
command: |
sudo apt-get update
sudo apt-get install -y openjdk-11-jdk
sudo update-alternatives --set java /usr/lib/jvm/java-11-openjdk-amd64/bin/java
sudo update-alternatives --set javac /usr/lib/jvm/java-11-openjdk-amd64/bin/javac
prepare:
description: "Prepare"
steps:
Expand All @@ -30,6 +40,7 @@ jobs:
build:
executor: medium_executor
steps:
- install_java
- prepare
- run:
name: Assemble
Expand All @@ -52,6 +63,7 @@ jobs:
publish:
executor: medium_executor
steps:
- install_java
- prepare
- attach_workspace:
at: ~/project
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,14 @@
import static org.ethereum.beacon.discovery.util.Utils.RECOVERABLE_ERRORS_PREDICATE;

import com.google.common.annotations.VisibleForTesting;
import io.netty.channel.socket.InternetProtocolFamily;
import io.netty.channel.socket.nio.NioDatagramChannel;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Stream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -63,7 +67,7 @@ public class DiscoveryManagerImpl implements DiscoveryManager {
private static final Logger LOG = LogManager.getLogger();

private final ReplayProcessor<NetworkParcel> outgoingMessages = ReplayProcessor.cacheLast();
private final NettyDiscoveryServer discoveryServer;
private final List<NettyDiscoveryServer> discoveryServers;
private final Pipeline incomingPipeline = new PipelineImpl();
private final Pipeline outgoingPipeline = new PipelineImpl();
private final LocalNodeRecordStore localNodeRecordStore;
Expand All @@ -72,7 +76,7 @@ public class DiscoveryManagerImpl implements DiscoveryManager {
private final NodeSessionManager nodeSessionManager;

public DiscoveryManagerImpl(
final NettyDiscoveryServer discoveryServer,
final List<NettyDiscoveryServer> discoveryServers,
final KBuckets nodeBucketStorage,
final LocalNodeRecordStore localNodeRecordStore,
final SecretKey homeNodeSecretKey,
Expand All @@ -86,7 +90,7 @@ public DiscoveryManagerImpl(
this.addressAccessPolicy = addressAccessPolicy;
final NodeRecord homeNodeRecord = localNodeRecordStore.getLocalNodeRecord();

this.discoveryServer = discoveryServer;
this.discoveryServers = discoveryServers;
nodeSessionManager =
new NodeSessionManager(
localNodeRecordStore,
Expand Down Expand Up @@ -140,16 +144,30 @@ private void requestUpdatedEnr(final NodeRecord record) {
public CompletableFuture<Void> start() {
incomingPipeline.build();
outgoingPipeline.build();
Flux.from(discoveryServer.getIncomingPackets())
.doOnNext(incomingPipeline::push)
.onErrorContinue(
RECOVERABLE_ERRORS_PREDICATE,
(err, msg) -> LOG.debug("Error while processing message: " + err))
.subscribe();
return discoveryServer
.start()
.thenAccept(
channel -> discoveryClient = new NettyDiscoveryClientImpl(outgoingMessages, channel));
discoveryServers.forEach(
discoveryServer ->
Flux.from(discoveryServer.getIncomingPackets())
.doOnNext(incomingPipeline::push)
.onErrorContinue(
RECOVERABLE_ERRORS_PREDICATE,
(err, msg) -> LOG.debug("Error while processing message", err))
.subscribe());
final Map<InternetProtocolFamily, NioDatagramChannel> channels = new ConcurrentHashMap<>();
return CompletableFuture.allOf(
discoveryServers.stream()
.map(
discoveryServer ->
discoveryServer
.start()
.thenAccept(
channel -> {
final InternetProtocolFamily ipFamily =
InternetProtocolFamily.of(
discoveryServer.getListenAddress().getAddress());
channels.put(ipFamily, channel);
}))
.toArray(CompletableFuture<?>[]::new))
.thenRun(() -> discoveryClient = new NettyDiscoveryClientImpl(outgoingMessages, channels));
}

@Override
Expand All @@ -158,7 +176,7 @@ public void stop() {
if (client != null) {
client.stop();
}
discoveryServer.stop();
discoveryServers.forEach(NettyDiscoveryServer::stop);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,18 @@
import static java.util.concurrent.TimeUnit.MILLISECONDS;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.channel.socket.InternetProtocolFamily;
import java.net.InetSocketAddress;
import java.time.Clock;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -44,7 +49,7 @@ public class DiscoverySystemBuilder {

private static final AtomicInteger COUNTER = new AtomicInteger();
private List<NodeRecord> bootnodes = Collections.emptyList();
private Optional<InetSocketAddress> listenAddress = Optional.empty();
private Optional<List<InetSocketAddress>> listenAddresses = Optional.empty();
private NodeRecord localNodeRecord;
private SecretKey secretKey;
private NodeRecordFactory nodeRecordFactory = NodeRecordFactory.DEFAULT;
Expand All @@ -56,8 +61,8 @@ public class DiscoverySystemBuilder {
private Duration lifeCheckInterval = DiscoveryTaskManager.DEFAULT_LIVE_CHECK_INTERVAL;
private int trafficReadLimit = 250000; // bytes per sec
private TalkHandler talkHandler = TalkHandler.NOOP;
private NettyDiscoveryServer discoveryServer = null;
private ExternalAddressSelector externalAddressSelector = null;
private List<NettyDiscoveryServer> discoveryServers;
private ExternalAddressSelector externalAddressSelector;
private AddressAccessPolicy addressAccessPolicy = AddressAccessPolicy.ALLOW_ALL;
private final Clock clock = Clock.systemUTC();
private final LivenessChecker livenessChecker = new LivenessChecker(clock);
Expand All @@ -73,7 +78,16 @@ public DiscoverySystemBuilder localNodeRecord(final NodeRecord localNodeRecord)
}

public DiscoverySystemBuilder listen(final String listenAddress, final int listenPort) {
this.listenAddress = Optional.of(new InetSocketAddress(listenAddress, listenPort));
this.listenAddresses = Optional.of(List.of(new InetSocketAddress(listenAddress, listenPort)));
return this;
}

public DiscoverySystemBuilder listen(final InetSocketAddress... listenAddresses) {
Preconditions.checkArgument(
listenAddresses.length == 1 || listenAddresses.length == 2,
"Can define only 1 or 2 listen addresses - IPv4/IPv6 or IPv4 and IPv6");
validateListenAddresses(Arrays.stream(listenAddresses).collect(Collectors.toList()));
this.listenAddresses = Optional.of(Arrays.asList(listenAddresses));
return this;
}

Expand All @@ -82,7 +96,7 @@ public DiscoverySystemBuilder secretKey(final SecretKey secretKey) {
return this;
}

public DiscoverySystemBuilder nodeRecordFactory(NodeRecordFactory nodeRecordFactory) {
public DiscoverySystemBuilder nodeRecordFactory(final NodeRecordFactory nodeRecordFactory) {
this.nodeRecordFactory = nodeRecordFactory;
return this;
}
Expand Down Expand Up @@ -142,7 +156,19 @@ public DiscoverySystemBuilder talkHandler(final TalkHandler talkHandler) {
}

public DiscoverySystemBuilder discoveryServer(final NettyDiscoveryServer discoveryServer) {
this.discoveryServer = discoveryServer;
this.discoveryServers = List.of(discoveryServer);
return this;
}

public DiscoverySystemBuilder discoveryServers(final NettyDiscoveryServer... discoveryServers) {
Preconditions.checkArgument(
discoveryServers.length == 1 || discoveryServers.length == 2,
"Can define only 1 or 2 discovery servers - IPv4/IPv6 or IPv4 and IPv6");
validateListenAddresses(
Arrays.stream(discoveryServers)
.map(NettyDiscoveryServer::getListenAddress)
.collect(Collectors.toList()));
this.discoveryServers = Arrays.asList(discoveryServers);
return this;
}

Expand All @@ -157,6 +183,20 @@ public DiscoverySystemBuilder addressAccessPolicy(final AddressAccessPolicy addr
return this;
}

private void validateListenAddresses(final List<InetSocketAddress> listenAddresses) {
if (listenAddresses.size() == 2) {
final Set<InternetProtocolFamily> ipFamilies =
listenAddresses.stream()
.map(InetSocketAddress::getAddress)
.map(InternetProtocolFamily::of)
.collect(Collectors.toSet());
if (ipFamilies.size() != 2) {
throw new IllegalArgumentException(
String.format("Expected an IPv4 and an IPv6 address but only %s was set", ipFamilies));
}
}
}

private void createDefaults() {
newAddressHandler =
requireNonNullElseGet(
Expand All @@ -169,17 +209,27 @@ private void createDefaults() {
oldRecord.getTcpAddress().map(InetSocketAddress::getPort),
secretKey)));
schedulers = requireNonNullElseGet(schedulers, Schedulers::createDefault);
final InetSocketAddress serverListenAddress =
listenAddress
.or(localNodeRecord::getUdpAddress)
.orElseThrow(
() ->
new IllegalArgumentException(
"Local node record must contain an IP and UDP port"));
discoveryServer =
final List<InetSocketAddress> serverListenAddresses =
listenAddresses.orElseGet(
() -> {
final List<InetSocketAddress> localNodeRecordAddresses = new ArrayList<>();
localNodeRecord.getUdpAddress().ifPresent(localNodeRecordAddresses::add);
localNodeRecord.getUdp6Address().ifPresent(localNodeRecordAddresses::add);
if (localNodeRecordAddresses.isEmpty()) {
throw new IllegalArgumentException(
"Local node record must contain an IP and UDP port(s)");
}
return localNodeRecordAddresses;
});
discoveryServers =
requireNonNullElseGet(
discoveryServer,
() -> new NettyDiscoveryServerImpl(serverListenAddress, trafficReadLimit));
discoveryServers,
() ->
serverListenAddresses.stream()
.map(
serverListenAddress ->
new NettyDiscoveryServerImpl(serverListenAddress, trafficReadLimit))
.collect(Collectors.toList()));

localNodeRecordStore =
requireNonNullElseGet(
Expand Down Expand Up @@ -245,7 +295,7 @@ public DiscoverySystem build() {
DiscoveryManagerImpl buildDiscoveryManager() {
createDefaults();
return new DiscoveryManagerImpl(
discoveryServer,
discoveryServers,
nodeBucketStorage,
localNodeRecordStore,
secretKey,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package org.ethereum.beacon.discovery.network;

import java.net.InetSocketAddress;
import java.util.concurrent.CompletableFuture;
import org.ethereum.beacon.discovery.pipeline.Envelope;
import org.reactivestreams.Publisher;
Expand All @@ -14,6 +15,8 @@ public interface DiscoveryServer {

void stop();

InetSocketAddress getListenAddress();

/** Raw incoming packets stream */
Publisher<Envelope> getIncomingPackets();
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@

import io.netty.buffer.Unpooled;
import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.socket.InternetProtocolFamily;
import io.netty.channel.socket.nio.NioDatagramChannel;
import java.net.InetSocketAddress;
import java.util.Map;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.tuweni.bytes.Bytes;
Expand All @@ -17,17 +19,19 @@
/** Netty discovery UDP client */
public class NettyDiscoveryClientImpl implements DiscoveryClient {
private static final Logger LOG = LogManager.getLogger(NettyDiscoveryClientImpl.class);
private NioDatagramChannel channel;

private final Map<InternetProtocolFamily, NioDatagramChannel> channels;

/**
* Constructs UDP client using
*
* @param outgoingStream Stream of outgoing packets, client will forward them to the channel
* @param channel Nio channel
* @param channels must have either 1 entry (IPv4/IPv6) or 2 entries (IPv4 and IPv6)
*/
public NettyDiscoveryClientImpl(
Publisher<NetworkParcel> outgoingStream, NioDatagramChannel channel) {
this.channel = channel;
final Publisher<NetworkParcel> outgoingStream,
final Map<InternetProtocolFamily, NioDatagramChannel> channels) {
this.channels = channels;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd add throw if channels is empty.

Flux.from(outgoingStream)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we throw here if both are empty?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed the design a bit. You can have a look.

.subscribe(
networkPacket ->
Expand All @@ -39,8 +43,16 @@ public NettyDiscoveryClientImpl(
public void stop() {}

@Override
public void send(Bytes data, InetSocketAddress destination) {
DatagramPacket packet = new DatagramPacket(Unpooled.copiedBuffer(data.toArray()), destination);
public void send(final Bytes data, final InetSocketAddress destination) {
final DatagramPacket packet =
new DatagramPacket(Unpooled.copiedBuffer(data.toArray()), destination);
final NioDatagramChannel channel =
channels.get(InternetProtocolFamily.of(destination.getAddress()));
if (channel == null) {
LOG.trace(
() -> String.format("Dropping packet %s because of IP version incompatibility", packet));
return;
}
LOG.trace(() -> String.format("Sending packet %s", packet));
channel.write(packet);
channel.flush();
Expand Down
Loading
Loading