Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding in concurrent logic to help prevent ConcurrentModificationException #89

Open
wants to merge 2 commits into
base: transitclock-merge
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions transitclock/src/main/java/org/transitclock/avl/AvlClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,13 @@
package org.transitclock.avl;

import java.util.HashMap;
import java.util.concurrent.locks.ReentrantLock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.transitclock.configData.AgencyConfig;
import org.transitclock.configData.AvlConfig;
import org.transitclock.configData.CoreConfig;
import org.transitclock.core.AvlProcessor;
import org.transitclock.db.structs.AvlReport;
import org.transitclock.logging.Markers;
import org.transitclock.utils.Time;

/**
Expand All @@ -43,7 +41,7 @@ public class AvlClient implements Runnable {
// List of current AVL reports by vehicle. Useful for determining last
// report so can filter out new report if the same as the old one.
// Keyed on vehicle ID.
private static HashMap<String, AvlReport> avlReports =
private static final HashMap<String, AvlReport> avlReports =
new HashMap<String, AvlReport>();

private static final Logger logger=
Expand Down Expand Up @@ -79,6 +77,8 @@ public AvlReport getAvlReport() {
public void run() {
// Put a try/catch around everything so that if unexpected exception
// occurs an e-mail is sent and the avl client thread isn't killed.
ReentrantLock l = new ReentrantLock();
l.lock();
try {
// If the data is bad throw it out
String errorMsg = avlReport.validateData();
Expand All @@ -89,7 +89,7 @@ public void run() {
}

// See if should filter out report
synchronized (avlReports) {

AvlReport previousReportForVehicle =
avlReports.get(avlReport.getVehicleId());

Expand Down Expand Up @@ -165,7 +165,7 @@ public void run() {
// filter
// the next one
avlReports.put(avlReport.getVehicleId(), avlReport);
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it the avlReports.put(...) is the problem here, I suggest extending the synchronized (avlReports) block instead. The Reentrant lock doesn't give you anything different as I understand it.


// Process the report
logger.info("Thread={} AvlClient processing AVL data {}",
Expand All @@ -180,6 +180,9 @@ public void run() {
// "For agencyId={} Exception {} for avlReport={}.",
// AgencyConfig.getAgencyId(), e.getMessage(), avlReport, e);
}
finally{
l.unlock();
}
}

}
12 changes: 10 additions & 2 deletions transitclock/src/main/java/org/transitclock/avl/AvlQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
*/
package org.transitclock.avl;

import java.util.HashMap;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -47,8 +49,8 @@ public class AvlQueue extends ArrayBlockingQueue<Runnable> {

// For keeping track of the last AVL report for each vehicle. Used to
// determine if AVL report from queue is obsolete.
ConcurrentMap<String, AvlReport> avlDataPerVehicleMap =
new ConcurrentHashMap<String, AvlReport>();
HashMap<String, AvlReport> avlDataPerVehicleMap =
new HashMap<String, AvlReport>();

private static final long serialVersionUID = 6587642826604552096L;

Expand Down Expand Up @@ -76,9 +78,12 @@ public AvlQueue(int queueSize) {
private void addToAvlDataPerVehicleMap(Runnable runnable) {
if (!(runnable instanceof AvlClient))
throw new IllegalArgumentException("Runnable must be AvlClient.");
ReentrantLock l = new ReentrantLock();
l.lock();

AvlReport avlReport = ((AvlClient) runnable).getAvlReport();
avlDataPerVehicleMap.put(avlReport.getVehicleId(), avlReport);
l.unlock();
}

/**
Expand All @@ -92,6 +97,8 @@ private void addToAvlDataPerVehicleMap(Runnable runnable) {
private boolean isObsolete(Runnable runnableFromQueue) {
if (!(runnableFromQueue instanceof AvlClient))
throw new IllegalArgumentException("Runnable must be AvlClient.");
ReentrantLock l = new ReentrantLock();
l.lock();

AvlReport avlReportFromQueue =
((AvlClient) runnableFromQueue).getAvlReport();
Expand All @@ -110,6 +117,7 @@ private boolean isObsolete(Runnable runnableFromQueue) {
+ "is {}",
avlReportFromQueue, lastAvlReportForVehicle, size());
}
l.unlock();
return obsolete;
}

Expand Down
29 changes: 17 additions & 12 deletions transitclock/src/main/java/org/transitclock/core/AvlProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.transitclock.utils.Time;

import java.util.*;
import java.util.concurrent.locks.ReentrantLock;

/**
* This is a very important high-level class. It takes the AVL data and
Expand Down Expand Up @@ -1396,14 +1397,16 @@ private void determineAndSetRealTimeSchAdh(VehicleState vehicleState) {
private void lowLevelProcessAvlReport(AvlReport avlReport,
boolean recursiveCall) {
// Determine previous state of vehicle
ReentrantLock l = new ReentrantLock();
l.lock();
String vehicleId = avlReport.getVehicleId();
VehicleState vehicleState = VehicleStateManager.getInstance()
.getVehicleState(vehicleId);

// Since modifying the VehicleState should synchronize in case another
// thread simultaneously processes data for the same vehicle. This
// would be extremely rare but need to be safe.
synchronized (vehicleState) {

// Keep track of last AvlReport even if vehicle not predictable.
vehicleState.setAvlReport(avlReport);

Expand Down Expand Up @@ -1518,7 +1521,7 @@ private void lowLevelProcessAvlReport(AvlReport avlReport,
org.transitclock.db.structs.VehicleState dbVehicleState =
new org.transitclock.db.structs.VehicleState(vehicleState);
Core.getInstance().getDbLogger().add(dbVehicleState);
} // End of synchronizing on vehicleState }
l.unlock(); // End of synchronizing on vehicleState }
}

/**
Expand Down Expand Up @@ -1580,22 +1583,23 @@ public AvlReport getLastAvlReport() {
* @param avlReport
*/
public void cacheAvlReportWithoutProcessing(AvlReport avlReport) {
ReentrantLock l = new ReentrantLock();
l.lock();
VehicleState vehicleState =
VehicleStateManager.getInstance().getVehicleState(
avlReport.getVehicleId());

// Since modifying the VehicleState should synchronize in case another
// thread simultaneously processes data for the same vehicle. This
// would be extremely rare but need to be safe.
synchronized (vehicleState) {
// Update AVL report for cached VehicleState
vehicleState.setAvlReport(avlReport);
// Update AVL report for cached VehicleState
vehicleState.setAvlReport(avlReport);

// Let vehicle data cache know that the vehicle state was updated
// so that new IPC vehicle data will be created and cached and
// made available to the API.
VehicleDataCache.getInstance().updateVehicle(vehicleState);
}
// Let vehicle data cache know that the vehicle state was updated
// so that new IPC vehicle data will be created and cached and
// made available to the API.
VehicleDataCache.getInstance().updateVehicle(vehicleState);
l.unlock();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again, if the updateVehicle(vehicleState) is at issue by being outside of the synchronized block, then consider moving it.

}

private boolean isCanceled(VehicleState vehicleState) {
Expand Down Expand Up @@ -1637,7 +1641,8 @@ private void logInvalidAssignment(VehicleState vehicleState) {
* The new AVL report to be processed
*/
public void processAvlReport(AvlReport avlReport) {
IntervalTimer timer = new IntervalTimer();
IntervalTimer timer = new IntervalTimer();
logger.error("rjasmin processing starting {}msec", System.currentTimeMillis());

// Handle special case where want to not use assignment from AVL
// report, most likely because want to test automatic assignment
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.util.HashMap;
import java.util.Iterator;
import java.util.concurrent.locks.ReentrantLock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -124,9 +125,10 @@ public TimeoutHandlerModule(String agencyId) {
public void storeAvlReport(AvlReport avlReport) {
// Synchronize map modifications since elsewhere the elements can be removed
// from the map.
synchronized (avlReportsMap) {
ReentrantLock l = new ReentrantLock();
l.lock();
avlReportsMap.put(avlReport.getVehicleId(), avlReport);
}
l.unlock();
}

/**
Expand Down Expand Up @@ -332,7 +334,8 @@ public void handlePossibleTimeouts() {

// Sync access to avlReportsMap since it can be simultaneously
// modified elsewhere
synchronized (avlReportsMap) {
ReentrantLock l = new ReentrantLock();
l.lock();
// Using an Iterator instead of for(AvlReport a : map.values())
// because removing elements while iterating. Way to do this without
// getting concurrent access exception is to use an Iterator.
Expand Down Expand Up @@ -367,7 +370,7 @@ public void handlePossibleTimeouts() {
}
}
}
}
l.unlock();
}

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.Date;
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -79,24 +80,21 @@ public void uncaughtException(Thread t, Throwable e) {
* @return
*/
private static String threadNameWithCounter(String name) {
synchronized (threadNameCountMap) {
Integer count = threadNameCountMap.get(name);
if (count == null) {
count = 1;
} else {
// Increment the count;
count++;
}
threadNameCountMap.put(name, count);
return name + "-" + count;
Integer count = threadNameCountMap.get(name);
if (count == null) {
count = 1;
} else {
// Increment the count;
count++;
}
threadNameCountMap.put(name, count);
return name + "-" + count;
}

@Override
public void run() {
logger.debug("Created NamedThread {}", getName());
try {
numAlive.incrementAndGet();
super.run();
} catch (Throwable t) {
// Log the problem but do so within a try/catch in case it is
Expand Down Expand Up @@ -141,7 +139,6 @@ public void run() {
System.exit(-1);
}
} finally {
numAlive.decrementAndGet();
logger.debug("Exiting NamedThread {}", getName());
}
}
Expand Down