Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding in concurrent logic to help prevent ConcurrentModificationException #89

Open
wants to merge 2 commits into
base: transitclock-merge
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.transitclock.avl;

import java.util.HashMap;
import java.util.concurrent.locks.ReentrantLock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -79,6 +80,7 @@ public AvlReport getAvlReport() {
public void run() {
// Put a try/catch around everything so that if unexpected exception
// occurs an e-mail is sent and the avl client thread isn't killed.
ReentrantLock l = new ReentrantLock();
try {
// If the data is bad throw it out
String errorMsg = avlReport.validateData();
Expand All @@ -89,7 +91,7 @@ public void run() {
}

// See if should filter out report
synchronized (avlReports) {
l.lock();
AvlReport previousReportForVehicle =
avlReports.get(avlReport.getVehicleId());

Expand Down Expand Up @@ -165,7 +167,7 @@ public void run() {
// filter
// the next one
avlReports.put(avlReport.getVehicleId(), avlReport);
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it the avlReports.put(...) is the problem here, I suggest extending the synchronized (avlReports) block instead. The Reentrant lock doesn't give you anything different as I understand it.


// Process the report
logger.info("Thread={} AvlClient processing AVL data {}",
Expand All @@ -180,6 +182,9 @@ public void run() {
// "For agencyId={} Exception {} for avlReport={}.",
// AgencyConfig.getAgencyId(), e.getMessage(), avlReport, e);
}
finally{
l.unlock();
}
}

}
21 changes: 11 additions & 10 deletions transitclock/src/main/java/org/transitclock/core/AvlProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.transitclock.utils.Time;

import java.util.*;
import java.util.concurrent.locks.ReentrantLock;

/**
* This is a very important high-level class. It takes the AVL data and
Expand Down Expand Up @@ -1583,19 +1584,19 @@ public void cacheAvlReportWithoutProcessing(AvlReport avlReport) {
VehicleState vehicleState =
VehicleStateManager.getInstance().getVehicleState(
avlReport.getVehicleId());

ReentrantLock l = new ReentrantLock();
l.lock();
// Since modifying the VehicleState should synchronize in case another
// thread simultaneously processes data for the same vehicle. This
// would be extremely rare but need to be safe.
synchronized (vehicleState) {
// Update AVL report for cached VehicleState
vehicleState.setAvlReport(avlReport);
// Update AVL report for cached VehicleState
vehicleState.setAvlReport(avlReport);

// Let vehicle data cache know that the vehicle state was updated
// so that new IPC vehicle data will be created and cached and
// made available to the API.
VehicleDataCache.getInstance().updateVehicle(vehicleState);
}
// Let vehicle data cache know that the vehicle state was updated
// so that new IPC vehicle data will be created and cached and
// made available to the API.
VehicleDataCache.getInstance().updateVehicle(vehicleState);
l.unlock();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again, if the updateVehicle(vehicleState) is at issue by being outside of the synchronized block, then consider moving it.

}

private boolean isCanceled(VehicleState vehicleState) {
Expand Down Expand Up @@ -1637,7 +1638,7 @@ private void logInvalidAssignment(VehicleState vehicleState) {
* The new AVL report to be processed
*/
public void processAvlReport(AvlReport avlReport) {
IntervalTimer timer = new IntervalTimer();
IntervalTimer timer = new IntervalTimer();

// Handle special case where want to not use assignment from AVL
// report, most likely because want to test automatic assignment
Expand Down
4 changes: 4 additions & 0 deletions transitclock/src/main/java/org/transitclock/core/Indices.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.io.Serializable;
import java.util.Objects;
import java.util.concurrent.locks.ReentrantLock;

import org.transitclock.applications.Core;
import org.transitclock.db.structs.ArrivalDeparture;
Expand Down Expand Up @@ -196,6 +197,8 @@ public boolean isEarlierStopPathThan(Indices indices) {
* @return The resulting Indices object
*/
public Indices increment(long epochTime) {
ReentrantLock l = new ReentrantLock();
l.lock();
++segmentIndex;
if (segmentIndex >= block.numSegments(tripIndex, stopPathIndex)) {
segmentIndex = 0;
Expand All @@ -217,6 +220,7 @@ public Indices increment(long epochTime) {
}
}
}
l.unlock();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain why a lock is needed here?


return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.Date;
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -79,7 +80,8 @@ public void uncaughtException(Thread t, Throwable e) {
* @return
*/
private static String threadNameWithCounter(String name) {
synchronized (threadNameCountMap) {
ReentrantLock l = new ReentrantLock();
l.lock();
Integer count = threadNameCountMap.get(name);
if (count == null) {
count = 1;
Expand All @@ -88,16 +90,19 @@ private static String threadNameWithCounter(String name) {
count++;
}
threadNameCountMap.put(name, count);
l.unlock();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the threadNameCountMap.put is causing the issue, consider moving it inside the synchronized block.

return name + "-" + count;
}
}

@Override
public void run() {
logger.debug("Created NamedThread {}", getName());
ReentrantLock l = new ReentrantLock();
try {
l.lock();
numAlive.incrementAndGet();
super.run();

} catch (Throwable t) {
// Log the problem but do so within a try/catch in case it is
// an OutOfMemoryError and need to exit even if get another
Expand Down Expand Up @@ -141,6 +146,7 @@ public void run() {
System.exit(-1);
}
} finally {
l.unlock();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, you really need to convince me this is necessary. Locking an entire thread seems completely unreasonable. I suspect you don't understand how this thread is used.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can remove this and retest.

numAlive.decrementAndGet();
logger.debug("Exiting NamedThread {}", getName());
}
Expand Down