diff --git a/orchid/pom.xml b/orchid/pom.xml index f363830c06a2..a34c7d2edc39 100644 --- a/orchid/pom.xml +++ b/orchid/pom.xml @@ -114,6 +114,11 @@ 4.11 test + + com.google.guava + guava + 16.0.1 + diff --git a/orchid/src/com/subgraph/orchid/Threading.java b/orchid/src/com/subgraph/orchid/Threading.java new file mode 100644 index 000000000000..526fd3c5b56e --- /dev/null +++ b/orchid/src/com/subgraph/orchid/Threading.java @@ -0,0 +1,44 @@ +package com.subgraph.orchid; + +import com.google.common.util.concurrent.CycleDetectingLockFactory; + +import java.util.concurrent.locks.ReentrantLock; + +/** + * Created by android on 8/22/14. + */ +public class Threading { + static { + // Default policy goes here. If you want to change this, use one of the static methods before + // instantiating any orchid objects. The policy change will take effect only on new objects + // from that point onwards. + throwOnLockCycles(); + } + + private static CycleDetectingLockFactory.Policy policy; + public static CycleDetectingLockFactory factory; + + public static ReentrantLock lock(String name) { + return factory.newReentrantLock(name); + } + + public static void warnOnLockCycles() { + setPolicy(CycleDetectingLockFactory.Policies.WARN); + } + + public static void throwOnLockCycles() { + setPolicy(CycleDetectingLockFactory.Policies.THROW); + } + + public static void ignoreLockCycles() { + setPolicy(CycleDetectingLockFactory.Policies.DISABLED); + } + + public static void setPolicy(CycleDetectingLockFactory.Policy policy) { + Threading.policy = policy; + factory = CycleDetectingLockFactory.newInstance(policy); + } + + public static CycleDetectingLockFactory.Policy getPolicy() { + return policy; + } diff --git a/orchid/src/com/subgraph/orchid/circuits/CircuitIO.java b/orchid/src/com/subgraph/orchid/circuits/CircuitIO.java index 0834a3e1e1c4..11d765d4c167 100644 --- a/orchid/src/com/subgraph/orchid/circuits/CircuitIO.java +++ b/orchid/src/com/subgraph/orchid/circuits/CircuitIO.java @@ -9,6 +9,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Level; import java.util.logging.Logger; @@ -18,6 +19,7 @@ import com.subgraph.orchid.ConnectionIOException; import com.subgraph.orchid.RelayCell; import com.subgraph.orchid.Stream; +import com.subgraph.orchid.Threading; import com.subgraph.orchid.TorException; import com.subgraph.orchid.circuits.cells.CellImpl; import com.subgraph.orchid.circuits.cells.RelayCellImpl; @@ -36,11 +38,10 @@ public class CircuitIO implements DashboardRenderable { private final BlockingQueue relayCellResponseQueue; private final BlockingQueue controlCellResponseQueue; private final Map streamMap; - private final Object relaySendLock = new Object(); + private final ReentrantLock streamLock = Threading.lock("stream"); + private final ReentrantLock relaySendLock = Threading.lock("relaySend"); - /** LOCKING: streamMap */ private boolean isMarkedForClose; - /** LOCKING: streamMap */ private boolean isClosed; CircuitIO(CircuitImpl circuit, Connection connection, int circuitId) { @@ -171,7 +172,8 @@ private void processRelayDataCell(RelayCell cell) { } } - synchronized(streamMap) { + streamLock.lock(); + try { final StreamImpl stream = streamMap.get(cell.getStreamId()); // It's not unusual for the stream to not be found. For example, if a RELAY_CONNECTED arrives after // the client has stopped waiting for it, the stream will never be tracked and eventually the edge node @@ -179,6 +181,8 @@ private void processRelayDataCell(RelayCell cell) { if(stream != null) { stream.addInputCell(cell); } + } finally { + streamLock.unlock(); } } @@ -187,7 +191,8 @@ RelayCell createRelayCell(int relayCommand, int streamId, CircuitNode targetNode } void sendRelayCellTo(RelayCell cell, CircuitNode targetNode) { - synchronized(relaySendLock) { + relaySendLock.lock(); + try { logRelayCell("Sending: ", cell); cell.setLength(); targetNode.updateForwardDigest(cell); @@ -200,6 +205,8 @@ void sendRelayCellTo(RelayCell cell, CircuitNode targetNode) { targetNode.waitForSendWindowAndDecrement(); sendCell(cell); + } finally { + relaySendLock.unlock(); } } @@ -236,20 +243,26 @@ void sendCell(Cell cell) { void markForClose() { boolean shouldClose; - synchronized (streamMap) { + streamLock.lock(); + try { if(isMarkedForClose) { return; } isMarkedForClose = true; shouldClose = streamMap.isEmpty(); + } finally { + streamLock.unlock(); } if(shouldClose) closeCircuit(); } boolean isMarkedForClose() { - synchronized (streamMap) { + streamLock.lock(); + try { return isMarkedForClose; + } finally { + streamLock.unlock(); } } @@ -276,7 +289,8 @@ private void processCircuitSendme(RelayCell cell) { } void destroyCircuit() { - synchronized(streamMap) { + streamLock.lock(); + try { if(isClosed) { return; } @@ -287,31 +301,42 @@ void destroyCircuit() { s.close(); } isClosed = true; + } finally { + streamLock.unlock(); } } StreamImpl createNewStream(boolean autoclose) { - synchronized(streamMap) { + streamLock.lock(); + try { final int streamId = circuit.getStatus().nextStreamId(); final StreamImpl stream = new StreamImpl(circuit, circuit.getFinalCircuitNode(), streamId, autoclose); streamMap.put(streamId, stream); return stream; + } finally { + streamLock.unlock(); } } void removeStream(StreamImpl stream) { boolean shouldClose; - synchronized(streamMap) { + streamLock.lock(); + try { streamMap.remove(stream.getStreamId()); shouldClose = streamMap.isEmpty() && isMarkedForClose; + } finally { + streamLock.unlock(); } if(shouldClose) closeCircuit(); } List getActiveStreams() { - synchronized (streamMap) { + streamLock.lock(); + try { return new ArrayList(streamMap.values()); + } finally { + streamLock.unlock(); } } diff --git a/orchid/src/com/subgraph/orchid/circuits/CircuitManagerImpl.java b/orchid/src/com/subgraph/orchid/circuits/CircuitManagerImpl.java index cac1142d2c52..12b3ee7911f7 100644 --- a/orchid/src/com/subgraph/orchid/circuits/CircuitManagerImpl.java +++ b/orchid/src/com/subgraph/orchid/circuits/CircuitManagerImpl.java @@ -13,6 +13,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.locks.ReentrantLock; import com.subgraph.orchid.Circuit; import com.subgraph.orchid.CircuitBuildHandler; @@ -29,6 +30,7 @@ import com.subgraph.orchid.Router; import com.subgraph.orchid.Stream; import com.subgraph.orchid.StreamConnectFailedException; +import com.subgraph.orchid.Threading; import com.subgraph.orchid.Tor; import com.subgraph.orchid.TorConfig; import com.subgraph.orchid.circuits.guards.EntryGuards; @@ -62,6 +64,7 @@ interface CircuitFilter { private final TorInitializationTracker initializationTracker; private final CircuitPathChooser pathChooser; private final HiddenServiceManager hiddenServiceManager; + private final ReentrantLock lock = Threading.lock("circuitManager"); public CircuitManagerImpl(TorConfig config, DirectoryDownloaderImpl directoryDownloader, Directory directory, ConnectionCache connectionCache, TorInitializationTracker initializationTracker) { this.config = config; @@ -87,13 +90,19 @@ public void startBuildingCircuits() { scheduledExecutor.scheduleAtFixedRate(circuitCreationTask, 0, 1000, TimeUnit.MILLISECONDS); } - public synchronized void stopBuildingCircuits(boolean killCircuits) { - scheduledExecutor.shutdownNow(); - if(killCircuits) { - List circuits = new ArrayList(activeCircuits); - for(CircuitImpl c: circuits) { - c.destroyCircuit(); + public void stopBuildingCircuits(boolean killCircuits) { + lock.lock(); + + try { + scheduledExecutor.shutdownNow(); + if (killCircuits) { + List circuits = new ArrayList(activeCircuits); + for (CircuitImpl c : circuits) { + c.destroyCircuit(); + } } + } finally { + lock.unlock(); } } @@ -114,8 +123,10 @@ void removeActiveCircuit(CircuitImpl circuit) { } } - synchronized int getActiveCircuitCount() { - return activeCircuits.size(); + int getActiveCircuitCount() { + synchronized (activeCircuits) { + return activeCircuits.size(); + } } Set getPendingCircuits() { @@ -126,17 +137,29 @@ public boolean filter(Circuit circuit) { }); } - synchronized int getPendingCircuitCount() { - return getPendingCircuits().size(); + int getPendingCircuitCount() { + lock.lock(); + + try { + return getPendingCircuits().size(); + } finally { + lock.unlock(); + } } Set getCircuitsByFilter(CircuitFilter filter) { final Set result = new HashSet(); + final Set circuits = new HashSet(); + synchronized (activeCircuits) { - for(CircuitImpl c: activeCircuits) { - if(filter == null || filter.filter(c)) { - result.add(c); - } + // the filter might lock additional objects, causing a deadlock, so don't + // call it inside the monitor + circuits.addAll(activeCircuits); + } + + for(CircuitImpl c: circuits) { + if(filter == null || filter.filter(c)) { + result.add(c); } } return result; diff --git a/orchid/src/com/subgraph/orchid/connections/ConnectionImpl.java b/orchid/src/com/subgraph/orchid/connections/ConnectionImpl.java index 3ca3b7294d93..1577f29f6771 100644 --- a/orchid/src/com/subgraph/orchid/connections/ConnectionImpl.java +++ b/orchid/src/com/subgraph/orchid/connections/ConnectionImpl.java @@ -14,6 +14,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Level; import java.util.logging.Logger; @@ -27,6 +28,7 @@ import com.subgraph.orchid.ConnectionIOException; import com.subgraph.orchid.ConnectionTimeoutException; import com.subgraph.orchid.Router; +import com.subgraph.orchid.Threading; import com.subgraph.orchid.Tor; import com.subgraph.orchid.TorConfig; import com.subgraph.orchid.TorException; @@ -61,9 +63,11 @@ public class ConnectionImpl implements Connection, DashboardRenderable { private boolean isConnected; private volatile boolean isClosed; private final Thread readCellsThread; - private final Object connectLock = new Object(); + private final ReentrantLock connectLock = Threading.lock("connect"); + private final ReentrantLock circuitsLock = Threading.lock("circuits"); + private final ReentrantLock outputLock = Threading.lock("output"); private final AtomicLong lastActivity = new AtomicLong(); - + public ConnectionImpl(TorConfig config, SSLSocket socket, Router router, TorInitializationTracker tracker, boolean isDirectoryConnection) { this.config = config; @@ -92,13 +96,16 @@ public boolean isClosed() { } public int bindCircuit(Circuit circuit) { - synchronized(circuitMap) { + circuitsLock.lock(); + try { while(circuitMap.containsKey(currentId)) incrementNextId(); final int id = currentId; incrementNextId(); circuitMap.put(id, circuit); return id; + } finally { + circuitsLock.unlock(); } } @@ -109,7 +116,8 @@ private void incrementNextId() { } void connect() throws ConnectionFailedException, ConnectionTimeoutException, ConnectionHandshakeException { - synchronized (connectLock) { + connectLock.lock(); + try { if(isConnected) { return; } @@ -128,6 +136,8 @@ void connect() throws ConnectionFailedException, ConnectionTimeoutException, Con throw new ConnectionFailedException(e.getMessage()); } isConnected = true; + } finally { + connectLock.unlock(); } } @@ -171,7 +181,8 @@ public void sendCell(Cell cell) throws ConnectionIOException { throw new ConnectionIOException("Cannot send cell because connection is not connected"); } updateLastActivity(); - synchronized(output) { + outputLock.lock(); + try { try { output.write(cell.getCellBytes()); } catch (IOException e) { @@ -179,6 +190,8 @@ public void sendCell(Cell cell) throws ConnectionIOException { closeSocket(); throw new ConnectionIOException(e.getClass().getName() + " : "+ e.getMessage()); } + } finally { + outputLock.unlock(); } } @@ -276,32 +289,41 @@ private void processCell(Cell cell) { } private void processRelayCell(Cell cell) { - synchronized(circuitMap) { + circuitsLock.lock(); + try { final Circuit circuit = circuitMap.get(cell.getCircuitId()); if(circuit == null) { logger.warning("Could not deliver relay cell for circuit id = "+ cell.getCircuitId() +" on connection "+ this +". Circuit not found"); return; } circuit.deliverRelayCell(cell); + } finally { + circuitsLock.unlock(); } } private void processControlCell(Cell cell) { - synchronized(circuitMap) { + circuitsLock.lock(); + try { final Circuit circuit = circuitMap.get(cell.getCircuitId()); if(circuit != null) { circuit.deliverControlCell(cell); } + } finally { + circuitsLock.unlock(); } } void idleCloseCheck() { - synchronized (circuitMap) { + circuitsLock.lock(); + try { final boolean needClose = (!isClosed && circuitMap.isEmpty() && getIdleMilliseconds() > CONNECTION_IDLE_TIMEOUT); if(needClose) { logger.fine("Closing connection to "+ this +" on idle timeout"); closeSocket(); - } + } + } finally { + circuitsLock.unlock(); } } @@ -317,8 +339,11 @@ private long getIdleMilliseconds() { } public void removeCircuit(Circuit circuit) { - synchronized(circuitMap) { + circuitsLock.lock(); + try { circuitMap.remove(circuit.getCircuitId()); + } finally { + circuitsLock.unlock(); } } @@ -328,8 +353,11 @@ public String toString() { public void dashboardRender(DashboardRenderer renderer, PrintWriter writer, int flags) throws IOException { final int circuitCount; - synchronized (circuitMap) { + circuitsLock.lock(); + try { circuitCount = circuitMap.size(); + } finally { + circuitsLock.unlock(); } if(circuitCount == 0 && (flags & DASHBOARD_CONNECTIONS_VERBOSE) == 0) { return;