Skip to content

Commit

Permalink
Development: Remove fenced locks in integrated code lifecycle (#9180)
Browse files Browse the repository at this point in the history
  • Loading branch information
BBesrour authored Aug 30, 2024
1 parent f726b93 commit b4a87d9
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 99 deletions.
3 changes: 1 addition & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ hibernate_version=6.4.9.Final
opensaml_version=4.3.2
jwt_version=0.12.6
jaxb_runtime_version=4.0.5
# TODO: we cannot update to 5.5.0 because we currently use the CP Subsystem for fenced locks, however CP Subsystem is only available to Enterprise customers
hazelcast_version=5.4.0
hazelcast_version=5.5.0
junit_version=5.10.2
mockito_version=5.13.0
fasterxml_version=2.17.2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.hazelcast.collection.ItemEvent;
import com.hazelcast.collection.ItemListener;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.cp.lock.FencedLock;
import com.hazelcast.map.IMap;

import de.tum.in.www1.artemis.domain.BuildJob;
Expand Down Expand Up @@ -73,8 +72,6 @@ public class LocalCIResultProcessingService {

private IMap<String, BuildAgentInformation> buildAgentInformation;

private FencedLock resultQueueLock;

private UUID listenerId;

public LocalCIResultProcessingService(@Qualifier("hazelcastInstance") HazelcastInstance hazelcastInstance, ProgrammingExerciseGradingService programmingExerciseGradingService,
Expand All @@ -97,7 +94,6 @@ public LocalCIResultProcessingService(@Qualifier("hazelcastInstance") HazelcastI
public void init() {
this.resultQueue = this.hazelcastInstance.getQueue("buildResultQueue");
this.buildAgentInformation = this.hazelcastInstance.getMap("buildAgentInformation");
this.resultQueueLock = this.hazelcastInstance.getCPSubsystem().getLock("resultQueueLock");
this.listenerId = resultQueue.addItemListener(new ResultQueueListener(), true);
}

Expand All @@ -112,9 +108,7 @@ public void removeListener() {
public void processResult() {

// set lock to prevent multiple nodes from processing the same build job
resultQueueLock.lock();
ResultQueueItem resultQueueItem = resultQueue.poll();
resultQueueLock.unlock();

if (resultQueueItem == null) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@

import com.hazelcast.collection.IQueue;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.cp.lock.FencedLock;
import com.hazelcast.map.IMap;
import com.hazelcast.topic.ITopic;

Expand Down Expand Up @@ -66,11 +65,6 @@ public class SharedQueueManagementService {

private IMap<String, ZonedDateTime> dockerImageCleanupInfo;

/**
* Lock to prevent multiple nodes from processing the same build job.
*/
private FencedLock sharedLock;

private ITopic<String> canceledBuildJobsTopic;

public SharedQueueManagementService(BuildJobRepository buildJobRepository, @Qualifier("hazelcastInstance") HazelcastInstance hazelcastInstance, ProfileService profileService) {
Expand All @@ -86,7 +80,6 @@ public SharedQueueManagementService(BuildJobRepository buildJobRepository, @Qual
public void init() {
this.buildAgentInformation = this.hazelcastInstance.getMap("buildAgentInformation");
this.processingJobs = this.hazelcastInstance.getMap("processingJobs");
this.sharedLock = this.hazelcastInstance.getCPSubsystem().getLock("buildJobQueueLock");
this.queue = this.hazelcastInstance.getQueue("buildJobQueue");
this.canceledBuildJobsTopic = hazelcastInstance.getTopic("canceledBuildJobsTopic");
this.dockerImageCleanupInfo = this.hazelcastInstance.getMap("dockerImageCleanupInfo");
Expand Down Expand Up @@ -148,28 +141,22 @@ public List<BuildAgentInformation> getBuildAgentInformationWithoutRecentBuildJob
* @param buildJobId id of the build job to cancel
*/
public void cancelBuildJob(String buildJobId) {
sharedLock.lock();
try {
// Remove build job if it is queued
if (queue.stream().anyMatch(job -> Objects.equals(job.id(), buildJobId))) {
List<BuildJobQueueItem> toRemove = new ArrayList<>();
for (BuildJobQueueItem job : queue) {
if (Objects.equals(job.id(), buildJobId)) {
toRemove.add(job);
}
}
queue.removeAll(toRemove);
}
else {
// Cancel build job if it is currently being processed
BuildJobQueueItem buildJob = processingJobs.remove(buildJobId);
if (buildJob != null) {
triggerBuildJobCancellation(buildJobId);
// Remove build job if it is queued
if (queue.stream().anyMatch(job -> Objects.equals(job.id(), buildJobId))) {
List<BuildJobQueueItem> toRemove = new ArrayList<>();
for (BuildJobQueueItem job : queue) {
if (Objects.equals(job.id(), buildJobId)) {
toRemove.add(job);
}
}
queue.removeAll(toRemove);
}
finally {
sharedLock.unlock();
else {
// Cancel build job if it is currently being processed
BuildJobQueueItem buildJob = processingJobs.remove(buildJobId);
if (buildJob != null) {
triggerBuildJobCancellation(buildJobId);
}
}
}

Expand All @@ -188,30 +175,17 @@ private void triggerBuildJobCancellation(String buildJobId) {
* Cancel all queued build jobs.
*/
public void cancelAllQueuedBuildJobs() {
sharedLock.lock();
try {
log.debug("Cancelling all queued build jobs");
queue.clear();
}
finally {
sharedLock.unlock();
}
log.debug("Cancelling all queued build jobs");
queue.clear();
}

/**
* Cancel all running build jobs.
*/
public void cancelAllRunningBuildJobs() {
sharedLock.lock();
try {
for (BuildJobQueueItem buildJob : processingJobs.values()) {
cancelBuildJob(buildJob.id());
}
}
finally {
sharedLock.unlock();
for (BuildJobQueueItem buildJob : processingJobs.values()) {
cancelBuildJob(buildJob.id());
}

}

/**
Expand All @@ -220,13 +194,7 @@ public void cancelAllRunningBuildJobs() {
* @param agentName name of the agent
*/
public void cancelAllRunningBuildJobsForAgent(String agentName) {
sharedLock.lock();
try {
processingJobs.values().stream().filter(job -> Objects.equals(job.buildAgentAddress(), agentName)).forEach(job -> cancelBuildJob(job.id()));
}
finally {
sharedLock.unlock();
}
processingJobs.values().stream().filter(job -> Objects.equals(job.buildAgentAddress(), agentName)).forEach(job -> cancelBuildJob(job.id()));
}

/**
Expand All @@ -235,19 +203,13 @@ public void cancelAllRunningBuildJobsForAgent(String agentName) {
* @param courseId id of the course
*/
public void cancelAllQueuedBuildJobsForCourse(long courseId) {
sharedLock.lock();
try {
List<BuildJobQueueItem> toRemove = new ArrayList<>();
for (BuildJobQueueItem job : queue) {
if (job.courseId() == courseId) {
toRemove.add(job);
}
List<BuildJobQueueItem> toRemove = new ArrayList<>();
for (BuildJobQueueItem job : queue) {
if (job.courseId() == courseId) {
toRemove.add(job);
}
queue.removeAll(toRemove);
}
finally {
sharedLock.unlock();
}
queue.removeAll(toRemove);
}

/**
Expand All @@ -269,25 +231,19 @@ public void cancelAllRunningBuildJobsForCourse(long courseId) {
* @param participationId id of the participation
*/
public void cancelAllJobsForParticipation(long participationId) {
sharedLock.lock();
try {
List<BuildJobQueueItem> toRemove = new ArrayList<>();
for (BuildJobQueueItem queuedJob : queue) {
if (queuedJob.participationId() == participationId) {
toRemove.add(queuedJob);
}
List<BuildJobQueueItem> toRemove = new ArrayList<>();
for (BuildJobQueueItem queuedJob : queue) {
if (queuedJob.participationId() == participationId) {
toRemove.add(queuedJob);
}
queue.removeAll(toRemove);
}
queue.removeAll(toRemove);

for (BuildJobQueueItem runningJob : processingJobs.values()) {
if (runningJob.participationId() == participationId) {
cancelBuildJob(runningJob.id());
}
for (BuildJobQueueItem runningJob : processingJobs.values()) {
if (runningJob.participationId() == participationId) {
cancelBuildJob(runningJob.id());
}
}
finally {
sharedLock.unlock();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import com.hazelcast.collection.ItemEvent;
import com.hazelcast.collection.ItemListener;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.cp.lock.FencedLock;
import com.hazelcast.map.IMap;

import de.tum.in.www1.artemis.domain.BuildLogEntry;
Expand Down Expand Up @@ -65,11 +64,6 @@ public class SharedQueueProcessingService {

private final BuildAgentSshKeyService buildAgentSSHKeyService;

/**
* Lock to prevent multiple nodes from processing the same build job.
*/
private FencedLock sharedLock;

private IQueue<BuildJobQueueItem> queue;

private IQueue<ResultQueueItem> resultQueue;
Expand Down Expand Up @@ -104,7 +98,6 @@ public SharedQueueProcessingService(@Qualifier("hazelcastInstance") HazelcastIns
public void init() {
this.buildAgentInformation = this.hazelcastInstance.getMap("buildAgentInformation");
this.processingJobs = this.hazelcastInstance.getMap("processingJobs");
this.sharedLock = this.hazelcastInstance.getCPSubsystem().getLock("buildJobQueueLock");
this.queue = this.hazelcastInstance.getQueue("buildJobQueue");
this.resultQueue = this.hazelcastInstance.getQueue("buildResultQueue");
this.listenerId = this.queue.addItemListener(new QueuedBuildJobItemListener(), true);
Expand Down Expand Up @@ -176,14 +169,8 @@ private void checkAvailabilityAndProcessNextBuild() {
return;
}

// Lock the queue to prevent multiple nodes from processing the same build job
sharedLock.lock();
try {
buildJob = addToProcessingJobs();
}
finally {
sharedLock.unlock();
}
buildJob = addToProcessingJobs();

processBuild(buildJob);
}
catch (RejectedExecutionException e) {
Expand Down

0 comments on commit b4a87d9

Please sign in to comment.