Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Iris: Improve code quality #9494

Closed
wants to merge 12 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import de.tum.cit.aet.artemis.atlas.domain.competency.CompetencyTaxonomy;
import de.tum.cit.aet.artemis.core.domain.Course;
import de.tum.cit.aet.artemis.core.domain.User;
import de.tum.cit.aet.artemis.iris.service.pyris.PyrisJobService;
import de.tum.cit.aet.artemis.iris.service.pyris.PyrisPipelineService;
import de.tum.cit.aet.artemis.iris.service.pyris.dto.competency.PyrisCompetencyExtractionPipelineExecutionDTO;
import de.tum.cit.aet.artemis.iris.service.pyris.dto.competency.PyrisCompetencyRecommendationDTO;
Expand All @@ -27,12 +26,9 @@ public class IrisCompetencyGenerationService {

private final IrisWebsocketService websocketService;

private final PyrisJobService pyrisJobService;

public IrisCompetencyGenerationService(PyrisPipelineService pyrisPipelineService, IrisWebsocketService websocketService, PyrisJobService pyrisJobService) {
public IrisCompetencyGenerationService(PyrisPipelineService pyrisPipelineService, IrisWebsocketService websocketService) {
this.pyrisPipelineService = pyrisPipelineService;
this.websocketService = websocketService;
this.pyrisJobService = pyrisJobService;
}
MichaelOwenDyer marked this conversation as resolved.
Show resolved Hide resolved

/**
Expand All @@ -48,7 +44,7 @@ public void executeCompetencyExtractionPipeline(User user, Course course, String
pyrisPipelineService.executePipeline(
"competency-extraction",
"default",
pyrisJobService.createTokenForJob(token -> new CompetencyExtractionJob(token, course.getId(), user.getLogin())),
jobId -> new CompetencyExtractionJob(jobId, course.getId(), user.getLogin()),
MichaelOwenDyer marked this conversation as resolved.
Show resolved Hide resolved
executionDto -> new PyrisCompetencyExtractionPipelineExecutionDTO(executionDto, courseDescription, currentCompetencies, CompetencyTaxonomy.values(), 5),
stages -> websocketService.send(user.getLogin(), websocketTopic(course.getId()), new PyrisCompetencyStatusUpdateDTO(stages, null))
);
MichaelOwenDyer marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

import de.tum.cit.aet.artemis.core.exception.AccessForbiddenException;
import de.tum.cit.aet.artemis.core.exception.ConflictException;
import de.tum.cit.aet.artemis.iris.service.pyris.job.CourseChatJob;
import de.tum.cit.aet.artemis.iris.service.pyris.job.ExerciseChatJob;
import de.tum.cit.aet.artemis.iris.service.pyris.job.IngestionWebhookJob;
import de.tum.cit.aet.artemis.iris.service.pyris.job.PyrisJob;

Expand Down Expand Up @@ -63,33 +61,20 @@ public void init() {
}

/**
* Creates a token for an arbitrary job, runs the provided function with the token as an argument,
* and stores the job in the job map.
* Creates a new job token, runs the provided function with the token as an argument,
* and registers the resulting job in Hazelcast.
* The job token is then returned for later reference.
*
* @param tokenToJobFunction the function to run with the token
* @return the generated token
*/
public String createTokenForJob(Function<String, PyrisJob> tokenToJobFunction) {
public String registerJob(Function<String, PyrisJob> tokenToJobFunction) {
var token = generateJobIdToken();
var job = tokenToJobFunction.apply(token);
jobMap.put(token, job);
return token;
MichaelOwenDyer marked this conversation as resolved.
Show resolved Hide resolved
}

MichaelOwenDyer marked this conversation as resolved.
Show resolved Hide resolved
public String addExerciseChatJob(Long courseId, Long exerciseId, Long sessionId) {
var token = generateJobIdToken();
var job = new ExerciseChatJob(token, courseId, exerciseId, sessionId);
jobMap.put(token, job);
return token;
}

public String addCourseChatJob(Long courseId, Long sessionId) {
var token = generateJobIdToken();
var job = new CourseChatJob(token, courseId, sessionId);
jobMap.put(token, job);
return token;
}

/**
* Adds a new ingestion webhook job to the job map with a timeout.
*
Expand Down Expand Up @@ -129,7 +114,7 @@ public PyrisJob getJob(String token) {
* 2. Retrieves the PyrisJob object associated with the provided token.
* 3. Throws an AccessForbiddenException if the token is invalid or not provided.
* <p>
* The token was previously generated via {@link #createTokenForJob(Function)}
* The token was previously generated via {@link #registerJob(Function)}
MichaelOwenDyer marked this conversation as resolved.
Show resolved Hide resolved
*
* @param request the HttpServletRequest object representing the incoming request
* @param jobClass the class of the PyrisJob object to cast the retrieved job to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,7 @@

import static de.tum.cit.aet.artemis.core.config.Constants.PROFILE_IRIS;

import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;

Expand All @@ -17,27 +12,11 @@
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Service;

import de.tum.cit.aet.artemis.atlas.domain.competency.CompetencyJol;
import de.tum.cit.aet.artemis.atlas.dto.CompetencyJolDTO;
import de.tum.cit.aet.artemis.core.domain.Course;
import de.tum.cit.aet.artemis.core.repository.CourseRepository;
import de.tum.cit.aet.artemis.exercise.domain.participation.StudentParticipation;
import de.tum.cit.aet.artemis.exercise.repository.StudentParticipationRepository;
import de.tum.cit.aet.artemis.exercise.service.LearningMetricsService;
import de.tum.cit.aet.artemis.iris.domain.session.IrisCourseChatSession;
import de.tum.cit.aet.artemis.iris.domain.session.IrisExerciseChatSession;
import de.tum.cit.aet.artemis.iris.exception.IrisException;
import de.tum.cit.aet.artemis.iris.service.pyris.dto.PyrisPipelineExecutionDTO;
import de.tum.cit.aet.artemis.iris.service.pyris.dto.PyrisPipelineExecutionSettingsDTO;
import de.tum.cit.aet.artemis.iris.service.pyris.dto.chat.course.PyrisCourseChatPipelineExecutionDTO;
import de.tum.cit.aet.artemis.iris.service.pyris.dto.chat.exercise.PyrisExerciseChatPipelineExecutionDTO;
import de.tum.cit.aet.artemis.iris.service.pyris.dto.data.PyrisCourseDTO;
import de.tum.cit.aet.artemis.iris.service.pyris.dto.data.PyrisExtendedCourseDTO;
import de.tum.cit.aet.artemis.iris.service.pyris.dto.data.PyrisUserDTO;
import de.tum.cit.aet.artemis.iris.service.pyris.dto.status.PyrisStageDTO;
import de.tum.cit.aet.artemis.iris.service.websocket.IrisChatWebsocketService;
import de.tum.cit.aet.artemis.programming.domain.ProgrammingExercise;
import de.tum.cit.aet.artemis.programming.domain.ProgrammingSubmission;
import de.tum.cit.aet.artemis.iris.service.pyris.job.PyrisJob;

/**
* Service responsible for executing the various Pyris pipelines in a type-safe manner.
Expand All @@ -53,34 +32,18 @@ public class PyrisPipelineService {

private final PyrisJobService pyrisJobService;

private final PyrisDTOService pyrisDTOService;

private final IrisChatWebsocketService irisChatWebsocketService;

private final CourseRepository courseRepository;

private final StudentParticipationRepository studentParticipationRepository;

private final LearningMetricsService learningMetricsService;

@Value("${server.url}")
private String artemisBaseUrl;

public PyrisPipelineService(PyrisConnectorService pyrisConnectorService, PyrisJobService pyrisJobService, PyrisDTOService pyrisDTOService,
IrisChatWebsocketService irisChatWebsocketService, CourseRepository courseRepository, LearningMetricsService learningMetricsService,
StudentParticipationRepository studentParticipationRepository) {
public PyrisPipelineService(PyrisConnectorService pyrisConnectorService, PyrisJobService pyrisJobService) {
this.pyrisConnectorService = pyrisConnectorService;
this.pyrisJobService = pyrisJobService;
this.pyrisDTOService = pyrisDTOService;
this.irisChatWebsocketService = irisChatWebsocketService;
this.courseRepository = courseRepository;
this.learningMetricsService = learningMetricsService;
this.studentParticipationRepository = studentParticipationRepository;
}

/**
* Executes a pipeline on Pyris, identified by the given name and variant.
* The pipeline execution is tracked by a unique job token, which must be provided by the caller.
* The pipeline execution is tracked by a unique job token, which is created for each new job automatically.
* The caller must provide a mapper function to take this job token and produce a {@code PyrisJob} object to be registered.
MichaelOwenDyer marked this conversation as resolved.
Show resolved Hide resolved
MichaelOwenDyer marked this conversation as resolved.
Show resolved Hide resolved
MichaelOwenDyer marked this conversation as resolved.
Show resolved Hide resolved
* The caller must additionally provide a mapper function to create the concrete DTO type for this pipeline from the base DTO.
* The status of the pipeline execution is updated via a consumer that accepts a list of stages. This method will
* call the consumer with the initial stages of the pipeline execution. Later stages will be sent back from Pyris,
Expand All @@ -89,11 +52,12 @@ public PyrisPipelineService(PyrisConnectorService pyrisConnectorService, PyrisJo
*
* @param name the name of the pipeline to be executed
* @param variant the variant of the pipeline
* @param jobToken a unique job token for tracking the pipeline execution
* @param dtoMapper a function to create the concrete DTO type for this pipeline from the base DTO
* @param statusUpdater a consumer to update the status of the pipeline execution
* @param jobFunction a function from job token to job. Creates a new {@code PyrisJob} which will be registered in Hazelcast
* @param dtoMapper a function to create the concrete DTO type for this pipeline from the base execution DTO
* @param statusUpdater a consumer of stages to send status updates while the pipeline is being prepared
*/
public void executePipeline(String name, String variant, String jobToken, Function<PyrisPipelineExecutionDTO, Object> dtoMapper, Consumer<List<PyrisStageDTO>> statusUpdater) {
public void executePipeline(String name, String variant, Function<String, PyrisJob> jobFunction, Function<PyrisPipelineExecutionDTO, Object> dtoMapper,
Consumer<List<PyrisStageDTO>> statusUpdater) {
MichaelOwenDyer marked this conversation as resolved.
Show resolved Hide resolved
// Define the preparation stages of pipeline execution with their initial states
// There will be more stages added in Pyris later
var preparing = new PyrisStageDTO("Preparing", 10, null, null);
Expand All @@ -102,6 +66,8 @@ public void executePipeline(String name, String variant, String jobToken, Functi
// Send initial status update indicating that the preparation stage is in progress
statusUpdater.accept(List.of(preparing.inProgress(), executing.notStarted()));

String jobToken = pyrisJobService.registerJob(jobFunction);
MichaelOwenDyer marked this conversation as resolved.
Show resolved Hide resolved

MichaelOwenDyer marked this conversation as resolved.
Show resolved Hide resolved
var baseDto = new PyrisPipelineExecutionDTO(new PyrisPipelineExecutionSettingsDTO(jobToken, List.of(), artemisBaseUrl), List.of(preparing.done()));
var pipelineDto = dtoMapper.apply(baseDto);

Expand All @@ -123,106 +89,4 @@ public void executePipeline(String name, String variant, String jobToken, Functi
statusUpdater.accept(List.of(preparing.error("An internal error occurred"), executing.notStarted()));
}
}

/**
* Execute the exercise chat pipeline for the given session.
* It provides specific data for the exercise chat pipeline, including:
* - The latest submission of the student
* - The programming exercise
* - The course the exercise is part of
* <p>
*
* @param variant the variant of the pipeline
* @param latestSubmission the latest submission of the student
* @param exercise the programming exercise
* @param session the chat session
* @see PyrisPipelineService#executePipeline for more details on the pipeline execution process.
*/
public void executeExerciseChatPipeline(String variant, Optional<ProgrammingSubmission> latestSubmission, ProgrammingExercise exercise, IrisExerciseChatSession session) {
// @formatter:off
executePipeline(
"tutor-chat", // TODO: Rename this to 'exercise-chat' with next breaking Pyris version
variant,
pyrisJobService.addExerciseChatJob(exercise.getCourseViaExerciseGroupOrCourseMember().getId(), exercise.getId(), session.getId()),
executionDto -> {
var course = exercise.getCourseViaExerciseGroupOrCourseMember();
return new PyrisExerciseChatPipelineExecutionDTO(
latestSubmission.map(pyrisDTOService::toPyrisSubmissionDTO).orElse(null),
pyrisDTOService.toPyrisProgrammingExerciseDTO(exercise),
new PyrisCourseDTO(course),
pyrisDTOService.toPyrisMessageDTOList(session.getMessages()),
new PyrisUserDTO(session.getUser()),
executionDto.settings(),
executionDto.initialStages()
);
},
stages -> irisChatWebsocketService.sendStatusUpdate(session, stages)
);
// @formatter:on
}

/**
* Execute the course chat pipeline for the given session.
* It provides specific data for the course chat pipeline, including:
* - The full course with the participation of the student
* - The metrics of the student in the course
* - The competency JoL if this is due to a JoL set event
* <p>
*
* @param variant the variant of the pipeline
* @param session the chat session
* @param competencyJol if this is due to a JoL set event, this must be the newly created competencyJoL
* @see PyrisPipelineService#executePipeline for more details on the pipeline execution process.
*/
public void executeCourseChatPipeline(String variant, IrisCourseChatSession session, CompetencyJol competencyJol) {
// @formatter:off
var courseId = session.getCourse().getId();
var studentId = session.getUser().getId();
executePipeline(
"course-chat",
variant,
pyrisJobService.addCourseChatJob(courseId, session.getId()),
executionDto -> {
var fullCourse = loadCourseWithParticipationOfStudent(courseId, studentId);
return new PyrisCourseChatPipelineExecutionDTO(
PyrisExtendedCourseDTO.of(fullCourse),
learningMetricsService.getStudentCourseMetrics(session.getUser().getId(), courseId),
competencyJol == null ? null : CompetencyJolDTO.of(competencyJol),
pyrisDTOService.toPyrisMessageDTOList(session.getMessages()),
new PyrisUserDTO(session.getUser()),
executionDto.settings(), // flatten the execution dto here
executionDto.initialStages()
);
},
stages -> irisChatWebsocketService.sendStatusUpdate(session, stages)
);
// @formatter:on
}

/**
* Load the course with the participation of the student and set the participations on the exercises.
* <p>
* Spring Boot 3 does not support conditional left joins, so we have to load the participations separately.
*
* @param courseId the id of the course
* @param studentId the id of the student
*/
private Course loadCourseWithParticipationOfStudent(long courseId, long studentId) {
Course course = courseRepository.findWithEagerExercisesAndLecturesAndAttachmentsAndLectureUnitsAndCompetenciesAndExamsById(courseId).orElseThrow();
List<StudentParticipation> participations = studentParticipationRepository.findByStudentIdAndIndividualExercisesWithEagerSubmissionsResultIgnoreTestRuns(studentId,
course.getExercises());

Map<Long, Set<StudentParticipation>> participationMap = new HashMap<>();
for (StudentParticipation participation : participations) {
Long exerciseId = participation.getExercise().getId();
participationMap.computeIfAbsent(exerciseId, k -> new HashSet<>()).add(participation);
}

course.getExercises().forEach(exercise -> {
Set<StudentParticipation> exerciseParticipations = participationMap.getOrDefault(exercise.getId(), Set.of());
exercise.setStudentParticipations(exerciseParticipations);
});

return course;
}
}
Loading
Loading