diff --git a/core/src/main/java/org/bitcoinj/coinjoin/CoinJoinClientManager.java b/core/src/main/java/org/bitcoinj/coinjoin/CoinJoinClientManager.java index 239bf165c..df4ea3b80 100644 --- a/core/src/main/java/org/bitcoinj/coinjoin/CoinJoinClientManager.java +++ b/core/src/main/java/org/bitcoinj/coinjoin/CoinJoinClientManager.java @@ -107,6 +107,10 @@ private boolean waitForAnotherBlock() { return cachedBlockHeight - cachedLastSuccessBlock < minBlocksToWait; } + public boolean isWaitingForNewBlock() { + return waitForAnotherBlock(); + } + // Make sure we have enough keys since last backup private boolean checkAutomaticBackup() { return CoinJoinClientOptions.isEnabled() && isMixing(); diff --git a/core/src/main/java/org/bitcoinj/coinjoin/utils/CoinJoinManager.java b/core/src/main/java/org/bitcoinj/coinjoin/utils/CoinJoinManager.java index 422347bfd..0dae82820 100644 --- a/core/src/main/java/org/bitcoinj/coinjoin/utils/CoinJoinManager.java +++ b/core/src/main/java/org/bitcoinj/coinjoin/utils/CoinJoinManager.java @@ -159,6 +159,7 @@ public void stop() { public void initMasternodeGroup(AbstractBlockChain blockChain) { this.blockChain = blockChain; masternodeGroup = new MasternodeGroup(context, blockChain); + masternodeGroup.setCoinJoinManager(this); } NewBestBlockListener newBestBlockListener = new NewBestBlockListener() { @@ -374,4 +375,8 @@ public void setRequestDecryptedKey(RequestDecryptedKey requestDecryptedKey) { public @Nullable ECKey requestDecryptKey(ECKey key) { return requestDecryptedKey != null ? requestDecryptedKey.requestDecryptedKey(key) : null; } + + public boolean isWaitingForNewBlock() { + return coinJoinClientManagers.values().stream().anyMatch(CoinJoinClientManager::isWaitingForNewBlock); + } } diff --git a/core/src/main/java/org/bitcoinj/coinjoin/utils/MasternodeGroup.java b/core/src/main/java/org/bitcoinj/coinjoin/utils/MasternodeGroup.java index 6ca2b1ad2..361f7dd4c 100644 --- a/core/src/main/java/org/bitcoinj/coinjoin/utils/MasternodeGroup.java +++ b/core/src/main/java/org/bitcoinj/coinjoin/utils/MasternodeGroup.java @@ -18,6 +18,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ListenableFuture; import net.jcip.annotations.GuardedBy; import org.bitcoinj.coinjoin.CoinJoinClientOptions; import org.bitcoinj.coinjoin.CoinJoinClientSession; @@ -29,6 +30,10 @@ import org.bitcoinj.core.PeerAddress; import org.bitcoinj.core.PeerGroup; import org.bitcoinj.core.Sha256Hash; +import org.bitcoinj.core.StoredBlock; +import org.bitcoinj.core.Utils; +import org.bitcoinj.core.VerificationException; +import org.bitcoinj.core.listeners.NewBestBlockListener; import org.bitcoinj.evolution.Masternode; import org.bitcoinj.net.ClientConnectionManager; import org.bitcoinj.net.discovery.PeerDiscovery; @@ -45,17 +50,15 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.NoSuchElementException; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; -import static java.lang.Math.max; import static java.lang.Math.min; -public class MasternodeGroup extends PeerGroup { +public class MasternodeGroup extends PeerGroup implements NewBestBlockListener { private static final Logger log = LoggerFactory.getLogger(MasternodeGroup.class); private final ReentrantLock pendingMasternodesLock = Threading.lock("pendingMasternodes"); @@ -90,6 +93,8 @@ public void shutdown() { } }; + private CoinJoinManager coinJoinManager; + /** * See {@link #MasternodeGroup(Context)} * @@ -118,6 +123,7 @@ public MasternodeGroup(Context context) { */ public MasternodeGroup(NetworkParameters params, @Nullable AbstractBlockChain chain) { super(params, chain); + init(); } /** @@ -129,6 +135,7 @@ public MasternodeGroup(NetworkParameters params, @Nullable AbstractBlockChain ch */ public MasternodeGroup(NetworkParameters params, @Nullable AbstractBlockChain chain, @Nullable AbstractBlockChain headerChain) { super(params, chain, headerChain); + init(); } /** @@ -178,7 +185,11 @@ public boolean addPendingMasternode(CoinJoinClientSession session) { } - private final ExponentialBackoff.Params masternodeBackoffParams = new ExponentialBackoff.Params(1000, 1.001f, 10 * 1000); + + // Exponential backoff for peers starts at 1 second and maxes at 5 seconds. + private final ExponentialBackoff.Params masternodeBackoffParams = new ExponentialBackoff.Params(1000, 1.001f, 5 * 1000); + // Tracks failures globally in case of a network failure. + @GuardedBy("lock") private final ExponentialBackoff masternodeGroupBackoff = new ExponentialBackoff(new ExponentialBackoff.Params(1000, 1.5f, 2 * 1001)); // Adds peerAddress to backoffMap map and inactives queue. // Returns true if it was added, false if it was already there. @@ -192,8 +203,8 @@ protected boolean addInactive(PeerAddress peerAddress, int priority) { // do not connect to another the same peer twice if (isNodeConnected(peerAddress) || isNodeConnected(peerAddress)) { - log.info("attempting to connect to the same peer again: {}", peerAddress); - return false; // do not connect to the same p + log.info("attempting to connect to the same masternode again: {}", peerAddress); + return false; // do not connect to the same masternode } backoffMap.put(peerAddress, new ExponentialBackoff(masternodeBackoffParams)); @@ -224,12 +235,21 @@ private void updateMaxConnections() { } public boolean isMasternodeOrDisconnectRequested(MasternodeAddress addr) { - return forPeer(addr, new ForPeer() { + boolean found = forPeer(addr, new ForPeer() { @Override public boolean process(Peer peer) { return true; } - }); + }, false); + + if (!found) { + for (Masternode mn : pendingClosingMasternodes) { + if (mn.getService().equals(addr)) { + found = true; + } + } + } + return found; } public boolean disconnectMasternode(Masternode mn) { @@ -264,7 +284,13 @@ protected void handlePeerDeath(final Peer peer, @Nullable Throwable exception) { masternode != null ? masternode.getService().getSocketAddress() : "not found in closing list"); if (masternode != null) { pendingMasternodesLock.lock(); + PeerAddress address = peer.getAddress(); try { + if (pendingClosingMasternodes.contains(masternode)) { + // if this is part of pendingClosingMasternodes, where we want to close the connection, + // we don't want to increase the backoff time + backoffMap.get(address).trackSuccess(); + } pendingClosingMasternodes.remove(masternode); pendingSessions.remove(masternodeMap.get(masternode.getProTxHash())); masternodeMap.remove(masternode.getProTxHash()); @@ -305,7 +331,7 @@ protected Peer connectTo(PeerAddress address, boolean incrementMaxConnections, i } Peer peer = super.connectTo(address, incrementMaxConnections, connectTimeoutMillis); - log.info("masternode[connected] {}: {}; {}", peer.getAddress().getSocketAddress(), session.getMixingMasternodeInfo().getProTxHash(), session); + log.info("masternode[connecting] {}: {}; {}", peer.getAddress().getSocketAddress(), session.getMixingMasternodeInfo().getProTxHash(), session); return peer; } @@ -371,6 +397,10 @@ public CoinJoinClientSession getMixingSession(Peer masternode) { return addressMap.get(new MasternodeAddress(masternode.getAddress().getSocketAddress())); } + public void setCoinJoinManager(CoinJoinManager coinJoinManager) { + this.coinJoinManager = coinJoinManager; + } + public interface ForPeer { boolean process(Peer peer); } @@ -418,4 +448,129 @@ protected boolean isMasternodeSession(PeerAddress address) { pendingMasternodesLock.unlock(); } } + + @Override + protected void triggerConnections() { + // Run on a background thread due to the need to potentially retry and back off in the background. + if (!executor.isShutdown()) + executor.execute(triggerMasternodeConnectionsJob); + } + + private final Runnable triggerMasternodeConnectionsJob = new Runnable() { + private boolean firstRun = true; + private final static long MIN_PEER_DISCOVERY_INTERVAL = 1000L; + + @Override + public void run() { + try { + go(); + } catch (Throwable e) { + log.error("Exception when trying to build connections", e); // The executor swallows exceptions :( + } + } + + public void go() { + if (!isRunning()) return; + + if (coinJoinManager.isWaitingForNewBlock()) + return; + + boolean doDiscovery = false; + long now = Utils.currentTimeMillis(); + lock.lock(); + try { +// // First run: try and use a local node if there is one, for the additional security it can provide. +// // But, not on Android as there are none for this platform: it could only be a malicious app trying +// // to hijack our traffic. +// if (!Utils.isAndroidRuntime() && useLocalhostPeerWhenPossible && maybeCheckForLocalhostPeer() && firstRun) { +// log.info("Localhost peer detected, trying to use it instead of P2P discovery"); +// maxConnections = 0; +// connectToLocalHost(); +// return; +// } + + boolean havePeerWeCanTry = !inactives.isEmpty() && backoffMap.get(inactives.peek()).getRetryTime() <= now; + doDiscovery = !havePeerWeCanTry; + } finally { + firstRun = false; + lock.unlock(); + } + + // Don't hold the lock across discovery as this process can be very slow. + boolean discoverySuccess = false; + if (doDiscovery) { + discoverySuccess = discoverPeers() > 0; + } + + long retryTime; + PeerAddress addrToTry; + lock.lock(); + try { + if (doDiscovery) { + // Require that we have enough connections, to consider this + // a success, or we just constantly test for new peers + if (discoverySuccess && countConnectedAndPendingPeers() >= getMaxConnections()) { + masternodeGroupBackoff.trackSuccess(); + } else { + masternodeGroupBackoff.trackFailure(); + } + } + // Inactives is sorted by backoffMap time. + if (inactives.isEmpty()) { + if (countConnectedAndPendingPeers() < getMaxConnections()) { + long interval = Math.max(masternodeGroupBackoff.getRetryTime() - now, MIN_PEER_DISCOVERY_INTERVAL); + log.info("Masternode discovery didn't provide us any more masternodes, will try again in " + + interval + "ms."); + executor.schedule(this, interval, TimeUnit.MILLISECONDS); + } else { + // We have enough peers and discovery provided no more, so just settle down. Most likely we + // were given a fixed set of addresses in some test scenario. + } + return; + } else { + do { + addrToTry = inactives.poll(); + } while (isIpv6Unreachable() && addrToTry.getAddr() instanceof Inet6Address); + retryTime = backoffMap.get(addrToTry).getRetryTime(); + } + retryTime = Math.max(retryTime, masternodeGroupBackoff.getRetryTime()); + if (retryTime > now) { + long delay = retryTime - now; + log.info("Waiting {} ms before next connect attempt to masternode {}", delay, addrToTry == null ? "" : "to " + addrToTry); + inactives.add(addrToTry); + executor.schedule(this, delay, TimeUnit.MILLISECONDS); + return; + } + connectTo(addrToTry, false, getConnectTimeoutMillis()); + } finally { + lock.unlock(); + } + if (countConnectedAndPendingPeers() < getMaxConnections()) { + executor.execute(this); // Try next peer immediately. + } + } + }; + + @Override + public void notifyNewBestBlock(StoredBlock block) throws VerificationException { + log.info("New block found, restarting masternode connections job"); + triggerConnections(); + } + + @Override + public ListenableFuture startAsync() { + if (chain != null) { + chain.addNewBestBlockListener(this); + } + + return super.startAsync(); + } + + @Override + public ListenableFuture stopAsync() { + if (chain != null) { + chain.removeNewBestBlockListener(this); + } + return super.stopAsync(); + } } diff --git a/core/src/main/java/org/bitcoinj/core/PeerGroup.java b/core/src/main/java/org/bitcoinj/core/PeerGroup.java index 94939947a..fe6a50e89 100644 --- a/core/src/main/java/org/bitcoinj/core/PeerGroup.java +++ b/core/src/main/java/org/bitcoinj/core/PeerGroup.java @@ -178,6 +178,10 @@ public class PeerGroup implements TransactionBroadcaster, GovernanceVoteBroadcas @GuardedBy("lock") private boolean useLocalhostPeerWhenPossible = true; @GuardedBy("lock") private boolean ipv6Unreachable = false; + protected boolean isIpv6Unreachable() { + return ipv6Unreachable; + } + @GuardedBy("lock") private long fastCatchupTimeSecs; private final CopyOnWriteArrayList wallets; private final CopyOnWriteArrayList peerFilterProviders; @@ -340,7 +344,10 @@ public void onPeerDisconnected(Peer peer, int peerCount) { /** The default timeout between when a connection attempt begins and version message exchange completes */ public static final int DEFAULT_CONNECT_TIMEOUT_MILLIS = 5000; private volatile int vConnectTimeoutMillis = DEFAULT_CONNECT_TIMEOUT_MILLIS; - + protected int getConnectTimeoutMillis() { + return vConnectTimeoutMillis; + } + /** Whether bloom filter support is enabled when using a non FullPrunedBlockchain*/ private volatile boolean vBloomFilteringEnabled = true; @@ -611,7 +618,7 @@ public void go() { } }; - private void triggerConnections() { + protected void triggerConnections() { // Run on a background thread due to the need to potentially retry and back off in the background. if (!executor.isShutdown()) executor.execute(triggerConnectionsJob); @@ -1169,7 +1176,7 @@ void waitForJobQueue() { Futures.getUnchecked(executor.submit(Runnables.doNothing())); } - private int countConnectedAndPendingPeers() { + protected int countConnectedAndPendingPeers() { lock.lock(); try { return peers.size() + pendingPeers.size();