Skip to content

Commit

Permalink
feat(coinjoin): add tracking of transactions
Browse files Browse the repository at this point in the history
  • Loading branch information
HashEngineering committed Jul 18, 2023
1 parent c50ce77 commit c236738
Show file tree
Hide file tree
Showing 16 changed files with 326 additions and 34 deletions.
2 changes: 2 additions & 0 deletions core/src/main/java/org/bitcoinj/coinjoin/CoinJoin.java
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,8 @@ public static String getMessageByID(PoolMessage nMessageID) {
return "Inputs vs outputs size mismatch.";
case ERR_TIMEOUT:
return "Session has timed out.";
case ERR_CONNECTION_TIMEOUT:
return "Connection attempt has timed out (" + PendingDsaRequest.TIMEOUT + " ms).";
default:
return "Unknown response.";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.google.common.collect.Lists;
import com.google.common.util.concurrent.SettableFuture;
import org.bitcoinj.coinjoin.listeners.CoinJoinTransactionListener;
import org.bitcoinj.coinjoin.listeners.MixingCompleteListener;
import org.bitcoinj.coinjoin.listeners.MixingStartedListener;
import org.bitcoinj.coinjoin.listeners.SessionCompleteListener;
Expand Down Expand Up @@ -47,11 +48,9 @@
import java.util.Collections;
import java.util.Deque;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -96,6 +95,8 @@ public class CoinJoinClientManager {
= new CopyOnWriteArrayList<>();
private final CopyOnWriteArrayList<ListenerRegistration<MixingCompleteListener>> mixingCompleteListeners
= new CopyOnWriteArrayList<>();
private final CopyOnWriteArrayList<ListenerRegistration<CoinJoinTransactionListener>> transactionListeners
= new CopyOnWriteArrayList<>();

private boolean waitForAnotherBlock() {
if (context.masternodeSync.hasSyncFlag(MasternodeSync.SYNC_FLAGS.SYNC_GOVERNANCE) &&
Expand Down Expand Up @@ -253,12 +254,16 @@ public boolean doAutomaticDenominating(boolean dryRun) {
try {
if (deqSessions.size() < CoinJoinClientOptions.getSessions()) {
CoinJoinClientSession newSession = new CoinJoinClientSession(mixingWallet);
log.info("creating new session: {}: ", newSession.getId());
for (ListenerRegistration<SessionCompleteListener> listener : sessionCompleteListeners) {
newSession.addSessionCompleteListener(listener.executor, listener.listener);
}
for (ListenerRegistration<SessionStartedListener> listener : sessionStartedListeners) {
newSession.addSessionStartedListener(listener.executor, listener.listener);
}
for (ListenerRegistration<CoinJoinTransactionListener> listener : transactionListeners) {
newSession.addTransationListener (listener.executor, listener.listener);
}
deqSessions.addLast(newSession);
}
for (CoinJoinClientSession session: deqSessions) {
Expand Down Expand Up @@ -595,6 +600,37 @@ public void run() {
}
}

/**
* Adds an event listener object. Methods on this object are called when something interesting happens,
* like receiving money. Runs the listener methods in the user thread.
*/
public void addTransationListener (CoinJoinTransactionListener listener) {
addTransationListener (Threading.USER_THREAD, listener);
}

/**
* Adds an event listener object. Methods on this object are called when something interesting happens,
* like receiving money. The listener is executed by the given executor.
*/
public void addTransationListener (Executor executor, CoinJoinTransactionListener listener) {
// This is thread safe, so we don't need to take the lock.
transactionListeners.add(new ListenerRegistration<>(listener, executor));
for (CoinJoinClientSession session: deqSessions) {
session.addTransationListener (executor, listener);
}
}

/**
* Removes the given event listener object. Returns true if the listener was removed, false if that listener
* was never added.
*/
public boolean removeTransationListener(CoinJoinTransactionListener listener) {
for (CoinJoinClientSession session: deqSessions) {
session.removeTransactionListener(listener);
}
return ListenerRegistration.removeFromList(listener, transactionListeners);
}

public List<PoolStatus> getSessionsStatus() {
ArrayList<PoolStatus> sessionsStatus = Lists.newArrayList();
for (CoinJoinClientSession session : deqSessions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package org.bitcoinj.coinjoin;

import com.google.common.collect.Lists;
import org.bitcoinj.coinjoin.listeners.CoinJoinTransactionListener;
import org.bitcoinj.coinjoin.listeners.CoinJoinTransactionType;
import org.bitcoinj.coinjoin.listeners.SessionCompleteListener;
import org.bitcoinj.coinjoin.listeners.SessionStartedListener;
import org.bitcoinj.coinjoin.utils.CompactTallyItem;
Expand All @@ -26,6 +28,7 @@
import org.bitcoinj.coinjoin.utils.TransactionBuilderOutput;
import org.bitcoinj.core.Coin;
import org.bitcoinj.core.ECKey;
import org.bitcoinj.core.MasternodeAddress;
import org.bitcoinj.core.MasternodeSync;
import org.bitcoinj.core.Message;
import org.bitcoinj.core.NetworkParameters;
Expand Down Expand Up @@ -72,6 +75,7 @@
import static org.bitcoinj.coinjoin.CoinJoinConstants.COINJOIN_ENTRY_MAX_SIZE;
import static org.bitcoinj.coinjoin.CoinJoinConstants.COINJOIN_QUEUE_TIMEOUT;
import static org.bitcoinj.coinjoin.CoinJoinConstants.COINJOIN_SIGNING_TIMEOUT;
import static org.bitcoinj.coinjoin.PoolMessage.ERR_CONNECTION_TIMEOUT;
import static org.bitcoinj.coinjoin.PoolMessage.ERR_SESSION;
import static org.bitcoinj.coinjoin.PoolMessage.ERR_TIMEOUT;
import static org.bitcoinj.coinjoin.PoolMessage.MSG_POOL_MAX;
Expand Down Expand Up @@ -109,6 +113,8 @@ public class CoinJoinClientSession extends CoinJoinBaseSession {
= new CopyOnWriteArrayList<>();
private final CopyOnWriteArrayList<ListenerRegistration<SessionCompleteListener>> sessionCompleteListeners
= new CopyOnWriteArrayList<>();
private final CopyOnWriteArrayList<ListenerRegistration<CoinJoinTransactionListener>> transactionListeners
= new CopyOnWriteArrayList<>();

/// Create denominations
private boolean createDenominated(Coin balanceToDenominate) {
Expand Down Expand Up @@ -361,6 +367,8 @@ public int process(Coin amount) {

log.info("coinjoin: txid: {}", strResult);

queueCreateDenominationListeners(txBuilder.getTransaction(), CoinJoinTransactionType.CREATE_DENOMINATION);

return true;
}

Expand Down Expand Up @@ -588,7 +596,7 @@ private boolean joinExistingQueue(Coin balanceNeedsAnonymized) {
mixingWallet.getContext().coinJoinManager.addPendingMasternode(this);
setState(POOL_STATE_QUEUE);
timeLastSuccessfulStep.set(Utils.currentTimeSeconds());
log.info("coinjoin: pending connection (from queue): sessionDenom: {} ({}), addr={}",
log.info("coinjoin: join existing queue -> pending connection, sessionDenom: {} ({}), addr={}",
sessionDenom, CoinJoin.denominationToString(sessionDenom), dmn.getService());
setStatus(PoolStatus.CONNECTING);
return true;
Expand Down Expand Up @@ -643,7 +651,7 @@ private boolean startNewQueue(Coin balanceNeedsAnonymized) {
continue;
}

log.info("coinjoin: attempt {} connection to Masternode {}, {}", nTries, dmn.getService(), dmn.getProTxHash());
log.info("coinjoin: attempt {} connection to masternode {}, protx: {}", nTries + 1, dmn.getService(), dmn.getProTxHash());

// try to get a single random denom out of setAmounts
while (sessionDenom == 0) {
Expand Down Expand Up @@ -817,7 +825,7 @@ public boolean process(Peer peer) {
peer.sendMessage(entry);
return true;
}
})) {
}, true)) {
log.info("coinjoin: failed to send to {} CoinJoinEntry: {}", mixingMasternode.getService().getSocketAddress(), entry);
}
}
Expand Down Expand Up @@ -898,6 +906,7 @@ private void completedTransaction(PoolMessage messageID) {
lock.lock();
try {
setNull(); // this will also disconnect the masternode
setStatus(PoolStatus.COMPLETE);
} finally {
lock.unlock();
}
Expand Down Expand Up @@ -1066,14 +1075,18 @@ protected void setNull() {
}
mixingMasternode = null;
pendingDsaRequest = null;
log.info("session zeroed out");
log.info("session zeroed out {}; {}", state, status);
super.setNull();
}

// internal session id
static int nextId = 0;
private final int id;

public int getId() {
return id;
}

public CoinJoinClientSession(WalletEx mixingWallet) {
super(mixingWallet.getContext());
this.mixingWallet = mixingWallet;
Expand Down Expand Up @@ -1371,7 +1384,7 @@ public boolean doAutomaticDenominating(boolean fDryRun) {
}
} else {
if (!isMyCollateralValid || !CoinJoin.isCollateralValid(txMyCollateral)) {
log.info("coinjoin: invalid collateral, recreating...");
log.info("coinjoin: invalid collateral, recreating... [id: {}] ", id);
if (!createCollateralTransaction(txMyCollateral, strReason)) {
log.info("coinjoin: create collateral error: {}", strReason);
return false;
Expand Down Expand Up @@ -1482,12 +1495,14 @@ public boolean process(Peer peer) {
log.info("sending {} to {}", pendingDsaRequest.getDsa(), peer);
return true;
}
});
}, false);

if (sentMessage) {
pendingDsaRequest = null;
} else if (pendingDsaRequest.isExpired()) {
log.info("coinjoin -- failed to connect to {}", pendingDsaRequest.getAddress());
log.info("coinjoin -- failed to connect to {}; reason: expired", pendingDsaRequest.getAddress());
setStatus(PoolStatus.CONNECTION_TIMEOUT);
queueSessionCompleteListeners(ERR_CONNECTION_TIMEOUT);
setNull();
}

Expand Down Expand Up @@ -1519,6 +1534,7 @@ public boolean checkTimeout() {

setState(POOL_STATE_ERROR);
queueSessionCompleteListeners(ERR_TIMEOUT);
setStatus(PoolStatus.TIMEOUT);
unlockCoins();
keyHolderStorage.returnAll();
timeLastSuccessfulStep.set(Utils.currentTimeSeconds());
Expand Down Expand Up @@ -1627,7 +1643,41 @@ protected void queueSessionCompleteListeners(PoolMessage message) {
registration.executor.execute(new Runnable() {
@Override
public void run() {
registration.listener.onSessionComplete(mixingWallet, getSessionID(), getSessionDenom(), message);
MasternodeAddress address = mixingMasternode.getService();
registration.listener.onSessionComplete(mixingWallet, getSessionID(), getSessionDenom(), message, address);
}
});
}
}

public void addTransationListener(CoinJoinTransactionListener listener) {
addTransationListener (Threading.USER_THREAD, listener);
}

/**
* Adds an event listener object. Methods on this object are called when something interesting happens,
* like receiving money. The listener is executed by the given executor.
*/
public void addTransationListener(Executor executor, CoinJoinTransactionListener listener) {
// This is thread safe, so we don't need to take the lock.
transactionListeners.add(new ListenerRegistration<>(listener, executor));
}

/**
* Removes the given event listener object. Returns true if the listener was removed, false if that listener
* was never added.
*/
public boolean removeTransactionListener(CoinJoinTransactionListener listener) {
return ListenerRegistration.removeFromList(listener, transactionListeners);
}

protected void queueCreateDenominationListeners(Transaction denominationTransaction, CoinJoinTransactionType type) {
//checkState(lock.isHeldByCurrentThread());
for (final ListenerRegistration<CoinJoinTransactionListener> registration : transactionListeners) {
registration.executor.execute(new Runnable() {
@Override
public void run() {
registration.listener.onTransactionProcessed(denominationTransaction, type);
}
});
}
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/java/org/bitcoinj/coinjoin/PoolMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ public enum PoolMessage {
MSG_POOL_MAX(ERR_SIZE_MISMATCH.value),

// extra values for DASHJ Reporting
ERR_TIMEOUT(23);
ERR_TIMEOUT(23),
ERR_CONNECTION_TIMEOUT(24);

public final int value;

Expand Down
10 changes: 8 additions & 2 deletions core/src/main/java/org/bitcoinj/coinjoin/PoolStatus.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,12 @@ public enum PoolStatus {
WARMUP(0x0001),
IDLE(0x0002),
CONNECTING(0x0003),
MIXING(0x0004),
FINISHED(0x1005),
CONNECTED(0x0004),
MIXING(0x0005),
COMPLETE(0x0106),
FINISHED(0x1007),
TIMEOUT(0x0107),
CONNECTION_TIMEOUT(0x0108),
// Errors
ERR_NO_INPUTS(0x2100),
ERR_MASTERNODE_NOT_FOUND(0x2101),
Expand All @@ -35,6 +39,7 @@ public enum PoolStatus {
private static final int STOP = 0x1000;
private static final int ERROR = 0x2000;
private static final int WARNING = 0x4000;
private static final int COMPLETED = 0x0100;

final int value;
PoolStatus(int value) {
Expand All @@ -44,4 +49,5 @@ public enum PoolStatus {
public boolean isError() { return (value & ERROR) != 0; }
public boolean isWarning() { return (value & WARNING) != 0; }
public boolean shouldStop() { return (value & STOP) != 0; }
public boolean sessionCompleted() { return (value & COMPLETED) != 0; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright (c) 2023 Dash Core Group
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.bitcoinj.coinjoin.listeners;

import org.bitcoinj.core.Transaction;

public interface CoinJoinTransactionListener {
void onTransactionProcessed(Transaction denominationTx, CoinJoinTransactionType type);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright (c) 2023 Dash Core Group
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.bitcoinj.coinjoin.listeners;

public enum CoinJoinTransactionType {
CREATE_DENOMINATION,
CREATE_COLLATERAL,
MIXING_FEE,
COINJOIN,
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@
package org.bitcoinj.coinjoin.listeners;

import org.bitcoinj.coinjoin.PoolMessage;
import org.bitcoinj.core.MasternodeAddress;
import org.bitcoinj.wallet.WalletEx;

public interface SessionCompleteListener {
void onSessionComplete(WalletEx wallet, int sessionId, int denomination, PoolMessage message);
void onSessionComplete(WalletEx wallet, int sessionId, int denomination, PoolMessage message, MasternodeAddress address);
}
Loading

0 comments on commit c236738

Please sign in to comment.