From 8f2ccfa7e29e6b52a56229daad8baaaf9d50f2e4 Mon Sep 17 00:00:00 2001 From: Jonathan Gamba Date: Fri, 11 Oct 2024 14:22:14 -0600 Subject: [PATCH] chore (Core): Created upgrade task for Postgres job queue Tables (#30325) This pull request introduces a new task to create the necessary tables and indexes for the Postgres Job Queue implementation. It includes the implementation of the task, updates to the task locator, and the addition of corresponding tests. ### Implementation of Postgres Job Queue Tables * [`dotCMS/src/main/java/com/dotmarketing/startup/runonce/Task241009CreatePostgresJobQueueTables.java`](diffhunk://#diff-770c772049f1e161c67da6e326cb57b8f5c6ca1cf22bea61d8d79d8f94b4f4f2R1-R134): Added a new startup task to create `job_queue`, `job`, and `job_history` tables along with their associated indexes. ### Task Locator Update * [`dotCMS/src/main/java/com/dotmarketing/util/TaskLocatorUtil.java`](diffhunk://#diff-67a18c9d18fc6ddfd57f180a2155c32429cd137f8492419e77d4956e137f7a8bR352): Updated the `getStartupRunOnceTaskClasses` method to include `Task241009CreatePostgresJobQueueTables`. ### Test Suite Update * [`dotcms-integration/src/test/java/com/dotcms/MainSuite2b.java`](diffhunk://#diff-3a7b8388a34da3ff66e6b26e2fa9c4062c81f7c33f94d5df5ff30d92de8185acL329-R330): Added `Task241009CreatePostgresJobQueueTablesTest` to the test suite. ### New Test Class * [`dotcms-integration/src/test/java/com/dotmarketing/startup/runonce/Task241009CreatePostgresJobQueueTablesTest.java`](diffhunk://#diff-9e16296a2c0c0a5ffed890515d398b5f10be73915db863fc3c3d56e73d7fb44fR1-R105): Added a new test class to ensure the job queue tables are properly created by the upgrade task. --- ...ask241009CreatePostgresJobQueueTables.java | 182 ++++++++++++++++++ .../dotmarketing/util/TaskLocatorUtil.java | 1 + .../src/test/java/com/dotcms/MainSuite2b.java | 4 +- ...41009CreatePostgresJobQueueTablesTest.java | 176 +++++++++++++++++ 4 files changed, 362 insertions(+), 1 deletion(-) create mode 100644 dotCMS/src/main/java/com/dotmarketing/startup/runonce/Task241009CreatePostgresJobQueueTables.java create mode 100644 dotcms-integration/src/test/java/com/dotmarketing/startup/runonce/Task241009CreatePostgresJobQueueTablesTest.java diff --git a/dotCMS/src/main/java/com/dotmarketing/startup/runonce/Task241009CreatePostgresJobQueueTables.java b/dotCMS/src/main/java/com/dotmarketing/startup/runonce/Task241009CreatePostgresJobQueueTables.java new file mode 100644 index 000000000000..df1d33198c28 --- /dev/null +++ b/dotCMS/src/main/java/com/dotmarketing/startup/runonce/Task241009CreatePostgresJobQueueTables.java @@ -0,0 +1,182 @@ +package com.dotmarketing.startup.runonce; + +import com.dotmarketing.common.db.DotConnect; +import com.dotmarketing.common.db.DotDatabaseMetaData; +import com.dotmarketing.db.DbConnectionFactory; +import com.dotmarketing.exception.DotDataException; +import com.dotmarketing.exception.DotRuntimeException; +import com.dotmarketing.startup.StartupTask; +import com.dotmarketing.util.Logger; +import java.sql.Connection; +import java.sql.SQLException; + +/** + * Upgrade task to create the necessary tables and indexes for the Postgres Job Queue + * implementation. This task creates the job_queue, job, and job_history tables along with their + * associated indexes. + */ +public class Task241009CreatePostgresJobQueueTables implements StartupTask { + + @Override + public boolean forceRun() { + return true; + } + + /** + * Executes the upgrade task, creating the necessary tables and indexes for the Job Queue. + * + * @throws DotDataException if a data access error occurs. + * @throws DotRuntimeException if a runtime error occurs. + */ + @Override + public void executeUpgrade() throws DotDataException, DotRuntimeException { + + final Connection connection = DbConnectionFactory.getConnection(); + final DotDatabaseMetaData databaseMetaData = new DotDatabaseMetaData(); + + try { + + // job_queue table + if (!databaseMetaData.tableExists(connection, "job_queue")) { + createJobQueueTable(); + createJobQueueTableIndexes(); + } + + // job table + if (!databaseMetaData.tableExists(connection, "job")) { + createJobTable(); + createJobTableIndexes(); + } + + // job_history table + if (!databaseMetaData.tableExists(connection, "job_history")) { + createJobHistoryTable(); + createJobHistoryTableIndexes(); + } + } catch (SQLException e) { + Logger.error("Error creating job queue tables", e); + throw new DotRuntimeException(e.getMessage(), e); + } + } + + /** + * Creates the job_queue table. + * + * @throws DotDataException if an error occurs while creating the table. + */ + private void createJobQueueTable() throws DotDataException { + try { + new DotConnect().executeStatement( + "CREATE TABLE job_queue (" + + "id VARCHAR(255) PRIMARY KEY, " + + "queue_name VARCHAR(255) NOT NULL, " + + "state VARCHAR(50) NOT NULL, " + + "priority INTEGER DEFAULT 0, " + + "created_at timestamptz NOT NULL)" + ); + } catch (Exception ex) { + throw new DotDataException(ex.getMessage(), ex); + } + } + + /** + * Creates the necessary indexes for the job_queue table. + * + * @throws DotDataException if an error occurs while creating the indexes. + */ + private void createJobQueueTableIndexes() throws DotDataException { + try { + new DotConnect().executeStatement( + "CREATE INDEX idx_job_queue_status ON job_queue (state)"); + new DotConnect().executeStatement( + "CREATE INDEX idx_job_queue_priority_created_at ON job_queue (priority DESC, created_at ASC)"); + } catch (Exception ex) { + throw new DotDataException(ex.getMessage(), ex); + } + } + + /** + * Creates the job table. + * + * @throws DotDataException if an error occurs while creating the table. + */ + private void createJobTable() throws DotDataException { + try { + new DotConnect().executeStatement( + "CREATE TABLE job (" + + "id VARCHAR(255) PRIMARY KEY, " + + "queue_name VARCHAR(255) NOT NULL, " + + "state VARCHAR(50) NOT NULL, " + + "parameters JSONB NOT NULL, " + + "result JSONB, " + + "progress FLOAT DEFAULT 0, " + + "created_at timestamptz NOT NULL, " + + "updated_at timestamptz NOT NULL, " + + "started_at timestamptz, " + + "completed_at timestamptz, " + + "execution_node VARCHAR(255), " + + "retry_count INTEGER DEFAULT 0)" + ); + } catch (Exception ex) { + throw new DotDataException(ex.getMessage(), ex); + } + } + + /** + * Creates the necessary indexes for the job table. + * + * @throws DotDataException if an error occurs while creating the indexes. + */ + private void createJobTableIndexes() throws DotDataException { + try { + new DotConnect().executeStatement( + "CREATE INDEX idx_job_parameters ON job USING GIN (parameters)"); + new DotConnect().executeStatement( + "CREATE INDEX idx_job_result ON job USING GIN (result)"); + new DotConnect().executeStatement("CREATE INDEX idx_job_status ON job (state)"); + new DotConnect().executeStatement( + "CREATE INDEX idx_job_created_at ON job (created_at)"); + } catch (Exception ex) { + throw new DotDataException(ex.getMessage(), ex); + } + } + + /** + * Creates the job_history table. + * + * @throws DotDataException if an error occurs while creating the table. + */ + private void createJobHistoryTable() throws DotDataException { + try { + new DotConnect().executeStatement( + "CREATE TABLE job_history (" + + "id VARCHAR(255) PRIMARY KEY, " + + "job_id VARCHAR(255) NOT NULL, " + + "state VARCHAR(50) NOT NULL, " + + "execution_node VARCHAR(255), " + + "created_at timestamptz NOT NULL, " + + "result JSONB, " + + "FOREIGN KEY (job_id) REFERENCES job (id))" + ); + } catch (Exception ex) { + throw new DotDataException(ex.getMessage(), ex); + } + } + + /** + * Creates the necessary indexes for the job_history table. + * + * @throws DotDataException if an error occurs while creating the indexes. + */ + private void createJobHistoryTableIndexes() throws DotDataException { + try { + new DotConnect().executeStatement( + "CREATE INDEX idx_job_history_job_id ON job_history (job_id)"); + new DotConnect().executeStatement( + "CREATE INDEX idx_job_history_job_id_state ON job_history (job_id, state)"); + } catch (Exception ex) { + throw new DotDataException(ex.getMessage(), ex); + } + } + +} \ No newline at end of file diff --git a/dotCMS/src/main/java/com/dotmarketing/util/TaskLocatorUtil.java b/dotCMS/src/main/java/com/dotmarketing/util/TaskLocatorUtil.java index 74e6a4959a08..0863430c1676 100644 --- a/dotCMS/src/main/java/com/dotmarketing/util/TaskLocatorUtil.java +++ b/dotCMS/src/main/java/com/dotmarketing/util/TaskLocatorUtil.java @@ -349,6 +349,7 @@ public static List> getStartupRunOnceTaskClasses() { .add(Task240530AddDotAIPortletToLayout.class) .add(Task240606AddVariableColumnToWorkflow.class) .add(Task241013RemoveFullPathLcColumnFromIdentifier.class) + .add(Task241009CreatePostgresJobQueueTables.class) .build(); return ret.stream().sorted(classNameComparator).collect(Collectors.toList()); } diff --git a/dotcms-integration/src/test/java/com/dotcms/MainSuite2b.java b/dotcms-integration/src/test/java/com/dotcms/MainSuite2b.java index 82c4062672e1..9e0f3090ba21 100644 --- a/dotcms-integration/src/test/java/com/dotcms/MainSuite2b.java +++ b/dotcms-integration/src/test/java/com/dotcms/MainSuite2b.java @@ -177,6 +177,7 @@ import com.dotmarketing.startup.runonce.Task240513UpdateContentTypesSystemFieldTest; import com.dotmarketing.startup.runonce.Task240530AddDotAIPortletToLayoutTest; import com.dotmarketing.startup.runonce.Task240606AddVariableColumnToWorkflowTest; +import com.dotmarketing.startup.runonce.Task241009CreatePostgresJobQueueTablesTest; import com.dotmarketing.startup.runonce.Task241013RemoveFullPathLcColumnFromIdentifierTest; import com.dotmarketing.util.ConfigUtilsTest; import com.dotmarketing.util.ITConfigTest; @@ -383,7 +384,8 @@ ConfigUtilsTest.class, SimpleInjectionIT.class, LegacyJSONObjectRenderTest.class, - Task241013RemoveFullPathLcColumnFromIdentifierTest.class + Task241013RemoveFullPathLcColumnFromIdentifierTest.class, + Task241009CreatePostgresJobQueueTablesTest.class }) public class MainSuite2b { diff --git a/dotcms-integration/src/test/java/com/dotmarketing/startup/runonce/Task241009CreatePostgresJobQueueTablesTest.java b/dotcms-integration/src/test/java/com/dotmarketing/startup/runonce/Task241009CreatePostgresJobQueueTablesTest.java new file mode 100644 index 000000000000..6f68e59edd57 --- /dev/null +++ b/dotcms-integration/src/test/java/com/dotmarketing/startup/runonce/Task241009CreatePostgresJobQueueTablesTest.java @@ -0,0 +1,176 @@ +package com.dotmarketing.startup.runonce; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import com.dotcms.IntegrationTestBase; +import com.dotcms.util.IntegrationTestInitService; +import com.dotmarketing.common.db.DotDatabaseMetaData; +import com.dotmarketing.db.DbConnectionFactory; +import com.dotmarketing.db.LocalTransaction; +import com.dotmarketing.exception.DotDataException; +import com.dotmarketing.exception.DotRuntimeException; +import com.dotmarketing.exception.DotSecurityException; +import com.dotmarketing.util.Logger; +import java.sql.Connection; +import java.sql.SQLException; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Test class for {@link Task241009CreatePostgresJobQueueTables}. + *

+ * This test class ensures that the job queue tables are properly created by the upgrade task. + *

+ * The test first drops the tables if they exist, then executes the upgrade task and validates that + * the tables were successfully created. + */ +public class Task241009CreatePostgresJobQueueTablesTest extends IntegrationTestBase { + + /** + * Initializes the test environment and ensures the job queue tables do not exist. + * + * @throws Exception if an error occurs during initialization. + */ + @BeforeClass + public static void setup() throws Exception { + + // Setting web app environment + IntegrationTestInitService.getInstance().init(); + } + + /** + * Drops the job queue tables if they exist. + */ + private void dropTablesIfExist() { + + try { + + final Connection connection = DbConnectionFactory.getConnection(); + final DotDatabaseMetaData databaseMetaData = new DotDatabaseMetaData(); + + if (databaseMetaData.tableExists(connection, "job_queue")) { + databaseMetaData.dropTable(connection, "job_queue"); + } + if (databaseMetaData.tableExists(connection, "job_history")) { + databaseMetaData.dropTable(connection, "job_history"); + } + if (databaseMetaData.tableExists(connection, "job")) { + databaseMetaData.dropTable(connection, "job"); + } + + } catch (Exception e) { + throw new DotRuntimeException(e); + } + } + + /** + * Method to test {@link Task241009CreatePostgresJobQueueTables#executeUpgrade()} and + * {@link Task241009CreatePostgresJobQueueTables#forceRun()}. + *

+ * Given Scenario: The job queue tables do not exist. + *

+ * Expected Result: The job queue tables will be created after running the upgrade task. + * + * @throws SQLException if a SQL error occurs. + * @throws DotDataException if a data access error occurs. + * @throws DotSecurityException if a security error occurs. + */ + @Test + public void executeTaskUpgrade() throws SQLException, DotDataException, DotSecurityException { + + try { + // First, ensure the tables do not exist + LocalTransaction.wrap(() -> { + try { + dropTablesIfExist(); + } catch (Exception e) { + throw new DotRuntimeException(e); + } + }); + + // Running the upgrade task and validating the tables were created + executeUpgradeAndValidate(); + } finally { + DbConnectionFactory.closeSilently(); + } + } + + /** + * Method to test {@link Task241009CreatePostgresJobQueueTables#executeUpgrade()} and + * {@link Task241009CreatePostgresJobQueueTables#forceRun()}. + *

+ * Given Scenario: The job queue tables do not exist, and the upgrade task is run twice. + *

+ * Expected Result: The job queue tables will be created after running the upgrade task the + * first time and should not fail when running the upgrade task again. + * + * @throws SQLException if a SQL error occurs. + * @throws DotDataException if a data access error occurs. + * @throws DotSecurityException if a security error occurs. + */ + @Test + public void executeTaskUpgradeTwice() + throws SQLException, DotDataException, DotSecurityException { + + try { + // First, ensure the tables do not exist + LocalTransaction.wrap(() -> { + try { + dropTablesIfExist(); + } catch (Exception e) { + throw new DotRuntimeException(e); + } + }); + + // Run the upgrade task for the first time, it should create the tables + executeUpgradeAndValidate(); + + // Run the upgrade task again, should not fail + LocalTransaction.wrap(() -> { + try { + final var task = new Task241009CreatePostgresJobQueueTables(); + task.executeUpgrade(); + } catch (Exception e) { + final var message = "The upgrade task should not fail when the tables already exist"; + Logger.error(message, e); + Assert.fail(message); + } + }); + } finally { + DbConnectionFactory.closeSilently(); + } + } + + /** + * Executes the upgrade task and validates the job queue tables were created. + */ + private static void executeUpgradeAndValidate() + throws SQLException, DotDataException, DotSecurityException { + + final var task = new Task241009CreatePostgresJobQueueTables(); + final Connection connection = DbConnectionFactory.getConnection(); + final DotDatabaseMetaData databaseMetaData = new DotDatabaseMetaData(); + + // Ensure the tables do not exist before the upgrade + assertFalse(databaseMetaData.tableExists(connection, "job_queue")); + assertFalse(databaseMetaData.tableExists(connection, "job")); + assertFalse(databaseMetaData.tableExists(connection, "job_history")); + + assertTrue(task.forceRun()); + LocalTransaction.wrap(() -> { + try { + task.executeUpgrade(); + } catch (Exception e) { + throw new DotRuntimeException(e); + } + }); + + // Validate the tables were created after the upgrade + assertTrue(databaseMetaData.tableExists(connection, "job_queue")); + assertTrue(databaseMetaData.tableExists(connection, "job")); + assertTrue(databaseMetaData.tableExists(connection, "job_history")); + } + +} \ No newline at end of file