Skip to content

Commit

Permalink
Add cycle detecting locks, fix Orchid deadlock 2
Browse files Browse the repository at this point in the history
subgraph/Orchid#10

Conflicts:
	orchid/src/com/subgraph/orchid/circuits/CircuitIO.java
  • Loading branch information
devrandom committed Aug 23, 2014
1 parent a012b83 commit 5bd4a71
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 36 deletions.
5 changes: 5 additions & 0 deletions orchid/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>16.0.1</version>
</dependency>
</dependencies>

</project>
45 changes: 45 additions & 0 deletions orchid/src/com/subgraph/orchid/Threading.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.subgraph.orchid;

import com.google.common.util.concurrent.CycleDetectingLockFactory;

import java.util.concurrent.locks.ReentrantLock;

/**
* Created by android on 8/22/14.
*/
public class Threading {
static {
// Default policy goes here. If you want to change this, use one of the static methods before
// instantiating any orchid objects. The policy change will take effect only on new objects
// from that point onwards.
throwOnLockCycles();
}

private static CycleDetectingLockFactory.Policy policy;
public static CycleDetectingLockFactory factory;

public static ReentrantLock lock(String name) {
return factory.newReentrantLock(name);
}

public static void warnOnLockCycles() {
setPolicy(CycleDetectingLockFactory.Policies.WARN);
}

public static void throwOnLockCycles() {
setPolicy(CycleDetectingLockFactory.Policies.THROW);
}

public static void ignoreLockCycles() {
setPolicy(CycleDetectingLockFactory.Policies.DISABLED);
}

public static void setPolicy(CycleDetectingLockFactory.Policy policy) {
Threading.policy = policy;
factory = CycleDetectingLockFactory.newInstance(policy);
}

public static CycleDetectingLockFactory.Policy getPolicy() {
return policy;
}
}
47 changes: 36 additions & 11 deletions orchid/src/com/subgraph/orchid/circuits/CircuitIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand All @@ -18,6 +19,7 @@
import com.subgraph.orchid.ConnectionIOException;
import com.subgraph.orchid.RelayCell;
import com.subgraph.orchid.Stream;
import com.subgraph.orchid.Threading;
import com.subgraph.orchid.TorException;
import com.subgraph.orchid.circuits.cells.CellImpl;
import com.subgraph.orchid.circuits.cells.RelayCellImpl;
Expand All @@ -36,11 +38,10 @@ public class CircuitIO implements DashboardRenderable {
private final BlockingQueue<RelayCell> relayCellResponseQueue;
private final BlockingQueue<Cell> controlCellResponseQueue;
private final Map<Integer, StreamImpl> streamMap;
private final Object relaySendLock = new Object();
private final ReentrantLock streamLock = Threading.lock("stream");
private final ReentrantLock relaySendLock = Threading.lock("relaySend");

/** LOCKING: streamMap */
private boolean isMarkedForClose;
/** LOCKING: streamMap */
private boolean isClosed;

CircuitIO(CircuitImpl circuit, Connection connection, int circuitId) {
Expand Down Expand Up @@ -171,14 +172,17 @@ private void processRelayDataCell(RelayCell cell) {
}
}

synchronized(streamMap) {
streamLock.lock();
try {
final StreamImpl stream = streamMap.get(cell.getStreamId());
// It's not unusual for the stream to not be found. For example, if a RELAY_CONNECTED arrives after
// the client has stopped waiting for it, the stream will never be tracked and eventually the edge node
// will send a RELAY_END for this stream.
if(stream != null) {
stream.addInputCell(cell);
}
} finally {
streamLock.unlock();
}
}

Expand All @@ -187,7 +191,8 @@ RelayCell createRelayCell(int relayCommand, int streamId, CircuitNode targetNode
}

void sendRelayCellTo(RelayCell cell, CircuitNode targetNode) {
synchronized(relaySendLock) {
relaySendLock.lock();
try {
logRelayCell("Sending: ", cell);
cell.setLength();
targetNode.updateForwardDigest(cell);
Expand All @@ -200,6 +205,8 @@ void sendRelayCellTo(RelayCell cell, CircuitNode targetNode) {
targetNode.waitForSendWindowAndDecrement();

sendCell(cell);
} finally {
relaySendLock.unlock();
}
}

Expand Down Expand Up @@ -236,20 +243,26 @@ void sendCell(Cell cell) {

void markForClose() {
boolean shouldClose;
synchronized (streamMap) {
streamLock.lock();
try {
if(isMarkedForClose) {
return;
}
isMarkedForClose = true;
shouldClose = streamMap.isEmpty();
} finally {
streamLock.unlock();
}
if(shouldClose)
closeCircuit();
}

boolean isMarkedForClose() {
synchronized (streamMap) {
streamLock.lock();
try {
return isMarkedForClose;
} finally {
streamLock.unlock();
}
}

Expand All @@ -276,7 +289,8 @@ private void processCircuitSendme(RelayCell cell) {
}

void destroyCircuit() {
synchronized(streamMap) {
streamLock.lock();
try {
if(isClosed) {
return;
}
Expand All @@ -287,31 +301,42 @@ void destroyCircuit() {
s.close();
}
isClosed = true;
} finally {
streamLock.unlock();
}
}

StreamImpl createNewStream(boolean autoclose) {
synchronized(streamMap) {
streamLock.lock();
try {
final int streamId = circuit.getStatus().nextStreamId();
final StreamImpl stream = new StreamImpl(circuit, circuit.getFinalCircuitNode(), streamId, autoclose);
streamMap.put(streamId, stream);
return stream;
} finally {
streamLock.unlock();
}
}

void removeStream(StreamImpl stream) {
boolean shouldClose;
synchronized(streamMap) {
streamLock.lock();
try {
streamMap.remove(stream.getStreamId());
shouldClose = streamMap.isEmpty() && isMarkedForClose;
} finally {
streamLock.unlock();
}
if(shouldClose)
closeCircuit();
}

List<Stream> getActiveStreams() {
synchronized (streamMap) {
streamLock.lock();
try {
return new ArrayList<Stream>(streamMap.values());
} finally {
streamLock.unlock();
}
}

Expand Down
51 changes: 37 additions & 14 deletions orchid/src/com/subgraph/orchid/circuits/CircuitManagerImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReentrantLock;

import com.subgraph.orchid.Circuit;
import com.subgraph.orchid.CircuitBuildHandler;
Expand All @@ -29,6 +30,7 @@
import com.subgraph.orchid.Router;
import com.subgraph.orchid.Stream;
import com.subgraph.orchid.StreamConnectFailedException;
import com.subgraph.orchid.Threading;
import com.subgraph.orchid.Tor;
import com.subgraph.orchid.TorConfig;
import com.subgraph.orchid.circuits.guards.EntryGuards;
Expand Down Expand Up @@ -62,6 +64,7 @@ interface CircuitFilter {
private final TorInitializationTracker initializationTracker;
private final CircuitPathChooser pathChooser;
private final HiddenServiceManager hiddenServiceManager;
private final ReentrantLock lock = Threading.lock("circuitManager");

public CircuitManagerImpl(TorConfig config, DirectoryDownloaderImpl directoryDownloader, Directory directory, ConnectionCache connectionCache, TorInitializationTracker initializationTracker) {
this.config = config;
Expand All @@ -87,13 +90,19 @@ public void startBuildingCircuits() {
scheduledExecutor.scheduleAtFixedRate(circuitCreationTask, 0, 1000, TimeUnit.MILLISECONDS);
}

public synchronized void stopBuildingCircuits(boolean killCircuits) {
scheduledExecutor.shutdownNow();
if(killCircuits) {
List<CircuitImpl> circuits = new ArrayList<CircuitImpl>(activeCircuits);
for(CircuitImpl c: circuits) {
c.destroyCircuit();
public void stopBuildingCircuits(boolean killCircuits) {
lock.lock();

try {
scheduledExecutor.shutdownNow();
if (killCircuits) {
List<CircuitImpl> circuits = new ArrayList<CircuitImpl>(activeCircuits);
for (CircuitImpl c : circuits) {
c.destroyCircuit();
}
}
} finally {
lock.unlock();
}
}

Expand All @@ -114,8 +123,10 @@ void removeActiveCircuit(CircuitImpl circuit) {
}
}

synchronized int getActiveCircuitCount() {
return activeCircuits.size();
int getActiveCircuitCount() {
synchronized (activeCircuits) {
return activeCircuits.size();
}
}

Set<Circuit> getPendingCircuits() {
Expand All @@ -126,17 +137,29 @@ public boolean filter(Circuit circuit) {
});
}

synchronized int getPendingCircuitCount() {
return getPendingCircuits().size();
int getPendingCircuitCount() {
lock.lock();

try {
return getPendingCircuits().size();
} finally {
lock.unlock();
}
}

Set<Circuit> getCircuitsByFilter(CircuitFilter filter) {
final Set<Circuit> result = new HashSet<Circuit>();
final Set<CircuitImpl> circuits = new HashSet<CircuitImpl>();

synchronized (activeCircuits) {
for(CircuitImpl c: activeCircuits) {
if(filter == null || filter.filter(c)) {
result.add(c);
}
// the filter might lock additional objects, causing a deadlock, so don't
// call it inside the monitor
circuits.addAll(activeCircuits);
}

for(CircuitImpl c: circuits) {
if(filter == null || filter.filter(c)) {
result.add(c);
}
}
return result;
Expand Down
Loading

0 comments on commit 5bd4a71

Please sign in to comment.