Skip to content

Commit

Permalink
#30367 Integration test for testing the abandoned job detection funct…
Browse files Browse the repository at this point in the history
…ionality.
  • Loading branch information
jgambarios committed Nov 19, 2024
1 parent 3491712 commit 43f8cd3
Showing 1 changed file with 138 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,21 @@
import com.dotcms.jobs.business.processor.JobProcessor;
import com.dotcms.jobs.business.processor.ProgressTracker;
import com.dotcms.util.IntegrationTestInitService;
import com.dotmarketing.business.APILocator;
import com.dotmarketing.common.db.DotConnect;
import com.dotmarketing.exception.DotDataException;
import com.dotmarketing.util.Config;
import com.dotmarketing.util.Logger;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -63,6 +69,9 @@ public class JobQueueManagerAPIIntegrationTest extends com.dotcms.Junit5WeldBase
static void setUp() throws Exception {
// Initialize the test environment
IntegrationTestInitService.getInstance().init();

Config.setProperty("JOB_ABANDONMENT_DETECTION_INTERVAL_MINUTES", "1");
Config.setProperty("JOB_ABANDONMENT_THRESHOLD_MINUTES", "2");
}

/**
Expand All @@ -73,10 +82,15 @@ static void setUp() throws Exception {
*/
@AfterAll
void cleanUp() throws Exception {
if(null != jobQueueManagerAPI) {
jobQueueManagerAPI.close();
}

clearJobs();

Config.setProperty("JOB_ABANDONMENT_DETECTION_INTERVAL_MINUTES", "5");
Config.setProperty("JOB_ABANDONMENT_THRESHOLD_MINUTES", "30");

if (null != jobQueueManagerAPI) {
jobQueueManagerAPI.close();
}
}

@BeforeEach
Expand Down Expand Up @@ -450,6 +464,127 @@ void test_CombinedScenarios() throws Exception {
});
}

/**
* Tests the abandoned job detection functionality.
* Given Scenario: A job exists in RUNNING state with an old timestamp
* ExpectedResult: The job is detected as abandoned, marked accordingly and retried successfully
*/
@Test
@Order(7)
void test_AbandonedJobDetection() throws Exception {

final String jobId = UUID.randomUUID().toString();
final String queueName = "abandonedQueue";
final Map<String, Object> parameters = Collections.singletonMap("test", "value");
final String serverId = APILocator.getServerAPI().readServerId();
final LocalDateTime oldTimestamp = LocalDateTime.now().minusMinutes(5);

// Create a job directly in the database in RUNNING state to simulate an abandoned job
DotConnect dc = new DotConnect();

// Insert into job table
dc.setSQL("INSERT INTO job (id, queue_name, state, parameters, created_at, updated_at, started_at, execution_node) VALUES (?, ?, ?, ?::jsonb, ?, ?, ?, ?)")
.addParam(jobId)
.addParam(queueName)
.addParam(JobState.RUNNING.name())
.addParam(new ObjectMapper().writeValueAsString(parameters))
.addParam(Timestamp.valueOf(oldTimestamp))
.addParam(Timestamp.valueOf(oldTimestamp))
.addParam(Timestamp.valueOf(oldTimestamp))
.addParam(serverId)
.loadResult();

// Insert into job_queue table
dc.setSQL("INSERT INTO job_queue (id, queue_name, state, created_at) VALUES (?, ?, ?, ?)")
.addParam(jobId)
.addParam(queueName)
.addParam(JobState.RUNNING.name())
.addParam(Timestamp.valueOf(oldTimestamp))
.loadResult();

// Insert initial state into job_history
dc.setSQL("INSERT INTO job_history (id, job_id, state, execution_node, created_at) VALUES (?, ?, ?, ?, ?)")
.addParam(UUID.randomUUID().toString())
.addParam(jobId)
.addParam(JobState.RUNNING.name())
.addParam(serverId)
.addParam(Timestamp.valueOf(oldTimestamp))
.loadResult();

// Verify the job was created in RUNNING state
Job initialJob = jobQueueManagerAPI.getJob(jobId);
assertEquals(JobState.RUNNING, initialJob.state(),
"Job should be in RUNNING state initially");

// Start job queue manager if not started
if (!jobQueueManagerAPI.isStarted()) {
jobQueueManagerAPI.start();
jobQueueManagerAPI.awaitStart(5, TimeUnit.SECONDS);
}

// Register a processor for the abandoned job
jobQueueManagerAPI.registerProcessor(queueName, AbbandonedJobProcessor.class);

// The job should be marked as abandoned
CountDownLatch latch = new CountDownLatch(1);
jobQueueManagerAPI.watchJob(jobId, job -> {
if (job.state() == JobState.ABANDONED) {
latch.countDown();
}
});

boolean abandoned = latch.await(3, TimeUnit.MINUTES);
assertTrue(abandoned, "Job should be marked as abandoned within timeout period");

// Verify the abandoned job state and error details
Job abandonedJob = jobQueueManagerAPI.getJob(jobId);
assertEquals(JobState.ABANDONED, abandonedJob.state(),
"Job should be in ABANDONED state");
assertTrue(abandonedJob.result().isPresent(),
"Abandoned job should have a result");
assertTrue(abandonedJob.result().get().errorDetail().isPresent(),
"Abandoned job should have error details");
assertTrue(abandonedJob.result().get().errorDetail().get().message()
.contains("abandoned due to no updates"),
"Error message should indicate abandonment");

// Verify the job was put back in queue for retry and completed successfully
Awaitility.await().atMost(15, TimeUnit.SECONDS)
.pollInterval(100, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
Job job = jobQueueManagerAPI.getJob(jobId);
assertEquals(JobState.COMPLETED, job.state(),
"Job should be in COMPLETED state");
});

// Verify job history contains the state transitions
dc.setSQL("SELECT state FROM job_history WHERE job_id = ? ORDER BY created_at")
.addParam(jobId);
List<Map<String, Object>> history = dc.loadObjectResults();

assertFalse(history.isEmpty(), "Job should have history records");
assertEquals(JobState.RUNNING.name(), history.get(0).get("state"),
"First state should be RUNNING");
assertEquals(JobState.ABANDONED.name(), history.get(1).get("state"),
"Second state should be ABANDONED");
assertEquals(JobState.RUNNING.name(), history.get(2).get("state"),
"Third state should be RUNNING");
assertEquals(JobState.COMPLETED.name(), history.get(3).get("state"),
"Latest state should be COMPLETED");
}

static class AbbandonedJobProcessor implements JobProcessor {

@Override
public void process(Job job) {
}

@Override
public Map<String, Object> getResultMetadata(Job job) {
return Collections.emptyMap();
}
}

static class ProgressTrackingJobProcessor implements JobProcessor {
@Override
public void process(Job job) {
Expand Down

0 comments on commit 43f8cd3

Please sign in to comment.