diff --git a/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPIIntegrationTest.java b/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPIIntegrationTest.java index 97c3387fde78..986b2de95b94 100644 --- a/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPIIntegrationTest.java +++ b/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPIIntegrationTest.java @@ -13,15 +13,21 @@ import com.dotcms.jobs.business.processor.JobProcessor; import com.dotcms.jobs.business.processor.ProgressTracker; import com.dotcms.util.IntegrationTestInitService; +import com.dotmarketing.business.APILocator; import com.dotmarketing.common.db.DotConnect; import com.dotmarketing.exception.DotDataException; +import com.dotmarketing.util.Config; import com.dotmarketing.util.Logger; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.sql.Timestamp; +import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -63,6 +69,9 @@ public class JobQueueManagerAPIIntegrationTest extends com.dotcms.Junit5WeldBase static void setUp() throws Exception { // Initialize the test environment IntegrationTestInitService.getInstance().init(); + + Config.setProperty("JOB_ABANDONMENT_DETECTION_INTERVAL_MINUTES", "1"); + Config.setProperty("JOB_ABANDONMENT_THRESHOLD_MINUTES", "2"); } /** @@ -73,10 +82,15 @@ static void setUp() throws Exception { */ @AfterAll void cleanUp() throws Exception { - if(null != jobQueueManagerAPI) { - jobQueueManagerAPI.close(); - } + clearJobs(); + + Config.setProperty("JOB_ABANDONMENT_DETECTION_INTERVAL_MINUTES", "5"); + Config.setProperty("JOB_ABANDONMENT_THRESHOLD_MINUTES", "30"); + + if (null != jobQueueManagerAPI) { + jobQueueManagerAPI.close(); + } } @BeforeEach @@ -450,6 +464,127 @@ void test_CombinedScenarios() throws Exception { }); } + /** + * Tests the abandoned job detection functionality. + * Given Scenario: A job exists in RUNNING state with an old timestamp + * ExpectedResult: The job is detected as abandoned, marked accordingly and retried successfully + */ + @Test + @Order(7) + void test_AbandonedJobDetection() throws Exception { + + final String jobId = UUID.randomUUID().toString(); + final String queueName = "abandonedQueue"; + final Map parameters = Collections.singletonMap("test", "value"); + final String serverId = APILocator.getServerAPI().readServerId(); + final LocalDateTime oldTimestamp = LocalDateTime.now().minusMinutes(5); + + // Create a job directly in the database in RUNNING state to simulate an abandoned job + DotConnect dc = new DotConnect(); + + // Insert into job table + dc.setSQL("INSERT INTO job (id, queue_name, state, parameters, created_at, updated_at, started_at, execution_node) VALUES (?, ?, ?, ?::jsonb, ?, ?, ?, ?)") + .addParam(jobId) + .addParam(queueName) + .addParam(JobState.RUNNING.name()) + .addParam(new ObjectMapper().writeValueAsString(parameters)) + .addParam(Timestamp.valueOf(oldTimestamp)) + .addParam(Timestamp.valueOf(oldTimestamp)) + .addParam(Timestamp.valueOf(oldTimestamp)) + .addParam(serverId) + .loadResult(); + + // Insert into job_queue table + dc.setSQL("INSERT INTO job_queue (id, queue_name, state, created_at) VALUES (?, ?, ?, ?)") + .addParam(jobId) + .addParam(queueName) + .addParam(JobState.RUNNING.name()) + .addParam(Timestamp.valueOf(oldTimestamp)) + .loadResult(); + + // Insert initial state into job_history + dc.setSQL("INSERT INTO job_history (id, job_id, state, execution_node, created_at) VALUES (?, ?, ?, ?, ?)") + .addParam(UUID.randomUUID().toString()) + .addParam(jobId) + .addParam(JobState.RUNNING.name()) + .addParam(serverId) + .addParam(Timestamp.valueOf(oldTimestamp)) + .loadResult(); + + // Verify the job was created in RUNNING state + Job initialJob = jobQueueManagerAPI.getJob(jobId); + assertEquals(JobState.RUNNING, initialJob.state(), + "Job should be in RUNNING state initially"); + + // Start job queue manager if not started + if (!jobQueueManagerAPI.isStarted()) { + jobQueueManagerAPI.start(); + jobQueueManagerAPI.awaitStart(5, TimeUnit.SECONDS); + } + + // Register a processor for the abandoned job + jobQueueManagerAPI.registerProcessor(queueName, AbbandonedJobProcessor.class); + + // The job should be marked as abandoned + CountDownLatch latch = new CountDownLatch(1); + jobQueueManagerAPI.watchJob(jobId, job -> { + if (job.state() == JobState.ABANDONED) { + latch.countDown(); + } + }); + + boolean abandoned = latch.await(3, TimeUnit.MINUTES); + assertTrue(abandoned, "Job should be marked as abandoned within timeout period"); + + // Verify the abandoned job state and error details + Job abandonedJob = jobQueueManagerAPI.getJob(jobId); + assertEquals(JobState.ABANDONED, abandonedJob.state(), + "Job should be in ABANDONED state"); + assertTrue(abandonedJob.result().isPresent(), + "Abandoned job should have a result"); + assertTrue(abandonedJob.result().get().errorDetail().isPresent(), + "Abandoned job should have error details"); + assertTrue(abandonedJob.result().get().errorDetail().get().message() + .contains("abandoned due to no updates"), + "Error message should indicate abandonment"); + + // Verify the job was put back in queue for retry and completed successfully + Awaitility.await().atMost(15, TimeUnit.SECONDS) + .pollInterval(100, TimeUnit.MILLISECONDS) + .untilAsserted(() -> { + Job job = jobQueueManagerAPI.getJob(jobId); + assertEquals(JobState.COMPLETED, job.state(), + "Job should be in COMPLETED state"); + }); + + // Verify job history contains the state transitions + dc.setSQL("SELECT state FROM job_history WHERE job_id = ? ORDER BY created_at") + .addParam(jobId); + List> history = dc.loadObjectResults(); + + assertFalse(history.isEmpty(), "Job should have history records"); + assertEquals(JobState.RUNNING.name(), history.get(0).get("state"), + "First state should be RUNNING"); + assertEquals(JobState.ABANDONED.name(), history.get(1).get("state"), + "Second state should be ABANDONED"); + assertEquals(JobState.RUNNING.name(), history.get(2).get("state"), + "Third state should be RUNNING"); + assertEquals(JobState.COMPLETED.name(), history.get(3).get("state"), + "Latest state should be COMPLETED"); + } + + static class AbbandonedJobProcessor implements JobProcessor { + + @Override + public void process(Job job) { + } + + @Override + public Map getResultMetadata(Job job) { + return Collections.emptyMap(); + } + } + static class ProgressTrackingJobProcessor implements JobProcessor { @Override public void process(Job job) {