Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support listening both IPv4 and IPv6 + NodeRecord extension #178

Merged
merged 14 commits into from
Jun 25, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@
import static org.ethereum.beacon.discovery.util.Utils.RECOVERABLE_ERRORS_PREDICATE;

import com.google.common.annotations.VisibleForTesting;
import io.netty.channel.socket.nio.NioDatagramChannel;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Stream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -63,7 +65,7 @@ public class DiscoveryManagerImpl implements DiscoveryManager {
private static final Logger LOG = LogManager.getLogger();

private final ReplayProcessor<NetworkParcel> outgoingMessages = ReplayProcessor.cacheLast();
private final NettyDiscoveryServer discoveryServer;
private final List<NettyDiscoveryServer> discoveryServers;
private final Pipeline incomingPipeline = new PipelineImpl();
private final Pipeline outgoingPipeline = new PipelineImpl();
private final LocalNodeRecordStore localNodeRecordStore;
Expand All @@ -72,7 +74,7 @@ public class DiscoveryManagerImpl implements DiscoveryManager {
private final NodeSessionManager nodeSessionManager;

public DiscoveryManagerImpl(
final NettyDiscoveryServer discoveryServer,
final List<NettyDiscoveryServer> discoveryServers,
final KBuckets nodeBucketStorage,
final LocalNodeRecordStore localNodeRecordStore,
final SecretKey homeNodeSecretKey,
Expand All @@ -86,7 +88,7 @@ public DiscoveryManagerImpl(
this.addressAccessPolicy = addressAccessPolicy;
final NodeRecord homeNodeRecord = localNodeRecordStore.getLocalNodeRecord();

this.discoveryServer = discoveryServer;
this.discoveryServers = discoveryServers;
nodeSessionManager =
new NodeSessionManager(
localNodeRecordStore,
Expand Down Expand Up @@ -140,16 +142,22 @@ private void requestUpdatedEnr(final NodeRecord record) {
public CompletableFuture<Void> start() {
incomingPipeline.build();
outgoingPipeline.build();
Flux.from(discoveryServer.getIncomingPackets())
final Publisher<?>[] incomingPacketsSources =
discoveryServers.stream()
.map(NettyDiscoveryServer::getIncomingPackets)
.toArray(Publisher<?>[]::new);
Flux.concat(incomingPacketsSources)
.doOnNext(incomingPipeline::push)
.onErrorContinue(
RECOVERABLE_ERRORS_PREDICATE,
(err, msg) -> LOG.debug("Error while processing message: " + err))
(err, msg) -> LOG.debug("Error while processing message: " + msg, err))
.subscribe();
return discoveryServer
.start()
.thenAccept(
channel -> discoveryClient = new NettyDiscoveryClientImpl(outgoingMessages, channel));
final List<NioDatagramChannel> channels = new CopyOnWriteArrayList<>();
return CompletableFuture.allOf(
discoveryServers.stream()
.map(discoveryServer -> discoveryServer.start().thenAccept(channels::add))
.toArray(CompletableFuture<?>[]::new))
.thenRun(() -> discoveryClient = new NettyDiscoveryClientImpl(outgoingMessages, channels));
}

@Override
Expand All @@ -158,7 +166,7 @@ public void stop() {
if (client != null) {
client.stop();
}
discoveryServer.stop();
discoveryServers.forEach(NettyDiscoveryServer::stop);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import java.net.InetSocketAddress;
import java.time.Clock;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -44,7 +46,7 @@ public class DiscoverySystemBuilder {

private static final AtomicInteger COUNTER = new AtomicInteger();
private List<NodeRecord> bootnodes = Collections.emptyList();
private Optional<InetSocketAddress> listenAddress = Optional.empty();
private Optional<List<InetSocketAddress>> listenAddresses = Optional.empty();
private NodeRecord localNodeRecord;
private SecretKey secretKey;
private NodeRecordFactory nodeRecordFactory = NodeRecordFactory.DEFAULT;
Expand All @@ -56,8 +58,8 @@ public class DiscoverySystemBuilder {
private Duration lifeCheckInterval = DiscoveryTaskManager.DEFAULT_LIVE_CHECK_INTERVAL;
private int trafficReadLimit = 250000; // bytes per sec
private TalkHandler talkHandler = TalkHandler.NOOP;
private NettyDiscoveryServer discoveryServer = null;
private ExternalAddressSelector externalAddressSelector = null;
private List<NettyDiscoveryServer> discoveryServers;
private ExternalAddressSelector externalAddressSelector;
private AddressAccessPolicy addressAccessPolicy = AddressAccessPolicy.ALLOW_ALL;
private final Clock clock = Clock.systemUTC();
private final LivenessChecker livenessChecker = new LivenessChecker(clock);
Expand All @@ -73,7 +75,12 @@ public DiscoverySystemBuilder localNodeRecord(final NodeRecord localNodeRecord)
}

public DiscoverySystemBuilder listen(final String listenAddress, final int listenPort) {
this.listenAddress = Optional.of(new InetSocketAddress(listenAddress, listenPort));
this.listenAddresses = Optional.of(List.of(new InetSocketAddress(listenAddress, listenPort)));
return this;
}

public DiscoverySystemBuilder listen(final InetSocketAddress... listenAddresses) {
this.listenAddresses = Optional.of(Arrays.asList(listenAddresses));
return this;
}

Expand All @@ -82,7 +89,7 @@ public DiscoverySystemBuilder secretKey(final SecretKey secretKey) {
return this;
}

public DiscoverySystemBuilder nodeRecordFactory(NodeRecordFactory nodeRecordFactory) {
public DiscoverySystemBuilder nodeRecordFactory(final NodeRecordFactory nodeRecordFactory) {
this.nodeRecordFactory = nodeRecordFactory;
return this;
}
Expand Down Expand Up @@ -142,7 +149,12 @@ public DiscoverySystemBuilder talkHandler(final TalkHandler talkHandler) {
}

public DiscoverySystemBuilder discoveryServer(final NettyDiscoveryServer discoveryServer) {
this.discoveryServer = discoveryServer;
this.discoveryServers = List.of(discoveryServer);
return this;
}

public DiscoverySystemBuilder discoveryServers(final NettyDiscoveryServer... discoveryServers) {
this.discoveryServers = Arrays.asList(discoveryServers);
return this;
}

Expand All @@ -169,17 +181,27 @@ private void createDefaults() {
oldRecord.getTcpAddress().map(InetSocketAddress::getPort),
secretKey)));
schedulers = requireNonNullElseGet(schedulers, Schedulers::createDefault);
final InetSocketAddress serverListenAddress =
listenAddress
.or(localNodeRecord::getUdpAddress)
.orElseThrow(
() ->
new IllegalArgumentException(
"Local node record must contain an IP and UDP port"));
discoveryServer =
final List<InetSocketAddress> serverListenAddresses =
listenAddresses.orElseGet(
() -> {
final List<InetSocketAddress> localNodeRecordAddresses = new ArrayList<>();
localNodeRecord.getUdpAddress().ifPresent(localNodeRecordAddresses::add);
localNodeRecord.getUdp6Address().ifPresent(localNodeRecordAddresses::add);
if (localNodeRecordAddresses.isEmpty()) {
throw new IllegalArgumentException(
"Local node record must contain an IP and UDP port(s)");
}
return localNodeRecordAddresses;
});
discoveryServers =
requireNonNullElseGet(
discoveryServer,
() -> new NettyDiscoveryServerImpl(serverListenAddress, trafficReadLimit));
discoveryServers,
() ->
serverListenAddresses.stream()
.map(
serverListenAddress ->
new NettyDiscoveryServerImpl(serverListenAddress, trafficReadLimit))
.collect(Collectors.toList()));

localNodeRecordStore =
requireNonNullElseGet(
Expand Down Expand Up @@ -245,7 +267,7 @@ public DiscoverySystem build() {
DiscoveryManagerImpl buildDiscoveryManager() {
createDefaults();
return new DiscoveryManagerImpl(
discoveryServer,
discoveryServers,
nodeBucketStorage,
localNodeRecordStore,
secretKey,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@
import io.netty.buffer.Unpooled;
import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.socket.nio.NioDatagramChannel;
import java.net.Inet6Address;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Optional;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.tuweni.bytes.Bytes;
Expand All @@ -17,17 +20,20 @@
/** Netty discovery UDP client */
public class NettyDiscoveryClientImpl implements DiscoveryClient {
private static final Logger LOG = LogManager.getLogger(NettyDiscoveryClientImpl.class);
private NioDatagramChannel channel;

private final Optional<NioDatagramChannel> ip4Channel;
private final Optional<NioDatagramChannel> ip6Channel;

/**
* Constructs UDP client using
*
* @param outgoingStream Stream of outgoing packets, client will forward them to the channel
* @param channel Nio channel
* @param channels must be either 1 (IPv4/IPv6) or 2 (IPv4 and IPv6)
*/
public NettyDiscoveryClientImpl(
Publisher<NetworkParcel> outgoingStream, NioDatagramChannel channel) {
this.channel = channel;
final Publisher<NetworkParcel> outgoingStream, final List<NioDatagramChannel> channels) {
this.ip4Channel = channels.stream().filter(channel -> !isChannelIPv6(channel)).findFirst();
this.ip6Channel = channels.stream().filter(this::isChannelIPv6).findFirst();
Flux.from(outgoingStream)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we throw here if both are empty?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed the design a bit. You can have a look.

.subscribe(
networkPacket ->
Expand All @@ -39,10 +45,21 @@ public NettyDiscoveryClientImpl(
public void stop() {}

@Override
public void send(Bytes data, InetSocketAddress destination) {
DatagramPacket packet = new DatagramPacket(Unpooled.copiedBuffer(data.toArray()), destination);
LOG.trace(() -> String.format("Sending packet %s", packet));
public void send(final Bytes data, final InetSocketAddress destination) {
final DatagramPacket packet =
new DatagramPacket(Unpooled.copiedBuffer(data.toArray()), destination);
final NioDatagramChannel channel;
if (destination.getAddress() instanceof Inet6Address) {
channel = ip6Channel.orElseThrow();
} else {
channel = ip4Channel.orElseThrow();
}
LOG.trace(() -> String.format("Sending packet %s from %s", packet, channel.localAddress()));
channel.write(packet);
channel.flush();
}

private boolean isChannelIPv6(final NioDatagramChannel channel) {
return channel.localAddress().getAddress() instanceof Inet6Address;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ public CompletableFuture<NioDatagramChannel> start() {
LOG.info("Starting discovery server on UDP port {}", listenAddress.getPort());
if (!listen.compareAndSet(false, true)) {
return CompletableFuture.failedFuture(
new IllegalStateException("Attempted to start an already started server"));
new IllegalStateException(
"Attempted to start an already started server on port " + listenAddress.getPort()));
StefanBratanov marked this conversation as resolved.
Show resolved Hide resolved
}
nioGroup = new NioEventLoopGroup(1);
return startServer(nioGroup);
Expand Down Expand Up @@ -86,14 +87,15 @@ public void initChannel(NioDatagramChannel ch) {
.addListener(
closeFuture -> {
if (!listen.get()) {
LOG.info("Shutting down discovery server");
LOG.info(
"Shutting down discovery server on port {}", listenAddress.getPort());
group.shutdownGracefully();
return;
}
LOG.error(
"Discovery server closed. Trying to restore after "
+ RECREATION_TIMEOUT
+ " milliseconds delay",
String.format(
"Discovery server on port %d has been closed. Trying to restore after %d milliseconds delay",
listenAddress.getPort(), RECREATION_TIMEOUT),
closeFuture.cause());
Thread.sleep(RECREATION_TIMEOUT);
startServer(group);
Expand All @@ -111,12 +113,12 @@ public Publisher<Envelope> getIncomingPackets() {
@Override
public void stop() {
if (listen.compareAndSet(true, false)) {
LOG.info("Stopping discovery server");
LOG.info("Stopping discovery server on UDP port {}", listenAddress.getPort());
if (channel != null) {
try {
channel.close().sync();
} catch (InterruptedException ex) {
LOG.error("Failed to stop discovery server", ex);
LOG.error("Failed to stop discovery server on port " + listenAddress.getPort(), ex);
}
if (nioGroup != null) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,12 @@ default boolean isValid(NodeRecord nodeRecord) {

Optional<InetSocketAddress> getUdpAddress(NodeRecord nodeRecord);

Optional<InetSocketAddress> getUdp6Address(NodeRecord nodeRecord);

Optional<InetSocketAddress> getTcpAddress(NodeRecord nodeRecord);

Optional<InetSocketAddress> getTcp6Address(NodeRecord nodeRecord);

NodeRecord createWithNewAddress(
NodeRecord nodeRecord,
InetSocketAddress newAddress,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,23 @@ public void sign(final NodeRecord nodeRecord, final SecretKey secretKey) {

@Override
public Optional<InetSocketAddress> getUdpAddress(final NodeRecord nodeRecord) {
return addressFromFields(nodeRecord, EnrField.IP_V4, EnrField.UDP)
.or(() -> addressFromFields(nodeRecord, EnrField.IP_V6, EnrField.UDP_V6))
return addressFromFields(nodeRecord, EnrField.IP_V4, EnrField.UDP);
}

@Override
public Optional<InetSocketAddress> getUdp6Address(NodeRecord nodeRecord) {
return addressFromFields(nodeRecord, EnrField.IP_V6, EnrField.UDP_V6)
.or(() -> addressFromFields(nodeRecord, EnrField.IP_V6, EnrField.UDP));
}

@Override
public Optional<InetSocketAddress> getTcpAddress(final NodeRecord nodeRecord) {
return addressFromFields(nodeRecord, EnrField.IP_V4, EnrField.TCP)
.or(() -> addressFromFields(nodeRecord, EnrField.IP_V6, EnrField.TCP_V6))
return addressFromFields(nodeRecord, EnrField.IP_V4, EnrField.TCP);
}

@Override
public Optional<InetSocketAddress> getTcp6Address(NodeRecord nodeRecord) {
return addressFromFields(nodeRecord, EnrField.IP_V6, EnrField.TCP_V6)
.or(() -> addressFromFields(nodeRecord, EnrField.IP_V6, EnrField.TCP));
}

Expand Down
12 changes: 12 additions & 0 deletions src/main/java/org/ethereum/beacon/discovery/schema/NodeRecord.java
Original file line number Diff line number Diff line change
Expand Up @@ -213,10 +213,18 @@ public Optional<InetSocketAddress> getTcpAddress() {
return identitySchemaInterpreter.getTcpAddress(this);
}

public Optional<InetSocketAddress> getTcp6Address() {
return identitySchemaInterpreter.getTcp6Address(this);
}

public Optional<InetSocketAddress> getUdpAddress() {
return identitySchemaInterpreter.getUdpAddress(this);
}

public Optional<InetSocketAddress> getUdp6Address() {
return identitySchemaInterpreter.getUdp6Address(this);
}

public NodeRecord withNewAddress(
final InetSocketAddress newUdpAddress,
final Optional<Integer> newTcpPort,
Expand All @@ -242,6 +250,10 @@ public String toString() {
+ getUdpAddress()
+ ", tcpAddress="
+ getTcpAddress()
+ ", udp6Address="
+ getUdp6Address()
+ ", tcp6Address="
+ getTcp6Address()
+ ", asBase64="
+ this.asBase64()
+ ", nodeId="
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.time.Clock;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -68,8 +69,9 @@ public void test() throws Exception {
new ExpirationSchedulerFactory(Executors.newSingleThreadScheduledExecutor());
DiscoveryManagerImpl discoveryManager1 =
new DiscoveryManagerImpl(
new NettyDiscoveryServerImpl(
nodeRecord1.getUdpAddress().get(), TEST_TRAFFIC_READ_LIMIT),
List.of(
new NettyDiscoveryServerImpl(
nodeRecord1.getUdpAddress().get(), TEST_TRAFFIC_READ_LIMIT)),
nodeBucketStorage1,
new LocalNodeRecordStore(
nodeRecord1,
Expand All @@ -86,8 +88,9 @@ public void test() throws Exception {
livenessChecker1.setPinger(discoveryManager1::ping);
DiscoveryManagerImpl discoveryManager2 =
new DiscoveryManagerImpl(
new NettyDiscoveryServerImpl(
nodeRecord2.getUdpAddress().get(), TEST_TRAFFIC_READ_LIMIT),
List.of(
new NettyDiscoveryServerImpl(
nodeRecord2.getUdpAddress().get(), TEST_TRAFFIC_READ_LIMIT)),
nodeBucketStorage2,
new LocalNodeRecordStore(
nodeRecord2,
Expand Down
Loading
Loading