From e8d102a36777b9a56e91f4f8732e7e2949b46848 Mon Sep 17 00:00:00 2001 From: "david.blasby" Date: Thu, 22 Sep 2022 18:56:12 -0700 Subject: [PATCH] replace method synchorinization with object (global) synchronization --- .../service/DatabaseUpdateService.java | 37 ++++++++++--------- .../database/service/HarvestJobService.java | 25 +++++++------ .../geocat/service/QueueChooserService.java | 12 +++--- .../java/geocat/service/QueueGroupInfo.java | 11 ++++-- 4 files changed, 48 insertions(+), 37 deletions(-) diff --git a/src/main/java/geocat/database/service/DatabaseUpdateService.java b/src/main/java/geocat/database/service/DatabaseUpdateService.java index c0bf9dd..a795dd8 100644 --- a/src/main/java/geocat/database/service/DatabaseUpdateService.java +++ b/src/main/java/geocat/database/service/DatabaseUpdateService.java @@ -100,25 +100,28 @@ public List createCSWEndPointDetectedEvents(CSWMetadat return result; } + static Object lockobject = new Object(); @Transactional(propagation = Propagation.NOT_SUPPORTED) //synchronized so other threads cannot update while we are writing... - public synchronized void errorOccurred(Exchange exchange) { - Exception e = (Exception) exchange.getMessage().getHeader("exception"); - if (e == null) - return; - String processId = (String) exchange.getMessage().getHeader("processID"); - Optional _job = harvestJobRepo.findById(processId); - if (!_job.isPresent()) - return; // cannot update database. Likely DB issue or very very early exception - HarvestJob job = _job.get(); - if (job.getMessages() == null) - job.setMessages(""); - String thisMessage = "\n--------------------------------------\n"; - thisMessage += "WHEN:" + Instant.now().toString() + "\n\n"; - thisMessage += convertToString(e); - thisMessage += "\n--------------------------------------\n"; - job.setMessages(job.getMessages() + thisMessage); - HarvestJob j2 = harvestJobRepo.save(job); + public void errorOccurred(Exchange exchange) { + synchronized (lockobject) { + Exception e = (Exception) exchange.getMessage().getHeader("exception"); + if (e == null) + return; + String processId = (String) exchange.getMessage().getHeader("processID"); + Optional _job = harvestJobRepo.findById(processId); + if (!_job.isPresent()) + return; // cannot update database. Likely DB issue or very very early exception + HarvestJob job = _job.get(); + if (job.getMessages() == null) + job.setMessages(""); + String thisMessage = "\n--------------------------------------\n"; + thisMessage += "WHEN:" + Instant.now().toString() + "\n\n"; + thisMessage += convertToString(e); + thisMessage += "\n--------------------------------------\n"; + job.setMessages(job.getMessages() + thisMessage); + HarvestJob j2 = harvestJobRepo.save(job); + } } diff --git a/src/main/java/geocat/database/service/HarvestJobService.java b/src/main/java/geocat/database/service/HarvestJobService.java index 702b0b5..ad34c7d 100644 --- a/src/main/java/geocat/database/service/HarvestJobService.java +++ b/src/main/java/geocat/database/service/HarvestJobService.java @@ -60,18 +60,21 @@ public HarvestJob updateHarvestJobStateInDB(String guid, HarvestJobState state) return harvestJobRepo.save(job); } - public synchronized WorkedDeterminedFinished determineIfWorkCompleted(String harvestId) { - HarvestJob harvestJob = harvestJobRepo.findById(harvestId).get(); - if (!(harvestJob.getState() == HarvestJobState.DETERMINING_WORK)) - return null; //already completed earlier - List outstandingJobs = endpointJobRepo.findByHarvestJobIdAndState(harvestId, EndpointJobState.DETERMINING_WORK); - boolean workCompleted = outstandingJobs.isEmpty(); - if (workCompleted) { - //move state - updateHarvestJobStateInDB(harvestId, HarvestJobState.WORK_DETERMINED); - return new WorkedDeterminedFinished(harvestId); + static Object lockobject = new Object(); + public WorkedDeterminedFinished determineIfWorkCompleted(String harvestId) { + synchronized (lockobject) { + HarvestJob harvestJob = harvestJobRepo.findById(harvestId).get(); + if (!(harvestJob.getState() == HarvestJobState.DETERMINING_WORK)) + return null; //already completed earlier + List outstandingJobs = endpointJobRepo.findByHarvestJobIdAndState(harvestId, EndpointJobState.DETERMINING_WORK); + boolean workCompleted = outstandingJobs.isEmpty(); + if (workCompleted) { + //move state + updateHarvestJobStateInDB(harvestId, HarvestJobState.WORK_DETERMINED); + return new WorkedDeterminedFinished(harvestId); + } + return null; } - return null; } public HarvestJob getById(String id) { diff --git a/src/main/java/geocat/service/QueueChooserService.java b/src/main/java/geocat/service/QueueChooserService.java index 304dd0a..97aa090 100644 --- a/src/main/java/geocat/service/QueueChooserService.java +++ b/src/main/java/geocat/service/QueueChooserService.java @@ -72,10 +72,12 @@ public String chooseQueue(String hint, int expectedNumberOfRecords) throws Excep return chooseQueueByGroup(queueGroupsMap.get("parallel_" + parallellism)); } - - public synchronized String chooseQueueByGroup(QueueGroupInfo groupInfo) { - QueueInfo result = groupInfo.currentQueueInfo(); - groupInfo.useNextQueue(); - return result.queueName(); + static Object lockobject = new Object(); + public String chooseQueueByGroup(QueueGroupInfo groupInfo) { + synchronized (lockobject) { + QueueInfo result = groupInfo.currentQueueInfo(); + groupInfo.useNextQueue(); + return result.queueName(); + } } } diff --git a/src/main/java/geocat/service/QueueGroupInfo.java b/src/main/java/geocat/service/QueueGroupInfo.java index 44f50bd..ba1ced5 100644 --- a/src/main/java/geocat/service/QueueGroupInfo.java +++ b/src/main/java/geocat/service/QueueGroupInfo.java @@ -17,10 +17,13 @@ public QueueInfo currentQueueInfo() { return new QueueInfo(this, nextQueueNumber); } - public synchronized void useNextQueue() { - nextQueueNumber++; - if (nextQueueNumber >= numberOfQueues) - nextQueueNumber = 0; + static Object lockobject = new Object(); + public void useNextQueue() { + synchronized (lockobject) { + nextQueueNumber++; + if (nextQueueNumber >= numberOfQueues) + nextQueueNumber = 0; + } } public QueueInfo queueInfo(int subQueueNumber) {