diff --git a/README.md b/README.md index 1a66ba7..966bc67 100644 --- a/README.md +++ b/README.md @@ -71,8 +71,8 @@ docker pull orkesio/orkes-conductor-community:latest * **Group:** `io.orkes.conductor` * **Artifacts:** `orkes-conductor-community-{server,persistence,archive}` -| Artifact | Gradle | -|-------------|-----------------------------------------------------------------------------------| +| Artifact | Gradle | +|-------------|-------------------------------------------------------------------------------------| | server | `implementation 'io.orkes.conductor:orkes-conductor-community-server:VERSION'` | | persistence | `implementation 'io.orkes.conductor:orkes-conductor-community-persistence:VERSION'` | | archive | `implementation 'io.orkes.conductor:orkes-conductor-community-archive:VERSION'` | @@ -88,7 +88,7 @@ Use GitHub issue tracking for filing issues and Discussion Forum for any other q [Orkes](http://orkes.io) development team creates and maintains the Orkes-Conductor releases. ## License -Copyright 2022 Orkes, Inc +Copyright 2023 Orkes, Inc Licensed under Orkes Community License. You may obtain a copy of the License at: ``` diff --git a/archive/build.gradle b/archive/build.gradle index 60b4d64..1d7631f 100644 --- a/archive/build.gradle +++ b/archive/build.gradle @@ -43,7 +43,8 @@ dependencies { testImplementation "org.testcontainers:postgresql:${versions.revTestContainer}" //Fake data generator - testImplementation "com.github.javafaker:javafaker:1.0.2" + testImplementation ('com.github.javafaker:javafaker:1.0.2') { exclude module: 'snakeyaml' } +// testImplementation group: 'org.yaml', name: 'snakeyaml', version: '2.2' } test { diff --git a/archive/src/main/java/io/orkes/conductor/dao/archive/ArchivedIndexDAO.java b/archive/src/main/java/io/orkes/conductor/dao/archive/ArchivedIndexDAO.java index 6a2ac24..a612d6b 100644 --- a/archive/src/main/java/io/orkes/conductor/dao/archive/ArchivedIndexDAO.java +++ b/archive/src/main/java/io/orkes/conductor/dao/archive/ArchivedIndexDAO.java @@ -13,7 +13,9 @@ package io.orkes.conductor.dao.archive; import java.util.List; +import java.util.Objects; import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Primary; @@ -26,6 +28,7 @@ import com.netflix.conductor.common.run.WorkflowSummary; import com.netflix.conductor.core.events.queue.Message; import com.netflix.conductor.dao.IndexDAO; +import com.netflix.conductor.model.WorkflowModel; import lombok.extern.slf4j.Slf4j; @@ -68,6 +71,53 @@ public SearchResult searchWorkflows( return archiveDAO.searchWorkflows(query, freeText, start, count); } + @Override + public SearchResult searchWorkflowSummary( + String query, String freeText, int start, int count, List sort) { + ScrollableSearchResult results = + archiveDAO.searchWorkflows(query, freeText, start, count); + List workflowSummaryList = + results.getResults().stream() + .map(wfId -> archiveDAO.getWorkflow(wfId, false)) + .filter(Objects::nonNull) + .map(this::convertToWorkflowSummary) + .collect(Collectors.toList()); + return new SearchResult<>(results.getTotalHits(), workflowSummaryList); + } + + private WorkflowSummary convertToWorkflowSummary(WorkflowModel wfModel) { + return new WorkflowSummary(wfModel.toWorkflow()); + } + + @Override + public CompletableFuture asyncRemoveTask(String workflowId, String taskId) { + log.debug("Task index is not maintained in this environment"); + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletableFuture asyncUpdateTask( + String workflowId, String taskId, String[] keys, Object[] values) { + log.debug("Task index is not maintained in this environment"); + return CompletableFuture.completedFuture(null); + } + + @Override + public void updateTask(String workflowId, String taskId, String[] keys, Object[] values) { + throw new UnsupportedOperationException("Task index is not maintained in this environment"); + } + + @Override + public void removeTask(String workflowId, String taskId) { + throw new UnsupportedOperationException("Task index is not maintained in this environment"); + } + + @Override + public SearchResult searchTaskSummary( + String query, String freeText, int start, int count, List sort) { + throw new UnsupportedOperationException("Task search is not supported in this environment"); + } + @Override public SearchResult searchTasks( String query, String freeText, int start, int count, List sort) { diff --git a/archive/src/test/java/io/orkes/conductor/dao/postgres/PostgresDAOTestUtil.java b/archive/src/test/java/io/orkes/conductor/dao/postgres/PostgresDAOTestUtil.java index 6fa204d..cd4ca9a 100644 --- a/archive/src/test/java/io/orkes/conductor/dao/postgres/PostgresDAOTestUtil.java +++ b/archive/src/test/java/io/orkes/conductor/dao/postgres/PostgresDAOTestUtil.java @@ -1,5 +1,5 @@ /* - * Copyright 2020 Orkes, Inc. + * Copyright 2023 Orkes, Inc. *

* Licensed under the Orkes Community License (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at diff --git a/build.gradle b/build.gradle index 518799d..da0220e 100644 --- a/build.gradle +++ b/build.gradle @@ -15,20 +15,22 @@ ext { springBootVersion = '2.5.6' versions = [ - revConductor : '3.10.7', + revConductor : '3.13.8', revTestContainer : '1.17.2', - revGuava : '30.0-jre', + revGuava : '32.0.0-jre', log4j : '2.17.1', - revJedis : '3.3.0', + revJedis : '3.8.0', revMockServerClient : '5.12.0', revCommonsLang : '3.12.0', revLombok : '1.18.24', revLucene : '7.7.3', revSpectator : '0.122.0', - revOpenapi : '1.6.11', - revAwsSdk : '1.12.153', - revProtoBuf : '3.13.0', + revJsonPath : '2.8.0', + revOpenapi : '1.7.+', + revAwsSdk : '1.12.549', + revProtoBuf : '3.16.3', revRarefiedRedis : '0.0.17', + revOrkesProtos : '0.9.2', revOrkesQueues : '1.0.6' ] } @@ -73,6 +75,16 @@ subprojects { } dependencies { + + implementation('net.minidev:json-smart') { + version { + strictly '2.4.10' + } + } + + implementation 'com.amazonaws:aws-java-sdk-s3:1.12.548' + implementation "redis.clients:jedis:${versions.revJedis}" + implementation "org.apache.logging.log4j:log4j-core:${versions.log4j}!!" implementation "org.apache.logging.log4j:log4j-api:${versions.log4j}!!" implementation "org.apache.logging.log4j:log4j-slf4j-impl:${versions.log4j}!!" @@ -86,6 +98,25 @@ subprojects { implementation "org.apache.commons:commons-lang3:${versions.revCommonsLang}" } + allprojects { + configurations.all { + resolutionStrategy.eachDependency { DependencyResolveDetails details -> + if (details.requested.group == 'com.fasterxml.jackson.core') { + details.useVersion '2.15.2' + } + if (details.requested.group == 'com.fasterxml.jackson.dataformat') { + details.useVersion '2.15.2' + } + if (details.requested.group == 'org.yaml') { + details.useVersion '2.2' + } + if (details.requested.group == 'io.netty' && details.requested.version == '4.1.70.Final') { + details.useVersion "4.1.94.Final" + } + } + } + } + dependencyManagement { imports { mavenBom(org.springframework.boot.gradle.plugin.SpringBootPlugin.BOM_COORDINATES) diff --git a/docker/DockerfileStandalone b/docker/DockerfileStandalone index 8cc2aa2..6055c38 100644 --- a/docker/DockerfileStandalone +++ b/docker/DockerfileStandalone @@ -1,4 +1,4 @@ -FROM alpine:3.16.2 +FROM alpine:3.18.3 MAINTAINER Orkes Inc # Install software required to run conductor stack diff --git a/persistence/src/main/java/com/netflix/conductor/redis/config/AnyRedisCondition.java b/persistence/src/main/java/com/netflix/conductor/redis/config/AnyRedisCondition.java index 0789d72..70d1d47 100644 --- a/persistence/src/main/java/com/netflix/conductor/redis/config/AnyRedisCondition.java +++ b/persistence/src/main/java/com/netflix/conductor/redis/config/AnyRedisCondition.java @@ -1,5 +1,5 @@ /* - * Copyright 2020 Orkes, Inc. + * Copyright 2023 Orkes, Inc. *

* Licensed under the Orkes Community License (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at diff --git a/persistence/src/main/java/com/netflix/conductor/redis/config/InMemoryRedisConfiguration.java b/persistence/src/main/java/com/netflix/conductor/redis/config/InMemoryRedisConfiguration.java index 8913bc2..444951a 100644 --- a/persistence/src/main/java/com/netflix/conductor/redis/config/InMemoryRedisConfiguration.java +++ b/persistence/src/main/java/com/netflix/conductor/redis/config/InMemoryRedisConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2020 Orkes, Inc. + * Copyright 2023 Orkes, Inc. *

* Licensed under the Orkes Community License (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at @@ -45,6 +45,7 @@ public JedisMock jedisMock() { @Bean public JedisCommands jedisCommands() { + //noinspection SpringConfigurationProxyMethods return new JedisStandalone(jedisPool()); } @@ -62,6 +63,7 @@ public Jedis getResource() { @Bean public OrkesJedisProxy OrkesJedisProxy() { System.out.println("OrkesJedisProxy created"); + //noinspection SpringConfigurationProxyMethods return new OrkesJedisProxy(jedisPool()); } } diff --git a/persistence/src/main/java/com/netflix/conductor/redis/config/RedisClusterConfiguration.java b/persistence/src/main/java/com/netflix/conductor/redis/config/RedisClusterConfiguration.java index 1f5105b..d7dc9bc 100644 --- a/persistence/src/main/java/com/netflix/conductor/redis/config/RedisClusterConfiguration.java +++ b/persistence/src/main/java/com/netflix/conductor/redis/config/RedisClusterConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2020 Orkes, Inc. + * Copyright 2023 Orkes, Inc. *

* Licensed under the Orkes Community License (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at @@ -28,6 +28,7 @@ import com.netflix.dyno.connectionpool.Host; import redis.clients.jedis.HostAndPort; +import redis.clients.jedis.Jedis; import redis.clients.jedis.Protocol; @Configuration(proxyBeanMethods = false) @@ -41,7 +42,7 @@ public class RedisClusterConfiguration { @Bean public JedisCluster getJedisCluster(RedisProperties properties) { - GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig<>(); + GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig<>(); genericObjectPoolConfig.setMaxTotal(properties.getMaxConnectionsPerHost()); ConfigurationHostSupplier hostSupplier = new ConfigurationHostSupplier(properties); Set hosts = diff --git a/persistence/src/main/java/com/netflix/conductor/redis/config/RedisSentinelConfiguration.java b/persistence/src/main/java/com/netflix/conductor/redis/config/RedisSentinelConfiguration.java index 029ede8..6fb02a2 100644 --- a/persistence/src/main/java/com/netflix/conductor/redis/config/RedisSentinelConfiguration.java +++ b/persistence/src/main/java/com/netflix/conductor/redis/config/RedisSentinelConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2020 Orkes, Inc. + * Copyright 2023 Orkes, Inc. *

* Licensed under the Orkes Community License (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at @@ -27,6 +27,7 @@ import com.netflix.conductor.redis.jedis.JedisSentinel; import com.netflix.dyno.connectionpool.Host; +import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisSentinelPool; import redis.clients.jedis.Protocol; @@ -38,7 +39,7 @@ public class RedisSentinelConfiguration { @Bean protected JedisSentinel getJedisSentinel(RedisProperties properties) { - GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig<>(); + GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig<>(); genericObjectPoolConfig.setMinIdle(properties.getMinIdleConnections()); genericObjectPoolConfig.setMaxIdle(properties.getMaxIdleConnections()); genericObjectPoolConfig.setMaxTotal(properties.getMaxConnectionsPerHost()); diff --git a/persistence/src/main/java/com/netflix/conductor/redis/config/RedisStandaloneConfiguration.java b/persistence/src/main/java/com/netflix/conductor/redis/config/RedisStandaloneConfiguration.java index 5f61fdd..e5b8624 100644 --- a/persistence/src/main/java/com/netflix/conductor/redis/config/RedisStandaloneConfiguration.java +++ b/persistence/src/main/java/com/netflix/conductor/redis/config/RedisStandaloneConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2020 Orkes, Inc. + * Copyright 2023 Orkes, Inc. *

* Licensed under the Orkes Community License (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at diff --git a/persistence/src/main/java/com/netflix/conductor/redis/dao/RedisMetadataDAO.java b/persistence/src/main/java/com/netflix/conductor/redis/dao/RedisMetadataDAO.java index c5f5572..ca9a796 100644 --- a/persistence/src/main/java/com/netflix/conductor/redis/dao/RedisMetadataDAO.java +++ b/persistence/src/main/java/com/netflix/conductor/redis/dao/RedisMetadataDAO.java @@ -283,6 +283,27 @@ public List getAllWorkflowDefs() { return workflows; } + @Override + public List getAllWorkflowDefsLatestVersions() { + List workflows = new LinkedList<>(); + + // Get all definitions latest versions from WORKFLOW_DEF_NAMES + recordRedisDaoRequests("getAllWorkflowLatestVersionsDefs"); + Set wfNames = orkesJedisProxy.smembers(nsKey(WORKFLOW_DEF_NAMES)); + int size = 0; + // Place all workflows into the Priority Queue. The PQ will allow us to grab the latest + // version of the workflows. + for (String wfName : wfNames) { + WorkflowDef def = getLatestWorkflowDef(wfName).orElse(null); + if (def != null) { + workflows.add(def); + size += def.toString().length(); + } + } + recordRedisDaoPayloadSize("getAllWorkflowLatestVersionsDefs", size, "n/a", "n/a"); + return workflows; + } + private void _createOrUpdate(WorkflowDef workflowDef) { if (isNull(workflowDef.getUpdateTime())) { workflowDef.setUpdateTime(System.currentTimeMillis()); diff --git a/persistence/src/main/java/com/netflix/conductor/redis/jedis/JedisCluster.java b/persistence/src/main/java/com/netflix/conductor/redis/jedis/JedisCluster.java index fc989bf..e3226e9 100644 --- a/persistence/src/main/java/com/netflix/conductor/redis/jedis/JedisCluster.java +++ b/persistence/src/main/java/com/netflix/conductor/redis/jedis/JedisCluster.java @@ -1,5 +1,5 @@ /* - * Copyright 2020 Orkes, Inc. + * Copyright 2023 Orkes, Inc. *

* Licensed under the Orkes Community License (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at @@ -19,26 +19,11 @@ import java.util.Set; import java.util.stream.Collectors; -import redis.clients.jedis.BitPosParams; -import redis.clients.jedis.GeoCoordinate; -import redis.clients.jedis.GeoRadiusResponse; -import redis.clients.jedis.GeoUnit; -import redis.clients.jedis.ListPosition; -import redis.clients.jedis.ScanParams; -import redis.clients.jedis.ScanResult; -import redis.clients.jedis.SortingParams; -import redis.clients.jedis.StreamConsumersInfo; -import redis.clients.jedis.StreamEntry; -import redis.clients.jedis.StreamEntryID; -import redis.clients.jedis.StreamGroupInfo; -import redis.clients.jedis.StreamInfo; -import redis.clients.jedis.StreamPendingEntry; -import redis.clients.jedis.Tuple; +import redis.clients.jedis.*; import redis.clients.jedis.commands.JedisCommands; -import redis.clients.jedis.params.GeoRadiusParam; -import redis.clients.jedis.params.SetParams; -import redis.clients.jedis.params.ZAddParams; -import redis.clients.jedis.params.ZIncrByParams; +import redis.clients.jedis.params.*; +import redis.clients.jedis.resps.KeyedListElement; +import redis.clients.jedis.resps.LCSMatchResult; public class JedisCluster implements JedisCommands { @@ -950,4 +935,204 @@ public List xinfoGroup(String key) { public List xinfoConsumers(String key, String group) { return null; } + + @Override + public String getDel(String key) { + return jedisCluster.getDel(key); + } + + @Override + public String getEx(String key, GetExParams params) { + return jedisCluster.getEx(key, params); + } + + @Override + public String restore(String key, long ttl, byte[] serializedValue) { + return jedisCluster.restore(key, ttl, serializedValue); + } + + @Override + public String restoreReplace(String key, long ttl, byte[] serializedValue) { + throw new UnsupportedOperationException(); + } + + @Override + public String restore(String key, long ttl, byte[] serializedValue, RestoreParams params) { + return jedisCluster.restore(key, ttl, serializedValue, params); + } + + @Override + public Long expire(String key, long seconds) { + return jedisCluster.expire(key, seconds); + } + + @Override + public String setex(String key, long seconds, String value) { + return jedisCluster.setex(key, seconds, value); + } + + @Override + public String hrandfield(String key) { + return jedisCluster.hrandfield(key); + } + + @Override + public List hrandfield(String key, long count) { + return jedisCluster.hrandfield(key, count); + } + + @Override + public Map hrandfieldWithValues(String key, long count) { + return jedisCluster.hrandfieldWithValues(key, count); + } + + @Override + public List lpop(String key, int count) { + return jedisCluster.lpop(key, count); + } + + @Override + public Long lpos(String key, String element) { + return jedisCluster.lpos(key, element); + } + + @Override + public Long lpos(String key, String element, LPosParams params) { + return jedisCluster.lpos(key, element, params); + } + + @Override + public List lpos(String key, String element, LPosParams params, long count) { + return jedisCluster.lpos(key, element, params, count); + } + + @Override + public List rpop(String key, int count) { + return jedisCluster.rpop(key, count); + } + + @Override + public List smismember(String key, String... members) { + return jedisCluster.smismember(key, members); + } + + @Override + public Double zaddIncr(String key, double score, String member, ZAddParams params) { + return jedisCluster.zaddIncr(key, score, member, params); + } + + @Override + public String zrandmember(String key) { + return jedisCluster.zrandmember(key); + } + + @Override + public Set zrandmember(String key, long count) { + return jedisCluster.zrandmember(key, count); + } + + @Override + public Set zrandmemberWithScores(String key, long count) { + return jedisCluster.zrandmemberWithScores(key, count); + } + + @Override + public List zmscore(String key, String... members) { + return null; + } + + @Override + public KeyedListElement blpop(double timeout, String key) { + return null; + } + + @Override + public KeyedListElement brpop(double timeout, String key) { + return null; + } + + @Override + public Long geoadd( + String key, GeoAddParams params, Map memberCoordinateMap) { + return null; + } + + @Override + public StreamEntryID xadd(String key, Map hash, XAddParams params) { + return null; + } + + @Override + public List xrange(String key, StreamEntryID start, StreamEntryID end) { + return null; + } + + @Override + public List xrevrange(String key, StreamEntryID end, StreamEntryID start) { + return null; + } + + @Override + public StreamPendingSummary xpending(String key, String groupname) { + return null; + } + + @Override + public List xpending(String key, String groupname, XPendingParams params) { + return null; + } + + @Override + public long xtrim(String key, XTrimParams params) { + return 0; + } + + @Override + public List xclaim( + String key, + String group, + String consumername, + long minIdleTime, + XClaimParams params, + StreamEntryID... ids) { + return null; + } + + @Override + public List xclaimJustId( + String key, + String group, + String consumername, + long minIdleTime, + XClaimParams params, + StreamEntryID... ids) { + return null; + } + + @Override + public Entry> xautoclaim( + String key, + String group, + String consumerName, + long minIdleTime, + StreamEntryID start, + XAutoClaimParams params) { + return null; + } + + @Override + public Entry> xautoclaimJustId( + String key, + String group, + String consumerName, + long minIdleTime, + StreamEntryID start, + XAutoClaimParams params) { + return null; + } + + @Override + public LCSMatchResult strAlgoLCSStrings(String strA, String strB, StrAlgoLCSParams params) { + return null; + } } diff --git a/persistence/src/main/java/com/netflix/conductor/redis/jedis/JedisMock.java b/persistence/src/main/java/com/netflix/conductor/redis/jedis/JedisMock.java index 73f534c..b67f2ab 100644 --- a/persistence/src/main/java/com/netflix/conductor/redis/jedis/JedisMock.java +++ b/persistence/src/main/java/com/netflix/conductor/redis/jedis/JedisMock.java @@ -1,5 +1,5 @@ /* - * Copyright 2020 Orkes, Inc. + * Copyright 2023 Orkes, Inc. *

* Licensed under the Orkes Community License (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at diff --git a/persistence/src/main/java/com/netflix/conductor/redis/jedis/JedisSentinel.java b/persistence/src/main/java/com/netflix/conductor/redis/jedis/JedisSentinel.java index 7e305b0..10cf04d 100644 --- a/persistence/src/main/java/com/netflix/conductor/redis/jedis/JedisSentinel.java +++ b/persistence/src/main/java/com/netflix/conductor/redis/jedis/JedisSentinel.java @@ -1,5 +1,5 @@ /* - * Copyright 2020 Orkes, Inc. + * Copyright 2023 Orkes, Inc. *

* Licensed under the Orkes Community License (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at @@ -17,28 +17,11 @@ import java.util.Map.Entry; import java.util.Set; -import redis.clients.jedis.BitPosParams; -import redis.clients.jedis.GeoCoordinate; -import redis.clients.jedis.GeoRadiusResponse; -import redis.clients.jedis.GeoUnit; -import redis.clients.jedis.Jedis; -import redis.clients.jedis.JedisPoolAbstract; -import redis.clients.jedis.ListPosition; -import redis.clients.jedis.ScanParams; -import redis.clients.jedis.ScanResult; -import redis.clients.jedis.SortingParams; -import redis.clients.jedis.StreamConsumersInfo; -import redis.clients.jedis.StreamEntry; -import redis.clients.jedis.StreamEntryID; -import redis.clients.jedis.StreamGroupInfo; -import redis.clients.jedis.StreamInfo; -import redis.clients.jedis.StreamPendingEntry; -import redis.clients.jedis.Tuple; +import redis.clients.jedis.*; import redis.clients.jedis.commands.JedisCommands; -import redis.clients.jedis.params.GeoRadiusParam; -import redis.clients.jedis.params.SetParams; -import redis.clients.jedis.params.ZAddParams; -import redis.clients.jedis.params.ZIncrByParams; +import redis.clients.jedis.params.*; +import redis.clients.jedis.resps.KeyedListElement; +import redis.clients.jedis.resps.LCSMatchResult; public class JedisSentinel implements JedisCommands { @@ -69,6 +52,269 @@ public String get(String key) { } } + @Override + public String getDel(String key) { + try (Jedis jedis = jedisPool.getResource()) { + return jedis.getDel(key); + } + } + + @Override + public String getEx(String key, GetExParams params) { + try (Jedis jedis = jedisPool.getResource()) { + return jedis.getEx(key, params); + } + } + + @Override + public String restore(String key, long ttl, byte[] serializedValue) { + try (Jedis jedis = jedisPool.getResource()) { + return jedis.restore(key, ttl, serializedValue); + } + } + + @Override + public String restore(String key, long ttl, byte[] serializedValue, RestoreParams params) { + try (Jedis jedis = jedisPool.getResource()) { + return jedis.restore(key, ttl, serializedValue, params); + } + } + + @Override + public Long expire(String key, long seconds) { + try (Jedis jedis = jedisPool.getResource()) { + return jedis.expire(key, seconds); + } + } + + @Override + public String hrandfield(String key) { + try (Jedis jedis = jedisPool.getResource()) { + return jedis.hrandfield(key); + } + } + + @Override + public List hrandfield(String key, long count) { + try (Jedis jedis = jedisPool.getResource()) { + return jedis.hrandfield(key, count); + } + } + + @Override + public Map hrandfieldWithValues(String key, long count) { + try (Jedis jedis = jedisPool.getResource()) { + return jedis.hrandfieldWithValues(key, count); + } + } + + @Override + public List lpop(String key, int count) { + try (Jedis jedis = jedisPool.getResource()) { + return jedis.lpop(key, count); + } + } + + @Override + public Long lpos(String key, String element) { + try (Jedis jedis = jedisPool.getResource()) { + return jedis.lpos(key, element); + } + } + + @Override + public Long lpos(String key, String element, LPosParams params) { + try (Jedis jedis = jedisPool.getResource()) { + return jedis.lpos(key, element, params); + } + } + + @Override + public List lpos(String key, String element, LPosParams params, long count) { + try (Jedis jedis = jedisPool.getResource()) { + return jedis.lpos(key, element, params, count); + } + } + + @Override + public String setex(String key, long seconds, String value) { + try (Jedis jedis = jedisPool.getResource()) { + return jedis.setex(key, seconds, value); + } + } + + @Override + public List rpop(String key, int count) { + try (Jedis jedis = jedisPool.getResource()) { + return jedis.rpop(key, count); + } + } + + @Override + public List smismember(String key, String... members) { + try (Jedis jedis = jedisPool.getResource()) { + return jedis.smismember(key, members); + } + } + + @Override + public Double zaddIncr(String key, double score, String member, ZAddParams params) { + try (Jedis jedis = jedisPool.getResource()) { + return jedis.zaddIncr(key, score, member, params); + } + } + + @Override + public String zrandmember(String key) { + try (Jedis jedis = jedisPool.getResource()) { + return jedis.zrandmember(key); + } + } + + @Override + public Set zrandmember(String key, long count) { + try (Jedis jedis = jedisPool.getResource()) { + return jedis.zrandmember(key, count); + } + } + + @Override + public Set zrandmemberWithScores(String key, long count) { + try (Jedis jedis = jedisPool.getResource()) { + return jedis.zrandmemberWithScores(key, count); + } + } + + @Override + public List zmscore(String key, String... members) { + try (Jedis jedis = jedisPool.getResource()) { + return jedis.zmscore(key, members); + } + } + + @Override + public KeyedListElement blpop(double timeout, String key) { + try (Jedis jedis = jedisPool.getResource()) { + return jedis.blpop(timeout, key); + } + } + + @Override + public KeyedListElement brpop(double timeout, String key) { + try (Jedis jedis = jedisPool.getResource()) { + return jedis.brpop(timeout, key); + } + } + + @Override + public Long geoadd( + String key, GeoAddParams params, Map memberCoordinateMap) { + try (Jedis jedis = jedisPool.getResource()) { + return jedis.geoadd(key, params, memberCoordinateMap); + } + } + + @Override + public StreamEntryID xadd(String key, Map hash, XAddParams params) { + try (Jedis jedis = jedisPool.getResource()) { + return jedis.xadd(key, hash, params); + } + } + + @Override + public List xrange(String key, StreamEntryID start, StreamEntryID end) { + try (Jedis jedis = jedisPool.getResource()) { + return jedis.xrange(key, start, end); + } + } + + @Override + public List xrevrange(String key, StreamEntryID end, StreamEntryID start) { + try (Jedis jedis = jedisPool.getResource()) { + return jedis.xrevrange(key, end, start); + } + } + + @Override + public StreamPendingSummary xpending(String key, String groupname) { + try (Jedis jedis = jedisPool.getResource()) { + return jedis.xpending(key, groupname); + } + } + + @Override + public List xpending(String key, String groupname, XPendingParams params) { + try (Jedis jedis = jedisPool.getResource()) { + return jedis.xpending(key, groupname, params); + } + } + + @Override + public long xtrim(String key, XTrimParams params) { + try (Jedis jedis = jedisPool.getResource()) { + return jedis.xtrim(key, params); + } + } + + @Override + public List xclaim( + String key, + String group, + String consumername, + long minIdleTime, + XClaimParams params, + StreamEntryID... ids) { + try (Jedis jedis = jedisPool.getResource()) { + return jedis.xclaim(key, group, consumername, minIdleTime, params, ids); + } + } + + @Override + public List xclaimJustId( + String key, + String group, + String consumername, + long minIdleTime, + XClaimParams params, + StreamEntryID... ids) { + try (Jedis jedis = jedisPool.getResource()) { + return jedis.xclaimJustId(key, group, consumername, minIdleTime, params, ids); + } + } + + @Override + public Entry> xautoclaim( + String key, + String group, + String consumerName, + long minIdleTime, + StreamEntryID start, + XAutoClaimParams params) { + try (Jedis jedis = jedisPool.getResource()) { + return jedis.xautoclaim(key, group, consumerName, minIdleTime, start, params); + } + } + + @Override + public Entry> xautoclaimJustId( + String key, + String group, + String consumerName, + long minIdleTime, + StreamEntryID start, + XAutoClaimParams params) { + try (Jedis jedis = jedisPool.getResource()) { + return jedis.xautoclaimJustId(key, group, consumerName, minIdleTime, start, params); + } + } + + @Override + public LCSMatchResult strAlgoLCSStrings(String strA, String strB, StrAlgoLCSParams params) { + try (Jedis jedis = jedisPool.getResource()) { + return jedis.strAlgoLCSStrings(strA, strB, params); + } + } + @Override public Boolean exists(String key) { try (Jedis jedis = jedisPool.getResource()) { @@ -1273,4 +1519,11 @@ public List xinfoConsumers(String key, String group) { return jedis.xinfoConsumers(key, group); } } + + @Override + public String restoreReplace(String key, long ttl, byte[] serializedValue) { + try (Jedis jedis = jedisPool.getResource()) { + return jedis.restoreReplace(key, ttl, serializedValue); + } + } } diff --git a/persistence/src/main/java/com/netflix/conductor/redis/jedis/JedisStandalone.java b/persistence/src/main/java/com/netflix/conductor/redis/jedis/JedisStandalone.java index 12c360a..aefa546 100644 --- a/persistence/src/main/java/com/netflix/conductor/redis/jedis/JedisStandalone.java +++ b/persistence/src/main/java/com/netflix/conductor/redis/jedis/JedisStandalone.java @@ -1,5 +1,5 @@ /* - * Copyright 2020 Orkes, Inc. + * Copyright 2023 Orkes, Inc. *

* Licensed under the Orkes Community License (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at @@ -20,28 +20,11 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; -import redis.clients.jedis.BitPosParams; -import redis.clients.jedis.GeoCoordinate; -import redis.clients.jedis.GeoRadiusResponse; -import redis.clients.jedis.GeoUnit; -import redis.clients.jedis.Jedis; -import redis.clients.jedis.JedisPool; -import redis.clients.jedis.ListPosition; -import redis.clients.jedis.ScanParams; -import redis.clients.jedis.ScanResult; -import redis.clients.jedis.SortingParams; -import redis.clients.jedis.StreamConsumersInfo; -import redis.clients.jedis.StreamEntry; -import redis.clients.jedis.StreamEntryID; -import redis.clients.jedis.StreamGroupInfo; -import redis.clients.jedis.StreamInfo; -import redis.clients.jedis.StreamPendingEntry; -import redis.clients.jedis.Tuple; +import redis.clients.jedis.*; import redis.clients.jedis.commands.JedisCommands; -import redis.clients.jedis.params.GeoRadiusParam; -import redis.clients.jedis.params.SetParams; -import redis.clients.jedis.params.ZAddParams; -import redis.clients.jedis.params.ZIncrByParams; +import redis.clients.jedis.params.*; +import redis.clients.jedis.resps.KeyedListElement; +import redis.clients.jedis.resps.LCSMatchResult; /** A {@link JedisCommands} implementation that delegates to {@link JedisPool}. */ @Component @@ -480,6 +463,212 @@ public Tuple zpopmin(String key) { return executeInJedis(jedis -> jedis.zpopmin(key)); } + @Override + public String getDel(String key) { + return executeInJedis(jedis -> jedis.getDel(key)); + } + + @Override + public String getEx(String key, GetExParams params) { + return executeInJedis(jedis -> jedis.getEx(key, params)); + } + + @Override + public String restore(String key, long ttl, byte[] serializedValue) { + return executeInJedis(jedis -> jedis.restore(key, ttl, serializedValue)); + } + + @Override + public String restoreReplace(String key, long ttl, byte[] serializedValue) { + return executeInJedis(jedis -> jedis.restoreReplace(key, ttl, serializedValue)); + } + + @Override + public String restore(String key, long ttl, byte[] serializedValue, RestoreParams params) { + return executeInJedis(jedis -> jedis.restore(key, ttl, serializedValue, params)); + } + + @Override + public Long expire(String key, long seconds) { + return executeInJedis(jedis -> jedis.expire(key, seconds)); + } + + @Override + public String setex(String key, long seconds, String value) { + return executeInJedis(jedis -> jedis.setex(key, seconds, value)); + } + + @Override + public String hrandfield(String key) { + return executeInJedis(jedis -> jedis.hrandfield(key)); + } + + @Override + public List hrandfield(String key, long count) { + return executeInJedis(jedis -> jedis.hrandfield(key, count)); + } + + @Override + public Map hrandfieldWithValues(String key, long count) { + return executeInJedis(jedis -> jedis.hrandfieldWithValues(key, count)); + } + + @Override + public List lpop(String key, int count) { + return executeInJedis(jedis -> jedis.lpop(key, count)); + } + + @Override + public Long lpos(String key, String element) { + return executeInJedis(jedis -> jedis.lpos(key, element)); + } + + @Override + public Long lpos(String key, String element, LPosParams params) { + return executeInJedis(jedis -> jedis.lpos(key, element, params)); + } + + @Override + public List lpos(String key, String element, LPosParams params, long count) { + return executeInJedis(jedis -> jedis.lpos(key, element, params, count)); + } + + @Override + public List rpop(String key, int count) { + return executeInJedis(jedis -> jedis.rpop(key, count)); + } + + @Override + public List smismember(String key, String... members) { + return executeInJedis(jedis -> jedis.smismember(key, members)); + } + + @Override + public Double zaddIncr(String key, double score, String member, ZAddParams params) { + return executeInJedis(jedis -> jedis.zaddIncr(key, score, member, params)); + } + + @Override + public String zrandmember(String key) { + return executeInJedis(jedis -> jedis.zrandmember(key)); + } + + @Override + public Set zrandmember(String key, long count) { + return executeInJedis(jedis -> jedis.zrandmember(key, count)); + } + + @Override + public Set zrandmemberWithScores(String key, long count) { + return executeInJedis(jedis -> jedis.zrandmemberWithScores(key, count)); + } + + @Override + public List zmscore(String key, String... members) { + return executeInJedis(jedis -> jedis.zmscore(key, members)); + } + + @Override + public KeyedListElement blpop(double timeout, String key) { + return executeInJedis(jedis -> jedis.blpop(timeout, key)); + } + + @Override + public KeyedListElement brpop(double timeout, String key) { + return executeInJedis(jedis -> jedis.brpop(timeout, key)); + } + + @Override + public Long geoadd( + String key, GeoAddParams params, Map memberCoordinateMap) { + return executeInJedis(jedis -> jedis.geoadd(key, params, memberCoordinateMap)); + } + + @Override + public StreamEntryID xadd(String key, Map hash, XAddParams params) { + return executeInJedis(jedis -> jedis.xadd(key, hash, params)); + } + + @Override + public List xrange(String key, StreamEntryID start, StreamEntryID end) { + return executeInJedis(jedis -> jedis.xrange(key, start, end)); + } + + @Override + public List xrevrange(String key, StreamEntryID end, StreamEntryID start) { + return executeInJedis(jedis -> jedis.xrevrange(key, end, start)); + } + + @Override + public StreamPendingSummary xpending(String key, String groupname) { + return executeInJedis(jedis -> jedis.xpending(key, groupname)); + } + + @Override + public List xpending(String key, String groupname, XPendingParams params) { + return executeInJedis(jedis -> jedis.xpending(key, groupname, params)); + } + + @Override + public long xtrim(String key, XTrimParams params) { + return executeInJedis(jedis -> jedis.xtrim(key, params)); + } + + @Override + public List xclaim( + String key, + String group, + String consumername, + long minIdleTime, + XClaimParams params, + StreamEntryID... ids) { + return executeInJedis( + jedis -> jedis.xclaim(key, group, consumername, minIdleTime, params, ids)); + } + + @Override + public List xclaimJustId( + String key, + String group, + String consumername, + long minIdleTime, + XClaimParams params, + StreamEntryID... ids) { + return executeInJedis( + jedis -> jedis.xclaimJustId(key, group, consumername, minIdleTime, params, ids)); + } + + @Override + public Map.Entry> xautoclaim( + String key, + String group, + String consumerName, + long minIdleTime, + StreamEntryID start, + XAutoClaimParams params) { + return executeInJedis( + jedis -> jedis.xautoclaim(key, group, consumerName, minIdleTime, start, params)); + } + + @Override + public Map.Entry> xautoclaimJustId( + String key, + String group, + String consumerName, + long minIdleTime, + StreamEntryID start, + XAutoClaimParams params) { + return executeInJedis( + jedis -> + jedis.xautoclaimJustId( + key, group, consumerName, minIdleTime, start, params)); + } + + @Override + public LCSMatchResult strAlgoLCSStrings(String strA, String strB, StrAlgoLCSParams params) { + return executeInJedis(jedis -> jedis.strAlgoLCSStrings(strA, strB, params)); + } + @Override public Set zpopmin(String key, int count) { return executeInJedis(jedis -> jedis.zpopmin(key, count)); diff --git a/persistence/src/main/java/com/netflix/conductor/redis/jedis/OrkesJedisProxy.java b/persistence/src/main/java/com/netflix/conductor/redis/jedis/OrkesJedisProxy.java index 451222b..6a78f32 100644 --- a/persistence/src/main/java/com/netflix/conductor/redis/jedis/OrkesJedisProxy.java +++ b/persistence/src/main/java/com/netflix/conductor/redis/jedis/OrkesJedisProxy.java @@ -1,5 +1,5 @@ /* - * Copyright 2020 Orkes, Inc. + * Copyright 2023 Orkes, Inc. *

* Licensed under the Orkes Community License (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at diff --git a/persistence/src/test/java/com/netflix/conductor/dao/PollDataDAOTest.java b/persistence/src/test/java/com/netflix/conductor/dao/PollDataDAOTest.java index 5906ccc..fc8b0ca 100644 --- a/persistence/src/test/java/com/netflix/conductor/dao/PollDataDAOTest.java +++ b/persistence/src/test/java/com/netflix/conductor/dao/PollDataDAOTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2020 Orkes, Inc. + * Copyright 2023 Orkes, Inc. *

* Licensed under the Orkes Community License (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at diff --git a/persistence/src/test/java/com/netflix/conductor/redis/dao/BaseDynoDAOTest.java b/persistence/src/test/java/com/netflix/conductor/redis/dao/BaseDynoDAOTest.java index af699c4..d63da18 100644 --- a/persistence/src/test/java/com/netflix/conductor/redis/dao/BaseDynoDAOTest.java +++ b/persistence/src/test/java/com/netflix/conductor/redis/dao/BaseDynoDAOTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2020 Orkes, Inc. + * Copyright 2023 Orkes, Inc. *

* Licensed under the Orkes Community License (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at diff --git a/persistence/src/test/java/com/netflix/conductor/redis/jedis/ConfigurationHostSupplierTest.java b/persistence/src/test/java/com/netflix/conductor/redis/jedis/ConfigurationHostSupplierTest.java index e7c8126..5a03837 100644 --- a/persistence/src/test/java/com/netflix/conductor/redis/jedis/ConfigurationHostSupplierTest.java +++ b/persistence/src/test/java/com/netflix/conductor/redis/jedis/ConfigurationHostSupplierTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2020 Orkes, Inc. + * Copyright 2023 Orkes, Inc. *

* Licensed under the Orkes Community License (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at diff --git a/persistence/src/test/java/com/netflix/conductor/redis/jedis/JedisClusterTest.java b/persistence/src/test/java/com/netflix/conductor/redis/jedis/JedisClusterTest.java index 8467113..ca5e0f0 100644 --- a/persistence/src/test/java/com/netflix/conductor/redis/jedis/JedisClusterTest.java +++ b/persistence/src/test/java/com/netflix/conductor/redis/jedis/JedisClusterTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2020 Orkes, Inc. + * Copyright 2023 Orkes, Inc. *

* Licensed under the Orkes Community License (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at diff --git a/persistence/src/test/java/com/netflix/conductor/redis/jedis/JedisSentinelTest.java b/persistence/src/test/java/com/netflix/conductor/redis/jedis/JedisSentinelTest.java index 7ea0fcc..345bffe 100644 --- a/persistence/src/test/java/com/netflix/conductor/redis/jedis/JedisSentinelTest.java +++ b/persistence/src/test/java/com/netflix/conductor/redis/jedis/JedisSentinelTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2020 Orkes, Inc. + * Copyright 2023 Orkes, Inc. *

* Licensed under the Orkes Community License (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at diff --git a/server/build.gradle b/server/build.gradle index 2c9caa8..53f3984 100644 --- a/server/build.gradle +++ b/server/build.gradle @@ -26,10 +26,7 @@ dependencies { implementation project(":orkes-conductor-archive") implementation project(":orkes-conductor-persistence") - implementation 'io.orkes.conductor:orkes-conductor-common-protos:0.9.2' - - //aws - implementation "com.amazonaws:aws-java-sdk-core:${versions.revAwsSdk}" + implementation "io.orkes.conductor:orkes-conductor-common-protos:${versions.revOrkesProtos}" implementation "org.springdoc:springdoc-openapi-ui:${versions.revOpenapi}" @@ -53,7 +50,7 @@ dependencies { implementation "io.micrometer:micrometer-registry-prometheus:1.7.5" implementation "io.micrometer:micrometer-core:1.8.0" - implementation "com.jayway.jsonpath:json-path:2.4.0" + implementation "com.jayway.jsonpath:json-path:${versions.revJsonPath}" testImplementation 'org.springframework.boot:spring-boot-starter-test' testImplementation 'org.springframework.security:spring-security-test' diff --git a/server/src/main/java/com/netflix/conductor/core/execution/OrkesWorkflowExecutor.java b/server/src/main/java/com/netflix/conductor/core/execution/OrkesWorkflowExecutor.java index b58b8c3..827f41d 100644 --- a/server/src/main/java/com/netflix/conductor/core/execution/OrkesWorkflowExecutor.java +++ b/server/src/main/java/com/netflix/conductor/core/execution/OrkesWorkflowExecutor.java @@ -21,6 +21,7 @@ import java.util.concurrent.*; import java.util.stream.Collectors; +import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.annotation.Lazy; import org.springframework.context.annotation.Primary; import org.springframework.stereotype.Component; @@ -33,6 +34,7 @@ import com.netflix.conductor.core.exception.TerminateWorkflowException; import com.netflix.conductor.core.execution.tasks.SystemTaskRegistry; import com.netflix.conductor.core.execution.tasks.WorkflowSystemTask; +import com.netflix.conductor.core.listener.TaskStatusListener; import com.netflix.conductor.core.listener.WorkflowStatusListener; import com.netflix.conductor.core.metadata.MetadataMapperService; import com.netflix.conductor.core.utils.IDGenerator; @@ -73,25 +75,29 @@ public OrkesWorkflowExecutor( QueueDAO queueDAO, MetadataMapperService metadataMapperService, WorkflowStatusListener workflowStatusListener, + TaskStatusListener taskStatusListener, ExecutionDAOFacade executionDAOFacade, ConductorProperties properties, ExecutionLockService executionLockService, @Lazy SystemTaskRegistry systemTaskRegistry, ParametersUtils parametersUtils, IDGenerator idGenerator, - RedisExecutionDAO executionDAO) { + RedisExecutionDAO executionDAO, + ApplicationEventPublisher applicationEventPublisher) { super( deciderService, metadataDAO, queueDAO, metadataMapperService, workflowStatusListener, + taskStatusListener, executionDAOFacade, properties, executionLockService, systemTaskRegistry, parametersUtils, - idGenerator); + idGenerator, + applicationEventPublisher); this.queueDAO = queueDAO; this.orkesExecutionDAOFacade = executionDAOFacade; diff --git a/server/src/main/java/io/orkes/conductor/OrkesConductorApplication.java b/server/src/main/java/io/orkes/conductor/OrkesConductorApplication.java index 8041104..dfe77c2 100644 --- a/server/src/main/java/io/orkes/conductor/OrkesConductorApplication.java +++ b/server/src/main/java/io/orkes/conductor/OrkesConductorApplication.java @@ -90,7 +90,7 @@ public OpenApiCustomiser openApiCustomiser(Environment environment) { @Bean public OpenAPI openAPI() { - log.info("openAPI Configuration...."); + log.info("OpenAPI Configuration...."); return new OpenAPI() .info( new Info() diff --git a/server/src/main/java/io/orkes/conductor/config/AWSCredentialsConfiguration.java b/server/src/main/java/io/orkes/conductor/config/AWSCredentialsConfiguration.java deleted file mode 100644 index d942669..0000000 --- a/server/src/main/java/io/orkes/conductor/config/AWSCredentialsConfiguration.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright 2022 Orkes, Inc. - *

- * Licensed under the Orkes Community License (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * https://github.com/orkes-io/licenses/blob/main/community/LICENSE.txt - *

- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package io.orkes.conductor.config; - -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - -import com.amazonaws.auth.AWSCredentialsProvider; -import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; - -@Configuration -public class AWSCredentialsConfiguration { - - @Bean - AWSCredentialsProvider createAWSCredentialsProvider() { - return new DefaultAWSCredentialsProviderChain(); - } -} diff --git a/server/src/main/java/io/orkes/conductor/execution/tasks/OrkesRestTemplateProvider.java b/server/src/main/java/io/orkes/conductor/execution/tasks/OrkesRestTemplateProvider.java index 4ec1e19..4895edb 100644 --- a/server/src/main/java/io/orkes/conductor/execution/tasks/OrkesRestTemplateProvider.java +++ b/server/src/main/java/io/orkes/conductor/execution/tasks/OrkesRestTemplateProvider.java @@ -1,5 +1,5 @@ /* - * Copyright 2020 Orkes, Inc. + * Copyright 2023 Orkes, Inc. *

* Licensed under the Orkes Community License (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at diff --git a/server/src/main/java/io/orkes/conductor/execution/tasks/SubWorkflowSync.java b/server/src/main/java/io/orkes/conductor/execution/tasks/SubWorkflowSync.java index ccf99cf..17293e7 100644 --- a/server/src/main/java/io/orkes/conductor/execution/tasks/SubWorkflowSync.java +++ b/server/src/main/java/io/orkes/conductor/execution/tasks/SubWorkflowSync.java @@ -17,6 +17,7 @@ import com.netflix.conductor.core.execution.WorkflowExecutor; import com.netflix.conductor.core.execution.tasks.SubWorkflow; import com.netflix.conductor.core.execution.tasks.WorkflowSystemTask; +import com.netflix.conductor.core.operation.StartWorkflowOperation; import com.netflix.conductor.model.TaskModel; import com.netflix.conductor.model.WorkflowModel; @@ -30,15 +31,13 @@ public class SubWorkflowSync extends WorkflowSystemTask { private final SubWorkflow subWorkflow; - private final ObjectMapper objectMapper; - public SubWorkflowSync(ObjectMapper objectMapper) { + public SubWorkflowSync( + ObjectMapper objectMapper, StartWorkflowOperation startWorkflowOperation) { super(TASK_TYPE_SUB_WORKFLOW); - this.subWorkflow = new SubWorkflow(objectMapper); - this.objectMapper = objectMapper; + this.subWorkflow = new SubWorkflow(objectMapper, startWorkflowOperation); } - @SuppressWarnings("unchecked") @Override public void start(WorkflowModel workflow, TaskModel task, WorkflowExecutor workflowExecutor) { subWorkflow.start(workflow, task, workflowExecutor); diff --git a/server/src/main/java/io/orkes/conductor/execution/tasks/mapper/OrkesForkJoinDynamicTaskMapper.java b/server/src/main/java/io/orkes/conductor/execution/tasks/mapper/OrkesForkJoinDynamicTaskMapper.java index a3ca7f5..c57fb5d 100644 --- a/server/src/main/java/io/orkes/conductor/execution/tasks/mapper/OrkesForkJoinDynamicTaskMapper.java +++ b/server/src/main/java/io/orkes/conductor/execution/tasks/mapper/OrkesForkJoinDynamicTaskMapper.java @@ -73,8 +73,8 @@ public OrkesForkJoinDynamicTaskMapper( } @Override - public TaskType getTaskType() { - return TaskType.FORK_JOIN_DYNAMIC; + public String getTaskType() { + return TaskType.FORK_JOIN_DYNAMIC.name(); } /**