Skip to content

Commit

Permalink
fix(coinjoin): improve coinjoin stability with new connection manager…
Browse files Browse the repository at this point in the history
… thread
  • Loading branch information
HashEngineering committed Jul 24, 2023
1 parent 7a2855d commit aa138f8
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ private boolean waitForAnotherBlock() {
return cachedBlockHeight - cachedLastSuccessBlock < minBlocksToWait;
}

public boolean isWaitingForNewBlock() {
return waitForAnotherBlock();
}

// Make sure we have enough keys since last backup
private boolean checkAutomaticBackup() {
return CoinJoinClientOptions.isEnabled() && isMixing();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ public void stop() {
public void initMasternodeGroup(AbstractBlockChain blockChain) {
this.blockChain = blockChain;
masternodeGroup = new MasternodeGroup(context, blockChain);
masternodeGroup.setCoinJoinManager(this);
}

NewBestBlockListener newBestBlockListener = new NewBestBlockListener() {
Expand Down Expand Up @@ -374,4 +375,8 @@ public void setRequestDecryptedKey(RequestDecryptedKey requestDecryptedKey) {
public @Nullable ECKey requestDecryptKey(ECKey key) {
return requestDecryptedKey != null ? requestDecryptedKey.requestDecryptedKey(key) : null;
}

public boolean isWaitingForNewBlock() {
return coinJoinClientManagers.values().stream().anyMatch(CoinJoinClientManager::isWaitingForNewBlock);
}
}
173 changes: 164 additions & 9 deletions core/src/main/java/org/bitcoinj/coinjoin/utils/MasternodeGroup.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
import net.jcip.annotations.GuardedBy;
import org.bitcoinj.coinjoin.CoinJoinClientOptions;
import org.bitcoinj.coinjoin.CoinJoinClientSession;
Expand All @@ -29,6 +30,10 @@
import org.bitcoinj.core.PeerAddress;
import org.bitcoinj.core.PeerGroup;
import org.bitcoinj.core.Sha256Hash;
import org.bitcoinj.core.StoredBlock;
import org.bitcoinj.core.Utils;
import org.bitcoinj.core.VerificationException;
import org.bitcoinj.core.listeners.NewBestBlockListener;
import org.bitcoinj.evolution.Masternode;
import org.bitcoinj.net.ClientConnectionManager;
import org.bitcoinj.net.discovery.PeerDiscovery;
Expand All @@ -45,17 +50,15 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;

import static java.lang.Math.max;
import static java.lang.Math.min;


public class MasternodeGroup extends PeerGroup {
public class MasternodeGroup extends PeerGroup implements NewBestBlockListener {
private static final Logger log = LoggerFactory.getLogger(MasternodeGroup.class);

private final ReentrantLock pendingMasternodesLock = Threading.lock("pendingMasternodes");
Expand Down Expand Up @@ -90,6 +93,8 @@ public void shutdown() {

}
};
private CoinJoinManager coinJoinManager;

/**
* See {@link #MasternodeGroup(Context)}
*
Expand Down Expand Up @@ -118,6 +123,7 @@ public MasternodeGroup(Context context) {
*/
public MasternodeGroup(NetworkParameters params, @Nullable AbstractBlockChain chain) {
super(params, chain);
init();
}

/**
Expand All @@ -129,6 +135,7 @@ public MasternodeGroup(NetworkParameters params, @Nullable AbstractBlockChain ch
*/
public MasternodeGroup(NetworkParameters params, @Nullable AbstractBlockChain chain, @Nullable AbstractBlockChain headerChain) {
super(params, chain, headerChain);
init();
}

/**
Expand Down Expand Up @@ -178,7 +185,11 @@ public boolean addPendingMasternode(CoinJoinClientSession session) {
}


private final ExponentialBackoff.Params masternodeBackoffParams = new ExponentialBackoff.Params(1000, 1.001f, 10 * 1000);

// Exponential backoff for peers starts at 1 second and maxes at 5 seconds.
private final ExponentialBackoff.Params masternodeBackoffParams = new ExponentialBackoff.Params(1000, 1.001f, 5 * 1000);
// Tracks failures globally in case of a network failure.
@GuardedBy("lock") private final ExponentialBackoff masternodeGroupBackoff = new ExponentialBackoff(new ExponentialBackoff.Params(1000, 1.5f, 2 * 1001));

// Adds peerAddress to backoffMap map and inactives queue.
// Returns true if it was added, false if it was already there.
Expand All @@ -192,8 +203,8 @@ protected boolean addInactive(PeerAddress peerAddress, int priority) {

// do not connect to another the same peer twice
if (isNodeConnected(peerAddress) || isNodeConnected(peerAddress)) {
log.info("attempting to connect to the same peer again: {}", peerAddress);
return false; // do not connect to the same p
log.info("attempting to connect to the same masternode again: {}", peerAddress);
return false; // do not connect to the same masternode
}

backoffMap.put(peerAddress, new ExponentialBackoff(masternodeBackoffParams));
Expand Down Expand Up @@ -224,12 +235,21 @@ private void updateMaxConnections() {
}

public boolean isMasternodeOrDisconnectRequested(MasternodeAddress addr) {
return forPeer(addr, new ForPeer() {
boolean found = forPeer(addr, new ForPeer() {
@Override
public boolean process(Peer peer) {
return true;
}
});
}, false);

if (!found) {
for (Masternode mn : pendingClosingMasternodes) {
if (mn.getService().equals(addr)) {
found = true;
}
}
}
return found;
}

public boolean disconnectMasternode(Masternode mn) {
Expand Down Expand Up @@ -264,7 +284,13 @@ protected void handlePeerDeath(final Peer peer, @Nullable Throwable exception) {
masternode != null ? masternode.getService().getSocketAddress() : "not found in closing list");
if (masternode != null) {
pendingMasternodesLock.lock();
PeerAddress address = peer.getAddress();
try {
if (pendingClosingMasternodes.contains(masternode)) {
// if this is part of pendingClosingMasternodes, where we want to close the connection,
// we don't want to increase the backoff time
backoffMap.get(address).trackSuccess();
}
pendingClosingMasternodes.remove(masternode);
pendingSessions.remove(masternodeMap.get(masternode.getProTxHash()));
masternodeMap.remove(masternode.getProTxHash());
Expand Down Expand Up @@ -305,7 +331,7 @@ protected Peer connectTo(PeerAddress address, boolean incrementMaxConnections, i
}

Peer peer = super.connectTo(address, incrementMaxConnections, connectTimeoutMillis);
log.info("masternode[connected] {}: {}; {}", peer.getAddress().getSocketAddress(), session.getMixingMasternodeInfo().getProTxHash(), session);
log.info("masternode[connecting] {}: {}; {}", peer.getAddress().getSocketAddress(), session.getMixingMasternodeInfo().getProTxHash(), session);
return peer;

}
Expand Down Expand Up @@ -371,6 +397,10 @@ public CoinJoinClientSession getMixingSession(Peer masternode) {
return addressMap.get(new MasternodeAddress(masternode.getAddress().getSocketAddress()));
}

public void setCoinJoinManager(CoinJoinManager coinJoinManager) {
this.coinJoinManager = coinJoinManager;
}

public interface ForPeer {
boolean process(Peer peer);
}
Expand Down Expand Up @@ -418,4 +448,129 @@ protected boolean isMasternodeSession(PeerAddress address) {
pendingMasternodesLock.unlock();
}
}

@Override
protected void triggerConnections() {
// Run on a background thread due to the need to potentially retry and back off in the background.
if (!executor.isShutdown())
executor.execute(triggerMasternodeConnectionsJob);
}

private final Runnable triggerMasternodeConnectionsJob = new Runnable() {
private boolean firstRun = true;
private final static long MIN_PEER_DISCOVERY_INTERVAL = 1000L;

@Override
public void run() {
try {
go();
} catch (Throwable e) {
log.error("Exception when trying to build connections", e); // The executor swallows exceptions :(
}
}

public void go() {
if (!isRunning()) return;

if (coinJoinManager.isWaitingForNewBlock())
return;

boolean doDiscovery = false;
long now = Utils.currentTimeMillis();
lock.lock();
try {
// // First run: try and use a local node if there is one, for the additional security it can provide.
// // But, not on Android as there are none for this platform: it could only be a malicious app trying
// // to hijack our traffic.
// if (!Utils.isAndroidRuntime() && useLocalhostPeerWhenPossible && maybeCheckForLocalhostPeer() && firstRun) {
// log.info("Localhost peer detected, trying to use it instead of P2P discovery");
// maxConnections = 0;
// connectToLocalHost();
// return;
// }

boolean havePeerWeCanTry = !inactives.isEmpty() && backoffMap.get(inactives.peek()).getRetryTime() <= now;
doDiscovery = !havePeerWeCanTry;
} finally {
firstRun = false;
lock.unlock();
}

// Don't hold the lock across discovery as this process can be very slow.
boolean discoverySuccess = false;
if (doDiscovery) {
discoverySuccess = discoverPeers() > 0;
}

long retryTime;
PeerAddress addrToTry;
lock.lock();
try {
if (doDiscovery) {
// Require that we have enough connections, to consider this
// a success, or we just constantly test for new peers
if (discoverySuccess && countConnectedAndPendingPeers() >= getMaxConnections()) {
masternodeGroupBackoff.trackSuccess();
} else {
masternodeGroupBackoff.trackFailure();
}
}
// Inactives is sorted by backoffMap time.
if (inactives.isEmpty()) {
if (countConnectedAndPendingPeers() < getMaxConnections()) {
long interval = Math.max(masternodeGroupBackoff.getRetryTime() - now, MIN_PEER_DISCOVERY_INTERVAL);
log.info("Masternode discovery didn't provide us any more masternodes, will try again in "
+ interval + "ms.");
executor.schedule(this, interval, TimeUnit.MILLISECONDS);
} else {
// We have enough peers and discovery provided no more, so just settle down. Most likely we
// were given a fixed set of addresses in some test scenario.
}
return;
} else {
do {
addrToTry = inactives.poll();
} while (isIpv6Unreachable() && addrToTry.getAddr() instanceof Inet6Address);
retryTime = backoffMap.get(addrToTry).getRetryTime();
}
retryTime = Math.max(retryTime, masternodeGroupBackoff.getRetryTime());
if (retryTime > now) {
long delay = retryTime - now;
log.info("Waiting {} ms before next connect attempt to masternode {}", delay, addrToTry == null ? "" : "to " + addrToTry);
inactives.add(addrToTry);
executor.schedule(this, delay, TimeUnit.MILLISECONDS);
return;
}
connectTo(addrToTry, false, getConnectTimeoutMillis());
} finally {
lock.unlock();
}
if (countConnectedAndPendingPeers() < getMaxConnections()) {
executor.execute(this); // Try next peer immediately.
}
}
};

@Override
public void notifyNewBestBlock(StoredBlock block) throws VerificationException {
log.info("New block found, restarting masternode connections job");
triggerConnections();
}

@Override
public ListenableFuture startAsync() {
if (chain != null) {
chain.addNewBestBlockListener(this);
}

return super.startAsync();
}

@Override
public ListenableFuture stopAsync() {
if (chain != null) {
chain.removeNewBestBlockListener(this);
}
return super.stopAsync();
}
}
13 changes: 10 additions & 3 deletions core/src/main/java/org/bitcoinj/core/PeerGroup.java
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,10 @@ public class PeerGroup implements TransactionBroadcaster, GovernanceVoteBroadcas
@GuardedBy("lock") private boolean useLocalhostPeerWhenPossible = true;
@GuardedBy("lock") private boolean ipv6Unreachable = false;

protected boolean isIpv6Unreachable() {
return ipv6Unreachable;
}

@GuardedBy("lock") private long fastCatchupTimeSecs;
private final CopyOnWriteArrayList<Wallet> wallets;
private final CopyOnWriteArrayList<PeerFilterProvider> peerFilterProviders;
Expand Down Expand Up @@ -340,7 +344,10 @@ public void onPeerDisconnected(Peer peer, int peerCount) {
/** The default timeout between when a connection attempt begins and version message exchange completes */
public static final int DEFAULT_CONNECT_TIMEOUT_MILLIS = 5000;
private volatile int vConnectTimeoutMillis = DEFAULT_CONNECT_TIMEOUT_MILLIS;

protected int getConnectTimeoutMillis() {
return vConnectTimeoutMillis;
}

/** Whether bloom filter support is enabled when using a non FullPrunedBlockchain*/
private volatile boolean vBloomFilteringEnabled = true;

Expand Down Expand Up @@ -611,7 +618,7 @@ public void go() {
}
};

private void triggerConnections() {
protected void triggerConnections() {
// Run on a background thread due to the need to potentially retry and back off in the background.
if (!executor.isShutdown())
executor.execute(triggerConnectionsJob);
Expand Down Expand Up @@ -1169,7 +1176,7 @@ void waitForJobQueue() {
Futures.getUnchecked(executor.submit(Runnables.doNothing()));
}

private int countConnectedAndPendingPeers() {
protected int countConnectedAndPendingPeers() {
lock.lock();
try {
return peers.size() + pendingPeers.size();
Expand Down

0 comments on commit aa138f8

Please sign in to comment.