-
Notifications
You must be signed in to change notification settings - Fork 467
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
chore (Core): Created upgrade task for Postgres job queue Tables (#30325
) This pull request introduces a new task to create the necessary tables and indexes for the Postgres Job Queue implementation. It includes the implementation of the task, updates to the task locator, and the addition of corresponding tests. ### Implementation of Postgres Job Queue Tables * [`dotCMS/src/main/java/com/dotmarketing/startup/runonce/Task241009CreatePostgresJobQueueTables.java`](diffhunk://#diff-770c772049f1e161c67da6e326cb57b8f5c6ca1cf22bea61d8d79d8f94b4f4f2R1-R134): Added a new startup task to create `job_queue`, `job`, and `job_history` tables along with their associated indexes. ### Task Locator Update * [`dotCMS/src/main/java/com/dotmarketing/util/TaskLocatorUtil.java`](diffhunk://#diff-67a18c9d18fc6ddfd57f180a2155c32429cd137f8492419e77d4956e137f7a8bR352): Updated the `getStartupRunOnceTaskClasses` method to include `Task241009CreatePostgresJobQueueTables`. ### Test Suite Update * [`dotcms-integration/src/test/java/com/dotcms/MainSuite2b.java`](diffhunk://#diff-3a7b8388a34da3ff66e6b26e2fa9c4062c81f7c33f94d5df5ff30d92de8185acL329-R330): Added `Task241009CreatePostgresJobQueueTablesTest` to the test suite. ### New Test Class * [`dotcms-integration/src/test/java/com/dotmarketing/startup/runonce/Task241009CreatePostgresJobQueueTablesTest.java`](diffhunk://#diff-9e16296a2c0c0a5ffed890515d398b5f10be73915db863fc3c3d56e73d7fb44fR1-R105): Added a new test class to ensure the job queue tables are properly created by the upgrade task.
- Loading branch information
1 parent
183afba
commit 8f2ccfa
Showing
4 changed files
with
362 additions
and
1 deletion.
There are no files selected for viewing
182 changes: 182 additions & 0 deletions
182
...rc/main/java/com/dotmarketing/startup/runonce/Task241009CreatePostgresJobQueueTables.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,182 @@ | ||
package com.dotmarketing.startup.runonce; | ||
|
||
import com.dotmarketing.common.db.DotConnect; | ||
import com.dotmarketing.common.db.DotDatabaseMetaData; | ||
import com.dotmarketing.db.DbConnectionFactory; | ||
import com.dotmarketing.exception.DotDataException; | ||
import com.dotmarketing.exception.DotRuntimeException; | ||
import com.dotmarketing.startup.StartupTask; | ||
import com.dotmarketing.util.Logger; | ||
import java.sql.Connection; | ||
import java.sql.SQLException; | ||
|
||
/** | ||
* Upgrade task to create the necessary tables and indexes for the Postgres Job Queue | ||
* implementation. This task creates the job_queue, job, and job_history tables along with their | ||
* associated indexes. | ||
*/ | ||
public class Task241009CreatePostgresJobQueueTables implements StartupTask { | ||
|
||
@Override | ||
public boolean forceRun() { | ||
return true; | ||
} | ||
|
||
/** | ||
* Executes the upgrade task, creating the necessary tables and indexes for the Job Queue. | ||
* | ||
* @throws DotDataException if a data access error occurs. | ||
* @throws DotRuntimeException if a runtime error occurs. | ||
*/ | ||
@Override | ||
public void executeUpgrade() throws DotDataException, DotRuntimeException { | ||
|
||
final Connection connection = DbConnectionFactory.getConnection(); | ||
final DotDatabaseMetaData databaseMetaData = new DotDatabaseMetaData(); | ||
|
||
try { | ||
|
||
// job_queue table | ||
if (!databaseMetaData.tableExists(connection, "job_queue")) { | ||
createJobQueueTable(); | ||
createJobQueueTableIndexes(); | ||
} | ||
|
||
// job table | ||
if (!databaseMetaData.tableExists(connection, "job")) { | ||
createJobTable(); | ||
createJobTableIndexes(); | ||
} | ||
|
||
// job_history table | ||
if (!databaseMetaData.tableExists(connection, "job_history")) { | ||
createJobHistoryTable(); | ||
createJobHistoryTableIndexes(); | ||
} | ||
} catch (SQLException e) { | ||
Logger.error("Error creating job queue tables", e); | ||
throw new DotRuntimeException(e.getMessage(), e); | ||
} | ||
} | ||
|
||
/** | ||
* Creates the job_queue table. | ||
* | ||
* @throws DotDataException if an error occurs while creating the table. | ||
*/ | ||
private void createJobQueueTable() throws DotDataException { | ||
try { | ||
new DotConnect().executeStatement( | ||
"CREATE TABLE job_queue (" + | ||
"id VARCHAR(255) PRIMARY KEY, " + | ||
"queue_name VARCHAR(255) NOT NULL, " + | ||
"state VARCHAR(50) NOT NULL, " + | ||
"priority INTEGER DEFAULT 0, " + | ||
"created_at timestamptz NOT NULL)" | ||
); | ||
} catch (Exception ex) { | ||
throw new DotDataException(ex.getMessage(), ex); | ||
} | ||
} | ||
|
||
/** | ||
* Creates the necessary indexes for the job_queue table. | ||
* | ||
* @throws DotDataException if an error occurs while creating the indexes. | ||
*/ | ||
private void createJobQueueTableIndexes() throws DotDataException { | ||
try { | ||
new DotConnect().executeStatement( | ||
"CREATE INDEX idx_job_queue_status ON job_queue (state)"); | ||
new DotConnect().executeStatement( | ||
"CREATE INDEX idx_job_queue_priority_created_at ON job_queue (priority DESC, created_at ASC)"); | ||
} catch (Exception ex) { | ||
throw new DotDataException(ex.getMessage(), ex); | ||
} | ||
} | ||
|
||
/** | ||
* Creates the job table. | ||
* | ||
* @throws DotDataException if an error occurs while creating the table. | ||
*/ | ||
private void createJobTable() throws DotDataException { | ||
try { | ||
new DotConnect().executeStatement( | ||
"CREATE TABLE job (" + | ||
"id VARCHAR(255) PRIMARY KEY, " + | ||
"queue_name VARCHAR(255) NOT NULL, " + | ||
"state VARCHAR(50) NOT NULL, " + | ||
"parameters JSONB NOT NULL, " + | ||
"result JSONB, " + | ||
"progress FLOAT DEFAULT 0, " + | ||
"created_at timestamptz NOT NULL, " + | ||
"updated_at timestamptz NOT NULL, " + | ||
"started_at timestamptz, " + | ||
"completed_at timestamptz, " + | ||
"execution_node VARCHAR(255), " + | ||
"retry_count INTEGER DEFAULT 0)" | ||
); | ||
} catch (Exception ex) { | ||
throw new DotDataException(ex.getMessage(), ex); | ||
} | ||
} | ||
|
||
/** | ||
* Creates the necessary indexes for the job table. | ||
* | ||
* @throws DotDataException if an error occurs while creating the indexes. | ||
*/ | ||
private void createJobTableIndexes() throws DotDataException { | ||
try { | ||
new DotConnect().executeStatement( | ||
"CREATE INDEX idx_job_parameters ON job USING GIN (parameters)"); | ||
new DotConnect().executeStatement( | ||
"CREATE INDEX idx_job_result ON job USING GIN (result)"); | ||
new DotConnect().executeStatement("CREATE INDEX idx_job_status ON job (state)"); | ||
new DotConnect().executeStatement( | ||
"CREATE INDEX idx_job_created_at ON job (created_at)"); | ||
} catch (Exception ex) { | ||
throw new DotDataException(ex.getMessage(), ex); | ||
} | ||
} | ||
|
||
/** | ||
* Creates the job_history table. | ||
* | ||
* @throws DotDataException if an error occurs while creating the table. | ||
*/ | ||
private void createJobHistoryTable() throws DotDataException { | ||
try { | ||
new DotConnect().executeStatement( | ||
"CREATE TABLE job_history (" + | ||
"id VARCHAR(255) PRIMARY KEY, " + | ||
"job_id VARCHAR(255) NOT NULL, " + | ||
"state VARCHAR(50) NOT NULL, " + | ||
"execution_node VARCHAR(255), " + | ||
"created_at timestamptz NOT NULL, " + | ||
"result JSONB, " + | ||
"FOREIGN KEY (job_id) REFERENCES job (id))" | ||
); | ||
} catch (Exception ex) { | ||
throw new DotDataException(ex.getMessage(), ex); | ||
} | ||
} | ||
|
||
/** | ||
* Creates the necessary indexes for the job_history table. | ||
* | ||
* @throws DotDataException if an error occurs while creating the indexes. | ||
*/ | ||
private void createJobHistoryTableIndexes() throws DotDataException { | ||
try { | ||
new DotConnect().executeStatement( | ||
"CREATE INDEX idx_job_history_job_id ON job_history (job_id)"); | ||
new DotConnect().executeStatement( | ||
"CREATE INDEX idx_job_history_job_id_state ON job_history (job_id, state)"); | ||
} catch (Exception ex) { | ||
throw new DotDataException(ex.getMessage(), ex); | ||
} | ||
} | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
176 changes: 176 additions & 0 deletions
176
...est/java/com/dotmarketing/startup/runonce/Task241009CreatePostgresJobQueueTablesTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,176 @@ | ||
package com.dotmarketing.startup.runonce; | ||
|
||
import static org.junit.Assert.assertFalse; | ||
import static org.junit.Assert.assertTrue; | ||
|
||
import com.dotcms.IntegrationTestBase; | ||
import com.dotcms.util.IntegrationTestInitService; | ||
import com.dotmarketing.common.db.DotDatabaseMetaData; | ||
import com.dotmarketing.db.DbConnectionFactory; | ||
import com.dotmarketing.db.LocalTransaction; | ||
import com.dotmarketing.exception.DotDataException; | ||
import com.dotmarketing.exception.DotRuntimeException; | ||
import com.dotmarketing.exception.DotSecurityException; | ||
import com.dotmarketing.util.Logger; | ||
import java.sql.Connection; | ||
import java.sql.SQLException; | ||
import org.junit.Assert; | ||
import org.junit.BeforeClass; | ||
import org.junit.Test; | ||
|
||
/** | ||
* Test class for {@link Task241009CreatePostgresJobQueueTables}. | ||
* <p> | ||
* This test class ensures that the job queue tables are properly created by the upgrade task. | ||
* <p> | ||
* The test first drops the tables if they exist, then executes the upgrade task and validates that | ||
* the tables were successfully created. | ||
*/ | ||
public class Task241009CreatePostgresJobQueueTablesTest extends IntegrationTestBase { | ||
|
||
/** | ||
* Initializes the test environment and ensures the job queue tables do not exist. | ||
* | ||
* @throws Exception if an error occurs during initialization. | ||
*/ | ||
@BeforeClass | ||
public static void setup() throws Exception { | ||
|
||
// Setting web app environment | ||
IntegrationTestInitService.getInstance().init(); | ||
} | ||
|
||
/** | ||
* Drops the job queue tables if they exist. | ||
*/ | ||
private void dropTablesIfExist() { | ||
|
||
try { | ||
|
||
final Connection connection = DbConnectionFactory.getConnection(); | ||
final DotDatabaseMetaData databaseMetaData = new DotDatabaseMetaData(); | ||
|
||
if (databaseMetaData.tableExists(connection, "job_queue")) { | ||
databaseMetaData.dropTable(connection, "job_queue"); | ||
} | ||
if (databaseMetaData.tableExists(connection, "job_history")) { | ||
databaseMetaData.dropTable(connection, "job_history"); | ||
} | ||
if (databaseMetaData.tableExists(connection, "job")) { | ||
databaseMetaData.dropTable(connection, "job"); | ||
} | ||
|
||
} catch (Exception e) { | ||
throw new DotRuntimeException(e); | ||
} | ||
} | ||
|
||
/** | ||
* Method to test {@link Task241009CreatePostgresJobQueueTables#executeUpgrade()} and | ||
* {@link Task241009CreatePostgresJobQueueTables#forceRun()}. | ||
* <p> | ||
* Given Scenario: The job queue tables do not exist. | ||
* <p> | ||
* Expected Result: The job queue tables will be created after running the upgrade task. | ||
* | ||
* @throws SQLException if a SQL error occurs. | ||
* @throws DotDataException if a data access error occurs. | ||
* @throws DotSecurityException if a security error occurs. | ||
*/ | ||
@Test | ||
public void executeTaskUpgrade() throws SQLException, DotDataException, DotSecurityException { | ||
|
||
try { | ||
// First, ensure the tables do not exist | ||
LocalTransaction.wrap(() -> { | ||
try { | ||
dropTablesIfExist(); | ||
} catch (Exception e) { | ||
throw new DotRuntimeException(e); | ||
} | ||
}); | ||
|
||
// Running the upgrade task and validating the tables were created | ||
executeUpgradeAndValidate(); | ||
} finally { | ||
DbConnectionFactory.closeSilently(); | ||
} | ||
} | ||
|
||
/** | ||
* Method to test {@link Task241009CreatePostgresJobQueueTables#executeUpgrade()} and | ||
* {@link Task241009CreatePostgresJobQueueTables#forceRun()}. | ||
* <p> | ||
* Given Scenario: The job queue tables do not exist, and the upgrade task is run twice. | ||
* <p> | ||
* Expected Result: The job queue tables will be created after running the upgrade task the | ||
* first time and should not fail when running the upgrade task again. | ||
* | ||
* @throws SQLException if a SQL error occurs. | ||
* @throws DotDataException if a data access error occurs. | ||
* @throws DotSecurityException if a security error occurs. | ||
*/ | ||
@Test | ||
public void executeTaskUpgradeTwice() | ||
throws SQLException, DotDataException, DotSecurityException { | ||
|
||
try { | ||
// First, ensure the tables do not exist | ||
LocalTransaction.wrap(() -> { | ||
try { | ||
dropTablesIfExist(); | ||
} catch (Exception e) { | ||
throw new DotRuntimeException(e); | ||
} | ||
}); | ||
|
||
// Run the upgrade task for the first time, it should create the tables | ||
executeUpgradeAndValidate(); | ||
|
||
// Run the upgrade task again, should not fail | ||
LocalTransaction.wrap(() -> { | ||
try { | ||
final var task = new Task241009CreatePostgresJobQueueTables(); | ||
task.executeUpgrade(); | ||
} catch (Exception e) { | ||
final var message = "The upgrade task should not fail when the tables already exist"; | ||
Logger.error(message, e); | ||
Assert.fail(message); | ||
} | ||
}); | ||
} finally { | ||
DbConnectionFactory.closeSilently(); | ||
} | ||
} | ||
|
||
/** | ||
* Executes the upgrade task and validates the job queue tables were created. | ||
*/ | ||
private static void executeUpgradeAndValidate() | ||
throws SQLException, DotDataException, DotSecurityException { | ||
|
||
final var task = new Task241009CreatePostgresJobQueueTables(); | ||
final Connection connection = DbConnectionFactory.getConnection(); | ||
final DotDatabaseMetaData databaseMetaData = new DotDatabaseMetaData(); | ||
|
||
// Ensure the tables do not exist before the upgrade | ||
assertFalse(databaseMetaData.tableExists(connection, "job_queue")); | ||
assertFalse(databaseMetaData.tableExists(connection, "job")); | ||
assertFalse(databaseMetaData.tableExists(connection, "job_history")); | ||
|
||
assertTrue(task.forceRun()); | ||
LocalTransaction.wrap(() -> { | ||
try { | ||
task.executeUpgrade(); | ||
} catch (Exception e) { | ||
throw new DotRuntimeException(e); | ||
} | ||
}); | ||
|
||
// Validate the tables were created after the upgrade | ||
assertTrue(databaseMetaData.tableExists(connection, "job_queue")); | ||
assertTrue(databaseMetaData.tableExists(connection, "job")); | ||
assertTrue(databaseMetaData.tableExists(connection, "job_history")); | ||
} | ||
|
||
} |