Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

7311: add peertask foundation code #7628

Open
wants to merge 60 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
4b80016
7311: Add PeerTask system for use in future PRs
Matilda-Clerke Sep 17, 2024
a8d5a9f
7311: Clean up some warnings
Matilda-Clerke Sep 17, 2024
e4be5c0
Merge branch 'main' into 7311-add-peertask-foundation-code
Matilda-Clerke Sep 18, 2024
5859444
Merge branch 'main' into 7311-add-peertask-foundation-code
Matilda-Clerke Sep 19, 2024
c335cbe
Merge branch 'main' into 7311-add-peertask-foundation-code
Matilda-Clerke Sep 20, 2024
08c66fd
7311: Reduce timeout in PeerTaskRequestSender to 5s
Matilda-Clerke Sep 20, 2024
049cae2
7311: Refactor PeerManager to be an interface
Matilda-Clerke Sep 20, 2024
5afba63
Merge branch 'main' into 7311-add-peertask-foundation-code
Matilda-Clerke Sep 20, 2024
f2ac53e
Merge branch 'main' into 7311-add-peertask-foundation-code
Matilda-Clerke Sep 23, 2024
e901fdf
Merge branch 'main' into 7311-add-peertask-foundation-code
Matilda-Clerke Sep 24, 2024
6e349e1
Merge branch 'main' into 7311-add-peertask-foundation-code
Matilda-Clerke Sep 25, 2024
ad86ae6
7311: Rename PeerManager to PeerSelector
Matilda-Clerke Sep 25, 2024
38f04ab
7311: Reword PeerSelector javadoc to avoid implementation details
Matilda-Clerke Sep 25, 2024
6de3fb3
7311: Use ConcurrentHashMap in DefaultPeerSelector
Matilda-Clerke Sep 25, 2024
da9cd43
7311: Reword trace log in DefaultPeerSelector
Matilda-Clerke Sep 25, 2024
ce7d245
7311: Remove unused imports
Matilda-Clerke Sep 25, 2024
c9eb22e
7311: Use a 1 second delay between retries in PeerTaskExecutor to mat…
Matilda-Clerke Sep 25, 2024
e2fda73
7311: Add testGetPeerButNoPeerMatchesFilter to DefaultPeerSelectorTest
Matilda-Clerke Sep 25, 2024
608fece
7311: Add testGetPeerButNoPeerMatchesFilter to DefaultPeerSelectorTest
Matilda-Clerke Sep 25, 2024
2d07800
7311: spotless
Matilda-Clerke Sep 25, 2024
ad26297
7311: Fix MetricsAcceptanceTest
Matilda-Clerke Sep 20, 2024
96c8030
7311: Fix MetricsAcceptanceTest
Matilda-Clerke Sep 20, 2024
b0f2ed0
7311: Modify PeerTaskExecutor metric to include response time from peer
Matilda-Clerke Sep 26, 2024
598b519
7311: Use SubProtocol instead of subprotocol name string in PeerTask
Matilda-Clerke Sep 26, 2024
bc25b16
7311: rename timing context to ignored to prevent intellij warnings
Matilda-Clerke Sep 26, 2024
e31bb70
7311: Use constants for number of retries
Matilda-Clerke Sep 26, 2024
41923d3
7311: Convert PeerTaskExecutorResult to a record
Matilda-Clerke Sep 26, 2024
720f94e
7311: Rename PeerTaskBehavior to PeerTaskRetryBehavior
Matilda-Clerke Sep 29, 2024
7d845b3
7311: Move peer selection logic to PeerSelector
Matilda-Clerke Sep 30, 2024
50c26f1
7311: spotless
Matilda-Clerke Sep 30, 2024
b7c0c95
Merge branch 'main' into 7311-add-peertask-foundation-code
Matilda-Clerke Sep 30, 2024
a81855d
Merge branch 'main' into 7311-add-peertask-foundation-code
Matilda-Clerke Sep 30, 2024
8718102
Merge branch 'main' into 7311-add-peertask-foundation-code
Matilda-Clerke Oct 3, 2024
e63f473
7311: Make changes as discussed in walkthrough meeting
Matilda-Clerke Oct 3, 2024
d1847f2
Merge branch 'main' into 7311-add-peertask-foundation-code
Matilda-Clerke Oct 4, 2024
6d2cb95
7311: Rename getPeerTaskBehavior to getPeerTaskRetryBehavior
Matilda-Clerke Oct 6, 2024
d84520a
7311: Rename getPeerTaskBehavior to getPeerTaskRetryBehavior
Matilda-Clerke Oct 6, 2024
77ed748
Merge remote-tracking branch 'origin/7311-add-peertask-foundation-cod…
Matilda-Clerke Oct 6, 2024
0896e31
7311: Rework PeerTaskExecutor retry system to be 0-based
Matilda-Clerke Oct 6, 2024
5f924c4
Merge branch 'main' into 7311-add-peertask-foundation-code
Matilda-Clerke Oct 6, 2024
2865625
Merge branch 'main' into 7311-add-peertask-foundation-code
Matilda-Clerke Oct 7, 2024
82cedb0
Merge branch 'main' into 7311-add-peertask-foundation-code
Matilda-Clerke Oct 7, 2024
bdd96ba
Merge branch 'main' into 7311-add-peertask-foundation-code
Matilda-Clerke Oct 8, 2024
c047f42
7311: Remove unused async methods in PeerTaskExecutor
Matilda-Clerke Oct 8, 2024
5aa6b0b
7311: Return Optional<EthPeer> in PeerSelector.getPeer and utilise ex…
Matilda-Clerke Oct 8, 2024
8becdb3
7311: Redo getPeer again to include hasAvailableRequestCapacity check
Matilda-Clerke Oct 8, 2024
8186a77
7311: Rework getPeer again to use LEAST_TO_MOST_BUSY comparator
Matilda-Clerke Oct 8, 2024
37b0ec2
7311: Import PeerNotConnected class instead of using fully qualified …
Matilda-Clerke Oct 8, 2024
545fd5c
7311: Change to specifying retry counts in PeerTask instead of behavi…
Matilda-Clerke Oct 9, 2024
4f544f4
Merge branch 'main' into 7311-add-peertask-foundation-code
Matilda-Clerke Oct 9, 2024
1c268b7
7311: Add additional metrics to PeerTaskExecutor
Matilda-Clerke Oct 10, 2024
b06f38b
7311: Add Predicate to PeerTask to check for partial success
Matilda-Clerke Oct 10, 2024
3c12d3d
7311: Fix incorrect name on isPartialSuccessTest
Matilda-Clerke Oct 10, 2024
b1c47ae
Merge branch 'main' into 7311-add-peertask-foundation-code
Matilda-Clerke Oct 10, 2024
d66dd3a
7311: Add partialSuccessCounter and inflightRequestGauge in PeerTaskE…
Matilda-Clerke Oct 11, 2024
fa22e93
Merge branch 'main' into 7311-add-peertask-foundation-code
Matilda-Clerke Oct 11, 2024
a3f5d4a
7311: Also filter by whether a peer is fully validated
Matilda-Clerke Oct 11, 2024
3a68980
7311: Fix up inflight requests gauge in PeerTaskExecutor
Matilda-Clerke Oct 11, 2024
c422bc5
Merge branch 'main' into 7311-add-peertask-foundation-code
Matilda-Clerke Oct 11, 2024
56c1f9d
7311: Update plugin api hash
Matilda-Clerke Oct 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.peertask;

public class InvalidPeerTaskResponseException extends Exception {

public InvalidPeerTaskResponseException() {
super();
}

public InvalidPeerTaskResponseException(final Throwable cause) {
super(cause);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.peertask;

public class NoAvailablePeerException extends Exception {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the descriptive comments!

Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.peertask;

import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.p2p.peers.PeerId;

import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.function.Predicate;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** "Manages" the EthPeers for the PeerTaskExecutor */
public class PeerManager {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the code a bit I think that PeerManager could just be an interface with one method: getPeer()
Different implementations could give us the behaviour that we are looking for, like how many retries, retry the same peer/other peers.
This would also allow us to pick peers when they are needed, e.g. to make sure that these peers don't have too many open requests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An interface to allow easy swapping of different implementations is a fine, but I'm not sure how we'd implement retries in the peer manager... the peer manager is completely separated from the PeerTasks and has no idea what tasks are being executed and whether they succeed or fail.

private static final Logger LOG = LoggerFactory.getLogger(PeerManager.class);

// use a synchronized map to ensure the map is never modified by multiple threads at once
private final Map<PeerId, EthPeer> ethPeersByPeerId =
Collections.synchronizedMap(new HashMap<>());

/**
* Gets the highest reputation peer matching the supplies filter
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels like an implementation detail of the DefaultPeerManager...if the interface requires it to be highest reputation should we rename the method to getHighestReputationPeer?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...or move the comment to DefaultPeerManager

...or merge PeerManager with DefaultPeerManager :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fair. Originally, there was no interface, so the javadoc described the implementation. I'll reword it

*
* @param filter a filter to match prospective peers with
* @return the highest reputation peer matching the supplies filter
* @throws NoAvailablePeerException If there are no suitable peers
*/
public EthPeer getPeer(final Predicate<EthPeer> filter) throws NoAvailablePeerException {
LOG.trace("Getting peer from pool of {} peers", ethPeersByPeerId.size());
return ethPeersByPeerId.values().stream()
.filter(filter)
.max(Comparator.naturalOrder())
.orElseThrow(NoAvailablePeerException::new);
}

public Optional<EthPeer> getPeerByPeerId(final PeerId peerId) {
return Optional.ofNullable(ethPeersByPeerId.get(peerId));
}

public void addPeer(final EthPeer ethPeer) {
ethPeersByPeerId.put(ethPeer.getConnection().getPeer(), ethPeer);
}

public void removePeer(final PeerId peerId) {
ethPeersByPeerId.remove(peerId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.peertask;

import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;

import java.util.Collection;

/**
* Represents a task to be executed on an EthPeer by the PeerTaskExecutor
*
* @param <T> The type of the result of this PeerTask
*/
public interface PeerTask<T> {
/**
* Returns the SubProtocol used for this PeerTask
*
* @return the SubProtocol used for this PeerTask
*/
String getSubProtocol();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the stronger SubProtocol type be used here? Not sure how much of hassle it is practice though

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Iirc I looked at using the SubProtocol type and decided against it. I can't remember why, so I'll take another look.


/**
* Gets the minimum required block number for a peer to have to successfully execute this task
*
* @return the minimum required block number for a peer to have to successfully execute this task
*/
long getRequiredBlockNumber();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: might need nice to combine these into a "prerequisites" object. Feels like details leaking into the interface

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a musing...what if we pass a peer into the task's prerequisite filter?

e.g. "here's my set of peers sorted by rep, choose the first one that matches my task's criteria."

Currently feels like peer selection logic is spread out across the Manager, Executor and Task


/**
* Gets the request data to send to the EthPeer
*
* @return the request data to send to the EthPeer
*/
MessageData getRequestMessage();

/**
* Parses the MessageData response from the EthPeer
*
* @param messageData the response MessageData to be parsed
* @return a T built from the response MessageData
* @throws InvalidPeerTaskResponseException if the response messageData is invalid
*/
T parseResponse(MessageData messageData) throws InvalidPeerTaskResponseException;

/**
* Gets the Collection of behaviors this task is expected to exhibit in the PeetTaskExecutor
*
* @return the Collection of behaviors this task is expected to exhibit in the PeetTaskExecutor
*/
Collection<PeerTaskBehavior> getPeerTaskBehaviors();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think a set would make more sense, don't think it makes sense to have duplicate behaviours

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep true

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.peertask;

public enum PeerTaskBehavior {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public enum PeerTaskBehavior {
public enum PeerTaskRetryBehavior {

RETRY_WITH_SAME_PEER,
RETRY_WITH_OTHER_PEERS
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this also include the non-retry single try behaviour if the intent is to include all the current peer behaviours? Otherwise, like @siladu mentioned, this should be the PeerTaskRetryBehavior

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-retry behaviour is just the absence of retry behaviours.

I wanted to leave this as PeerTaskBehavior to allow for other behaviors to be added later. They aren't strictly just for describing desired retry behavior, but there aren't currently any other behaviors.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if the goal is for the code to be more readable, I would go with more specific/descriptive naming and let whoever may (or may not) add more behaviours in later worry about the appropriate name then.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, that makes sense

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.peertask;

import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec;
import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.plugin.services.metrics.LabelledMetric;
import org.hyperledger.besu.plugin.services.metrics.OperationTimer;

import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;

/** Manages the execution of PeerTasks, respecting their PeerTaskBehavior */
public class PeerTaskExecutor {
private static final long[] WAIT_TIME_BEFORE_RETRY = {0, 20000, 5000};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the thinking behind these retry timings? Warrants a comment I think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea was to have a progressive back-off timer on retries. As discussed elsewhere, it would be better to stay consistent with the existing implemention (appears to be a 1 second delay), and make this sort of change later, so I'll change that shortly.

Copy link
Contributor

@siladu siladu Sep 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

progressive back-off timer on retries

0 -> 20 seconds -> 5 seconds? Should it {0, 5000, 20000}; instead?

but yeh, think we should avoid changing any functionality as much as possible (or do it in both old and new if it's a known improvement)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, it was using the retry countdown to select it. First retry, the countdown would be 2, so WAIT_TIME_BEFORE_RETRY[triesRemaining] would be 5000


private final PeerManager peerManager;
private final PeerTaskRequestSender requestSender;
private final Supplier<ProtocolSpec> protocolSpecSupplier;
private final LabelledMetric<OperationTimer> requestTimer;

public PeerTaskExecutor(
final PeerManager peerManager,
final PeerTaskRequestSender requestSender,
final Supplier<ProtocolSpec> protocolSpecSupplier,
final MetricsSystem metricsSystem) {
this.peerManager = peerManager;
this.requestSender = requestSender;
this.protocolSpecSupplier = protocolSpecSupplier;
requestTimer =
metricsSystem.createLabelledTimer(
BesuMetricCategory.PEERS, "Peer Task Executor Request Time", "", "Task Class Name");
}

public <T> PeerTaskExecutorResult<T> execute(final PeerTask<T> peerTask) {
PeerTaskExecutorResult<T> executorResult;
int triesRemaining =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this could be handled by the peer manager

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand how that would work. The peer manager doesn't have any connection to the tasks being run

peerTask.getPeerTaskBehaviors().contains(PeerTaskBehavior.RETRY_WITH_OTHER_PEERS) ? 3 : 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can a constant be created for the number of retries. Also may want to make the number of retries configurable as I think we use a different number of retries in various places.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added constants for now. I think I'll hold off on making them configurable until I get a better idea of how/when/why different values are used.

final Collection<EthPeer> usedEthPeers = new ArrayList<>();
do {
EthPeer peer;
try {
peer =
peerManager.getPeer(
(candidatePeer) ->
isPeerUnused(candidatePeer, usedEthPeers)
&& (protocolSpecSupplier.get().isPoS()
|| isPeerHeightHighEnough(
candidatePeer, peerTask.getRequiredBlockNumber()))
&& isPeerProtocolSuitable(candidatePeer, peerTask.getSubProtocol()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's just a naming thing, but I feel like peer selection logic shouldn't be within the Executor.

My mental model for an executor is it should handle executing the task for the given peer, and maybe retrying.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also struggling to find test coverage for this logic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with @siladu here; the peer selection would more cleanly belong in the PeerSelector

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, the idea here is that the calling code supplies a Predicate to effectively tell the PeerSelector what kind of peer is needed. I suppose if we move the logic to PeerSelector we can just expose additional getPeer methods for different use cases.

usedEthPeers.add(peer);
executorResult = executeAgainstPeer(peerTask, peer);
} catch (NoAvailablePeerException e) {
executorResult =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the previous code, we would have an additional wait of 5 seconds if there were no peers. Is this still needed? @pinges what do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I noticed that. Seems very weird to me that we'd just sit and wait for a peer, but only for 5 seconds. Imo, we should just let the task/pipeline fail so it can be retried later.

new PeerTaskExecutorResult<>(null, PeerTaskExecutorResponseCode.NO_PEER_AVAILABLE);
}
} while (--triesRemaining > 0
&& executorResult.getResponseCode() != PeerTaskExecutorResponseCode.SUCCESS);

return executorResult;
}

public <T> CompletableFuture<PeerTaskExecutorResult<T>> executeAsync(final PeerTask<T> peerTask) {
Copy link
Contributor

@siladu siladu Sep 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused, better IMO to add it in as needed in case future PRs don't end up using it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough, it's easy to remove it and bring it back later. What I'm finding is that the parallelisation seems to work well when done over a whole set of actions anyway.

return CompletableFuture.supplyAsync(() -> execute(peerTask));
}

public <T> PeerTaskExecutorResult<T> executeAgainstPeer(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be private for our existing uses, but afaik, there are some places that the calling code definitely cares about which peer it's requesting data from (particularly around pivot blocks, I think)

final PeerTask<T> peerTask, final EthPeer peer) {
MessageData requestMessageData = peerTask.getRequestMessage();
PeerTaskExecutorResult<T> executorResult;
int triesRemaining =
peerTask.getPeerTaskBehaviors().contains(PeerTaskBehavior.RETRY_WITH_SAME_PEER) ? 3 : 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bit unsure about this behaviour. If you specify RETRY_WITH_SAME_PEER and RETRY_WITH_OTHER_PEERS what would happen? It would select a peer and try with that peer up to 3 times and if that fails repeat the process up to 3 times. So it could potentially be trying the request up to 9 times?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's correct

do {
try {

MessageData responseMessageData;
try (final OperationTimer.TimingContext timingContext =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you rename timingContext to ignore then Intellij won't give a warning about a unused variable here

requestTimer.labels(peerTask.getClass().getSimpleName()).startTimer()) {
responseMessageData =
requestSender.sendRequest(peerTask.getSubProtocol(), requestMessageData, peer);
}
T result = peerTask.parseResponse(responseMessageData);
peer.recordUsefulResponse();
executorResult = new PeerTaskExecutorResult<>(result, PeerTaskExecutorResponseCode.SUCCESS);

} catch (PeerConnection.PeerNotConnected e) {
Matilda-Clerke marked this conversation as resolved.
Show resolved Hide resolved
executorResult =
new PeerTaskExecutorResult<>(null, PeerTaskExecutorResponseCode.PEER_DISCONNECTED);

} catch (InterruptedException | TimeoutException e) {
peer.recordRequestTimeout(requestMessageData.getCode());
executorResult = new PeerTaskExecutorResult<>(null, PeerTaskExecutorResponseCode.TIMEOUT);

} catch (InvalidPeerTaskResponseException e) {
peer.recordUselessResponse(e.getMessage());
executorResult =
new PeerTaskExecutorResult<>(null, PeerTaskExecutorResponseCode.INVALID_RESPONSE);

} catch (ExecutionException e) {
executorResult =
new PeerTaskExecutorResult<>(null, PeerTaskExecutorResponseCode.INTERNAL_SERVER_ERROR);
}
} while (--triesRemaining > 0
&& executorResult.getResponseCode() != PeerTaskExecutorResponseCode.SUCCESS
&& executorResult.getResponseCode() != PeerTaskExecutorResponseCode.PEER_DISCONNECTED
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you don't care about which peer is used you would try with another peer ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is in the executeAgainstPeer() method, so at this point in the code we do care which peer is used. If the calling code doesn't care which peer a task is run against, it can call the execute() method instead, which contains the logic for peer switching.

&& sleepBetweenRetries(WAIT_TIME_BEFORE_RETRY[triesRemaining]));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could be handled by the peer manager

Copy link
Contributor Author

@Matilda-Clerke Matilda-Clerke Sep 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how exactly. The peer manager doesn't (and shouldn't) know anything about whether a peer is requested for a retry or for a first attempt.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to sleep between retries?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't strictly need to sleep between retries, but if a peer is unable to respond before timing out, it's likely just busy or we're experiencing network issues, so waiting a moment before retrying can allow things a chance to improve.

Copy link
Contributor

@siladu siladu Sep 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding was this is a refactor so I think we should match the existing behaviour. Not sure how the retry timing worked before, possibly it is 1 second? https://github.com/hyperledger/besu/blob/main/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/AbstractRetryingPeerTask.java#L154

If we can later improve the peering performance with different timeouts that would be a good experiment to test out once the refactor is stable.

Instead of Thread.sleep, is there a reason not to use ethScheduler.scheduleFutureTask as before? I think we will lose the ethScheduler metrics if we just use vanilla CompleteableFutures

Copy link
Contributor Author

@Matilda-Clerke Matilda-Clerke Sep 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of Thread.sleep, is there a reason not to use ethScheduler.scheduleFutureTask as before?
Mainly because this particular method shouldn't be concerned with scheduling tasks. Its singular concern is to implement task execution with optional retries.

In addition, methods in the EthSchedule class seem to operate on either Runnables or EthTasks, which really doesn't combine well with the entirely different paradigm at play with PeerTasks and the PeerTaskExecutor. A better solution would be to ensure we have similar metrics set up in the PeerTaskExecutor.

Copy link
Contributor

@siladu siladu Sep 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mainly because this particular method shouldn't be concerned with scheduling tasks.

If we use it, EthScheduler would be the one concerned with scheduling tasks IMO. Anyway isn't this an implementation detail: implementing the retries with a wait time in the PeerTaskExecutor is equivalent to scheduling a future task with a wait time isn't it?

I think we need to maintain the metrics as they are, that's quite a big loss in functionality otherwise. Would much rather we use EthScheduler since a number of maintainers are trying to centralise thread execution here so we can reuse the metrics and testing support in a consistent way across Besu.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, I'll see if I can do a bit of rework to utilise EthScheduler to capture some metrics.


return executorResult;
}

public <T> CompletableFuture<PeerTaskExecutorResult<T>> executeAgainstPeerAsync(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused

final PeerTask<T> peerTask, final EthPeer peer) {
return CompletableFuture.supplyAsync(() -> executeAgainstPeer(peerTask, peer));
Matilda-Clerke marked this conversation as resolved.
Show resolved Hide resolved
}

private boolean sleepBetweenRetries(final long sleepTime) {
try {
Thread.sleep(sleepTime);
return true;
} catch (InterruptedException e) {
return false;
}
}

private static boolean isPeerUnused(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be handled by the peer manager

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These methods are here because they are used in the predicate specifically to request peers for the peer task executor. They may not be applicable to all calls to peerManager.getPeer.

They certainly could be moved to PeerManager, but that more tightly couples the peer manager to this one specific use case. Maintaining separation is what gives and maintains the simplicity of this code, as compared with the old system.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method doesn't need to be static

final EthPeer ethPeer, final Collection<EthPeer> usedEthPeers) {
return !usedEthPeers.contains(ethPeer);
}

private static boolean isPeerHeightHighEnough(final EthPeer ethPeer, final long requiredHeight) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be handled by the peer manager

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method doesn't need to be static

return ethPeer.chainState().getEstimatedHeight() >= requiredHeight;
}

private static boolean isPeerProtocolSuitable(final EthPeer ethPeer, final String protocol) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be handled by the peer manager

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method doesn't need to be static

return ethPeer.getProtocolName().equals(protocol);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.peertask;

public enum PeerTaskExecutorResponseCode {
SUCCESS,
NO_PEER_AVAILABLE,
PEER_DISCONNECTED,
INTERNAL_SERVER_ERROR,
TIMEOUT,
INVALID_RESPONSE
}
Loading
Loading