Skip to content

Commit

Permalink
Merge pull request #23 from GeoCat/_sync
Browse files Browse the repository at this point in the history
replace method synchronization with object (global) synchronization
  • Loading branch information
davidblasby authored Sep 23, 2022
2 parents 2cd436b + e8d102a commit 0848f89
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 37 deletions.
37 changes: 20 additions & 17 deletions src/main/java/geocat/database/service/DatabaseUpdateService.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,25 +100,28 @@ public List<CSWEndPointDetectedEvent> createCSWEndPointDetectedEvents(CSWMetadat
return result;
}

static Object lockobject = new Object();
@Transactional(propagation = Propagation.NOT_SUPPORTED)
//synchronized so other threads cannot update while we are writing...
public synchronized void errorOccurred(Exchange exchange) {
Exception e = (Exception) exchange.getMessage().getHeader("exception");
if (e == null)
return;
String processId = (String) exchange.getMessage().getHeader("processID");
Optional<HarvestJob> _job = harvestJobRepo.findById(processId);
if (!_job.isPresent())
return; // cannot update database. Likely DB issue or very very early exception
HarvestJob job = _job.get();
if (job.getMessages() == null)
job.setMessages("");
String thisMessage = "\n--------------------------------------\n";
thisMessage += "WHEN:" + Instant.now().toString() + "\n\n";
thisMessage += convertToString(e);
thisMessage += "\n--------------------------------------\n";
job.setMessages(job.getMessages() + thisMessage);
HarvestJob j2 = harvestJobRepo.save(job);
public void errorOccurred(Exchange exchange) {
synchronized (lockobject) {
Exception e = (Exception) exchange.getMessage().getHeader("exception");
if (e == null)
return;
String processId = (String) exchange.getMessage().getHeader("processID");
Optional<HarvestJob> _job = harvestJobRepo.findById(processId);
if (!_job.isPresent())
return; // cannot update database. Likely DB issue or very very early exception
HarvestJob job = _job.get();
if (job.getMessages() == null)
job.setMessages("");
String thisMessage = "\n--------------------------------------\n";
thisMessage += "WHEN:" + Instant.now().toString() + "\n\n";
thisMessage += convertToString(e);
thisMessage += "\n--------------------------------------\n";
job.setMessages(job.getMessages() + thisMessage);
HarvestJob j2 = harvestJobRepo.save(job);
}
}


Expand Down
25 changes: 14 additions & 11 deletions src/main/java/geocat/database/service/HarvestJobService.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,18 +60,21 @@ public HarvestJob updateHarvestJobStateInDB(String guid, HarvestJobState state)
return harvestJobRepo.save(job);
}

public synchronized WorkedDeterminedFinished determineIfWorkCompleted(String harvestId) {
HarvestJob harvestJob = harvestJobRepo.findById(harvestId).get();
if (!(harvestJob.getState() == HarvestJobState.DETERMINING_WORK))
return null; //already completed earlier
List<EndpointJob> outstandingJobs = endpointJobRepo.findByHarvestJobIdAndState(harvestId, EndpointJobState.DETERMINING_WORK);
boolean workCompleted = outstandingJobs.isEmpty();
if (workCompleted) {
//move state
updateHarvestJobStateInDB(harvestId, HarvestJobState.WORK_DETERMINED);
return new WorkedDeterminedFinished(harvestId);
static Object lockobject = new Object();
public WorkedDeterminedFinished determineIfWorkCompleted(String harvestId) {
synchronized (lockobject) {
HarvestJob harvestJob = harvestJobRepo.findById(harvestId).get();
if (!(harvestJob.getState() == HarvestJobState.DETERMINING_WORK))
return null; //already completed earlier
List<EndpointJob> outstandingJobs = endpointJobRepo.findByHarvestJobIdAndState(harvestId, EndpointJobState.DETERMINING_WORK);
boolean workCompleted = outstandingJobs.isEmpty();
if (workCompleted) {
//move state
updateHarvestJobStateInDB(harvestId, HarvestJobState.WORK_DETERMINED);
return new WorkedDeterminedFinished(harvestId);
}
return null;
}
return null;
}

public HarvestJob getById(String id) {
Expand Down
12 changes: 7 additions & 5 deletions src/main/java/geocat/service/QueueChooserService.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,12 @@ public String chooseQueue(String hint, int expectedNumberOfRecords) throws Excep
return chooseQueueByGroup(queueGroupsMap.get("parallel_" + parallellism));
}


public synchronized String chooseQueueByGroup(QueueGroupInfo groupInfo) {
QueueInfo result = groupInfo.currentQueueInfo();
groupInfo.useNextQueue();
return result.queueName();
static Object lockobject = new Object();
public String chooseQueueByGroup(QueueGroupInfo groupInfo) {
synchronized (lockobject) {
QueueInfo result = groupInfo.currentQueueInfo();
groupInfo.useNextQueue();
return result.queueName();
}
}
}
11 changes: 7 additions & 4 deletions src/main/java/geocat/service/QueueGroupInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@ public QueueInfo currentQueueInfo() {
return new QueueInfo(this, nextQueueNumber);
}

public synchronized void useNextQueue() {
nextQueueNumber++;
if (nextQueueNumber >= numberOfQueues)
nextQueueNumber = 0;
static Object lockobject = new Object();
public void useNextQueue() {
synchronized (lockobject) {
nextQueueNumber++;
if (nextQueueNumber >= numberOfQueues)
nextQueueNumber = 0;
}
}

public QueueInfo queueInfo(int subQueueNumber) {
Expand Down

0 comments on commit 0848f89

Please sign in to comment.