Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrated code lifecycle: Fix an issue with concurrent build queue access #9876

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,9 @@ private BuildAgentInformation getUpdatedLocalBuildAgentInformation(BuildJobQueue
}

private List<BuildJobQueueItem> getProcessingJobsOfNode(String memberAddress) {
return processingJobs.values().stream().filter(job -> Objects.equals(job.buildAgent().memberAddress(), memberAddress)).toList();
// NOTE: we should not use streams with IMap, because it can be unstable, when many items are added at the same time and there is a slow network condition
List<BuildJobQueueItem> processingJobsList = new ArrayList<>(processingJobs.values());
return processingJobsList.stream().filter(job -> Objects.equals(job.buildAgent().memberAddress(), memberAddress)).toList();
krusche marked this conversation as resolved.
Show resolved Hide resolved
}

private void removeOfflineNodes() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,36 +108,45 @@ public void pushDockerImageCleanupInfo() {
}
}

/**
* @return a copy of the queued build jobs as ArrayList
*/
public List<BuildJobQueueItem> getQueuedJobs() {
return queue.stream().toList();
// NOTE: we should not use streams with IQueue directly, because it can be unstable, when many items are added at the same time and there is a slow network condition
return new ArrayList<>(queue);
}

/**
* @return a copy of the processing jobs as ArrayList
*/
public List<BuildJobQueueItem> getProcessingJobs() {
return processingJobs.values().stream().toList();
// NOTE: we should not use streams with IMap, because it can be unstable, when many items are added at the same time and there is a slow network condition
return new ArrayList<>(processingJobs.values());
}

public List<BuildJobQueueItem> getQueuedJobsForCourse(long courseId) {
return queue.stream().filter(job -> job.courseId() == courseId).toList();
return getQueuedJobs().stream().filter(job -> job.courseId() == courseId).toList();
}

public List<BuildJobQueueItem> getProcessingJobsForCourse(long courseId) {
return processingJobs.values().stream().filter(job -> job.courseId() == courseId).toList();
return getProcessingJobs().stream().filter(job -> job.courseId() == courseId).toList();
}

public List<BuildJobQueueItem> getQueuedJobsForParticipation(long participationId) {
return queue.stream().filter(job -> job.participationId() == participationId).toList();
return getQueuedJobs().stream().filter(job -> job.participationId() == participationId).toList();
}

public List<BuildJobQueueItem> getProcessingJobsForParticipation(long participationId) {
return processingJobs.values().stream().filter(job -> job.participationId() == participationId).toList();
return getProcessingJobs().stream().filter(job -> job.participationId() == participationId).toList();
}

public List<BuildAgentInformation> getBuildAgentInformation() {
return buildAgentInformation.values().stream().toList();
// NOTE: we should not use streams with IMap, because it can be unstable, when many items are added at the same time and there is a slow network condition
return new ArrayList<>(buildAgentInformation.values());
}

public List<BuildAgentInformation> getBuildAgentInformationWithoutRecentBuildJobs() {
return buildAgentInformation.values().stream().map(agent -> new BuildAgentInformation(agent.buildAgent(), agent.maxNumberOfConcurrentBuildJobs(),
return getBuildAgentInformation().stream().map(agent -> new BuildAgentInformation(agent.buildAgent(), agent.maxNumberOfConcurrentBuildJobs(),
agent.numberOfCurrentBuildJobs(), agent.runningBuildJobs(), agent.status(), null, null)).toList();
}

Expand All @@ -156,9 +165,10 @@ public void resumeBuildAgent(String agent) {
*/
public void cancelBuildJob(String buildJobId) {
// Remove build job if it is queued
if (queue.stream().anyMatch(job -> Objects.equals(job.id(), buildJobId))) {
List<BuildJobQueueItem> queuedJobs = getQueuedJobs();
if (queuedJobs.stream().anyMatch(job -> Objects.equals(job.id(), buildJobId))) {
List<BuildJobQueueItem> toRemove = new ArrayList<>();
for (BuildJobQueueItem job : queue) {
for (BuildJobQueueItem job : queuedJobs) {
if (Objects.equals(job.id(), buildJobId)) {
toRemove.add(job);
}
Expand Down Expand Up @@ -197,7 +207,8 @@ public void cancelAllQueuedBuildJobs() {
* Cancel all running build jobs.
*/
public void cancelAllRunningBuildJobs() {
for (BuildJobQueueItem buildJob : processingJobs.values()) {
List<BuildJobQueueItem> runningJobs = getProcessingJobs();
for (BuildJobQueueItem buildJob : runningJobs) {
cancelBuildJob(buildJob.id());
}
}
Expand All @@ -208,7 +219,7 @@ public void cancelAllRunningBuildJobs() {
* @param agentName name of the agent
*/
public void cancelAllRunningBuildJobsForAgent(String agentName) {
processingJobs.values().stream().filter(job -> Objects.equals(job.buildAgent().name(), agentName)).forEach(job -> cancelBuildJob(job.id()));
getProcessingJobs().stream().filter(job -> Objects.equals(job.buildAgent().name(), agentName)).forEach(job -> cancelBuildJob(job.id()));
}

/**
Expand All @@ -217,8 +228,9 @@ public void cancelAllRunningBuildJobsForAgent(String agentName) {
* @param courseId id of the course
*/
public void cancelAllQueuedBuildJobsForCourse(long courseId) {
List<BuildJobQueueItem> queuedJobs = getQueuedJobs();
List<BuildJobQueueItem> toRemove = new ArrayList<>();
for (BuildJobQueueItem job : queue) {
for (BuildJobQueueItem job : queuedJobs) {
if (job.courseId() == courseId) {
toRemove.add(job);
}
Expand All @@ -232,7 +244,8 @@ public void cancelAllQueuedBuildJobsForCourse(long courseId) {
* @param courseId id of the course
*/
public void cancelAllRunningBuildJobsForCourse(long courseId) {
for (BuildJobQueueItem buildJob : processingJobs.values()) {
List<BuildJobQueueItem> runningJobs = getProcessingJobs();
for (BuildJobQueueItem buildJob : runningJobs) {
if (buildJob.courseId() == courseId) {
cancelBuildJob(buildJob.id());
}
Expand All @@ -246,14 +259,16 @@ public void cancelAllRunningBuildJobsForCourse(long courseId) {
*/
public void cancelAllJobsForParticipation(long participationId) {
List<BuildJobQueueItem> toRemove = new ArrayList<>();
for (BuildJobQueueItem queuedJob : queue) {
List<BuildJobQueueItem> queuedJobs = getQueuedJobs();
for (BuildJobQueueItem queuedJob : queuedJobs) {
if (queuedJob.participationId() == participationId) {
toRemove.add(queuedJob);
}
}
queue.removeAll(toRemove);

for (BuildJobQueueItem runningJob : processingJobs.values()) {
List<BuildJobQueueItem> runningJobs = getProcessingJobs();
for (BuildJobQueueItem runningJob : runningJobs) {
if (runningJob.participationId() == participationId) {
cancelBuildJob(runningJob.id());
}
Expand Down
Loading