diff --git a/.github/workflows/build_on_pr.yaml b/.github/workflows/build_on_pr.yaml
new file mode 100644
index 0000000..de30294
--- /dev/null
+++ b/.github/workflows/build_on_pr.yaml
@@ -0,0 +1,25 @@
+# This workflow uses actions that are not certified by GitHub.
+# They are provided by a third-party and are governed by
+# separate terms of service, privacy policy, and support
+# documentation.
+
+name: Main branch PR Build
+on:
+ pull_request:
+ branches:
+ - main
+
+jobs:
+ build:
+ runs-on: ubuntu-latest
+ name: Gradle Build
+ steps:
+ - name: Checkout repo
+ uses: actions/checkout@v3
+ - name: Set up Zulu JDK 11
+ uses: actions/setup-java@v3
+ with:
+ distribution: 'zulu'
+ java-version: '11'
+ - name: Build
+ run: ./gradlew clean build
diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml
new file mode 100644
index 0000000..e705a9c
--- /dev/null
+++ b/.github/workflows/ci.yaml
@@ -0,0 +1,18 @@
+name: CI
+
+on: [ push, pull_request ]
+
+jobs:
+ build:
+ runs-on: ubuntu-latest
+ name: Gradle Build
+ steps:
+ - name: Checkout repo
+ uses: actions/checkout@v3
+ - name: Set up Zulu JDK 11
+ uses: actions/setup-java@v3
+ with:
+ distribution: 'zulu'
+ java-version: '11'
+ - name: Build
+ run: ./gradlew clean build
\ No newline at end of file
diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml
new file mode 100644
index 0000000..4461526
--- /dev/null
+++ b/.github/workflows/release.yaml
@@ -0,0 +1,91 @@
+# This workflow uses actions that are not certified by GitHub.
+# They are provided by a third-party and are governed by
+# separate terms of service, privacy policy, and support
+# documentation.
+
+name: Publish release to Maven Central
+on:
+ release:
+ types:
+ - released
+ - prereleased
+ push:
+ branches: [ documentation ]
+
+jobs:
+ publish:
+ runs-on: ubuntu-latest
+ environment: prod
+ name: Gradle Build and Publish
+ steps:
+ - name: Checkout repo
+ uses: actions/checkout@v3
+ - name: Set up Zulu JDK 11
+ uses: actions/setup-java@v3
+ with:
+ distribution: 'zulu'
+ java-version: '11'
+ - name: Publish
+ run: |
+ export VERSION=${{github.ref_name}}
+ export REL_VER=`echo ${VERSION:1}`
+ echo "Release version is $REL_VER"
+ echo "RELEASE_VERSION=$REL_VER" >> $GITHUB_ENV
+ ./gradlew -x test publish -Pversion=$REL_VER -PmavenCentral -Pusername=${{ secrets.SONATYPE_USERNAME }} -Ppassword=${{ secrets.SONATYPE_PASSWORD }}
+ echo "Building UI"
+ ls -ltr server/build/libs
+ docker/build-ui.sh
+ echo "Done building UI"
+ - name: Set up Docker Buildx
+ uses: docker/setup-buildx-action@v1
+
+ - name: Login to Docker Hub Container Registry
+ uses: docker/login-action@v1
+ with:
+ username: ${{ secrets.DOCKERHUB_USERNAME }}
+ password: ${{ secrets.DOCKERHUB_TOKEN }}
+
+ - name: Set up QEMU
+ uses: docker/setup-qemu-action@v2
+
+ - name: Set up Docker Buildx
+ uses: docker/setup-buildx-action@v2
+
+ - name: Build and push Server
+ uses: docker/build-push-action@v3
+ with:
+ context: .
+ file: docker/DockerfileServer
+ push: true
+ platforms: linux/arm64,linux/amd64
+ tags: |
+ orkesio/orkes-conductor-community:latest
+ orkesio/orkes-conductor-community:${{ env.RELEASE_VERSION }}
+
+ - name: Build and push Server
+ uses: docker/build-push-action@v3
+ with:
+ context: .
+ file: docker/DockerfileStandalone
+ push: true
+ platforms: linux/arm64,linux/amd64
+ tags: |
+ orkesio/orkes-conductor-community-standalone:latest
+ orkesio/orkes-conductor-community-standalone:${{ env.RELEASE_VERSION }}
+ env:
+ ORG_GRADLE_PROJECT_signingKeyId: ${{ secrets.SIGNING_KEY_ID }}
+ ORG_GRADLE_PROJECT_signingKey: ${{ secrets.SIGNING_KEY }}
+ ORG_GRADLE_PROJECT_signingPassword: ${{ secrets.SIGNING_PASSWORD }}
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..03cd24b
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,30 @@
+# Compiled class file
+*.class
+
+# Log file
+*.log
+
+# BlueJ files
+*.ctxt
+
+# Mobile Tools for Java (J2ME)
+.mtj.tmp/
+
+# Package Files #
+*.war
+*.nar
+*.ear
+*.zip
+*.tar.gz
+*.rar
+
+# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
+hs_err_pid*
+.idea/
+.gradle/
+build/
+
+.DS_Store
+tmp/
+out/
+build/
diff --git a/LICENSE.txt b/LICENSE.txt
new file mode 100644
index 0000000..4204bd5
--- /dev/null
+++ b/LICENSE.txt
@@ -0,0 +1,207 @@
+ Orkes Community License Agreement
+
+PLEASE READ THIS ORKES COMMUNITY LICENSE AGREEMENT (“Agreement”). BY
+DOWNLOADING, INSTALLING, USING OR DISTRIBUTING THE SOFTWARE
+(DEFINED BELOW) (“Software”), YOU AND ANY ENTITY YOU REPRESENT (“Licensee”
+or “you”) AGREE TO BE BOUND BY THIS AGREEMENT WITH ORKES, INC., A
+DELAWARE CORPORATION (“Orkes”). IF AT ANYTIME YOU DO NOT AGREE TO
+ALL OF THE TERMS OF THIS AGREEMENT, THEN DO NOT DOWNLOAD, INSTALL,
+USE OR DISTRIBUTE THE SOFTWARE AND IMMEDIATELY CEASE ALL USE OF THE
+SOFTWARE.
+
+By agreeing to this Agreement, you represent that you have full power, capacity and authority to
+accept the terms of this Agreement. If you are accepting the terms of this Agreement on behalf
+of an employer or another entity, you and such employer or other entity represent that you have
+full legal authority to bind such employer or other entity to this Agreement.
+Orkes and Licensee agree to the following terms and conditions:
+
+ Article 1
+Definitions
+1.1 “Conductor Community Software” means the Conductor community software,
+sometimes referred to by the community as Netflix Conductor, contributed by the community
+under an Apache 2.0 license, which has been made available at
+https://github.com/Netflix/conductor.
+1.2 “Intellectual Property Rights” mean all legally protectable proprietary or intellectual
+property rights in the United States, and anywhere in the world, past, present and future,
+including without limitation any patent, copyright, or trade secret. For the purposes of this
+Agreement, Intellectual Property Rights do not include trademarks.
+1.3 “Source Code” means software in source code or human readable form.
+1.4 “Source Materials” means those portions of the Software furnished to Licensee by Orkes
+in Source Code.
+1.5 “Software” means the Orkes software licensed under this Agreement as may be identified
+at the following url: https://github.com/orkes-io/licenses/blob/main/community/SOFTWARE.txt
+
+ Article 2
+License Grant
+2.1 Software License. As of the Effective Date, subject to the terms and conditions of this
+Agreement, including, without limitation, Article 3 (License Restrictions and Intellectual
+Property Rights), Orkes grants Licensee a limited, non-exclusive, non-transferable, and non-
+sublicensable license, to:
+ (a) evaluate the Software in Licensee’s development and testing environments;
+ (b) use the Software internally for Licensee’s internal business purposes;
+ (c) modify and make derivative works of the Software;
+ (d) reproduce the Software; and
+ (e) distribute Software in binary or Source Code form.
+2.2 Conditions to License. The licenses under Article 2 (License Grants) are conditioned
+upon Licensee’s compliance with the terms of this Agreement including without limitation
+Article 3 (License Restrictions and Intellectual Property Rights).
+2.3 Third Party Intellectual Property. The Software may use, include or rely on third-party
+software or other Intellectual Property Rights (“Third-Party Intellectual Property”). Other than
+the Conductor Community Software included in the Software that is licensed under the Apache
+2.0 license, any Third Party Intellectual Property that Orkes provides to Licensee is for
+convenience only, and is not part of the Software and is not licensed hereunder. Licensee is
+solely responsible for procuring and complying with, and shall procure and comply with, any
+necessary license rights if Licensee uses any Third-Party Intellectual Property.
+
+ Article 3
+License Restrictions and Intellectual Property Rights
+3.1 Orkes Software Notice. Licensee shall not remove or modify any Orkes or third-party
+copyright or other proprietary notices on the Software. Additionally, Licensee shall provide the
+following notice on each copy of Software:
+THIS SOFTWARE IS MADE AVAILABLE BY ORKES, INC., A DELAWARE
+CORPORATION (“Orkes”) UNDER THE ORKES COMMUNITY LICENSE
+AGREEMENT (“Agreement”). BY DOWNLOADING, INSTALLING, USING
+OR DISTRIBUTING THE SOFTWARE, YOU AND ANY ENTITY YOU
+REPRESENT (“Licensee” or “you”) AGREE TO BE BOUND BY THE ORKES
+COMMUNITY LICENSE AGREEMENT AT ALL TIMES. IF YOU DO NOT
+AGREE TO ALL OF THE TERMS OF THE ORKES COMMUNITY LICENSE
+AGREEMENT AT ANY TIME, THEN DO NOT DOWNLOAD, INSTALL,
+USE OR DISTRIBUTE THE SOFTWARE AND YOU SHALL
+IMMEDIATELY CEASE ANY USE OF THE SOFTWARE.
+3.2 Restrictions. Licensee agrees that Licensee shall not: (i) use the Software outside of the
+scope of the license granted hereunder or in violation of any restrictions hereunder; or (ii) export
+or re-export the Software directly or indirectly in violation of the laws of the United States or any
+other jurisdiction.
+3.3 Competitive Products. Licensee shall not (i) use or provide the Software as a service for
+any third party (including as software-as-a-service, time-sharing or service bureau), (ii)
+otherwise provide the Software to a third party in competition with the Software or in
+competition with Orkes, (iii) use the Software (including any Source Materials) to develop any
+product, technology or service that competes with the Software or any Orkes product or service
+or (iv) allow its personnel who have access to the Software to develop any such competitive
+product, technology or service.
+3.4 Open Source Software. Licensee shall not use or combine the Source Materials (or the
+Software) with open-source software or other items in any manner which would subject the
+Software to any additional open source software terms and conditions. For clarity, the foregoing
+does not prohibit Licensee from combining and using the Software with Conductor code that is
+subject only to the Apache 2.0 license or subject to an Orkes community license.
+3.5 Ownership by Orkes. Orkes retains all Orkes’ Intellectual Property Rights covering or
+embodied in the Software, subject to the limited licenses granted to Licensee under this
+Agreement and any third-party rights in the Software.
+3.6 No Trademark License. Licensee acquires no right or license to any trademarks of Orkes
+hereunder.
+3.7 No Other Rights. All Intellectual Property Rights of Orkes not expressly granted to
+Licensee in this Agreement are expressly reserved by Orkes. Without limitation, Licensee
+receives no right or license, by implication, estoppel or otherwise, to any software, product,
+technology or Intellectual Property Rights not embodied in the Software, even if such other
+software, technology or Intellectual Property Rights are useful or necessary in connection with
+the Software. Licensee agrees not to claim, assert or assist in the claim or assertion of any such
+license or right disclaimed as provided above.
+
+ Article 4
+No Warranty
+THE SOFTWARE IS PROVIDED “AS-IS” WITHOUT ANY WARRANTY OF ANY KIND.
+TO THE MAXIMUM EXTENT PERMITTED BY LAW, ORKES DISCLAIMS ALL
+WARRANTIES, CONDITIONS AND REPRESENTATIONS (EXPRESS OR IMPLIED,
+ORAL OR WRITTEN) WITH RESPECT TO THE SOFTWARE, INCLUDING, WITHOUT
+LIMITATION, WARRANTIES OF MERCHANTABILITY OR FITNESS FOR A
+PARTICULAR PURPOSE.
+
+ Article 5
+Licensee Responsibility
+Licensee, and not Orkes, is solely responsible for any warranties and covenants Licensee makes in
+connection with the Licensee’s products and services as well as the Software and any results thereof, and
+any resulting claims from any customers or other third party. Without limiting the foregoing, Licensee is
+responsible for complying with applicable law in connection with use of the Software and verifying and
+validating the suitability and reliability of the Software for all of Licensee’s use thereof. Further,
+Licensee must take prudent steps to protect against failures when the Software or results thereof is
+incorporated in a system or application, including providing back-up and shut-down mechanisms.
+
+ Article 6
+Limitation of Liability
+6.1 Limitation of Liability. ORKES SHALL NOT BE LIABLE TO LICENSEE FOR ANY
+SPECIAL, INDIRECT, INCIDENTAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES,
+INCLUDING, WITHOUT LIMITATION, LOST PROFITS, BUSINESS INTERRUPTION OR
+LOSS OF INFORMATION, IN ANY WAY RELATED TO THIS AGREEMENT,
+REGARDLESS OF WHETHER SUCH PARTY WAS ADVISED OF THE POSSIBILITY OF
+ANY OF THE FOREGOING. IN NO EVENT SHALL THE TOTAL COLLECTIVE
+LIABILITY OF ORKES FOR ALL CLAIMS HEREUNDER OR IN ANY WAY RELATED
+TO THIS AGREEMENT EXCEED THE GREATER OF (I) THE TWENTY PERCENT (20%)
+OF AGGREGATE AMOUNTS PAID OR OWED BY LICENSEE UNDER THIS
+AGREEMENT IN THE PRECEDING TWELVE (12) MONTHS OR (II) ONE HUNDRED
+DOLLARS ($100).
+6.2 Allocation of Risk. The warranty disclaimer and limitations of liability set forth in this
+Agreement shall apply irrespective of any failure of the essential purpose of any limited remedy.
+Licensee acknowledges and agrees that, but for these provisions, Orkes would not have made the
+Software available to Licensee under the terms contemplated under this Agreement.
+6.3 Applicable Law. The warranty disclaimer and limitations of liability set forth in this
+Agreement shall not apply to the extent prohibited by law, in which case the disclaimer or
+limitation shall be modified to disclaim and/or limit in accordance with applicable law. Without
+limiting the foregoing, to the extent required by law, the foregoing limitations shall not apply to
+claims due to fraud, bodily injury or death.
+
+ Article 7
+Termination
+7.1 Term. This Agreement shall become effective until terminated.
+7.2 Termination for Cause. Orkes may terminate this Agreement upon written notice to the
+other upon the other Licensee’s material breach of this Agreement, which breach is incurable or,
+if curable, remains uncured for thirty (30) days after notice to Licensee.
+7.3 Termination for Bankruptcy. If Licensee (a) becomes insolvent or bankrupt, (b) dissolves
+or ceases to conduct business in the ordinary course, (c) makes an assignment for the benefit of
+its creditors, (d) commences any insolvency, receivership, bankruptcy or other similar
+proceeding for the settlement of its debts or (e) has commenced against it any insolvency,
+receivership, bankruptcy or other similar proceeding for the settlement of its debts that is not
+dismissed within thirty (30) days after notice of such proceeding, then Orkes may terminate this
+Agreement immediately upon written notice to Licensee.
+7.4 Effect of Termination. If this Agreement is terminated or expires for any reason, all
+rights granted hereunder to Licensee shall terminate, and Licensee shall immediately cease all
+use of the Software. The provisions of Article 1 (Definitions), Article 3 (License Restrictions
+and Intellectual Property Rights), Article 4 (No Warranty), Article 5 (Limitation of Liability),
+Article 6 (Term and Termination) and Article 7 (Miscellaneous) shall survive termination of this
+Agreement.
+
+ Article 8
+Miscellaneous
+8.1 Relationship of Parties. The Parties to this Agreement are independent contractors, and
+this Agreement shall not establish any relationship of partnership, joint venture, employment,
+franchise or agency between the Parties. Neither Party shall have the power to bind the other or
+incur obligations on the other’s behalf without the other Party’s prior written consent.
+8.2 Assignment. Licensee shall not have the right to assign this Agreement, in whole or in
+part, without Orkes’s prior written consent; assignment by operation of law or change of control
+Licensee is prohibited. Orkes may assign this Agreement without consent. Any attempt to
+assign this Agreement, other than as permitted above, shall be null and void.
+8.3 Federal Acquisition. This provision applies to all acquisitions of the Software by or for
+the Federal Government, whether by any prime contractor or subcontractor and whether under
+any procurement contract, grant, cooperative agreement or other activity by or with the Federal
+Government. By accepting delivery of the Software, the Government agrees the Software
+qualifies as “commercial” computer software within the meaning of the acquisition regulations
+applicable to this procurement. The terms and conditions of this Agreement shall pertain to the
+Government’s use and disclosure of the software and shall supersede any conflicting contractual
+terms or conditions. If this Agreement fails to meet the Government’s needs or is inconsistent in
+any respect with Federal law, the Government agrees to return the Software, unused, to Orkes.
+8.4 Governing Law. This Agreement shall be governed by, and construed in accordance
+with, the laws of the State of California, U.S.A., applicable to contracts made in full and
+performed in the State of California, U.S.A., without reference to any conflict of law or choice of
+law principles that would cause the application of laws of any other jurisdiction. The United
+Nations Convention on Contracts for the International Sales of Goods shall not apply to this
+Agreement.
+8.5 Jurisdiction and Venue. Jurisdiction and venue for any dispute arising from or related to
+this Agreement shall be in the state and federal courts of Santa Clara County, California, USA,
+and each party hereby consents to the jurisdiction and venue of such courts.
+8.6 Language of Agreement. This Agreement is made in the English language only, and the
+English language version shall control in all respects. In the event that this Agreement is
+translated into another language, such translation shall not be binding upon the Parties.
+8.7 Severability. If any provision of this Agreement, or the application thereof, shall for any
+reason and to any extent be determined by a court of competent jurisdiction to be invalid or
+unenforceable under applicable law, a valid provision that most closely matches the intent of the
+original shall be substituted, and the remaining provisions of this Agreement shall be interpreted
+so as best to reasonably effect its original intent.
+8.8 Waiver. The failure by either Party to enforce any provision of this Agreement shall not
+constitute a waiver of future enforcement of that or any other provision.
+8.9 Entire Agreement. This Agreement contains the complete understanding and agreement
+of the parties and supersedes all prior or contemporaneous agreements or understandings, oral or
+written, relating to the subject matter herein. Any waiver, modification or amendment of any
+provision of this Agreement shall be effective only if in writing and signed by duly authorized
+representatives of the Parties. No inconsistent or additional terms or conditions in any document
+provided by Licensee, including any purchase orders, purchase agreements, requests for
+proposals, bills of lading or the like shall apply to this Agreement or the activities hereunder, and
+any such terms or conditions are hereby rejected.
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..10031a0
--- /dev/null
+++ b/README.md
@@ -0,0 +1,83 @@
+# Orkes Conductor
+Orkes Conductor is a fully compatible version of Netflix Conductor with Orkes certified stack.
+
+[![CI](https://github.com/orkes-io/orkes-conductor-community/actions/workflows/ci.yaml/badge.svg)](https://github.com/orkes-io/orkes-conductor-community/actions/workflows/ci.yml)
+[![CI](https://img.shields.io/badge/license-orkes%20community%20license-green)](https://github.com/orkes-io/licenses/blob/main/community/LICENSE.txt)
+
+
+
+spring.datasource.username=postgres
+spring.datasource.password=postgres
+spring.datasource.hikari.maximum-pool-size=8
+spring.datasource.hikari.auto-commit=false
+```
+
+### Disable elasticsearch indexing
+```properties
+conductor.app.asyncIndexingEnabled=false
+conductor.indexing.enabled=false
+```
diff --git a/archive/build.gradle b/archive/build.gradle
new file mode 100644
index 0000000..2332c73
--- /dev/null
+++ b/archive/build.gradle
@@ -0,0 +1,51 @@
+dependencies {
+
+ implementation "com.netflix.conductor:conductor-common:${versions.conductorfork}"
+ implementation "com.netflix.conductor:conductor-core:${versions.conductorfork}"
+ implementation ("com.netflix.conductor:conductor-postgres-persistence:${versions.conductorfork}")
+
+ implementation 'org.springframework.boot:spring-boot-starter'
+ implementation 'org.springframework.boot:spring-boot-starter-data-jdbc'
+ implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
+ implementation 'org.springframework.boot:spring-boot-starter-security'
+
+ implementation "com.fasterxml.jackson.core:jackson-databind"
+ implementation "com.fasterxml.jackson.core:jackson-core"
+
+ //Metrics
+ implementation "io.micrometer:micrometer-registry-prometheus:1.7.5"
+ implementation "io.micrometer:micrometer-core:1.7.5"
+ implementation "com.netflix.spectator:spectator-reg-micrometer:${versions.revSpectator}"
+ implementation "com.netflix.spectator:spectator-reg-metrics3:${versions.revSpectator}"
+ implementation "com.netflix.spectator:spectator-api:${versions.revSpectator}"
+
+ implementation 'org.springframework.retry:spring-retry'
+
+ //Flyway for postgres configuration
+ implementation "org.flywaydb:flyway-core"
+
+ //Cache
+ implementation "com.google.guava:guava:${versions.revGuava}"
+
+ //Lucene
+ implementation "org.apache.lucene:lucene-core:${versions.revLucene}"
+ implementation "org.apache.lucene:lucene-analyzers-common:${versions.revLucene}"
+
+ //spring
+ testImplementation 'org.springframework.boot:spring-boot-starter-test'
+ testImplementation 'org.springframework.security:spring-security-test'
+
+
+ testImplementation 'org.hamcrest:hamcrest'
+ testImplementation "org.awaitility:awaitility:3.1.6"
+
+ //postgres test container
+ testImplementation "org.testcontainers:postgresql:${versions.revTestContainer}"
+
+ //Fake data generator
+ testImplementation "com.github.javafaker:javafaker:1.0.2"
+}
+
+test {
+ useJUnitPlatform()
+}
\ No newline at end of file
diff --git a/archive/src/main/java/io/orkes/conductor/dao/archive/ArchiveDAO.java b/archive/src/main/java/io/orkes/conductor/dao/archive/ArchiveDAO.java
new file mode 100644
index 0000000..58bc968
--- /dev/null
+++ b/archive/src/main/java/io/orkes/conductor/dao/archive/ArchiveDAO.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2022 Orkes, Inc.
+ *
+ * Licensed under the Orkes Community License (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * https://github.com/orkes-io/licenses/blob/main/community/LICENSE.txt
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package io.orkes.conductor.dao.archive;
+
+import java.util.List;
+
+import com.netflix.conductor.common.metadata.tasks.TaskExecLog;
+import com.netflix.conductor.model.WorkflowModel;
+
+public interface ArchiveDAO {
+
+ // Workflow Methods
+
+ void createOrUpdateWorkflow(WorkflowModel workflow);
+
+ boolean removeWorkflow(String workflowId);
+
+ WorkflowModel getWorkflow(String workflowId, boolean includeTasks);
+
+ List getWorkflowIdsByType(String workflowName, Long startTime, Long endTime);
+
+ List getWorkflowIdsByCorrelationId(
+ String workflowName, String correlationId, boolean includeClosed, boolean includeTasks);
+
+ ScrollableSearchResult searchWorkflows(
+ String query, String freeText, int start, int count);
+
+ List getTaskExecutionLogs(String taskId);
+
+ void addTaskExecutionLogs(List logs);
+}
diff --git a/archive/src/main/java/io/orkes/conductor/dao/archive/ArchivedExecutionDAO.java b/archive/src/main/java/io/orkes/conductor/dao/archive/ArchivedExecutionDAO.java
new file mode 100644
index 0000000..d73409e
--- /dev/null
+++ b/archive/src/main/java/io/orkes/conductor/dao/archive/ArchivedExecutionDAO.java
@@ -0,0 +1,345 @@
+/*
+ * Copyright 2022 Orkes, Inc.
+ *
+ * Licensed under the Orkes Community License (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * https://github.com/orkes-io/licenses/blob/main/community/LICENSE.txt
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package io.orkes.conductor.dao.archive;
+
+import java.time.Clock;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.stream.Collectors;
+
+import com.netflix.conductor.common.metadata.events.EventExecution;
+import com.netflix.conductor.common.run.Workflow;
+import com.netflix.conductor.dao.ExecutionDAO;
+import com.netflix.conductor.dao.QueueDAO;
+import com.netflix.conductor.model.TaskModel;
+import com.netflix.conductor.model.WorkflowModel;
+
+import io.orkes.conductor.dao.indexer.IndexWorker;
+import io.orkes.conductor.id.TimeBasedUUIDGenerator;
+import io.orkes.conductor.metrics.MetricsCollector;
+
+import lombok.extern.slf4j.Slf4j;
+
+import static io.orkes.conductor.dao.indexer.IndexWorker.INDEXER_QUEUE;
+
+@Slf4j
+public class ArchivedExecutionDAO implements ExecutionDAO {
+
+ private static final int OFFSET_TIME_SEC = 0;
+
+ private final ExecutionDAO primaryDAO;
+
+ private final ArchiveDAO archiveDAO;
+
+ private final QueueDAO queueDAO;
+
+ private final MetricsCollector metricsCollector;
+
+ private final Clock clock;
+
+ public ArchivedExecutionDAO(
+ ExecutionDAO primaryDAO,
+ ArchiveDAO archiveDAO,
+ QueueDAO queueDAO,
+ MetricsCollector metricsCollector) {
+ this.primaryDAO = primaryDAO;
+ this.archiveDAO = archiveDAO;
+ this.queueDAO = queueDAO;
+ this.metricsCollector = metricsCollector;
+ this.clock = Clock.systemDefaultZone();
+ log.info(
+ "Initialized {} as Execution DAO with {} as primary DAO",
+ ArchivedExecutionDAO.class.getSimpleName(),
+ primaryDAO.getClass().getSimpleName());
+ }
+
+ ////////////////////////////////////////////////////////////////////////
+ // Delegate to Primary DAO //
+ ////////////////////////////////////////////////////////////////////////
+ @Override
+ public List getPendingTasksByWorkflow(String taskName, String workflowId) {
+ return primaryDAO.getPendingTasksByWorkflow(taskName, workflowId);
+ }
+
+ @Override
+ public List createTasks(List tasks) {
+ return metricsCollector
+ .getTimer("create_tasks_dao")
+ .record(() -> primaryDAO.createTasks(tasks));
+ }
+
+ @Override
+ public void updateTask(TaskModel task) {
+ metricsCollector
+ .getTimer("update_task_dao", "taskType", task.getTaskDefName())
+ .record(
+ () -> {
+ primaryDAO.updateTask(task);
+ if (task.getStatus().isTerminal()) {
+ metricsCollector.recordTaskComplete(task);
+ }
+ });
+ }
+
+ @Override
+ public boolean exceedsInProgressLimit(TaskModel task) {
+ return primaryDAO.exceedsInProgressLimit(task);
+ }
+
+ @Override
+ public boolean removeTask(String taskId) {
+ return primaryDAO.removeTask(taskId);
+ }
+
+ @Override
+ public List getPendingTasksForTaskType(String taskType) {
+ return primaryDAO.getPendingTasksForTaskType(taskType);
+ }
+
+ @Override
+ public void removeFromPendingWorkflow(String workflowType, String workflowId) {
+ primaryDAO.removeFromPendingWorkflow(workflowType, workflowId);
+ }
+
+ @Override
+ public List getRunningWorkflowIds(String workflowName, int version) {
+ return primaryDAO.getRunningWorkflowIds(workflowName, version);
+ }
+
+ @Override
+ public List getPendingWorkflowsByType(String workflowName, int version) {
+ return primaryDAO.getPendingWorkflowsByType(workflowName, version);
+ }
+
+ @Override
+ public long getPendingWorkflowCount(String workflowName) {
+ return primaryDAO.getPendingWorkflowCount(workflowName);
+ }
+
+ @Override
+ public List getTasks(String taskType, String startKey, int count) {
+ // This method is only intended to show pending tasks
+ return primaryDAO.getTasks(taskType, startKey, count);
+ }
+
+ @Override
+ public long getInProgressTaskCount(String taskDefName) {
+ return primaryDAO.getInProgressTaskCount(taskDefName);
+ }
+
+ @Override
+ public boolean canSearchAcrossWorkflows() {
+ return true;
+ }
+
+ ////////////////////////////////////////////////////////////////////////
+ // Hybrid Mode //
+ ////////////////////////////////////////////////////////////////////////
+
+ @Override
+ public String updateWorkflow(WorkflowModel workflow) {
+ return metricsCollector
+ .getTimer("update_workflow_dao", "workflowName", workflow.getWorkflowName())
+ .record(
+ () -> {
+ workflow.setUpdatedTime(System.currentTimeMillis());
+ String id = primaryDAO.updateWorkflow(workflow);
+ queueForIndexing(workflow, false);
+ if (workflow.getStatus().isTerminal()) {
+ metricsCollector.recordWorkflowComplete(workflow);
+ }
+ return id;
+ });
+ }
+
+ @Override
+ public TaskModel getTask(String taskId) {
+ return metricsCollector
+ .getTimer("get_task_dao")
+ .record(
+ () -> {
+ TaskModel task = primaryDAO.getTask(taskId);
+ return task;
+ });
+ }
+
+ @Override
+ public List getTasks(List taskIds) {
+ return metricsCollector
+ .getTimer("get_tasks_dao")
+ .record(() -> primaryDAO.getTasks(taskIds));
+ }
+
+ @Override
+ public List getTasksForWorkflow(String workflowId) {
+ return metricsCollector
+ .getTimer("get_tasks_for_workflow_dao")
+ .record(
+ () -> {
+ List tasks = primaryDAO.getTasksForWorkflow(workflowId);
+ if (tasks == null || tasks.isEmpty()) {
+ tasks = archiveDAO.getWorkflow(workflowId, true).getTasks();
+ }
+ return tasks;
+ });
+ }
+
+ @Override
+ public String createWorkflow(WorkflowModel workflow) {
+ // UUID used are time based and we want to keep the created time of the UUID with the create
+ // time of workflow
+ // The reason is that the create time is used for partitioning and
+ // we want to be able to get the create time from workflow id
+ long time = TimeBasedUUIDGenerator.getDate(workflow.getWorkflowId());
+ workflow.setCreateTime(time);
+
+ return metricsCollector
+ .getTimer("create_workflow_dao", "workflowName", workflow.getWorkflowName())
+ .record(
+ () -> {
+ workflow.setUpdatedTime(System.currentTimeMillis());
+ String workflowId = primaryDAO.createWorkflow(workflow);
+ queueForIndexing(workflow, true);
+ return workflowId;
+ });
+ }
+
+ @Override
+ public boolean removeWorkflow(String workflowId) {
+ boolean removed = primaryDAO.removeWorkflow(workflowId);
+ if (!removed) {
+ removed = archiveDAO.removeWorkflow(workflowId);
+ }
+ return removed;
+ }
+
+ @Override
+ public boolean removeWorkflowWithExpiry(String workflowId, int ttlSeconds) {
+ return primaryDAO.removeWorkflowWithExpiry(workflowId, ttlSeconds);
+ }
+
+ @Override
+ public WorkflowModel getWorkflow(String workflowId) {
+ return getWorkflow(workflowId, false);
+ }
+
+ @Override
+ public WorkflowModel getWorkflow(String workflowId, boolean includeTasks) {
+ WorkflowModel workflow = primaryDAO.getWorkflow(workflowId, includeTasks);
+ if (workflow == null) {
+ log.debug("Not found in primary dao, going to archive {}", workflowId);
+ workflow =
+ metricsCollector
+ .getTimer("get_workflow_archive_dao", "includeTasks", "" + includeTasks)
+ .record(() -> archiveDAO.getWorkflow(workflowId, includeTasks));
+ }
+ return workflow;
+ }
+
+ @Override
+ public List getWorkflowsByType(
+ String workflowName, Long startTime, Long endTime) {
+ List workflows = new ArrayList<>();
+ List workflowIds =
+ archiveDAO.getWorkflowIdsByType(workflowName, startTime, endTime);
+ for (String workflowId : workflowIds) {
+ workflows.add(getWorkflow(workflowId));
+ }
+
+ return workflows;
+ }
+
+ @Override
+ public List getWorkflowsByCorrelationId(
+ String workflowName, String correlationId, boolean includeTasks) {
+ List ids =
+ archiveDAO.getWorkflowIdsByCorrelationId(
+ workflowName, correlationId, false, includeTasks);
+ return ids.stream()
+ .map(id -> getWorkflow(id, includeTasks))
+ .filter(wf -> wf != null)
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public boolean addEventExecution(EventExecution eventExecution) {
+ boolean added = primaryDAO.addEventExecution(eventExecution);
+ return added;
+ }
+
+ @Override
+ public void updateEventExecution(EventExecution eventExecution) {
+ primaryDAO.updateEventExecution(eventExecution);
+ }
+
+ @Override
+ public void removeEventExecution(EventExecution eventExecution) {
+ primaryDAO.removeEventExecution(eventExecution);
+ }
+
+ private void queueForIndexing(WorkflowModel workflow, boolean created) {
+
+ if (!created && !workflow.getStatus().isTerminal()) {
+ // Do nothing! We only index the workflow once its created and once its completed
+ return;
+ }
+ String messageId =
+ IndexWorker.WORKFLOW_ID_PREFIX
+ + workflow.getWorkflowId()
+ + ":"
+ + workflow.getStatus();
+ long offsetTime = OFFSET_TIME_SEC;
+
+ if (workflow.getStatus().isTerminal()) {
+ // Move ahead of the queue
+
+ // Below is how the score is calculated for pushing the message to the sorted set
+ // double score = Long.valueOf(clock.millis() + message.getTimeout()).doubleValue() +
+ // priority;
+
+ // Making the time to be negative pushes the message at the beginning of the queue
+ // Reducing the current time by 1s second, to ensure any mismatches do not cause score
+ // to be negative
+ // Negative score is allowed,but when querying the messages, the min score is set to 0
+ offsetTime = -1 * ((clock.millis() / 1000) - 1000);
+ }
+
+ if (!created && !workflow.getStatus().isTerminal()) {
+ // If this is not a newly created workflow and is not yet completed,
+ // We add a random delay to index
+ // Adding a delay ensures two things:
+ // 1. If the workflow completes in the next 1-2 seconds, the completed status will
+ // remove the pending
+ // workflow indexing --> see the block below
+ // 2. Probabiliy that multiple arallel threads/workers picking up the same workflow Id
+ // reduces
+ // avoiding database row lock contention
+
+ int delayInSeconds = Math.max(1, new Random().nextInt(10));
+ offsetTime = delayInSeconds;
+ }
+
+ queueDAO.push(INDEXER_QUEUE, messageId, offsetTime);
+
+ if (workflow.getStatus().isTerminal()) {
+ // Remove any previous message, so we can avoid indexing it twice
+ messageId =
+ IndexWorker.WORKFLOW_ID_PREFIX
+ + workflow.getWorkflowId()
+ + ":"
+ + Workflow.WorkflowStatus.RUNNING.toString();
+ queueDAO.ack(INDEXER_QUEUE, messageId);
+ }
+ }
+}
diff --git a/archive/src/main/java/io/orkes/conductor/dao/archive/ArchivedIndexDAO.java b/archive/src/main/java/io/orkes/conductor/dao/archive/ArchivedIndexDAO.java
new file mode 100644
index 0000000..6a2ac24
--- /dev/null
+++ b/archive/src/main/java/io/orkes/conductor/dao/archive/ArchivedIndexDAO.java
@@ -0,0 +1,152 @@
+/*
+ * Copyright 2022 Orkes, Inc.
+ *
+ * Licensed under the Orkes Community License (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * https://github.com/orkes-io/licenses/blob/main/community/LICENSE.txt
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package io.orkes.conductor.dao.archive;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Primary;
+import org.springframework.stereotype.Component;
+
+import com.netflix.conductor.common.metadata.events.EventExecution;
+import com.netflix.conductor.common.metadata.tasks.TaskExecLog;
+import com.netflix.conductor.common.run.SearchResult;
+import com.netflix.conductor.common.run.TaskSummary;
+import com.netflix.conductor.common.run.WorkflowSummary;
+import com.netflix.conductor.core.events.queue.Message;
+import com.netflix.conductor.dao.IndexDAO;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Component
+@ConditionalOnProperty(name = "conductor.archive.db.enabled", havingValue = "true")
+@Primary
+@Slf4j
+public class ArchivedIndexDAO implements IndexDAO {
+
+ private final ArchiveDAO archiveDAO;
+
+ public ArchivedIndexDAO(ArchiveDAO archiveDAO) {
+ this.archiveDAO = archiveDAO;
+ }
+
+ @Override
+ public void setup() throws Exception {}
+
+ @Override
+ public void indexWorkflow(WorkflowSummary workflow) {}
+
+ @Override
+ public CompletableFuture asyncIndexWorkflow(WorkflowSummary workflow) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public void indexTask(TaskSummary task) {
+ return;
+ }
+
+ @Override
+ public CompletableFuture asyncIndexTask(TaskSummary task) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public SearchResult searchWorkflows(
+ String query, String freeText, int start, int count, List sort) {
+ return archiveDAO.searchWorkflows(query, freeText, start, count);
+ }
+
+ @Override
+ public SearchResult searchTasks(
+ String query, String freeText, int start, int count, List sort) {
+ throw new UnsupportedOperationException("Task search is not supported in this environment");
+ }
+
+ @Override
+ public void removeWorkflow(String workflowId) {
+ archiveDAO.removeWorkflow(workflowId);
+ }
+
+ @Override
+ public CompletableFuture asyncRemoveWorkflow(String workflowId) {
+ archiveDAO.removeWorkflow(workflowId);
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public void updateWorkflow(String workflowInstanceId, String[] keys, Object[] values) {}
+
+ @Override
+ public CompletableFuture asyncUpdateWorkflow(
+ String workflowInstanceId, String[] keys, Object[] values) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public String get(String workflowInstanceId, String key) {
+ return null;
+ }
+
+ @Override
+ public void addTaskExecutionLogs(List logs) {
+ archiveDAO.addTaskExecutionLogs(logs);
+ }
+
+ @Override
+ public CompletableFuture asyncAddTaskExecutionLogs(List logs) {
+ archiveDAO.addTaskExecutionLogs(logs);
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public List getTaskExecutionLogs(String taskId) {
+ return archiveDAO.getTaskExecutionLogs(taskId);
+ }
+
+ @Override
+ public void addEventExecution(EventExecution eventExecution) {}
+
+ @Override
+ public List getEventExecutions(String event) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture asyncAddEventExecution(EventExecution eventExecution) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public void addMessage(String queue, Message msg) {}
+
+ @Override
+ public CompletableFuture asyncAddMessage(String queue, Message message) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public List getMessages(String queue) {
+ return null;
+ }
+
+ @Override
+ public List searchArchivableWorkflows(String indexName, long archiveTtlDays) {
+ throw new UnsupportedOperationException("You do not need to use this! :)");
+ }
+
+ public long getWorkflowCount(String query, String freeText) {
+ return 0;
+ }
+}
diff --git a/archive/src/main/java/io/orkes/conductor/dao/archive/DocumentStoreDAO.java b/archive/src/main/java/io/orkes/conductor/dao/archive/DocumentStoreDAO.java
new file mode 100644
index 0000000..4c45f17
--- /dev/null
+++ b/archive/src/main/java/io/orkes/conductor/dao/archive/DocumentStoreDAO.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2022 Orkes, Inc.
+ *
+ * Licensed under the Orkes Community License (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * https://github.com/orkes-io/licenses/blob/main/community/LICENSE.txt
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package io.orkes.conductor.dao.archive;
+
+import com.netflix.conductor.model.WorkflowModel;
+
+/**
+ * Document store is used to store completed workflow JSON data.
+ *
+ * @see io.orkes.conductor.dao.postgres.archive.PostgresArchiveDAO
+ */
+public interface DocumentStoreDAO {
+
+ void createOrUpdateWorkflow(WorkflowModel workflow);
+
+ boolean removeWorkflow(String workflowId);
+
+ WorkflowModel getWorkflow(String workflowId, boolean includeTasks);
+}
diff --git a/archive/src/main/java/io/orkes/conductor/dao/archive/ScrollableSearchResult.java b/archive/src/main/java/io/orkes/conductor/dao/archive/ScrollableSearchResult.java
new file mode 100644
index 0000000..c0dfe6b
--- /dev/null
+++ b/archive/src/main/java/io/orkes/conductor/dao/archive/ScrollableSearchResult.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2022 Orkes, Inc.
+ *
+ * Licensed under the Orkes Community License (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * https://github.com/orkes-io/licenses/blob/main/community/LICENSE.txt
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package io.orkes.conductor.dao.archive;
+
+import java.util.List;
+
+import com.netflix.conductor.common.run.SearchResult;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+
+@Getter
+@Setter
+@NoArgsConstructor
+public class ScrollableSearchResult extends SearchResult {
+
+ private String queryId;
+
+ public ScrollableSearchResult(List results, String queryId) {
+ super(0, results);
+ this.queryId = queryId;
+ }
+
+ // With ScrollableSearchResult this will always be zero and it's confusing from an API client's
+ // perspective.
+ // That's why it's ignored.
+ @Override
+ @JsonIgnore
+ public long getTotalHits() {
+ return super.getTotalHits();
+ }
+}
diff --git a/archive/src/main/java/io/orkes/conductor/dao/indexer/IndexValueExtractor.java b/archive/src/main/java/io/orkes/conductor/dao/indexer/IndexValueExtractor.java
new file mode 100644
index 0000000..e2518bc
--- /dev/null
+++ b/archive/src/main/java/io/orkes/conductor/dao/indexer/IndexValueExtractor.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2022 Orkes, Inc.
+ *
+ * Licensed under the Orkes Community License (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * https://github.com/orkes-io/licenses/blob/main/community/LICENSE.txt
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package io.orkes.conductor.dao.indexer;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+import org.apache.lucene.analysis.en.EnglishAnalyzer;
+
+import com.netflix.conductor.model.TaskModel;
+import com.netflix.conductor.model.WorkflowModel;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class IndexValueExtractor {
+
+ private static final String splitWords = "\"|,|;|\\s|,";
+ private static final String replaceWords = "\"|,|;|\\s|,|:";
+ private static final Collection stopWords = EnglishAnalyzer.getDefaultStopSet();
+
+ public static Collection getIndexWords(
+ WorkflowModel workflow, int maxWords, int maxWordLength) {
+
+ try {
+
+ List words = getIndexWords(workflow);
+ return words.stream()
+ .flatMap(value -> Arrays.asList(value.split(splitWords)).stream())
+ .filter(word -> word.length() < maxWordLength)
+ .filter(word -> word.length() > 2)
+ .filter(word -> !word.trim().isBlank())
+ .map(word -> word.toLowerCase().trim().replaceAll(replaceWords + "+$", ""))
+ .filter(word -> !stopWords.contains(word))
+ .limit(maxWords)
+ .collect(Collectors.toSet());
+
+ } catch (Exception e) {
+ log.warn("Error serializing input/output map to text: " + e.getMessage(), e);
+ return new ArrayList<>();
+ }
+ }
+
+ private static List getIndexWords(WorkflowModel workflow) {
+ List words = new ArrayList<>();
+ append(words, workflow.getCorrelationId());
+ append(words, workflow.getInput());
+ append(words, workflow.getOutput());
+ append(words, workflow.getReasonForIncompletion());
+ append(words, workflow.getVariables());
+
+ for (TaskModel task : workflow.getTasks()) {
+ append(words, task.getOutputData());
+ }
+ return words;
+ }
+
+ private static void append(List words, Object value) {
+ if (value instanceof String || value instanceof Number) {
+ if (value != null) words.add(value.toString());
+ } else if (value instanceof List) {
+ List values = (List) value;
+ for (Object valueObj : values) {
+ if (valueObj != null) words.add(valueObj.toString());
+ }
+ } else if (value instanceof Map) {
+ append(words, (Map) value);
+ }
+ }
+
+ private static void append(List words, Map map) {
+
+ map.values()
+ .forEach(
+ value -> {
+ if (value instanceof String || value instanceof Number) {
+ if (value != null) words.add(value.toString());
+ } else if (value instanceof Map) {
+ append(words, (Map) value);
+ } else if (value instanceof List) {
+ List values = (List) value;
+ for (Object valueObj : values) {
+ if (valueObj != null) words.add(valueObj.toString());
+ }
+ }
+ });
+ }
+}
diff --git a/archive/src/main/java/io/orkes/conductor/dao/indexer/IndexWorker.java b/archive/src/main/java/io/orkes/conductor/dao/indexer/IndexWorker.java
new file mode 100644
index 0000000..9e0c45e
--- /dev/null
+++ b/archive/src/main/java/io/orkes/conductor/dao/indexer/IndexWorker.java
@@ -0,0 +1,150 @@
+/*
+ * Copyright 2022 Orkes, Inc.
+ *
+ * Licensed under the Orkes Community License (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * https://github.com/orkes-io/licenses/blob/main/community/LICENSE.txt
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package io.orkes.conductor.dao.indexer;
+
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.stereotype.Component;
+
+import com.netflix.conductor.dao.ExecutionDAO;
+import com.netflix.conductor.dao.QueueDAO;
+import com.netflix.conductor.metrics.Monitors;
+import com.netflix.conductor.model.WorkflowModel;
+
+import io.orkes.conductor.dao.archive.ArchiveDAO;
+import io.orkes.conductor.metrics.MetricsCollector;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@Component
+@ConditionalOnProperty(name = "conductor.archive.db.enabled", havingValue = "true")
+public class IndexWorker {
+
+ public static final String INDEXER_QUEUE = "_index_queue";
+
+ public static final String WORKFLOW_ID_PREFIX = "wf:";
+
+ public static final String EVENT_ID_PREFIX = "e:";
+
+ private final QueueDAO queueDAO;
+
+ private final ArchiveDAO archiveDAO;
+
+ private final ExecutionDAO primaryExecDAO;
+
+ private final MetricsCollector metricsCollector;
+
+ private ScheduledExecutorService executorService;
+
+ private int pollBatchSize;
+
+ private int ttlSeconds = 30;
+
+ public IndexWorker(
+ QueueDAO queueDAO,
+ ArchiveDAO execDAO,
+ ExecutionDAO primaryExecDAO,
+ IndexWorkerProperties properties,
+ MetricsCollector metricsCollector) {
+ this.queueDAO = queueDAO;
+ this.archiveDAO = execDAO;
+ this.primaryExecDAO = primaryExecDAO;
+ this.pollBatchSize = properties.getPollBatchSize();
+ this.metricsCollector = metricsCollector;
+
+ int threadCount = properties.getThreadCount();
+ int pollingInterval = properties.getPollingInterval();
+ if (threadCount > 0) {
+ this.executorService =
+ Executors.newScheduledThreadPool(
+ threadCount,
+ new ThreadFactoryBuilder().setNameFormat("indexer-thread-%d").build());
+
+ for (int i = 0; i < threadCount; i++) {
+ this.executorService.scheduleWithFixedDelay(
+ () -> {
+ try {
+ pollAndExecute();
+ } catch (Throwable t) {
+ log.error(t.getMessage(), t);
+ }
+ },
+ 10,
+ pollingInterval,
+ TimeUnit.MILLISECONDS);
+ }
+ }
+
+ log.info(
+ "IndexWorker::INIT with primaryExecDAO = {}, threadCount = {}, pollingInterval = {} and pollBatchSize = {}",
+ primaryExecDAO,
+ threadCount,
+ pollingInterval,
+ pollBatchSize);
+ }
+
+ private void pollAndExecute() {
+
+ try {
+
+ List ids = queueDAO.pop(INDEXER_QUEUE, pollBatchSize, 1000); // 1 second
+
+ for (String id : ids) {
+ if (id.startsWith(WORKFLOW_ID_PREFIX)) {
+ String workflowId = id.substring(WORKFLOW_ID_PREFIX.length());
+ workflowId = workflowId.substring(0, workflowId.lastIndexOf(':'));
+ indexWorkflow(workflowId);
+ queueDAO.ack(INDEXER_QUEUE, id);
+ } else if (id.startsWith(EVENT_ID_PREFIX)) {
+ String eventId = id.substring(EVENT_ID_PREFIX.length());
+ eventId = eventId.substring(0, eventId.lastIndexOf(':'));
+ indexEvent(eventId);
+ queueDAO.ack(INDEXER_QUEUE, id);
+ }
+ }
+
+ } catch (Throwable e) {
+ Monitors.error(IndexWorker.class.getSimpleName(), "pollAndExecute");
+ log.error(e.getMessage(), e);
+ } finally {
+ Monitors.recordQueueDepth(INDEXER_QUEUE, queueDAO.getSize(INDEXER_QUEUE), "");
+ }
+ }
+
+ private void indexWorkflow(String workflowId) {
+ WorkflowModel workflow = primaryExecDAO.getWorkflow(workflowId, true);
+ if (workflow == null) {
+ log.warn("Cannot find workflow in the primary DAO: {}", workflowId);
+ return;
+ }
+
+ metricsCollector
+ .getTimer("archive_workflow", "workflowName", "" + workflow.getWorkflowName())
+ .record(() -> archiveDAO.createOrUpdateWorkflow(workflow));
+
+ if (workflow.getStatus().isTerminal()) {
+ primaryExecDAO.removeWorkflowWithExpiry(workflowId, ttlSeconds);
+ }
+ }
+
+ private void indexEvent(String eventId) {
+ log.trace("Indexing event {}", eventId);
+ // Do nothing for now
+ }
+}
diff --git a/archive/src/main/java/io/orkes/conductor/dao/indexer/IndexWorkerProperties.java b/archive/src/main/java/io/orkes/conductor/dao/indexer/IndexWorkerProperties.java
new file mode 100644
index 0000000..7811821
--- /dev/null
+++ b/archive/src/main/java/io/orkes/conductor/dao/indexer/IndexWorkerProperties.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2022 Orkes, Inc.
+ *
+ * Licensed under the Orkes Community License (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * https://github.com/orkes-io/licenses/blob/main/community/LICENSE.txt
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package io.orkes.conductor.dao.indexer;
+
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+@ConfigurationProperties("conductor.archive.db.indexer")
+public class IndexWorkerProperties {
+
+ private int threadCount = 3;
+
+ private int pollingInterval = 10; // in millisecond
+
+ private int pollBatchSize = 10;
+
+ public int getThreadCount() {
+ return threadCount;
+ }
+
+ public void setThreadCount(int threadCount) {
+ this.threadCount = threadCount;
+ }
+
+ public int getPollingInterval() {
+ return pollingInterval;
+ }
+
+ public void setPollingInterval(int pollingInterval) {
+ this.pollingInterval = pollingInterval;
+ }
+
+ public int getPollBatchSize() {
+ return pollBatchSize;
+ }
+
+ public void setPollBatchSize(int pollBatchSize) {
+ this.pollBatchSize = pollBatchSize;
+ }
+}
diff --git a/archive/src/main/java/io/orkes/conductor/dao/indexer/TaskIndex.java b/archive/src/main/java/io/orkes/conductor/dao/indexer/TaskIndex.java
new file mode 100644
index 0000000..4395a14
--- /dev/null
+++ b/archive/src/main/java/io/orkes/conductor/dao/indexer/TaskIndex.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2022 Orkes, Inc.
+ *
+ * Licensed under the Orkes Community License (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * https://github.com/orkes-io/licenses/blob/main/community/LICENSE.txt
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package io.orkes.conductor.dao.indexer;
+
+import java.util.List;
+import java.util.Map;
+
+import com.netflix.conductor.model.TaskModel;
+
+public class TaskIndex {
+
+ private TaskModel task;
+
+ private int maxValueLen;
+
+ public TaskIndex(TaskModel task, int maxValueLen) {
+ this.task = task;
+ this.maxValueLen = maxValueLen;
+ }
+
+ public String toIndexString() {
+ StringBuilder sb = new StringBuilder();
+ append(sb, task.getTaskType());
+ append(sb, task.getTaskId());
+ append(sb, task.getDomain());
+ append(sb, task.getReferenceTaskName());
+ append(sb, task.getStatus().toString());
+ append(sb, task.getExternalInputPayloadStoragePath());
+ append(sb, task.getExternalOutputPayloadStoragePath());
+ append(sb, task.getInputData());
+ append(sb, task.getOutputData());
+ append(sb, task.getReasonForIncompletion());
+ append(sb, task.getWorkerId());
+
+ return sb.toString();
+ }
+
+ @Override
+ public String toString() {
+ return toIndexString();
+ }
+
+ private String toString(Map map) {
+ StringBuilder sb = new StringBuilder();
+ append(sb, map);
+ return sb.toString();
+ }
+
+ private void append(StringBuilder sb, Object value) {
+ if (value instanceof String || value instanceof Number) {
+ sb.append(" ");
+ sb.append(value.toString());
+ } else if (value instanceof Map) {
+ sb.append(" ");
+ append(sb, (Map) value);
+ } else if (value instanceof List) {
+ List values = (List) value;
+ for (Object valueObj : values) {
+ sb.append(" ");
+ append(sb, valueObj);
+ }
+ }
+ }
+
+ private void append(StringBuilder sb, Map map) {
+
+ map.entrySet()
+ .forEach(
+ entry -> {
+ String key = entry.getKey();
+ Object value = entry.getValue();
+ sb.append(" ");
+ sb.append(key);
+ if (value instanceof String) {
+ sb.append(" ");
+ String valueString = value.toString();
+ sb.append(
+ valueString.substring(
+ 0, Math.min(valueString.length(), maxValueLen)));
+ } else if (value instanceof Number) {
+ sb.append(" ");
+ sb.append(value.toString());
+ } else if (value instanceof Map) {
+ sb.append(" ");
+ append(sb, (Map) value);
+ } else if (value instanceof List) {
+ List values = (List) value;
+ for (Object valueObj : values) {
+ sb.append(" ");
+ append(sb, valueObj);
+ }
+ }
+ });
+ }
+}
diff --git a/archive/src/main/java/io/orkes/conductor/dao/indexer/WorkflowIndex.java b/archive/src/main/java/io/orkes/conductor/dao/indexer/WorkflowIndex.java
new file mode 100644
index 0000000..32efcfb
--- /dev/null
+++ b/archive/src/main/java/io/orkes/conductor/dao/indexer/WorkflowIndex.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2022 Orkes, Inc.
+ *
+ * Licensed under the Orkes Community License (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * https://github.com/orkes-io/licenses/blob/main/community/LICENSE.txt
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package io.orkes.conductor.dao.indexer;
+
+import java.util.Collection;
+
+import com.netflix.conductor.model.WorkflowModel;
+
+public class WorkflowIndex {
+
+ private WorkflowModel workflow;
+ private int maxWords;
+ private int maxWordLength;
+
+ public WorkflowIndex(WorkflowModel workflow, int maxWords, int maxWordLength) {
+ this.workflow = workflow;
+ this.maxWords = maxWords;
+ this.maxWordLength = maxWordLength;
+ }
+
+ public Collection toIndexWords() {
+ return IndexValueExtractor.getIndexWords(workflow, maxWords, maxWordLength);
+ }
+
+ @Override
+ public String toString() {
+ return toIndexWords().toString();
+ }
+}
diff --git a/archive/src/main/java/io/orkes/conductor/dao/postgres/archive/PostgresArchiveDAO.java b/archive/src/main/java/io/orkes/conductor/dao/postgres/archive/PostgresArchiveDAO.java
new file mode 100644
index 0000000..1c4624e
--- /dev/null
+++ b/archive/src/main/java/io/orkes/conductor/dao/postgres/archive/PostgresArchiveDAO.java
@@ -0,0 +1,417 @@
+/*
+ * Copyright 2022 Orkes, Inc.
+ *
+ * Licensed under the Orkes Community License (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * https://github.com/orkes-io/licenses/blob/main/community/LICENSE.txt
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package io.orkes.conductor.dao.postgres.archive;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import javax.sql.DataSource;
+
+import org.apache.logging.log4j.util.Strings;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.retry.support.RetryTemplate;
+
+import com.netflix.conductor.common.metadata.tasks.TaskExecLog;
+import com.netflix.conductor.common.run.SearchResult;
+import com.netflix.conductor.model.WorkflowModel;
+import com.netflix.conductor.postgres.dao.PostgresBaseDAO;
+
+import io.orkes.conductor.dao.archive.ArchiveDAO;
+import io.orkes.conductor.dao.archive.DocumentStoreDAO;
+import io.orkes.conductor.dao.archive.ScrollableSearchResult;
+import io.orkes.conductor.dao.indexer.WorkflowIndex;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class PostgresArchiveDAO extends PostgresBaseDAO implements ArchiveDAO, DocumentStoreDAO {
+
+ private static final String GET_WORKFLOW =
+ "SELECT json_data FROM workflow_archive WHERE workflow_id = ? FOR SHARE SKIP LOCKED";
+
+ private static final String REMOVE_WORKFLOW =
+ "DELETE FROM workflow_archive WHERE workflow_id = ?";
+
+ private final DataSource searchDatasource;
+
+ private static final String TABLE_NAME = "workflow_archive";
+
+ public PostgresArchiveDAO(
+ ObjectMapper objectMapper,
+ DataSource dataSource,
+ @Qualifier("searchDatasource") DataSource searchDatasource) {
+ super(RetryTemplate.defaultInstance(), objectMapper, dataSource);
+ this.searchDatasource = searchDatasource;
+
+ log.info("Using {} as search datasource", searchDatasource);
+
+ try (Connection conn = searchDatasource.getConnection()) {
+ log.info("Using {} as search datasource", conn.getMetaData().getURL());
+ } catch (Exception e) {
+ log.error(e.getMessage(), e);
+ }
+ }
+
+ public void createOrUpdateWorkflow(WorkflowModel workflow) {
+
+ String INSERT_OR_UPDATE_LATEST =
+ "INSERT INTO "
+ + TABLE_NAME
+ + " as wf"
+ + "(workflow_id, created_on, modified_on, correlation_id, workflow_name, status, index_data, created_by, json_data) "
+ + "VALUES "
+ + "(?, ?, ?, ?, ?, ?, ?, ?, ?) "
+ + "ON CONFLICT (workflow_id) DO "
+ + "UPDATE SET modified_on = ?, status = ?, index_data = ?, json_data = ? "
+ + "WHERE wf.modified_on < ? ;";
+
+ try (Connection connection = super.dataSource.getConnection()) {
+ connection.setAutoCommit(true);
+ PreparedStatement statement = connection.prepareStatement(INSERT_OR_UPDATE_LATEST);
+
+ WorkflowIndex index = new WorkflowIndex(workflow, 200, 50);
+ Collection indexData = index.toIndexWords();
+
+ int indx = 1;
+ long updatedTime = workflow.getUpdatedTime() == null ? 0 : workflow.getUpdatedTime();
+
+ // Insert values
+
+ statement.setString(indx++, workflow.getWorkflowId());
+ statement.setLong(indx++, workflow.getCreateTime());
+ statement.setLong(indx++, updatedTime);
+ statement.setString(indx++, workflow.getCorrelationId());
+ statement.setString(indx++, workflow.getWorkflowName());
+ statement.setString(indx++, workflow.getStatus().toString());
+ statement.setArray(
+ indx++, connection.createArrayOf("text", indexData.toArray(new String[0])));
+ statement.setString(indx++, workflow.getCreatedBy());
+
+ String workflowJson = null;
+ if (workflow.getStatus().isTerminal()) {
+ workflowJson = objectMapper.writeValueAsString(workflow);
+ }
+ statement.setString(indx++, workflowJson);
+ // Update values
+ statement.setLong(indx++, updatedTime);
+ statement.setString(indx++, workflow.getStatus().toString());
+ statement.setArray(
+ indx++, connection.createArrayOf("text", indexData.toArray(new String[0])));
+ statement.setString(indx++, workflowJson);
+ statement.setLong(indx, updatedTime);
+
+ statement.executeUpdate();
+
+ } catch (Exception e) {
+ log.error(
+ "Error updating workflow {} - {}", workflow.getWorkflowId(), e.getMessage(), e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public boolean removeWorkflow(String workflowId) {
+ boolean removed = false;
+ WorkflowModel workflow = this.getWorkflow(workflowId, true);
+ if (workflow != null) {
+ withTransaction(
+ connection ->
+ execute(
+ connection,
+ REMOVE_WORKFLOW,
+ q -> q.addParameter(workflowId).executeDelete()));
+ removed = true;
+ }
+ return removed;
+ }
+
+ @Override
+ public WorkflowModel getWorkflow(String workflowId, boolean includeTasks) {
+ try (Connection connection = super.dataSource.getConnection()) {
+ PreparedStatement statement = connection.prepareStatement(GET_WORKFLOW);
+ statement.setString(1, workflowId);
+ ResultSet rs = statement.executeQuery();
+ if (rs.next()) {
+ byte[] json = rs.getBytes("json_data");
+ if (json == null || json.length == 0) {
+ return null;
+ }
+ return objectMapper.readValue(json, WorkflowModel.class);
+ }
+ } catch (Exception e) {
+ log.error("Error reading workflow - " + e.getMessage(), e);
+ throw new RuntimeException(e);
+ }
+ return null;
+ }
+
+ @Override
+ public List getWorkflowIdsByType(String workflowName, Long startTime, Long endTime) {
+ String query =
+ "workflowType IN ("
+ + workflowName
+ + ") AND startTime>"
+ + startTime
+ + " AND startTime< "
+ + endTime;
+ ScrollableSearchResult result = searchWorkflows(query, "*", 0, 100_000);
+ return result.getResults();
+ }
+
+ @Override
+ public List getWorkflowIdsByCorrelationId(
+ String workflowName,
+ String correlationId,
+ boolean includeClosed,
+ boolean includeTasks) {
+ String query =
+ "workflowType = '"
+ + workflowName.trim()
+ + "' AND correlationId = '"
+ + correlationId.trim()
+ + "'";
+ if (!includeClosed) {
+ query += " AND status IN (" + WorkflowModel.Status.RUNNING + ")";
+ }
+ ScrollableSearchResult result = searchWorkflows(query, null, 0, 100_000);
+ return result.getResults();
+ }
+
+ // Search
+ public ScrollableSearchResult searchWorkflows(
+ String query, String freeText, int start, int count) {
+
+ if (query == null) query = "";
+ if (freeText == null) freeText = "";
+
+ log.debug(
+ "search. query = {}, fulltext={}, limit={}, start= {}",
+ query,
+ freeText,
+ count,
+ start);
+
+ SearchQuery parsedQuery = SearchQuery.parse(query);
+ SearchResult results =
+ search(TABLE_NAME, parsedQuery, freeText.trim(), count, start);
+ ScrollableSearchResult scrollableSearchResult = new ScrollableSearchResult<>();
+ scrollableSearchResult.setResults(results.getResults());
+ return scrollableSearchResult;
+ }
+
+ private SearchResult search(
+ String tableName, SearchQuery query, String freeText, int limit, int start) {
+
+ List workflowNames = query.getWorkflowNames();
+ List workflowIds = query.getWorkflowIds();
+ List correlationIds = query.getCorrelationIds();
+ List statuses = query.getStatuses();
+ long startTime = query.getFromStartTime();
+ long endTime = query.getToStartTime();
+ if (endTime == 0) {
+ endTime = System.currentTimeMillis();
+ }
+
+ boolean search = false;
+
+ // Task specific
+ List taskIds = query.getTaskIds();
+ List taskTypes = query.getTaskTypes();
+
+ String WHERE_CLAUSE = "from " + tableName + " archive ";
+
+ WHERE_CLAUSE += " WHERE 1=1 ";
+
+ String SELECT_QUERY = "select workflow_id, created_on ";
+
+ String JOINER = " AND ";
+ if (workflowNames != null && !workflowNames.isEmpty()) {
+ WHERE_CLAUSE += JOINER + "archive.workflow_name = ANY (?) ";
+ }
+
+ if (taskTypes != null && !taskTypes.isEmpty()) {
+ WHERE_CLAUSE += JOINER + "task_type = ANY (?) ";
+ }
+
+ if (workflowIds != null && !workflowIds.isEmpty()) {
+ WHERE_CLAUSE += JOINER + "workflow_id = ANY (?) ";
+ JOINER = " AND ";
+ }
+
+ if (taskIds != null && !taskIds.isEmpty()) {
+ WHERE_CLAUSE += JOINER + "task_id = ANY (?) ";
+ JOINER = " AND ";
+ }
+
+ if (statuses != null && !statuses.isEmpty()) {
+ WHERE_CLAUSE += JOINER + "status = ANY (?) ";
+ JOINER = " AND ";
+ }
+
+ if (correlationIds != null && !correlationIds.isEmpty()) {
+ WHERE_CLAUSE += JOINER + "correlation_id = ANY (?) ";
+ JOINER = " AND ";
+ }
+
+ if (startTime > 0) {
+ WHERE_CLAUSE += JOINER + "created_on BETWEEN ? AND ? ";
+ JOINER = " AND ";
+ }
+
+ if (Strings.isNotBlank(freeText) && !"*".equals(freeText)) {
+ search = true;
+ WHERE_CLAUSE += JOINER + " index_data @> ? ";
+ }
+
+ String SEARCH_QUERY =
+ SELECT_QUERY
+ + " "
+ + WHERE_CLAUSE
+ + " order by created_on desc limit "
+ + limit
+ + " offset "
+ + start;
+ if (search) {
+ SEARCH_QUERY =
+ "select a.workflow_id, a.created_on from ("
+ + SELECT_QUERY
+ + " "
+ + WHERE_CLAUSE
+ + " limit 2000000 offset 0) a order by a.created_on desc limit "
+ + limit
+ + " offset "
+ + start;
+ }
+
+ log.debug(SEARCH_QUERY);
+
+ SearchResult result = new SearchResult<>();
+ result.setResults(new ArrayList<>());
+
+ try (Connection conn = searchDatasource.getConnection()) {
+ PreparedStatement pstmt = conn.prepareStatement(SEARCH_QUERY);
+ int indx = 1;
+ if (workflowNames != null && !workflowNames.isEmpty()) {
+ pstmt.setArray(
+ indx++,
+ conn.createArrayOf("VARCHAR", workflowNames.toArray(new String[0])));
+ }
+ if (taskTypes != null && !taskTypes.isEmpty()) {
+ pstmt.setArray(
+ indx++, conn.createArrayOf("VARCHAR", taskTypes.toArray(new String[0])));
+ }
+
+ if (workflowIds != null && !workflowIds.isEmpty()) {
+ pstmt.setArray(
+ indx++, conn.createArrayOf("VARCHAR", workflowIds.toArray(new String[0])));
+ }
+
+ if (taskIds != null && !taskIds.isEmpty()) {
+ pstmt.setArray(
+ indx++, conn.createArrayOf("VARCHAR", taskIds.toArray(new String[0])));
+ }
+
+ if (statuses != null && !statuses.isEmpty()) {
+ pstmt.setArray(
+ indx++, conn.createArrayOf("VARCHAR", statuses.toArray(new String[0])));
+ }
+
+ if (correlationIds != null && !correlationIds.isEmpty()) {
+ pstmt.setArray(
+ indx++,
+ conn.createArrayOf("VARCHAR", correlationIds.toArray(new String[0])));
+ }
+
+ if (startTime > 0) {
+ pstmt.setLong(indx++, startTime);
+ pstmt.setLong(indx++, endTime);
+ }
+
+ if (Strings.isNotBlank(freeText) && !"*".equals(freeText)) {
+ String[] textArray = freeText.toLowerCase().split(" ");
+ pstmt.setArray(indx++, conn.createArrayOf("text", textArray));
+ }
+
+ result.setTotalHits(0);
+ long countStart = System.currentTimeMillis();
+ ResultSet rs = pstmt.executeQuery();
+ log.debug(
+ "search query took {} ms to execute",
+ (System.currentTimeMillis() - countStart));
+ while (rs.next()) {
+ String workflowId = rs.getString("workflow_id");
+ result.getResults().add(workflowId);
+ }
+
+ } catch (SQLException sqlException) {
+ log.error(sqlException.getMessage(), sqlException);
+ throw new RuntimeException(sqlException);
+ }
+
+ return result;
+ }
+
+ // Private Methods
+ @Override
+ public List getTaskExecutionLogs(String taskId) {
+ String GET_TASK = "SELECT seq, log, created_on FROM task_logs WHERE task_id = ?";
+ return queryWithTransaction(
+ GET_TASK,
+ q -> {
+ List taskExecLogs =
+ q.addParameter(taskId)
+ .executeAndFetch(
+ resultSet -> {
+ List logs = new ArrayList<>();
+ while (resultSet.next()) {
+ TaskExecLog log = new TaskExecLog();
+ log.setTaskId(taskId);
+ log.setLog(resultSet.getString(2));
+ log.setCreatedTime(resultSet.getLong(3));
+ logs.add(log);
+ }
+ return logs;
+ });
+ return taskExecLogs;
+ });
+ }
+
+ @Override
+ public void addTaskExecutionLogs(List logs) {
+ String INSERT_STMT = "INSERT INTO task_logs (task_id, log, created_on) values(?,?,?)";
+ for (TaskExecLog taskExecLog : logs) {
+ withTransaction(
+ tx -> {
+ execute(
+ tx,
+ INSERT_STMT,
+ q ->
+ q
+
+ // Insert values
+ .addParameter(taskExecLog.getTaskId())
+ .addParameter(taskExecLog.getLog())
+ .addParameter(taskExecLog.getCreatedTime())
+
+ // execute
+ .executeUpdate());
+ });
+ }
+ }
+}
diff --git a/archive/src/main/java/io/orkes/conductor/dao/postgres/archive/PostgresArchiveDAOConfiguration.java b/archive/src/main/java/io/orkes/conductor/dao/postgres/archive/PostgresArchiveDAOConfiguration.java
new file mode 100644
index 0000000..409ec2c
--- /dev/null
+++ b/archive/src/main/java/io/orkes/conductor/dao/postgres/archive/PostgresArchiveDAOConfiguration.java
@@ -0,0 +1,146 @@
+/*
+ * Copyright 2022 Orkes, Inc.
+ *
+ * Licensed under the Orkes Community License (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * https://github.com/orkes-io/licenses/blob/main/community/LICENSE.txt
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package io.orkes.conductor.dao.postgres.archive;
+
+import javax.annotation.PostConstruct;
+import javax.sql.DataSource;
+
+import org.apache.logging.log4j.util.Strings;
+import org.flywaydb.core.Flyway;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.autoconfigure.flyway.FlywayConfigurationCustomizer;
+import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.*;
+import org.springframework.core.env.Environment;
+
+import com.netflix.conductor.dao.ExecutionDAO;
+import com.netflix.conductor.dao.QueueDAO;
+import com.netflix.conductor.postgres.config.PostgresProperties;
+
+import io.orkes.conductor.dao.archive.ArchiveDAO;
+import io.orkes.conductor.dao.archive.ArchivedExecutionDAO;
+import io.orkes.conductor.metrics.MetricsCollector;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@Configuration(proxyBeanMethods = false)
+@EnableConfigurationProperties({PostgresProperties.class})
+@Import(DataSourceAutoConfiguration.class)
+@ConditionalOnProperty(name = "conductor.archive.db.type", havingValue = "postgres")
+public class PostgresArchiveDAOConfiguration {
+
+ private final DataSource dataSource;
+
+ private final MetricsCollector metricsCollector;
+
+ private final ExecutionDAO primaryExecutionDAO;
+
+ private final QueueDAO dynoQueueDAO;
+
+ private final Environment environment;
+
+ private final ObjectMapper objectMapper;
+
+ public PostgresArchiveDAOConfiguration(
+ ObjectMapper objectMapper,
+ Environment environment,
+ DataSource dataSource,
+ ExecutionDAO primaryExecutionDAO,
+ QueueDAO dynoQueueDAO,
+ MetricsCollector metricsCollector) {
+
+ this.objectMapper = objectMapper;
+ this.environment = environment;
+ this.dataSource = dataSource;
+ this.primaryExecutionDAO = primaryExecutionDAO;
+ this.dynoQueueDAO = dynoQueueDAO;
+ this.metricsCollector = metricsCollector;
+ }
+
+ @Bean
+ @Primary
+ @ConditionalOnProperty(name = "conductor.archive.db.enabled", havingValue = "true")
+ public ExecutionDAO getExecutionDAO(ArchiveDAO archiveDAO) {
+ return new ArchivedExecutionDAO(
+ primaryExecutionDAO, archiveDAO, dynoQueueDAO, metricsCollector);
+ }
+
+ @Bean(initMethod = "migrate", name = "flyway")
+ @PostConstruct
+ public Flyway flywayForPrimaryDb() {
+ return Flyway.configure()
+ .locations(
+ "classpath:db/migration_postgres",
+ "classpath:db/migration_archive_postgres")
+ .schemas("public")
+ .dataSource(dataSource)
+ .baselineOnMigrate(true)
+ .mixed(true)
+ .load();
+ }
+
+ @Bean(name = "flywayInitializer")
+ public FlywayConfigurationCustomizer flywayConfigurationCustomizer() {
+ // override the default location.
+ return configuration ->
+ configuration.locations(
+ "classpath:db/migration_postgres",
+ "classpath:db/migration_archive_postgres");
+ }
+
+ @Bean
+ @Qualifier("searchDatasource")
+ public DataSource searchDatasource(DataSource defaultDatasource) {
+ String url = environment.getProperty("spring.search-datasource.url");
+ String user = environment.getProperty("spring.search-datasource.username");
+ String password = environment.getProperty("spring.search-datasource.password");
+ String maxPoolSizeString =
+ environment.getProperty("spring.search-datasource.hikari.maximum-pool-size");
+
+ if (Strings.isEmpty(url)) {
+ return defaultDatasource;
+ }
+ log.info("Configuring searchDatasource with {}", url);
+
+ int maxPoolSize = 10;
+ if (Strings.isNotEmpty(maxPoolSizeString)) {
+ try {
+ maxPoolSize = Integer.parseInt(maxPoolSizeString);
+ } catch (Exception e) {
+ }
+ }
+ HikariConfig config = new HikariConfig();
+ config.setJdbcUrl(url);
+ config.setAutoCommit(true);
+ config.setUsername(user);
+ config.setPassword(password);
+ config.setMaximumPoolSize(maxPoolSize);
+
+ HikariDataSource hikariDataSource = new HikariDataSource(config);
+ return hikariDataSource;
+ }
+
+ @Bean
+ @DependsOn({"flyway", "flywayInitializer"})
+ @ConditionalOnProperty(value = "conductor.archive.db.type", havingValue = "postgres")
+ public PostgresArchiveDAO getPostgresArchiveDAO(
+ @Qualifier("searchDatasource") DataSource searchDatasource) {
+ return new PostgresArchiveDAO(objectMapper, dataSource, searchDatasource);
+ }
+}
diff --git a/archive/src/main/java/io/orkes/conductor/dao/postgres/archive/SearchQuery.java b/archive/src/main/java/io/orkes/conductor/dao/postgres/archive/SearchQuery.java
new file mode 100644
index 0000000..aafef30
--- /dev/null
+++ b/archive/src/main/java/io/orkes/conductor/dao/postgres/archive/SearchQuery.java
@@ -0,0 +1,160 @@
+/*
+ * Copyright 2022 Orkes, Inc.
+ *
+ * Licensed under the Orkes Community License (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * https://github.com/orkes-io/licenses/blob/main/community/LICENSE.txt
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package io.orkes.conductor.dao.postgres.archive;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+public class SearchQuery {
+
+ private List workflowNames;
+
+ private List taskTypes;
+
+ private List workflowIds;
+
+ private List correlationIds;
+
+ private List taskIds;
+
+ private List statuses;
+
+ private long fromStartTime;
+
+ private long toStartTime;
+
+ private SearchQuery() {}
+
+ public static SearchQuery parse(String query) {
+ SearchQuery searchQuery = new SearchQuery();
+
+ String[] values = query.split(" AND ");
+ for (String value : values) {
+ value = value.trim();
+ if (value.startsWith("workflowId")) {
+ searchQuery.workflowIds = getValues(value);
+ }
+ if (value.startsWith("correlationId")) {
+ searchQuery.correlationIds = getValues(value);
+ } else if (value.startsWith("taskId")) {
+ searchQuery.taskIds = getValues(value);
+ } else if (value.startsWith("workflowType")) {
+ searchQuery.workflowNames = getValues(value);
+ } else if (value.startsWith("taskType")) {
+ searchQuery.taskTypes = getValues(value);
+ } else if (value.startsWith("status")) {
+ searchQuery.statuses = getValues(value);
+ } else if (value.startsWith("startTime")) {
+
+ if (value.contains(">")) {
+
+ String[] kv = value.split(">");
+ if (kv.length > 0) {
+ try {
+ searchQuery.fromStartTime = Long.parseLong(kv[1].trim());
+ } catch (Exception e) {
+ }
+ }
+
+ } else if (value.contains("<")) {
+
+ String[] kv = value.split("<");
+ if (kv.length > 0) {
+ try {
+ searchQuery.toStartTime = Long.parseLong(kv[1].trim());
+ } catch (Exception e) {
+ }
+ }
+ }
+ }
+ }
+ return searchQuery;
+ }
+
+ private static List getValues(String keyValue) {
+ List values = new ArrayList<>();
+ if (keyValue.contains("=")) {
+ String[] kv = keyValue.split("=");
+ if (kv.length > 0) {
+ String value = kv[1].trim();
+ // remove quotes from the start and end
+ value = value.substring(1, value.length() - 1);
+ return Arrays.asList(value.trim());
+ }
+ } else if (keyValue.contains(" IN ")) {
+
+ String[] kv = keyValue.split(" IN ");
+ if (kv.length > 0) {
+ String[] inValues = kv[1].trim().substring(1, kv[1].length() - 1).split(",");
+ for (String inValue : inValues) {
+ values.add(inValue.trim());
+ }
+ }
+ }
+ return values;
+ }
+
+ public List getWorkflowNames() {
+ return workflowNames;
+ }
+
+ public List getWorkflowIds() {
+ return workflowIds;
+ }
+
+ public List getCorrelationIds() {
+ return correlationIds;
+ }
+
+ public List getStatuses() {
+ return statuses;
+ }
+
+ public long getFromStartTime() {
+ return fromStartTime;
+ }
+
+ public long getToStartTime() {
+ if (toStartTime == 0) {
+ toStartTime = System.currentTimeMillis();
+ }
+ return toStartTime;
+ }
+
+ public List getTaskTypes() {
+ return taskTypes;
+ }
+
+ public List getTaskIds() {
+ return taskIds;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ SearchQuery that = (SearchQuery) o;
+ return fromStartTime == that.fromStartTime
+ && toStartTime == that.toStartTime
+ && Objects.equals(workflowNames, that.workflowNames)
+ && Objects.equals(workflowIds, that.workflowIds)
+ && Objects.equals(statuses, that.statuses);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(workflowNames, workflowIds, statuses, fromStartTime, toStartTime);
+ }
+}
diff --git a/archive/src/main/java/io/orkes/conductor/id/TimeBasedUUIDGenerator.java b/archive/src/main/java/io/orkes/conductor/id/TimeBasedUUIDGenerator.java
new file mode 100644
index 0000000..eb27b38
--- /dev/null
+++ b/archive/src/main/java/io/orkes/conductor/id/TimeBasedUUIDGenerator.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2022 Orkes, Inc.
+ *
+ * Licensed under the Orkes Community License (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * https://github.com/orkes-io/licenses/blob/main/community/LICENSE.txt
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package io.orkes.conductor.id;
+
+import java.time.LocalDate;
+import java.util.Calendar;
+import java.util.TimeZone;
+import java.util.UUID;
+
+import org.apache.logging.log4j.core.util.UuidUtil;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.stereotype.Component;
+
+import com.netflix.conductor.core.utils.IDGenerator;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Component
+@ConditionalOnProperty(name = "conductor.id.generator", havingValue = "time_based")
+@Slf4j
+public class TimeBasedUUIDGenerator extends IDGenerator {
+
+ private static final LocalDate JAN_1_2020 = LocalDate.of(2020, 1, 1);
+
+ private static final int uuidLength = UUID.randomUUID().toString().length();
+
+ private static Calendar uuidEpoch = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+
+ private static final long epochMillis;
+
+ static {
+ uuidEpoch.clear();
+ uuidEpoch.set(1582, 9, 15, 0, 0, 0); //
+ epochMillis = uuidEpoch.getTime().getTime();
+ }
+
+ public TimeBasedUUIDGenerator() {
+ log.info("Using TimeBasedUUIDGenerator to generate Ids");
+ }
+
+ public String generate() {
+ UUID uuid = UuidUtil.getTimeBasedUuid();
+ return uuid.toString();
+ }
+
+ public static long getDate(String id) {
+ UUID uuid = UUID.fromString(id);
+ if (uuid.version() != 1) {
+ return 0;
+ }
+ long time = (uuid.timestamp() / 10000L) + epochMillis;
+ return time;
+ }
+}
diff --git a/archive/src/main/java/io/orkes/conductor/metrics/MetricsCollector.java b/archive/src/main/java/io/orkes/conductor/metrics/MetricsCollector.java
new file mode 100644
index 0000000..cd00980
--- /dev/null
+++ b/archive/src/main/java/io/orkes/conductor/metrics/MetricsCollector.java
@@ -0,0 +1,178 @@
+/*
+ * Copyright 2022 Orkes, Inc.
+ *
+ * Licensed under the Orkes Community License (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * https://github.com/orkes-io/licenses/blob/main/community/LICENSE.txt
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package io.orkes.conductor.metrics;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import org.springframework.core.env.Environment;
+import org.springframework.stereotype.Component;
+
+import com.netflix.conductor.model.TaskModel;
+import com.netflix.conductor.model.WorkflowModel;
+import com.netflix.spectator.api.CompositeRegistry;
+import com.netflix.spectator.api.Spectator;
+import com.netflix.spectator.micrometer.MicrometerRegistry;
+
+import io.micrometer.core.instrument.*;
+import io.micrometer.core.instrument.config.MeterFilter;
+import io.micrometer.prometheus.PrometheusRenameFilter;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@Component
+public class MetricsCollector {
+
+ private static final CompositeRegistry spectatorRegistry = Spectator.globalRegistry();
+
+ private final MeterRegistry meterRegistry;
+
+ private static final Map, Counter>> counters = new ConcurrentHashMap<>();
+
+ private static final Map, Timer>> timers = new ConcurrentHashMap<>();
+
+ private static final Map, Gauge>> gauges = new ConcurrentHashMap<>();
+
+ private static double[] percentiles = new double[] {0.5, 0.75, 0.90, 0.95, 0.99};
+
+ private final boolean skipLabels;
+
+ public MetricsCollector(MeterRegistry meterRegistry, Environment env) {
+
+ Boolean skipLabels = env.getProperty("conductor.metrics.skipLabels", Boolean.class);
+ if (skipLabels == null) {
+ this.skipLabels = false;
+ } else {
+ this.skipLabels = skipLabels;
+ }
+
+ this.meterRegistry = meterRegistry;
+ final MicrometerRegistry metricsRegistry = new MicrometerRegistry(this.meterRegistry);
+
+ this.meterRegistry
+ .config()
+ .meterFilter(new PrometheusRenameFilter())
+ .meterFilter(MeterFilter.denyNameStartsWith("task_queue_wait"))
+ .meterFilter(MeterFilter.denyNameStartsWith("dao_payload_size"))
+ .meterFilter(MeterFilter.denyNameStartsWith("dao_requests"))
+ .meterFilter(MeterFilter.denyNameStartsWith("workflow_execution"))
+ .meterFilter(MeterFilter.denyNameStartsWith("tasks_in_workflow"))
+ .meterFilter(MeterFilter.denyNameStartsWith("task_pending_time"))
+ .meterFilter(MeterFilter.denyNameStartsWith("workflow_start_success"))
+ .meterFilter(MeterFilter.denyNameStartsWith("task_execution"));
+
+ if (this.skipLabels) {
+ this.meterRegistry
+ .config()
+ .meterFilter(MeterFilter.denyNameStartsWith("workflow_failure"))
+ .meterFilter(MeterFilter.denyNameStartsWith("workflow_running"));
+ }
+
+ spectatorRegistry.add(metricsRegistry);
+
+ log.info("skipLabels: {}", this.skipLabels);
+ }
+
+ public Timer getTimer(String name, String... additionalTags) {
+ List tags = tags(additionalTags);
+ return timers.computeIfAbsent(name, s -> new ConcurrentHashMap<>())
+ .computeIfAbsent(
+ tags,
+ t -> {
+ Timer.Builder timerBuilder =
+ Timer.builder(name)
+ .description(name)
+ .publishPercentiles(percentiles);
+ if (!this.skipLabels) {
+ timerBuilder = timerBuilder.tags(tags);
+ }
+ return timerBuilder.register(meterRegistry);
+ });
+ }
+
+ public Counter getCounter(String name, String... additionalTags) {
+ List tags = tags(additionalTags);
+ return counters.computeIfAbsent(name, s -> new ConcurrentHashMap<>())
+ .computeIfAbsent(
+ tags,
+ t -> {
+ Counter.Builder builder = Counter.builder(name).description(name);
+ if (!this.skipLabels) {
+ builder = builder.tags(tags);
+ }
+ return builder.register(meterRegistry);
+ });
+ }
+
+ public void recordGauge(String name, Number value, String... additionalTags) {
+ List tags = tags(additionalTags);
+ gauges.computeIfAbsent(name, s -> new ConcurrentHashMap<>())
+ .computeIfAbsent(
+ tags,
+ t -> {
+ Gauge.Builder> builder =
+ Gauge.builder(name, () -> value);
+
+ if (!this.skipLabels) {
+ builder = builder.tags(tags);
+ }
+ return builder.register(meterRegistry);
+ });
+ }
+
+ private void gauge(String name, Number value, String... additionalTags) {
+ Gauge.builder(name, () -> value).register(meterRegistry);
+ }
+
+ public void recordWorkflowComplete(WorkflowModel workflow) {
+ String workflowName = workflow.getWorkflowName();
+ if (skipLabels) {
+ workflowName = "None";
+ }
+ long duration = workflow.getEndTime() - workflow.getCreateTime();
+ getTimer(
+ "workflow_completed",
+ "workflowName",
+ workflowName,
+ "status",
+ workflow.getStatus().toString())
+ .record(duration, TimeUnit.MILLISECONDS);
+ }
+
+ public void recordTaskComplete(TaskModel task) {
+ String taskType = task.getTaskDefName();
+ if (skipLabels) {
+ taskType = "None";
+ }
+ getTimer("task_completed", "taskType", taskType, "status", task.getStatus().toString())
+ .record((task.getEndTime() - task.getStartTime()), TimeUnit.MILLISECONDS);
+ }
+
+ private static List tags(String... additionalTags) {
+ List tags = new ArrayList<>();
+
+ for (int j = 0; j < additionalTags.length - 1; j++) {
+ String tk = additionalTags[j];
+ String tv = "" + additionalTags[j + 1];
+ if (!tv.isEmpty()) {
+ tags.add(Tag.of(tk, tv));
+ }
+ j++;
+ }
+ return tags;
+ }
+}
diff --git a/archive/src/main/resources/db/migration_archive_postgres/V99__initial_schema.sql b/archive/src/main/resources/db/migration_archive_postgres/V99__initial_schema.sql
new file mode 100644
index 0000000..64a5bb5
--- /dev/null
+++ b/archive/src/main/resources/db/migration_archive_postgres/V99__initial_schema.sql
@@ -0,0 +1,26 @@
+-- Workflow
+CREATE TABLE workflow_archive (
+ workflow_id varchar(255) NOT NULL,
+ created_on bigint,
+ modified_on bigint,
+ created_by varchar(255),
+ correlation_id varchar(255),
+ workflow_name varchar(255),
+ status varchar(255),
+ json_data TEXT,
+ index_data text[],
+ PRIMARY KEY (workflow_id)
+) with (autovacuum_vacuum_scale_factor = 0.0, autovacuum_vacuum_threshold = 10000);
+
+CREATE INDEX workflow_archive_workflow_name_index ON workflow_archive (workflow_name, status, correlation_id, created_on);
+CREATE INDEX workflow_archive_search_index ON workflow_archive USING gin(index_data);
+CREATE INDEX workflow_archive_created_on_index ON workflow_archive (created_on desc);
+
+-- Task Logs
+CREATE TABLE task_logs (
+ task_id varchar(255) NOT NULL,
+ seq bigserial,
+ log TEXT,
+ created_on bigint,
+ PRIMARY KEY (task_id, seq)
+);
\ No newline at end of file
diff --git a/archive/src/test/java/io/orkes/conductor/dao/archive/TestScheduler.java b/archive/src/test/java/io/orkes/conductor/dao/archive/TestScheduler.java
new file mode 100644
index 0000000..eabb005
--- /dev/null
+++ b/archive/src/test/java/io/orkes/conductor/dao/archive/TestScheduler.java
@@ -0,0 +1,112 @@
+/*
+ * Copyright 2022 Orkes, Inc.
+ *
+ * Licensed under the Orkes Community License (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * https://github.com/orkes-io/licenses/blob/main/community/LICENSE.txt
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package io.orkes.conductor.dao.archive;
+
+import java.text.DecimalFormat;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.springframework.boot.web.client.RestTemplateBuilder;
+import org.springframework.http.*;
+import org.springframework.web.client.RestClientException;
+import org.springframework.web.client.RestTemplate;
+
+import com.netflix.conductor.common.metadata.workflow.StartWorkflowRequest;
+
+public class TestScheduler {
+
+ public static void main(String[] args) throws InterruptedException {
+ RestTemplate rt =
+ new RestTemplateBuilder()
+ .setReadTimeout(Duration.ofMillis(7000))
+ .setConnectTimeout(Duration.ofMillis(7000))
+ .build();
+
+ StartWorkflowRequest request = new StartWorkflowRequest();
+ request.setName("first_flow_all_simple");
+ DecimalFormat df = new DecimalFormat("000");
+ ExecutorService es = Executors.newFixedThreadPool(100);
+ int start = 1;
+ int maxSet = 150;
+ String host = "https://perf6.conductorworkflow.net/";
+ CountDownLatch latch = new CountDownLatch(maxSet);
+ HttpHeaders headers = new HttpHeaders();
+ headers.put(
+ "X-Authorization",
+ Collections.singletonList(
+ "eyJhbGciOiJkaXIiLCJlbmMiOiJBMjU2R0NNIiwiaXNzIjoiaHR0cHM6Ly9vcmtlcy10ZXN0LnVzLmF1dGgwLmNvbS8ifQ..Hhotod4LBfYqEZBc.AFl2Xa_AYk4NaWJzakRjuMx_wpmj-RSLaf7RzzFZxdo35jW3fXzA7RQ8pOevieLWZKgMRYPc6FWdyoFbIZXFUYZcQPUk83UzsigXQSB4g1PUUslmxv6betHQtBNt0PYaNLzwI5PF7QkzNWEdeIPa2b-IsTXniagk_fFeTRJmMdPmfqDDqGmU6kd1v3M12JOqVp5hNXGJirErz-9tB8uOySPXFVbiTCbz8mk_JA-B4LTUWzPzdyE6J7QqSHmsqjQZfkPNpCTYnEF958xLn1x3vZ2K9d84YYSYQTPU_ce3lZJeI3RbfoOp2fQL6KWzPIcPujRh.NdgKCYUDzhIZzfHinmMQdg"));
+ headers.setContentType(MediaType.APPLICATION_JSON);
+
+ for (int i = 0; i < 60; i++) {
+ int finalI = i;
+ es.submit(
+ () -> {
+ try {
+ for (int j = start; j <= maxSet; j++) {
+ Map sp = new HashMap<>();
+ sp.put(
+ "name",
+ String.format(
+ "test_second_%s_%s",
+ df.format(j), df.format(finalI)));
+ System.out.println(
+ String.format(
+ "test_second_%s_%s",
+ df.format(j), df.format(finalI)));
+
+ // rt.exchange("https://perf6.conductorworkflow.net/api/scheduler/schedules/" + sp.get("name"), HttpMethod.DELETE, new HttpEntity<>(headers), Void.class);
+
+ sp.put("cronExpression", String.format("%s * * ? * *", finalI));
+ sp.put("startWorkflowRequest", request);
+ HttpEntity