Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deprecate TTFB, RESP_TIMEOUT, introduce MAX_CONCURRENT_REQUESTS #8839

Open
wants to merge 23 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,7 @@ public class NetworkConstants {
public static final int DEFAULT_SAFE_SLOTS_TO_IMPORT_OPTIMISTICALLY = 128;

public static final int NODE_ID_BITS = 256;

// https://github.com/ethereum/consensus-specs/pull/3767
public static final int MAX_CONCURRENT_REQUESTS = 2;
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ public class ThrottlingTaskQueue {

private int inflightTaskCount = 0;

public static ThrottlingTaskQueue create(final int maximumConcurrentTasks) {
return new ThrottlingTaskQueue(maximumConcurrentTasks);
}

public static ThrottlingTaskQueue create(
final int maximumConcurrentTasks,
final MetricsSystem metricsSystem,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import tech.pegasys.teku.networking.p2p.peer.Peer;
import tech.pegasys.teku.networking.p2p.peer.PeerConnectedSubscriber;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.config.SpecConfig;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.metadata.MetadataMessage;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.metadata.MetadataMessageSchema;
import tech.pegasys.teku.spec.datastructures.state.Checkpoint;
Expand All @@ -51,6 +50,8 @@
public class Eth2PeerManager implements PeerLookup, PeerHandler {
private static final Logger LOG = LogManager.getLogger();

private static final Duration STATUS_RECEIVED_TIMEOUT = Duration.ofSeconds(10);

private final AsyncRunner asyncRunner;
private final RecentChainData recentChainData;
private final Eth2PeerFactory eth2PeerFactory;
Expand All @@ -66,7 +67,6 @@ public class Eth2PeerManager implements PeerLookup, PeerHandler {
private final int eth2RpcOutstandingPingThreshold;

private final Duration eth2StatusUpdateInterval;
private final SpecConfig specConfig;

Eth2PeerManager(
final Spec spec,
Expand Down Expand Up @@ -99,7 +99,6 @@ public class Eth2PeerManager implements PeerLookup, PeerHandler {
this.eth2RpcPingInterval = eth2RpcPingInterval;
this.eth2RpcOutstandingPingThreshold = eth2RpcOutstandingPingThreshold;
this.eth2StatusUpdateInterval = eth2StatusUpdateInterval;
this.specConfig = spec.getGenesisSpecConfig();
}

public static Eth2PeerManager create(
Expand Down Expand Up @@ -237,7 +236,7 @@ private void ensureStatusReceived(final Eth2Peer peer) {
.ifExceptionGetsHereRaiseABug();
}
},
Duration.ofSeconds(specConfig.getRespTimeout()))
STATUS_RECEIVED_TIMEOUT)
.finish(
() -> {},
error -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ public static BeaconChainMethods create(
final MetadataMessagesFactory metadataMessagesFactory,
final RpcEncoding rpcEncoding) {
return new BeaconChainMethods(
createStatus(spec, asyncRunner, statusMessageFactory, peerLookup, rpcEncoding),
createGoodBye(spec, asyncRunner, metricsSystem, peerLookup, rpcEncoding),
createStatus(asyncRunner, statusMessageFactory, peerLookup, rpcEncoding),
createGoodBye(asyncRunner, metricsSystem, peerLookup, rpcEncoding),
createBeaconBlocksByRoot(
spec, metricsSystem, asyncRunner, recentChainData, peerLookup, rpcEncoding),
createBeaconBlocksByRange(
Expand Down Expand Up @@ -144,11 +144,10 @@ public static BeaconChainMethods create(
rpcEncoding,
recentChainData),
createMetadata(spec, asyncRunner, metadataMessagesFactory, peerLookup, rpcEncoding),
createPing(spec, asyncRunner, metadataMessagesFactory, peerLookup, rpcEncoding));
createPing(asyncRunner, metadataMessagesFactory, peerLookup, rpcEncoding));
}

private static Eth2RpcMethod<StatusMessage, StatusMessage> createStatus(
final Spec spec,
final AsyncRunner asyncRunner,
final StatusMessageFactory statusMessageFactory,
final PeerLookup peerLookup,
Expand All @@ -165,12 +164,10 @@ private static Eth2RpcMethod<StatusMessage, StatusMessage> createStatus(
true,
contextCodec,
statusHandler,
peerLookup,
spec.getNetworkingConfig());
peerLookup);
}

private static Eth2RpcMethod<GoodbyeMessage, GoodbyeMessage> createGoodBye(
final Spec spec,
final AsyncRunner asyncRunner,
final MetricsSystem metricsSystem,
final PeerLookup peerLookup,
Expand All @@ -187,8 +184,7 @@ private static Eth2RpcMethod<GoodbyeMessage, GoodbyeMessage> createGoodBye(
false,
contextCodec,
goodbyeHandler,
peerLookup,
spec.getNetworkingConfig());
peerLookup);
}

private static Eth2RpcMethod<BeaconBlocksByRootRequestMessage, SignedBeaconBlock>
Expand Down Expand Up @@ -221,8 +217,7 @@ private static Eth2RpcMethod<GoodbyeMessage, GoodbyeMessage> createGoodBye(
expectResponseToRequest,
forkDigestContextCodec,
beaconBlocksByRootHandler,
peerLookup,
spec.getNetworkingConfig());
peerLookup);

return VersionedEth2RpcMethod.create(
rpcEncoding, requestType, expectResponseToRequest, List.of(v2Method));
Expand Down Expand Up @@ -259,8 +254,7 @@ private static Eth2RpcMethod<GoodbyeMessage, GoodbyeMessage> createGoodBye(
expectResponseToRequest,
forkDigestContextCodec,
beaconBlocksByRangeHandler,
peerLookup,
spec.getNetworkingConfig());
peerLookup);

return VersionedEth2RpcMethod.create(
rpcEncoding, requestType, expectResponseToRequest, List.of(v2Method));
Expand Down Expand Up @@ -299,8 +293,7 @@ private static Eth2RpcMethod<GoodbyeMessage, GoodbyeMessage> createGoodBye(
true,
forkDigestContextCodec,
blobSidecarsByRootHandler,
peerLookup,
spec.getNetworkingConfig()));
peerLookup));
}

private static Optional<Eth2RpcMethod<BlobSidecarsByRangeRequestMessage, BlobSidecar>>
Expand Down Expand Up @@ -336,8 +329,7 @@ private static Eth2RpcMethod<GoodbyeMessage, GoodbyeMessage> createGoodBye(
true,
forkDigestContextCodec,
blobSidecarsByRangeHandler,
peerLookup,
spec.getNetworkingConfig()));
peerLookup));
}

private static Eth2RpcMethod<EmptyMessage, MetadataMessage> createMetadata(
Expand Down Expand Up @@ -369,8 +361,7 @@ private static Eth2RpcMethod<EmptyMessage, MetadataMessage> createMetadata(
expectResponse,
phase0ContextCodec,
messageHandler,
peerLookup,
spec.getNetworkingConfig());
peerLookup);

if (spec.isMilestoneSupported(SpecMilestone.ALTAIR)) {
final SszSchema<MetadataMessage> altairMetadataSchema =
Expand All @@ -392,8 +383,7 @@ private static Eth2RpcMethod<EmptyMessage, MetadataMessage> createMetadata(
expectResponse,
altairContextCodec,
messageHandler,
peerLookup,
spec.getNetworkingConfig());
peerLookup);
return VersionedEth2RpcMethod.create(
rpcEncoding, requestType, expectResponse, List.of(v2Method, v1Method));
} else {
Expand All @@ -402,7 +392,6 @@ private static Eth2RpcMethod<EmptyMessage, MetadataMessage> createMetadata(
}

private static Eth2RpcMethod<PingMessage, PingMessage> createPing(
final Spec spec,
final AsyncRunner asyncRunner,
final MetadataMessagesFactory metadataMessagesFactory,
final PeerLookup peerLookup,
Expand All @@ -419,8 +408,7 @@ private static Eth2RpcMethod<PingMessage, PingMessage> createPing(
true,
contextCodec,
statusHandler,
peerLookup,
spec.getNetworkingConfig());
peerLookup);
}

public Collection<RpcMethod<?, ?, ?>> all() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@
import tech.pegasys.teku.networking.p2p.rpc.RpcRequestHandler;
import tech.pegasys.teku.networking.p2p.rpc.RpcStream;
import tech.pegasys.teku.networking.p2p.rpc.StreamClosedException;
import tech.pegasys.teku.spec.config.NetworkingSpecConfig;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.RpcRequest;

public class Eth2IncomingRequestHandler<
TRequest extends RpcRequest & SszData, TResponse extends SszData>
implements RpcRequestHandler {
private static final Logger LOG = LogManager.getLogger();
private static final Duration RECEIVE_INCOMING_REQUEST_TIMEOUT = Duration.ofSeconds(10);

private final PeerLookup peerLookup;
private final LocalMessageHandler<TRequest, TResponse> localMessageHandler;
Expand All @@ -45,23 +45,20 @@ public class Eth2IncomingRequestHandler<
private final String protocolId;
private final AsyncRunner asyncRunner;
private final AtomicBoolean requestHandled = new AtomicBoolean(false);
private final Duration respTimeout;

public Eth2IncomingRequestHandler(
final String protocolId,
final RpcResponseEncoder<TResponse, ?> responseEncoder,
final RpcRequestDecoder<TRequest> requestDecoder,
final AsyncRunner asyncRunner,
final PeerLookup peerLookup,
final LocalMessageHandler<TRequest, TResponse> localMessageHandler,
final NetworkingSpecConfig networkingConfig) {
final LocalMessageHandler<TRequest, TResponse> localMessageHandler) {
this.protocolId = protocolId;
this.asyncRunner = asyncRunner;
this.peerLookup = peerLookup;
this.localMessageHandler = localMessageHandler;
this.responseEncoder = responseEncoder;
this.requestDecoder = requestDecoder;
this.respTimeout = Duration.ofSeconds(networkingConfig.getRespTimeout());
}

@Override
Expand Down Expand Up @@ -121,15 +118,14 @@ private void handleRequest(
}

private void ensureRequestReceivedWithinTimeLimit(final RpcStream stream) {
final Duration timeout = respTimeout;
asyncRunner
.getDelayedFuture(timeout)
.getDelayedFuture(RECEIVE_INCOMING_REQUEST_TIMEOUT)
.thenAccept(
(__) -> {
if (!requestHandled.get()) {
LOG.debug(
"Failed to receive incoming request data within {} sec for protocol {}. Close stream.",
timeout.toSeconds(),
RECEIVE_INCOMING_REQUEST_TIMEOUT.toSeconds(),
protocolId);
stream.closeAbruptly().ifExceptionGetsHereRaiseABug();
}
Expand Down
Loading
Loading