From 0a538aaec2a9e19ebc0f3964a86eec929dd872fe Mon Sep 17 00:00:00 2001 From: rjasmin-camsys Date: Tue, 16 Jan 2024 11:04:40 -0500 Subject: [PATCH 1/2] Adding in concurrent logic to help prevent ConcurrentModificationException, also appears to have a very minor boon to performance. --- .../java/org/transitclock/avl/AvlClient.java | 9 ++++++-- .../org/transitclock/core/AvlProcessor.java | 21 ++++++++++--------- .../java/org/transitclock/core/Indices.java | 4 ++++ .../utils/threading/NamedThread.java | 10 +++++++-- 4 files changed, 30 insertions(+), 14 deletions(-) diff --git a/transitclock/src/main/java/org/transitclock/avl/AvlClient.java b/transitclock/src/main/java/org/transitclock/avl/AvlClient.java index 49c0881c1..bc55739f3 100644 --- a/transitclock/src/main/java/org/transitclock/avl/AvlClient.java +++ b/transitclock/src/main/java/org/transitclock/avl/AvlClient.java @@ -17,6 +17,7 @@ package org.transitclock.avl; import java.util.HashMap; +import java.util.concurrent.locks.ReentrantLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -79,6 +80,7 @@ public AvlReport getAvlReport() { public void run() { // Put a try/catch around everything so that if unexpected exception // occurs an e-mail is sent and the avl client thread isn't killed. + ReentrantLock l = new ReentrantLock(); try { // If the data is bad throw it out String errorMsg = avlReport.validateData(); @@ -89,7 +91,7 @@ public void run() { } // See if should filter out report - synchronized (avlReports) { + l.lock(); AvlReport previousReportForVehicle = avlReports.get(avlReport.getVehicleId()); @@ -165,7 +167,7 @@ public void run() { // filter // the next one avlReports.put(avlReport.getVehicleId(), avlReport); - } + // Process the report logger.info("Thread={} AvlClient processing AVL data {}", @@ -180,6 +182,9 @@ public void run() { // "For agencyId={} Exception {} for avlReport={}.", // AgencyConfig.getAgencyId(), e.getMessage(), avlReport, e); } + finally{ + l.unlock(); + } } } diff --git a/transitclock/src/main/java/org/transitclock/core/AvlProcessor.java b/transitclock/src/main/java/org/transitclock/core/AvlProcessor.java index c1d31c23c..6d9433f8d 100644 --- a/transitclock/src/main/java/org/transitclock/core/AvlProcessor.java +++ b/transitclock/src/main/java/org/transitclock/core/AvlProcessor.java @@ -42,6 +42,7 @@ import org.transitclock.utils.Time; import java.util.*; +import java.util.concurrent.locks.ReentrantLock; /** * This is a very important high-level class. It takes the AVL data and @@ -1583,19 +1584,19 @@ public void cacheAvlReportWithoutProcessing(AvlReport avlReport) { VehicleState vehicleState = VehicleStateManager.getInstance().getVehicleState( avlReport.getVehicleId()); - + ReentrantLock l = new ReentrantLock(); + l.lock(); // Since modifying the VehicleState should synchronize in case another // thread simultaneously processes data for the same vehicle. This // would be extremely rare but need to be safe. - synchronized (vehicleState) { - // Update AVL report for cached VehicleState - vehicleState.setAvlReport(avlReport); + // Update AVL report for cached VehicleState + vehicleState.setAvlReport(avlReport); - // Let vehicle data cache know that the vehicle state was updated - // so that new IPC vehicle data will be created and cached and - // made available to the API. - VehicleDataCache.getInstance().updateVehicle(vehicleState); - } + // Let vehicle data cache know that the vehicle state was updated + // so that new IPC vehicle data will be created and cached and + // made available to the API. + VehicleDataCache.getInstance().updateVehicle(vehicleState); + l.unlock(); } private boolean isCanceled(VehicleState vehicleState) { @@ -1637,7 +1638,7 @@ private void logInvalidAssignment(VehicleState vehicleState) { * The new AVL report to be processed */ public void processAvlReport(AvlReport avlReport) { - IntervalTimer timer = new IntervalTimer(); + IntervalTimer timer = new IntervalTimer(); // Handle special case where want to not use assignment from AVL // report, most likely because want to test automatic assignment diff --git a/transitclock/src/main/java/org/transitclock/core/Indices.java b/transitclock/src/main/java/org/transitclock/core/Indices.java index 71dfb7b0c..f0d7dc89f 100644 --- a/transitclock/src/main/java/org/transitclock/core/Indices.java +++ b/transitclock/src/main/java/org/transitclock/core/Indices.java @@ -18,6 +18,7 @@ import java.io.Serializable; import java.util.Objects; +import java.util.concurrent.locks.ReentrantLock; import org.transitclock.applications.Core; import org.transitclock.db.structs.ArrivalDeparture; @@ -196,6 +197,8 @@ public boolean isEarlierStopPathThan(Indices indices) { * @return The resulting Indices object */ public Indices increment(long epochTime) { + ReentrantLock l = new ReentrantLock(); + l.lock(); ++segmentIndex; if (segmentIndex >= block.numSegments(tripIndex, stopPathIndex)) { segmentIndex = 0; @@ -217,6 +220,7 @@ public Indices increment(long epochTime) { } } } + l.unlock(); return this; } diff --git a/transitclock/src/main/java/org/transitclock/utils/threading/NamedThread.java b/transitclock/src/main/java/org/transitclock/utils/threading/NamedThread.java index 715f99bfe..abe6f4fc4 100644 --- a/transitclock/src/main/java/org/transitclock/utils/threading/NamedThread.java +++ b/transitclock/src/main/java/org/transitclock/utils/threading/NamedThread.java @@ -19,6 +19,7 @@ import java.util.Date; import java.util.HashMap; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -79,7 +80,8 @@ public void uncaughtException(Thread t, Throwable e) { * @return */ private static String threadNameWithCounter(String name) { - synchronized (threadNameCountMap) { + ReentrantLock l = new ReentrantLock(); + l.lock(); Integer count = threadNameCountMap.get(name); if (count == null) { count = 1; @@ -88,16 +90,19 @@ private static String threadNameWithCounter(String name) { count++; } threadNameCountMap.put(name, count); + l.unlock(); return name + "-" + count; - } } @Override public void run() { logger.debug("Created NamedThread {}", getName()); + ReentrantLock l = new ReentrantLock(); try { + l.lock(); numAlive.incrementAndGet(); super.run(); + } catch (Throwable t) { // Log the problem but do so within a try/catch in case it is // an OutOfMemoryError and need to exit even if get another @@ -141,6 +146,7 @@ public void run() { System.exit(-1); } } finally { + l.unlock(); numAlive.decrementAndGet(); logger.debug("Exiting NamedThread {}", getName()); } From 5d60fca1e7e3431ceb7bad197ad5ec88763e8365 Mon Sep 17 00:00:00 2001 From: rjasmin-camsys Date: Fri, 26 Jan 2024 12:39:35 -0500 Subject: [PATCH 2/2] Adding in additional threading logic to prevent concurrency issues. --- .../java/org/transitclock/avl/AvlClient.java | 8 +++--- .../java/org/transitclock/avl/AvlQueue.java | 12 +++++++-- .../org/transitclock/core/AvlProcessor.java | 12 ++++++--- .../java/org/transitclock/core/Indices.java | 4 --- .../core/TimeoutHandlerModule.java | 11 +++++--- .../utils/threading/NamedThread.java | 27 +++++++------------ 6 files changed, 37 insertions(+), 37 deletions(-) diff --git a/transitclock/src/main/java/org/transitclock/avl/AvlClient.java b/transitclock/src/main/java/org/transitclock/avl/AvlClient.java index bc55739f3..e5e9032bd 100644 --- a/transitclock/src/main/java/org/transitclock/avl/AvlClient.java +++ b/transitclock/src/main/java/org/transitclock/avl/AvlClient.java @@ -21,12 +21,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.transitclock.configData.AgencyConfig; import org.transitclock.configData.AvlConfig; -import org.transitclock.configData.CoreConfig; import org.transitclock.core.AvlProcessor; import org.transitclock.db.structs.AvlReport; -import org.transitclock.logging.Markers; import org.transitclock.utils.Time; /** @@ -44,7 +41,7 @@ public class AvlClient implements Runnable { // List of current AVL reports by vehicle. Useful for determining last // report so can filter out new report if the same as the old one. // Keyed on vehicle ID. - private static HashMap avlReports = + private static final HashMap avlReports = new HashMap(); private static final Logger logger= @@ -81,6 +78,7 @@ public void run() { // Put a try/catch around everything so that if unexpected exception // occurs an e-mail is sent and the avl client thread isn't killed. ReentrantLock l = new ReentrantLock(); + l.lock(); try { // If the data is bad throw it out String errorMsg = avlReport.validateData(); @@ -91,7 +89,7 @@ public void run() { } // See if should filter out report - l.lock(); + AvlReport previousReportForVehicle = avlReports.get(avlReport.getVehicleId()); diff --git a/transitclock/src/main/java/org/transitclock/avl/AvlQueue.java b/transitclock/src/main/java/org/transitclock/avl/AvlQueue.java index b4d04dc1b..2b655a31c 100644 --- a/transitclock/src/main/java/org/transitclock/avl/AvlQueue.java +++ b/transitclock/src/main/java/org/transitclock/avl/AvlQueue.java @@ -16,10 +16,12 @@ */ package org.transitclock.avl; +import java.util.HashMap; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,8 +49,8 @@ public class AvlQueue extends ArrayBlockingQueue { // For keeping track of the last AVL report for each vehicle. Used to // determine if AVL report from queue is obsolete. - ConcurrentMap avlDataPerVehicleMap = - new ConcurrentHashMap(); + HashMap avlDataPerVehicleMap = + new HashMap(); private static final long serialVersionUID = 6587642826604552096L; @@ -76,9 +78,12 @@ public AvlQueue(int queueSize) { private void addToAvlDataPerVehicleMap(Runnable runnable) { if (!(runnable instanceof AvlClient)) throw new IllegalArgumentException("Runnable must be AvlClient."); + ReentrantLock l = new ReentrantLock(); + l.lock(); AvlReport avlReport = ((AvlClient) runnable).getAvlReport(); avlDataPerVehicleMap.put(avlReport.getVehicleId(), avlReport); + l.unlock(); } /** @@ -92,6 +97,8 @@ private void addToAvlDataPerVehicleMap(Runnable runnable) { private boolean isObsolete(Runnable runnableFromQueue) { if (!(runnableFromQueue instanceof AvlClient)) throw new IllegalArgumentException("Runnable must be AvlClient."); + ReentrantLock l = new ReentrantLock(); + l.lock(); AvlReport avlReportFromQueue = ((AvlClient) runnableFromQueue).getAvlReport(); @@ -110,6 +117,7 @@ private boolean isObsolete(Runnable runnableFromQueue) { + "is {}", avlReportFromQueue, lastAvlReportForVehicle, size()); } + l.unlock(); return obsolete; } diff --git a/transitclock/src/main/java/org/transitclock/core/AvlProcessor.java b/transitclock/src/main/java/org/transitclock/core/AvlProcessor.java index 6d9433f8d..c6b01b586 100644 --- a/transitclock/src/main/java/org/transitclock/core/AvlProcessor.java +++ b/transitclock/src/main/java/org/transitclock/core/AvlProcessor.java @@ -1397,6 +1397,8 @@ private void determineAndSetRealTimeSchAdh(VehicleState vehicleState) { private void lowLevelProcessAvlReport(AvlReport avlReport, boolean recursiveCall) { // Determine previous state of vehicle + ReentrantLock l = new ReentrantLock(); + l.lock(); String vehicleId = avlReport.getVehicleId(); VehicleState vehicleState = VehicleStateManager.getInstance() .getVehicleState(vehicleId); @@ -1404,7 +1406,7 @@ private void lowLevelProcessAvlReport(AvlReport avlReport, // Since modifying the VehicleState should synchronize in case another // thread simultaneously processes data for the same vehicle. This // would be extremely rare but need to be safe. - synchronized (vehicleState) { + // Keep track of last AvlReport even if vehicle not predictable. vehicleState.setAvlReport(avlReport); @@ -1519,7 +1521,7 @@ private void lowLevelProcessAvlReport(AvlReport avlReport, org.transitclock.db.structs.VehicleState dbVehicleState = new org.transitclock.db.structs.VehicleState(vehicleState); Core.getInstance().getDbLogger().add(dbVehicleState); - } // End of synchronizing on vehicleState } + l.unlock(); // End of synchronizing on vehicleState } } /** @@ -1581,11 +1583,12 @@ public AvlReport getLastAvlReport() { * @param avlReport */ public void cacheAvlReportWithoutProcessing(AvlReport avlReport) { + ReentrantLock l = new ReentrantLock(); + l.lock(); VehicleState vehicleState = VehicleStateManager.getInstance().getVehicleState( avlReport.getVehicleId()); - ReentrantLock l = new ReentrantLock(); - l.lock(); + // Since modifying the VehicleState should synchronize in case another // thread simultaneously processes data for the same vehicle. This // would be extremely rare but need to be safe. @@ -1639,6 +1642,7 @@ private void logInvalidAssignment(VehicleState vehicleState) { */ public void processAvlReport(AvlReport avlReport) { IntervalTimer timer = new IntervalTimer(); + logger.error("rjasmin processing starting {}msec", System.currentTimeMillis()); // Handle special case where want to not use assignment from AVL // report, most likely because want to test automatic assignment diff --git a/transitclock/src/main/java/org/transitclock/core/Indices.java b/transitclock/src/main/java/org/transitclock/core/Indices.java index f0d7dc89f..71dfb7b0c 100644 --- a/transitclock/src/main/java/org/transitclock/core/Indices.java +++ b/transitclock/src/main/java/org/transitclock/core/Indices.java @@ -18,7 +18,6 @@ import java.io.Serializable; import java.util.Objects; -import java.util.concurrent.locks.ReentrantLock; import org.transitclock.applications.Core; import org.transitclock.db.structs.ArrivalDeparture; @@ -197,8 +196,6 @@ public boolean isEarlierStopPathThan(Indices indices) { * @return The resulting Indices object */ public Indices increment(long epochTime) { - ReentrantLock l = new ReentrantLock(); - l.lock(); ++segmentIndex; if (segmentIndex >= block.numSegments(tripIndex, stopPathIndex)) { segmentIndex = 0; @@ -220,7 +217,6 @@ public Indices increment(long epochTime) { } } } - l.unlock(); return this; } diff --git a/transitclock/src/main/java/org/transitclock/core/TimeoutHandlerModule.java b/transitclock/src/main/java/org/transitclock/core/TimeoutHandlerModule.java index 6407628b1..e3a3710d5 100644 --- a/transitclock/src/main/java/org/transitclock/core/TimeoutHandlerModule.java +++ b/transitclock/src/main/java/org/transitclock/core/TimeoutHandlerModule.java @@ -18,6 +18,7 @@ import java.util.HashMap; import java.util.Iterator; +import java.util.concurrent.locks.ReentrantLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -124,9 +125,10 @@ public TimeoutHandlerModule(String agencyId) { public void storeAvlReport(AvlReport avlReport) { // Synchronize map modifications since elsewhere the elements can be removed // from the map. - synchronized (avlReportsMap) { + ReentrantLock l = new ReentrantLock(); + l.lock(); avlReportsMap.put(avlReport.getVehicleId(), avlReport); - } + l.unlock(); } /** @@ -332,7 +334,8 @@ public void handlePossibleTimeouts() { // Sync access to avlReportsMap since it can be simultaneously // modified elsewhere - synchronized (avlReportsMap) { + ReentrantLock l = new ReentrantLock(); + l.lock(); // Using an Iterator instead of for(AvlReport a : map.values()) // because removing elements while iterating. Way to do this without // getting concurrent access exception is to use an Iterator. @@ -367,7 +370,7 @@ public void handlePossibleTimeouts() { } } } - } + l.unlock(); } /* diff --git a/transitclock/src/main/java/org/transitclock/utils/threading/NamedThread.java b/transitclock/src/main/java/org/transitclock/utils/threading/NamedThread.java index abe6f4fc4..6721f2f6d 100644 --- a/transitclock/src/main/java/org/transitclock/utils/threading/NamedThread.java +++ b/transitclock/src/main/java/org/transitclock/utils/threading/NamedThread.java @@ -80,29 +80,22 @@ public void uncaughtException(Thread t, Throwable e) { * @return */ private static String threadNameWithCounter(String name) { - ReentrantLock l = new ReentrantLock(); - l.lock(); - Integer count = threadNameCountMap.get(name); - if (count == null) { - count = 1; - } else { - // Increment the count; - count++; - } - threadNameCountMap.put(name, count); - l.unlock(); - return name + "-" + count; + Integer count = threadNameCountMap.get(name); + if (count == null) { + count = 1; + } else { + // Increment the count; + count++; + } + threadNameCountMap.put(name, count); + return name + "-" + count; } @Override public void run() { logger.debug("Created NamedThread {}", getName()); - ReentrantLock l = new ReentrantLock(); try { - l.lock(); - numAlive.incrementAndGet(); super.run(); - } catch (Throwable t) { // Log the problem but do so within a try/catch in case it is // an OutOfMemoryError and need to exit even if get another @@ -146,8 +139,6 @@ public void run() { System.exit(-1); } } finally { - l.unlock(); - numAlive.decrementAndGet(); logger.debug("Exiting NamedThread {}", getName()); } }