From 2914c8c2b1362673f72e2acc8c2d7785a1538e20 Mon Sep 17 00:00:00 2001 From: Julian Purse Date: Wed, 31 Jul 2024 20:19:41 +0200 Subject: [PATCH 01/10] initial changes to add the optional executor group to the query builder --- .../internal/dao/WorkflowInstanceDao.java | 21 +++++++++++++++---- .../instance/QueryWorkflowInstances.java | 17 +++++++++++++++ .../internal/dao/WorkflowInstanceDaoTest.java | 11 ++++++++++ .../java/io/nflow/rest/v1/ResourceBase.java | 4 +++- .../v1/jaxrs/WorkflowInstanceResource.java | 7 +++++-- .../jaxrs/WorkflowInstanceResourceTest.java | 4 ++-- .../springweb/WorkflowInstanceResource.java | 6 ++++-- 7 files changed, 59 insertions(+), 11 deletions(-) diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java b/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java index afacf6a77..038fca71f 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java @@ -193,7 +193,8 @@ private long insertWorkflowInstanceWithCte(WorkflowInstance instance) { StringBuilder sqlb = new StringBuilder(256); sqlb.append("with wf as (").append(insertWorkflowInstanceSql()).append(" returning id)"); Object[] instanceValues = new Object[] { instance.type, instance.priority, instance.parentWorkflowId, - instance.parentActionId, instance.businessKey, instance.externalId, executorInfo.getExecutorGroup(), + instance.parentActionId, instance.businessKey, instance.externalId, + instance.executorGroup == null ? executorInfo.getExecutorGroup() : instance.executorGroup, instance.status.name(), instance.state, abbreviate(instance.stateText, getInstanceStateTextLength()), toTimestamp(instance.nextActivation), instance.signal.orElse(null) }; int pos = instanceValues.length; @@ -245,7 +246,11 @@ public PreparedStatement createPreparedStatement(Connection connection) throws S ps.setObject(p++, instance.parentActionId); ps.setString(p++, instance.businessKey); ps.setString(p++, instance.externalId); - ps.setString(p++, executorInfo.getExecutorGroup()); + if (instance.executorGroup!=null) { + ps.setString(p++, instance.executorGroup); + } else { + ps.setString(p++, executorInfo.getExecutorGroup()); + } ps.setString(p++, instance.status.name()); ps.setString(p++, instance.state); ps.setString(p++, abbreviate(instance.stateText, getInstanceStateTextLength())); @@ -692,7 +697,9 @@ public Stream queryWorkflowInstancesAsStream(QueryWorkflowInst List conditions = new ArrayList<>(); MapSqlParameterSource params = new MapSqlParameterSource(); queryOptionsToSqlAndParams(query, conditions, params); - conditions.add(executorInfo.getExecutorGroupCondition()); + if (!query.includeAllExecutors) { + conditions.add(executorInfo.getExecutorGroupCondition()); + } String sqlSuffix = "from nflow_workflow wf "; if (query.stateVariableKey != null) { sqlSuffix += "inner join nflow_workflow_state wfs on wf.id = wfs.workflow_id and wfs.state_key = :state_key and " + sqlVariants.clobToComparable("wfs.state_value") + " = :state_value "; @@ -701,7 +708,13 @@ public Stream queryWorkflowInstancesAsStream(QueryWorkflowInst params.addValue("state_key", query.stateVariableKey); params.addValue("state_value", query.stateVariableValue); } - sqlSuffix += "where " + collectionToDelimitedString(conditions, " and ") + " order by id desc"; + String collection = collectionToDelimitedString(conditions, " and "); + if (collection.isEmpty()){ + //remove the where clause when there are no conditions + sqlSuffix += " order by id desc"; + }else { + sqlSuffix += "where " + collection + " order by id desc"; + } long maxResults = getMaxResults(query.maxResults); String sql = sqlVariants.limit("select " + ALL_WORKFLOW_COLUMNS + ", 0 as archived " + sqlSuffix, maxResults); List results = namedJdbc.query(sql, params, workflowInstanceRowMapper); diff --git a/nflow-engine/src/main/java/io/nflow/engine/workflow/instance/QueryWorkflowInstances.java b/nflow-engine/src/main/java/io/nflow/engine/workflow/instance/QueryWorkflowInstances.java index 443548778..5a83a5382 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/workflow/instance/QueryWorkflowInstances.java +++ b/nflow-engine/src/main/java/io/nflow/engine/workflow/instance/QueryWorkflowInstances.java @@ -69,6 +69,10 @@ public class QueryWorkflowInstances extends ModelObject { * Setting this to true will make the query return also workflow actions. */ public final boolean includeActions; + /** + * Setting this to true will make the query return all executor groups + */ + public final boolean includeAllExecutors; /** * Setting this to true will make the query return also the current state variables for the workflow. @@ -114,6 +118,7 @@ public class QueryWorkflowInstances extends ModelObject { this.stateVariableKey = builder.stateVariableKey; this.stateVariableValue = builder.stateVariableValue; this.includeActions = builder.includeActions; + this.includeAllExecutors = builder.includeAllExecutors; this.includeCurrentStateVariables = builder.includeCurrentStateVariables; this.includeActionStateVariables = builder.includeActionStateVariables; this.includeChildWorkflows = builder.includeChildWorkflows; @@ -137,6 +142,7 @@ public static class Builder { String stateVariableKey; String stateVariableValue; boolean includeActions; + boolean includeAllExecutors; boolean includeCurrentStateVariables; boolean includeActionStateVariables; boolean includeChildWorkflows; @@ -162,6 +168,7 @@ public Builder(QueryWorkflowInstances copy) { this.stateVariableKey = copy.stateVariableKey; this.stateVariableValue = copy.stateVariableValue; this.includeActions = copy.includeActions; + this.includeAllExecutors = copy.includeAllExecutors; this.includeCurrentStateVariables = copy.includeCurrentStateVariables; this.includeActionStateVariables = copy.includeActionStateVariables; this.includeChildWorkflows = copy.includeChildWorkflows; @@ -273,6 +280,16 @@ public Builder setIncludeActions(boolean includeActions) { return this; } + /** + * Set whether all executor groups should be included in the results. Default is `false`. + * @param includeAllExecutors True to include all executors, false otherwise. + * @return this. + */ + public Builder setIncludeAllExecutors(boolean includeAllExecutors) { + this.includeAllExecutors = includeAllExecutors; + return this; + } + /** * Set whether current workflow state variables should be included in the results. Default is `false` * @param includeCurrentStateVariables True to include state variables, false otherwise. diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java index b326b8dd0..131f46c9f 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java @@ -698,6 +698,17 @@ public void pollNextWorkflowInstances() { assertThat(secondBatch.size(), equalTo(0)); } + /** + * Test returning all executors (default being junit) + */ + @Test +public void testCreateWorkflowWithAllExecutors() { + dao.insertWorkflowInstance(constructWorkflowInstanceBuilder().setNextActivation(now().minusMinutes(1)) + .setPriority((short)1).setExecutorGroup("test").build()); + List workflows = dao.queryWorkflowInstances(new QueryWorkflowInstances.Builder().setIncludeActions(true).setIncludeAllExecutors(true).build()); + assertThat(workflows.size(), is(1)); + } + @Test public void pollNextWorkflowInstancesReturnInstancesInCorrectOrder() { long olderLowPrio = createInstance(2, (short) 1); diff --git a/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/ResourceBase.java b/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/ResourceBase.java index 8d42e33e1..5d3064807 100644 --- a/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/ResourceBase.java +++ b/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/ResourceBase.java @@ -68,6 +68,7 @@ public abstract class ResourceBase { + "* actionStateVariables: state variable changes for actions\n" + "* childWorkflows: map of created child workflow instance IDs by action ID\n"; protected static final String QUERY_ARCHIVED_DEFAULT_STR = "false"; + protected static final String QUERY_INCLUDE_ALL_EXECUTORS_DEFAULT_STR = "false"; protected static final boolean QUERY_ARCHIVED_DEFAULT = parseBoolean(QUERY_ARCHIVED_DEFAULT_STR); public List listWorkflowDefinitions(Collection types, @@ -146,7 +147,7 @@ public boolean updateWorkflowInstance(long id, UpdateWorkflowInstanceRequest req public Stream listWorkflowInstances(Set ids, Set types, Long parentWorkflowId, Long parentActionId, Set states, Set statuses, String businessKey, String externalId, String stateVariableKey, String stateVariableValue, Set includes, String include, - Long maxResults, Long maxActions, boolean queryArchive, WorkflowInstanceService workflowInstances, + Long maxResults, Long maxActions, boolean queryArchive, boolean includeAllExecutors, WorkflowInstanceService workflowInstances, ListWorkflowInstanceConverter listWorkflowConverter) { Set propertyIncludes = resolveIncludes(includes, include); QueryWorkflowInstances q = new QueryWorkflowInstances.Builder() @@ -157,6 +158,7 @@ public Stream listWorkflowInstances(Set ids, .addStates(states.toArray(new String[states.size()])) .addStatuses(statuses.toArray(new WorkflowInstanceStatus[statuses.size()])) .setBusinessKey(businessKey) + .setIncludeAllExecutors(includeAllExecutors) .setExternalId(externalId) .setIncludeCurrentStateVariables(propertyIncludes.contains(currentStateVariables)) .setIncludeActions(propertyIncludes.contains(actions)) diff --git a/nflow-rest-api-jax-rs/src/main/java/io/nflow/rest/v1/jaxrs/WorkflowInstanceResource.java b/nflow-rest-api-jax-rs/src/main/java/io/nflow/rest/v1/jaxrs/WorkflowInstanceResource.java index 38e27d36e..19466afcf 100644 --- a/nflow-rest-api-jax-rs/src/main/java/io/nflow/rest/v1/jaxrs/WorkflowInstanceResource.java +++ b/nflow-rest-api-jax-rs/src/main/java/io/nflow/rest/v1/jaxrs/WorkflowInstanceResource.java @@ -175,10 +175,13 @@ public Response listWorkflowInstances( description = "Maximum number of actions returned for each workflow instance") Long maxActions, @QueryParam("queryArchive") @Parameter( description = "Query also the archive if not enough results found from main tables", - schema = @Schema(defaultValue = QUERY_ARCHIVED_DEFAULT_STR)) Boolean queryArchive) { + schema = @Schema(defaultValue = QUERY_ARCHIVED_DEFAULT_STR)) Boolean queryArchive, + @QueryParam("includeAllExecutors") @Parameter( + description = "Include Executors in the search", + schema = @Schema(defaultValue = QUERY_INCLUDE_ALL_EXECUTORS_DEFAULT_STR)) Boolean includeAllExecutors) { return handleExceptions(() -> ok(super.listWorkflowInstances(ids, types, parentWorkflowId, parentActionId, states, statuses, businessKey, externalId, stateVariableKey, stateVariableValue, includes, include, maxResults, maxActions, - ofNullable(queryArchive).orElse(QUERY_ARCHIVED_DEFAULT), workflowInstances, listWorkflowConverter).iterator())); + ofNullable(queryArchive).orElse(QUERY_ARCHIVED_DEFAULT),ofNullable(includeAllExecutors).orElse(false), workflowInstances, listWorkflowConverter).iterator())); } @PUT diff --git a/nflow-rest-api-jax-rs/src/test/java/io/nflow/rest/v1/jaxrs/WorkflowInstanceResourceTest.java b/nflow-rest-api-jax-rs/src/test/java/io/nflow/rest/v1/jaxrs/WorkflowInstanceResourceTest.java index 7c12eeb32..51cac4db6 100644 --- a/nflow-rest-api-jax-rs/src/test/java/io/nflow/rest/v1/jaxrs/WorkflowInstanceResourceTest.java +++ b/nflow-rest-api-jax-rs/src/test/java/io/nflow/rest/v1/jaxrs/WorkflowInstanceResourceTest.java @@ -250,7 +250,7 @@ public void whenUpdatingBusinessKeyWithDescriptionUpdateWorkflowInstanceWorks() @Test public void listWorkflowInstancesWorks() { makeRequest(() -> resource.listWorkflowInstances(Set.of(42L), Set.of("type"), 99L, 88L, Set.of("state"), - EnumSet.of(WorkflowInstanceStatus.created), "businessKey", "externalId", null, null, null, null, null, null, true)); + EnumSet.of(WorkflowInstanceStatus.created), "businessKey", "externalId", null, null, null, null, null, null, true,false)); verify(workflowInstances).listWorkflowInstancesAsStream(queryCaptor.capture()); QueryWorkflowInstances query = queryCaptor.getValue(); @@ -277,7 +277,7 @@ public void listWorkflowInstancesWorks() { public void listWorkflowInstancesWorksWithAllIncludes() { makeRequest(() -> resource.listWorkflowInstances(Set.of(42L), Set.of("type"), 99L, 88L, Set.of("state"), EnumSet.of(WorkflowInstanceStatus.created, WorkflowInstanceStatus.executing), "businessKey", "externalId", "stateVarKey", - "stateVarValue", EnumSet.allOf(ApiWorkflowInstanceInclude.class), null, 1L, 2L, false)); + "stateVarValue", EnumSet.allOf(ApiWorkflowInstanceInclude.class), null, 1L, 2L, false,false)); verify(workflowInstances).listWorkflowInstancesAsStream(queryCaptor.capture()); QueryWorkflowInstances query = queryCaptor.getValue(); diff --git a/nflow-rest-api-spring-web/src/main/java/io/nflow/rest/v1/springweb/WorkflowInstanceResource.java b/nflow-rest-api-spring-web/src/main/java/io/nflow/rest/v1/springweb/WorkflowInstanceResource.java index 703aab4de..6cedd79d4 100644 --- a/nflow-rest-api-spring-web/src/main/java/io/nflow/rest/v1/springweb/WorkflowInstanceResource.java +++ b/nflow-rest-api-spring-web/src/main/java/io/nflow/rest/v1/springweb/WorkflowInstanceResource.java @@ -178,10 +178,12 @@ public Mono> listWorkflowInstances( @RequestParam(value = "stateVariableValue", required = false) @Parameter( description = "Current value of state variable defined by stateVariableKey") String stateVariableValue, @RequestParam(value = "queryArchive", required = false, defaultValue = QUERY_ARCHIVED_DEFAULT_STR) @Parameter( - description = "Query also the archive if not enough results found from main tables") boolean queryArchive) { + description = "Query also the archive if not enough results found from main tables") boolean queryArchive, + @RequestParam(value = "includeAllExecutors", required = false, defaultValue = QUERY_ARCHIVED_DEFAULT_STR) @Parameter( + description = "Query all executor groups, default is just the current executor's group") boolean includeAllExecutors) { return handleExceptions(() -> wrapBlocking(() -> ok(super.listWorkflowInstances(ids, types, parentWorkflowId, parentActionId, states, statuses, businessKey, externalId, stateVariableKey, stateVariableValue, includes, include, maxResults, - maxActions, queryArchive, this.workflowInstances, this.listWorkflowConverter).iterator()))); + maxActions, queryArchive, includeAllExecutors, this.workflowInstances, this.listWorkflowConverter).iterator()))); } @PutMapping(path = "/{id}/signal", consumes = APPLICATION_JSON_VALUE) From cd848e48eb7c5d743e81e31ca962b1e8779aae22 Mon Sep 17 00:00:00 2001 From: Julian Purse Date: Wed, 31 Jul 2024 20:28:00 +0200 Subject: [PATCH 02/10] fix test case using wrong executor group --- .../java/io/nflow/engine/internal/executor/BaseNflowTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/BaseNflowTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/BaseNflowTest.java index f29d65ccd..476db1f1d 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/BaseNflowTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/BaseNflowTest.java @@ -30,7 +30,7 @@ protected WorkflowInstance.Builder constructWorkflowInstanceBuilder() { .setExternalId(randomUUID().toString()) .setBusinessKey(randomUUID().toString()) .setRetries(0) - .setExecutorGroup("flowInstance1") + .setExecutorGroup("junit") .setStateVariables(new LinkedHashMap() { { put("requestData", "{ \"parameter\": \"abc\" }"); From 2d147503c0256d2a7f232a982f7957fa93e0d0d1 Mon Sep 17 00:00:00 2001 From: Julian Purse Date: Thu, 1 Aug 2024 08:27:23 +0200 Subject: [PATCH 03/10] enhance the search and add a new property default that can be changed --- .../internal/dao/WorkflowInstanceDao.java | 22 ++++++++++- .../instance/QueryWorkflowInstances.java | 18 ++++----- .../main/resources/nflow-engine.properties | 1 + .../internal/dao/WorkflowInstanceDaoTest.java | 39 +++++++++++++++++-- .../src/test/resources/junit.properties | 3 +- .../java/io/nflow/rest/v1/ResourceBase.java | 4 +- .../v1/jaxrs/WorkflowInstanceResource.java | 7 ++-- .../jaxrs/WorkflowInstanceResourceTest.java | 4 +- .../springweb/WorkflowInstanceResource.java | 6 +-- 9 files changed, 79 insertions(+), 25 deletions(-) diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java b/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java index 038fca71f..dafd2ed6b 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java @@ -114,6 +114,7 @@ public class WorkflowInstanceDao { private final long workflowInstanceQueryMaxActionsDefault; private final int workflowInstanceTypeCacheSize; private final AtomicBoolean disableBatchUpdates = new AtomicBoolean(); + private final AtomicBoolean defaultQueryAllExecutors = new AtomicBoolean(false); AtomicInteger instanceStateTextLength = new AtomicInteger(); AtomicInteger actionStateTextLength = new AtomicInteger(); AtomicInteger stateVariableValueMaxLength = new AtomicInteger(); @@ -142,6 +143,7 @@ public WorkflowInstanceDao(SQLVariants sqlVariants, @NFlow JdbcTemplate nflowJdb workflowInstanceQueryMaxActions = env.getRequiredProperty("nflow.workflow.instance.query.max.actions", Long.class); workflowInstanceQueryMaxActionsDefault = env.getRequiredProperty("nflow.workflow.instance.query.max.actions.default", Long.class); + defaultQueryAllExecutors.set(env.getRequiredProperty("nflow.db.query_all_executors", Boolean.class)); disableBatchUpdates.set(env.getRequiredProperty("nflow.db.disable_batch_updates", Boolean.class)); if (disableBatchUpdates.get()) { logger.info("nFlow DB batch updates are disabled (system property nflow.db.disable_batch_updates=true)"); @@ -150,6 +152,12 @@ public WorkflowInstanceDao(SQLVariants sqlVariants, @NFlow JdbcTemplate nflowJdb instanceStateTextLength.set(env.getProperty("nflow.workflow.instance.state.text.length", Integer.class, -1)); actionStateTextLength.set(env.getProperty("nflow.workflow.action.state.text.length", Integer.class, -1)); stateVariableValueMaxLength.set(env.getProperty("nflow.workflow.state.variable.value.length", Integer.class, -1)); + + } + + protected AtomicBoolean getDefaultQueryAllExecutors() { + + return defaultQueryAllExecutors; } private int getInstanceStateTextLength() { @@ -697,9 +705,21 @@ public Stream queryWorkflowInstancesAsStream(QueryWorkflowInst List conditions = new ArrayList<>(); MapSqlParameterSource params = new MapSqlParameterSource(); queryOptionsToSqlAndParams(query, conditions, params); - if (!query.includeAllExecutors) { + + if (!isEmpty(query.executorGroups)) { + if (query.executorGroups.size() == 1) { + conditions.add("executor_group = :executor_group"); + params.addValue("executor_group", query.executorGroups.get(0)); + } else { + conditions.add("executor_group in (:executor_groups)"); + params.addValue("executor_groups", query.executorGroups); + } + } + else if (!defaultQueryAllExecutors.get()){ + //the old behaviour of nflow is that when you query for workflows you only get the executor group of the current instance conditions.add(executorInfo.getExecutorGroupCondition()); } + String sqlSuffix = "from nflow_workflow wf "; if (query.stateVariableKey != null) { sqlSuffix += "inner join nflow_workflow_state wfs on wf.id = wfs.workflow_id and wfs.state_key = :state_key and " + sqlVariants.clobToComparable("wfs.state_value") + " = :state_value "; diff --git a/nflow-engine/src/main/java/io/nflow/engine/workflow/instance/QueryWorkflowInstances.java b/nflow-engine/src/main/java/io/nflow/engine/workflow/instance/QueryWorkflowInstances.java index 5a83a5382..5821a3bcf 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/workflow/instance/QueryWorkflowInstances.java +++ b/nflow-engine/src/main/java/io/nflow/engine/workflow/instance/QueryWorkflowInstances.java @@ -70,9 +70,9 @@ public class QueryWorkflowInstances extends ModelObject { */ public final boolean includeActions; /** - * Setting this to true will make the query return all executor groups + * optional filter of executor groups, if empty it does not return all but instead backwards compactability of the current api group */ - public final boolean includeAllExecutors; + public final List executorGroups; /** * Setting this to true will make the query return also the current state variables for the workflow. @@ -118,7 +118,7 @@ public class QueryWorkflowInstances extends ModelObject { this.stateVariableKey = builder.stateVariableKey; this.stateVariableValue = builder.stateVariableValue; this.includeActions = builder.includeActions; - this.includeAllExecutors = builder.includeAllExecutors; + this.executorGroups = builder.executorGroups; this.includeCurrentStateVariables = builder.includeCurrentStateVariables; this.includeActionStateVariables = builder.includeActionStateVariables; this.includeChildWorkflows = builder.includeChildWorkflows; @@ -142,7 +142,7 @@ public static class Builder { String stateVariableKey; String stateVariableValue; boolean includeActions; - boolean includeAllExecutors; + List executorGroups = new ArrayList<>(); boolean includeCurrentStateVariables; boolean includeActionStateVariables; boolean includeChildWorkflows; @@ -168,7 +168,7 @@ public Builder(QueryWorkflowInstances copy) { this.stateVariableKey = copy.stateVariableKey; this.stateVariableValue = copy.stateVariableValue; this.includeActions = copy.includeActions; - this.includeAllExecutors = copy.includeAllExecutors; + this.executorGroups = copy.executorGroups; this.includeCurrentStateVariables = copy.includeCurrentStateVariables; this.includeActionStateVariables = copy.includeActionStateVariables; this.includeChildWorkflows = copy.includeChildWorkflows; @@ -281,12 +281,12 @@ public Builder setIncludeActions(boolean includeActions) { } /** - * Set whether all executor groups should be included in the results. Default is `false`. - * @param includeAllExecutors True to include all executors, false otherwise. + * Set whether specific executor groups should be included in the results. + * @param executorGroups list of executor names * @return this. */ - public Builder setIncludeAllExecutors(boolean includeAllExecutors) { - this.includeAllExecutors = includeAllExecutors; + public Builder setExecutorGroups(String ... executorGroups) { + this.executorGroups.addAll(asList(executorGroups)); return this; } diff --git a/nflow-engine/src/main/resources/nflow-engine.properties b/nflow-engine/src/main/resources/nflow-engine.properties index 0599223a7..35bb936eb 100644 --- a/nflow-engine/src/main/resources/nflow-engine.properties +++ b/nflow-engine/src/main/resources/nflow-engine.properties @@ -65,6 +65,7 @@ nflow.db.max_pool_size=4 nflow.db.idle_timeout_seconds=600 nflow.db.create_on_startup=true nflow.db.disable_batch_updates=false +nflow.db.query_all_executors=false nflow.db.workflowInstanceType.cacheSize=10000 nflow.db.initialization_fail_timeout_seconds=10 diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java index 131f46c9f..f28693b55 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java @@ -698,15 +698,48 @@ public void pollNextWorkflowInstances() { assertThat(secondBatch.size(), equalTo(0)); } + /** + * Test creating a workflow with multiple executors and querying for them + */ + @Test +public void testCreateWorkflowMultipleExecutorsAndQuery() { + + dao.insertWorkflowInstance(constructWorkflowInstanceBuilder().setNextActivation(now().minusMinutes(1)) + .setPriority((short)1).setExecutorGroup("junit").build()); + dao.insertWorkflowInstance(constructWorkflowInstanceBuilder().setNextActivation(now().minusMinutes(1)) + .setPriority((short)1).setExecutorGroup("test_two").build()); + + //will return the one that matches the executor group of the current nflow + List defaultSearch = dao.queryWorkflowInstances(new QueryWorkflowInstances.Builder().setIncludeActions(true) + .build()); + assertThat(defaultSearch.size(), is(1)); + + List workflows = dao.queryWorkflowInstances(new QueryWorkflowInstances.Builder().setIncludeActions(true) + .setExecutorGroups("junit").build()); + assertThat(workflows.size(), is(1)); + + List workflowsAll = dao.queryWorkflowInstances(new QueryWorkflowInstances.Builder().setIncludeActions(true) + .setExecutorGroups("junit","test_two").build()); + + assertThat(workflowsAll.size(), is(2)); + + } /** * Test returning all executors (default being junit) */ @Test -public void testCreateWorkflowWithAllExecutors() { +public void testQueryWorkflowWithDefaultSetToTrue() { + + dao.getDefaultQueryAllExecutors().set(true); + dao.insertWorkflowInstance(constructWorkflowInstanceBuilder().setNextActivation(now().minusMinutes(1)) .setPriority((short)1).setExecutorGroup("test").build()); - List workflows = dao.queryWorkflowInstances(new QueryWorkflowInstances.Builder().setIncludeActions(true).setIncludeAllExecutors(true).build()); - assertThat(workflows.size(), is(1)); + dao.insertWorkflowInstance(constructWorkflowInstanceBuilder().setNextActivation(now().minusMinutes(1)) + .setPriority((short)1).setExecutorGroup("test_two").build()); + List workflows = dao.queryWorkflowInstances(new QueryWorkflowInstances.Builder().setIncludeActions(true) + .build()); + assertThat(workflows.size(), is(2)); + } @Test diff --git a/nflow-engine/src/test/resources/junit.properties b/nflow-engine/src/test/resources/junit.properties index aafd514b1..51964be84 100644 --- a/nflow-engine/src/test/resources/junit.properties +++ b/nflow-engine/src/test/resources/junit.properties @@ -18,5 +18,6 @@ nflow.db.max_pool_size=20 nflow.db.idle_timeout_seconds=600 nflow.db.create_on_startup=true nflow.db.disable_batch_updates=false +nflow.db.query_all_executors=false nflow.db.workflowInstanceType.cacheSize=10000 -nflow.db.initialization_fail_timeout_seconds=1 \ No newline at end of file +nflow.db.initialization_fail_timeout_seconds=1 diff --git a/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/ResourceBase.java b/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/ResourceBase.java index 5d3064807..2b09406f6 100644 --- a/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/ResourceBase.java +++ b/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/ResourceBase.java @@ -147,7 +147,7 @@ public boolean updateWorkflowInstance(long id, UpdateWorkflowInstanceRequest req public Stream listWorkflowInstances(Set ids, Set types, Long parentWorkflowId, Long parentActionId, Set states, Set statuses, String businessKey, String externalId, String stateVariableKey, String stateVariableValue, Set includes, String include, - Long maxResults, Long maxActions, boolean queryArchive, boolean includeAllExecutors, WorkflowInstanceService workflowInstances, + Long maxResults, Long maxActions, boolean queryArchive, Set executorGroups, WorkflowInstanceService workflowInstances, ListWorkflowInstanceConverter listWorkflowConverter) { Set propertyIncludes = resolveIncludes(includes, include); QueryWorkflowInstances q = new QueryWorkflowInstances.Builder() @@ -158,7 +158,7 @@ public Stream listWorkflowInstances(Set ids, .addStates(states.toArray(new String[states.size()])) .addStatuses(statuses.toArray(new WorkflowInstanceStatus[statuses.size()])) .setBusinessKey(businessKey) - .setIncludeAllExecutors(includeAllExecutors) + .setExecutorGroups(executorGroups.toArray(new String[executorGroups.size()])) .setExternalId(externalId) .setIncludeCurrentStateVariables(propertyIncludes.contains(currentStateVariables)) .setIncludeActions(propertyIncludes.contains(actions)) diff --git a/nflow-rest-api-jax-rs/src/main/java/io/nflow/rest/v1/jaxrs/WorkflowInstanceResource.java b/nflow-rest-api-jax-rs/src/main/java/io/nflow/rest/v1/jaxrs/WorkflowInstanceResource.java index 19466afcf..956b3c9c8 100644 --- a/nflow-rest-api-jax-rs/src/main/java/io/nflow/rest/v1/jaxrs/WorkflowInstanceResource.java +++ b/nflow-rest-api-jax-rs/src/main/java/io/nflow/rest/v1/jaxrs/WorkflowInstanceResource.java @@ -176,12 +176,11 @@ public Response listWorkflowInstances( @QueryParam("queryArchive") @Parameter( description = "Query also the archive if not enough results found from main tables", schema = @Schema(defaultValue = QUERY_ARCHIVED_DEFAULT_STR)) Boolean queryArchive, - @QueryParam("includeAllExecutors") @Parameter( - description = "Include Executors in the search", - schema = @Schema(defaultValue = QUERY_INCLUDE_ALL_EXECUTORS_DEFAULT_STR)) Boolean includeAllExecutors) { + @QueryParam("executorGroups") @Parameter( + description = "Include Executors in the search") Set executorGroups) { return handleExceptions(() -> ok(super.listWorkflowInstances(ids, types, parentWorkflowId, parentActionId, states, statuses, businessKey, externalId, stateVariableKey, stateVariableValue, includes, include, maxResults, maxActions, - ofNullable(queryArchive).orElse(QUERY_ARCHIVED_DEFAULT),ofNullable(includeAllExecutors).orElse(false), workflowInstances, listWorkflowConverter).iterator())); + ofNullable(queryArchive).orElse(QUERY_ARCHIVED_DEFAULT),executorGroups, workflowInstances, listWorkflowConverter).iterator())); } @PUT diff --git a/nflow-rest-api-jax-rs/src/test/java/io/nflow/rest/v1/jaxrs/WorkflowInstanceResourceTest.java b/nflow-rest-api-jax-rs/src/test/java/io/nflow/rest/v1/jaxrs/WorkflowInstanceResourceTest.java index 51cac4db6..54dac0194 100644 --- a/nflow-rest-api-jax-rs/src/test/java/io/nflow/rest/v1/jaxrs/WorkflowInstanceResourceTest.java +++ b/nflow-rest-api-jax-rs/src/test/java/io/nflow/rest/v1/jaxrs/WorkflowInstanceResourceTest.java @@ -250,7 +250,7 @@ public void whenUpdatingBusinessKeyWithDescriptionUpdateWorkflowInstanceWorks() @Test public void listWorkflowInstancesWorks() { makeRequest(() -> resource.listWorkflowInstances(Set.of(42L), Set.of("type"), 99L, 88L, Set.of("state"), - EnumSet.of(WorkflowInstanceStatus.created), "businessKey", "externalId", null, null, null, null, null, null, true,false)); + EnumSet.of(WorkflowInstanceStatus.created), "businessKey", "externalId", null, null, null, null, null, null, true,null)); verify(workflowInstances).listWorkflowInstancesAsStream(queryCaptor.capture()); QueryWorkflowInstances query = queryCaptor.getValue(); @@ -277,7 +277,7 @@ public void listWorkflowInstancesWorks() { public void listWorkflowInstancesWorksWithAllIncludes() { makeRequest(() -> resource.listWorkflowInstances(Set.of(42L), Set.of("type"), 99L, 88L, Set.of("state"), EnumSet.of(WorkflowInstanceStatus.created, WorkflowInstanceStatus.executing), "businessKey", "externalId", "stateVarKey", - "stateVarValue", EnumSet.allOf(ApiWorkflowInstanceInclude.class), null, 1L, 2L, false,false)); + "stateVarValue", EnumSet.allOf(ApiWorkflowInstanceInclude.class), null, 1L, 2L, false,null)); verify(workflowInstances).listWorkflowInstancesAsStream(queryCaptor.capture()); QueryWorkflowInstances query = queryCaptor.getValue(); diff --git a/nflow-rest-api-spring-web/src/main/java/io/nflow/rest/v1/springweb/WorkflowInstanceResource.java b/nflow-rest-api-spring-web/src/main/java/io/nflow/rest/v1/springweb/WorkflowInstanceResource.java index 6cedd79d4..f20cb7802 100644 --- a/nflow-rest-api-spring-web/src/main/java/io/nflow/rest/v1/springweb/WorkflowInstanceResource.java +++ b/nflow-rest-api-spring-web/src/main/java/io/nflow/rest/v1/springweb/WorkflowInstanceResource.java @@ -179,11 +179,11 @@ public Mono> listWorkflowInstances( description = "Current value of state variable defined by stateVariableKey") String stateVariableValue, @RequestParam(value = "queryArchive", required = false, defaultValue = QUERY_ARCHIVED_DEFAULT_STR) @Parameter( description = "Query also the archive if not enough results found from main tables") boolean queryArchive, - @RequestParam(value = "includeAllExecutors", required = false, defaultValue = QUERY_ARCHIVED_DEFAULT_STR) @Parameter( - description = "Query all executor groups, default is just the current executor's group") boolean includeAllExecutors) { + @RequestParam(value = "executorGroups", required = false, defaultValue = QUERY_ARCHIVED_DEFAULT_STR) @Parameter( + description = "Query executor groups, default is just the current executor's group, system property to change default") Set executorGroups) { return handleExceptions(() -> wrapBlocking(() -> ok(super.listWorkflowInstances(ids, types, parentWorkflowId, parentActionId, states, statuses, businessKey, externalId, stateVariableKey, stateVariableValue, includes, include, maxResults, - maxActions, queryArchive, includeAllExecutors, this.workflowInstances, this.listWorkflowConverter).iterator()))); + maxActions, queryArchive, executorGroups, this.workflowInstances, this.listWorkflowConverter).iterator()))); } @PutMapping(path = "/{id}/signal", consumes = APPLICATION_JSON_VALUE) From fdd21aef10e57a34f259501afc4022464a97dac4 Mon Sep 17 00:00:00 2001 From: Julian Purse Date: Tue, 6 Aug 2024 20:28:25 +0200 Subject: [PATCH 04/10] enhance api to respond with the executor group --- .../nflow/rest/v1/converter/ListWorkflowInstanceConverter.java | 1 + .../io/nflow/rest/v1/msg/ListWorkflowInstanceResponse.java | 3 +++ .../io/nflow/rest/v1/springweb/WorkflowInstanceResource.java | 2 +- 3 files changed, 5 insertions(+), 1 deletion(-) diff --git a/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/converter/ListWorkflowInstanceConverter.java b/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/converter/ListWorkflowInstanceConverter.java index 867a8287e..4dafdbedb 100644 --- a/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/converter/ListWorkflowInstanceConverter.java +++ b/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/converter/ListWorkflowInstanceConverter.java @@ -43,6 +43,7 @@ public ListWorkflowInstanceResponse convert(WorkflowInstance instance, Set> listWorkflowInstances( description = "Current value of state variable defined by stateVariableKey") String stateVariableValue, @RequestParam(value = "queryArchive", required = false, defaultValue = QUERY_ARCHIVED_DEFAULT_STR) @Parameter( description = "Query also the archive if not enough results found from main tables") boolean queryArchive, - @RequestParam(value = "executorGroups", required = false, defaultValue = QUERY_ARCHIVED_DEFAULT_STR) @Parameter( + @RequestParam(value = "executorGroups", required = false, defaultValue = "") @Parameter( description = "Query executor groups, default is just the current executor's group, system property to change default") Set executorGroups) { return handleExceptions(() -> wrapBlocking(() -> ok(super.listWorkflowInstances(ids, types, parentWorkflowId, parentActionId, states, statuses, businessKey, externalId, stateVariableKey, stateVariableValue, includes, include, maxResults, From 22e0b17cd4f8a8e1e6ce59c782e6f4ad74868e83 Mon Sep 17 00:00:00 2001 From: Julian Purse Date: Sat, 17 Aug 2024 13:42:23 +0200 Subject: [PATCH 05/10] add content type header when not MSAL --- nflow-explorer/src/service.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/nflow-explorer/src/service.ts b/nflow-explorer/src/service.ts index 1e57043e1..f40486d84 100644 --- a/nflow-explorer/src/service.ts +++ b/nflow-explorer/src/service.ts @@ -45,6 +45,9 @@ const authenticatedApiCall = (url: string, config: Config, body?: any): Promise< body: body }; if (!config.msalClient) { + options.headers = new Headers({ + "content-type": "application/json" + }); return fetch(url, options); } const request = { From c1fa29440028f01e1755c6a88289f846a8e30c57 Mon Sep 17 00:00:00 2001 From: Julian Purse Date: Sat, 17 Aug 2024 14:28:28 +0200 Subject: [PATCH 06/10] WIP on feature/explorer-executor-groups --- .../WorkflowInstanceDetailsPage.tsx | 1 + .../WorkflowInstanceListPage.tsx | 22 +++++++++++++++++-- .../WorkflowInstanceSearchForm.tsx | 18 +++++++++++++++ 3 files changed, 39 insertions(+), 2 deletions(-) diff --git a/nflow-explorer/src/workflow-instance/WorkflowInstanceDetailsPage.tsx b/nflow-explorer/src/workflow-instance/WorkflowInstanceDetailsPage.tsx index 47a9fbb36..e9ab3c3e7 100644 --- a/nflow-explorer/src/workflow-instance/WorkflowInstanceDetailsPage.tsx +++ b/nflow-explorer/src/workflow-instance/WorkflowInstanceDetailsPage.tsx @@ -47,6 +47,7 @@ const InstanceSummaryTable = ({ headerName: 'Parent workflow', fieldRender: parentLink }, + {field: 'executorGroup', headerName: 'Executor Group'}, {field: 'state', headerName: 'Current state'}, {field: 'status', headerName: 'Current status'}, { diff --git a/nflow-explorer/src/workflow-instance/WorkflowInstanceListPage.tsx b/nflow-explorer/src/workflow-instance/WorkflowInstanceListPage.tsx index 71bbe239a..ff88fc62f 100644 --- a/nflow-explorer/src/workflow-instance/WorkflowInstanceListPage.tsx +++ b/nflow-explorer/src/workflow-instance/WorkflowInstanceListPage.tsx @@ -13,8 +13,8 @@ import WorkflowInstanceSearchForm from './WorkflowInstanceSearchForm'; import {useConfig} from '../config'; import {DataTable, InternalLink, Spinner, useFeedback} from '../component'; import {formatRelativeTime, formatTimestamp} from '../utils'; -import {listWorkflowDefinitions, listWorkflowInstances} from '../service'; -import {WorkflowInstance} from '../types'; +import {listExecutors, listWorkflowDefinitions, listWorkflowInstances} from '../service'; +import {Executor, WorkflowInstance} from '../types'; import './workflow-instance.scss'; import '../index.scss'; @@ -354,6 +354,7 @@ function WorkflowInstanceListPage() { const [initialLoad, setInitialLoad] = useState(true); const [definitions, setDefinitions] = useState>([]); const [instances, setInstances] = useState>(); + const [executors, setExecutors] = useState>([]); const fetchDefinitions = useCallback(() => { if (feedback.getCurrentFeedback() !== undefined) { @@ -372,6 +373,17 @@ function WorkflowInstanceListPage() { .finally(() => setInitialLoad(false)); }, [config, feedback]); + const fetchExecutors = useCallback(() => { + listExecutors(config) + .then(data => setExecutors(data)) + .catch(error => { + // TODO error handling + console.error('Error', error); + }) + .finally(() => setInitialLoad(false)); + }, [config]); + + useEffect(() => fetchDefinitions(), [fetchDefinitions]); const searchInstances = useCallback( @@ -407,6 +419,12 @@ function WorkflowInstanceListPage() { ) : ( !executor.stopped) + .map(executor => executor.executorGroup) + .filter((value, index, self) => self.indexOf(value) === index) + } onSubmit={search} /> )} diff --git a/nflow-explorer/src/workflow-instance/WorkflowInstanceSearchForm.tsx b/nflow-explorer/src/workflow-instance/WorkflowInstanceSearchForm.tsx index 8e0698bfe..acb67d769 100644 --- a/nflow-explorer/src/workflow-instance/WorkflowInstanceSearchForm.tsx +++ b/nflow-explorer/src/workflow-instance/WorkflowInstanceSearchForm.tsx @@ -5,6 +5,7 @@ import Button from '@material-ui/core/Button'; import {makeStyles} from '@material-ui/core/styles'; import {Selection} from '../component'; +import {Executor} from "../types"; const useStyles = makeStyles(theme => ({ formControl: { @@ -41,9 +42,13 @@ const stateNames: any = { const statusNames: any = { [allMarker]: '-- All statuses --' }; +const executorGroupNames: any = { + [allMarker]: '-- Executor Groups --' +}; function WorkflowInstanceSearchForm(props: { definitions: Array; + executorGroups: Array; onSubmit: (data: any) => any; }) { const classes = useStyles(); @@ -60,6 +65,9 @@ function WorkflowInstanceSearchForm(props: { const [status, setStatus] = useState( queryParams.get('status') || allMarker ); + const [executorGroup, setExecutorGroup] = useState( + queryParams.get('executorGroup') || allMarker + ); const [businessKey, setBusinessKey] = useState( queryParams.get('businessKey') || '' ); @@ -73,6 +81,8 @@ function WorkflowInstanceSearchForm(props: { const types = [allMarker].concat(props.definitions.map(d => d.type)); + const executorGroups = [allMarker].concat(props.definitions.map(d => d.executorGroup)); + // TODO to lodash or not to lodash? const selectedWorkflow = props.definitions.filter(d => d.type === type)[0]; const selectedStates = ( @@ -146,6 +156,7 @@ function WorkflowInstanceSearchForm(props: { const setWorkflowType = (type: string) => { // when type is changed, need to reset all selections that depend on type setType(type); + setExecutorGroup(executorGroup) setState(allMarker); }; @@ -178,6 +189,13 @@ function WorkflowInstanceSearchForm(props: { statusNames[status] || status } /> + executorGroupNames[executorGroup] || executorGroup} + /> Date: Sat, 17 Aug 2024 16:45:55 +0200 Subject: [PATCH 07/10] logic for the executor groups from explorer --- .../engine/internal/dao/ExecutorDao.java | 15 ++++++ .../service/WorkflowExecutorService.java | 7 +++ .../src/executor/ExecutorListPage.tsx | 4 +- nflow-explorer/src/service.ts | 54 +++++++++++++------ .../WorkflowInstanceListPage.tsx | 39 ++++++++------ .../WorkflowInstanceSearchForm.tsx | 27 +++++++--- .../v1/jaxrs/WorkflowExecutorResource.java | 8 +++ .../springweb/WorkflowExecutorResource.java | 8 +++ .../springweb/WorkflowInstanceResource.java | 2 +- 9 files changed, 122 insertions(+), 42 deletions(-) diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/dao/ExecutorDao.java b/nflow-engine/src/main/java/io/nflow/engine/internal/dao/ExecutorDao.java index 8fb025e01..19498584c 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/dao/ExecutorDao.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/dao/ExecutorDao.java @@ -176,6 +176,21 @@ public List getExecutors() { return new WorkflowExecutor(id, host, pid, executorGroup, started, active, expires, stopped, recovered); }); } + public List getAllExecutors() { + return jdbc.query("select id, host,executor_group, pid, started, active, expires, stopped, recovered from nflow_executor " + + " order by id asc", (rs, rowNum) -> { + int id = rs.getInt("id"); + String host = rs.getString("host"); + int pid = rs.getInt("pid"); + String executorGroupName = rs.getString("executor_group"); + var started = sqlVariants.getDateTime(rs, "started"); + var active = sqlVariants.getDateTime(rs, "active"); + var expires = sqlVariants.getDateTime(rs, "expires"); + var stopped = sqlVariants.getDateTime(rs, "stopped"); + var recovered = sqlVariants.getDateTime(rs, "recovered"); + return new WorkflowExecutor(id, host, pid, executorGroupName, started, active, expires, stopped, recovered); + }); + } public void markShutdown(boolean graceful) { try { diff --git a/nflow-engine/src/main/java/io/nflow/engine/service/WorkflowExecutorService.java b/nflow-engine/src/main/java/io/nflow/engine/service/WorkflowExecutorService.java index b6fca63ee..2f5f68c6e 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/service/WorkflowExecutorService.java +++ b/nflow-engine/src/main/java/io/nflow/engine/service/WorkflowExecutorService.java @@ -29,4 +29,11 @@ public WorkflowExecutorService(ExecutorDao executorDao) { public List getWorkflowExecutors() { return executorDao.getExecutors(); } + /** + * Return all workflow executors . + * @return The workflow executors. + */ + public List getAllWorkflowExecutors() { + return executorDao.getAllExecutors(); + } } diff --git a/nflow-explorer/src/executor/ExecutorListPage.tsx b/nflow-explorer/src/executor/ExecutorListPage.tsx index fd4530316..85d61b61a 100644 --- a/nflow-explorer/src/executor/ExecutorListPage.tsx +++ b/nflow-explorer/src/executor/ExecutorListPage.tsx @@ -12,7 +12,7 @@ import {formatRelativeTime, formatTimestamp} from '../utils'; import {useConfig} from '../config'; import {Spinner} from '../component'; import {Executor} from '../types'; -import {listExecutors} from '../service'; +import {listAllExecutors} from '../service'; const ExecutorTable = ({executors}: {executors: Executor[]}) => { const getMuiTheme = () => @@ -144,7 +144,7 @@ function ExecutorListPage() { const config = useConfig(); const fetchExecutors = useCallback(() => { - listExecutors(config) + listAllExecutors(config) .then(data => setExecutors(data)) .catch(error => { // TODO error handling diff --git a/nflow-explorer/src/service.ts b/nflow-explorer/src/service.ts index f40486d84..fab9c374f 100644 --- a/nflow-explorer/src/service.ts +++ b/nflow-explorer/src/service.ts @@ -39,33 +39,48 @@ const convertWorkflowInstance = (instance: any) => { ); }; -const authenticatedApiCall = (url: string, config: Config, body?: any): Promise => { +const authenticatedApiCall = ( + url: string, + config: Config, + body?: any +): Promise => { const options: RequestInit = { - method: body ? "PUT" : "GET", + method: body ? 'PUT' : 'GET', body: body }; if (!config.msalClient) { options.headers = new Headers({ - "content-type": "application/json" + 'content-type': 'application/json' }); return fetch(url, options); } const request = { - scopes: ["openid"], + scopes: ['openid'] }; - config.msalClient.setActiveAccount(config.msalClient.getAllAccounts()[0]) // required by acquireTokenSilent - return config.msalClient.acquireTokenSilent(request) - .then(tokenResponse => { - options["headers"] = new Headers({ - "Authorization": "Bearer " + tokenResponse.accessToken, - "content-type": "application/json" - }); - return fetch(url, options) + config.msalClient.setActiveAccount(config.msalClient.getAllAccounts()[0]); // required by acquireTokenSilent + return config.msalClient.acquireTokenSilent(request).then(tokenResponse => { + options['headers'] = new Headers({ + Authorization: 'Bearer ' + tokenResponse.accessToken, + 'content-type': 'application/json' }); -} + return fetch(url, options); + }); +}; const listExecutors = (config: Config): Promise> => { - return authenticatedApiCall(serviceUrl(config, '/v1/workflow-executor'), config) + return authenticatedApiCall( + serviceUrl(config, '/v1/workflow-executor'), + config + ) + .then(response => response.json()) + .then((items: any) => items.map(convertExecutor)); +}; + +const listAllExecutors = (config: Config): Promise> => { + return authenticatedApiCall( + serviceUrl(config, '/v1/workflow-executor/all'), + config + ) .then(response => response.json()) .then((items: any) => items.map(convertExecutor)); }; @@ -171,7 +186,10 @@ const listWorkflowInstances = ( query?: any ): Promise => { const params = new URLSearchParams(query).toString(); - return authenticatedApiCall(serviceUrl(config, '/v1/workflow-instance?' + params.toString()), config) + return authenticatedApiCall( + serviceUrl(config, '/v1/workflow-instance?' + params.toString()), + config + ) .then(response => response.json()) .then((items: any) => items.map(convertWorkflowInstance)); }; @@ -210,8 +228,9 @@ const createWorkflowInstance = ( data: NewWorkflowInstance ): Promise => { const url = serviceUrl(config, '/v1/workflow-instance'); - return authenticatedApiCall(url, config, JSON.stringify(data)) - .then(response => response.json()); + return authenticatedApiCall(url, config, JSON.stringify(data)).then( + response => response.json() + ); }; const updateWorkflowInstance = ( @@ -237,6 +256,7 @@ const sendWorkflowInstanceSignal = ( export { listExecutors, + listAllExecutors, listWorkflowDefinitions, getWorkflowDefinition, getWorkflowStatistics, diff --git a/nflow-explorer/src/workflow-instance/WorkflowInstanceListPage.tsx b/nflow-explorer/src/workflow-instance/WorkflowInstanceListPage.tsx index ff88fc62f..0f79ba154 100644 --- a/nflow-explorer/src/workflow-instance/WorkflowInstanceListPage.tsx +++ b/nflow-explorer/src/workflow-instance/WorkflowInstanceListPage.tsx @@ -13,7 +13,11 @@ import WorkflowInstanceSearchForm from './WorkflowInstanceSearchForm'; import {useConfig} from '../config'; import {DataTable, InternalLink, Spinner, useFeedback} from '../component'; import {formatRelativeTime, formatTimestamp} from '../utils'; -import {listExecutors, listWorkflowDefinitions, listWorkflowInstances} from '../service'; +import { + listAllExecutors, + listWorkflowDefinitions, + listWorkflowInstances +} from '../service'; import {Executor, WorkflowInstance} from '../types'; import './workflow-instance.scss'; import '../index.scss'; @@ -249,6 +253,14 @@ const InstanceTable = ({ filter: false } }, + { + name: 'executorGroup', + label: 'Executor Group', + options: { + display: false, + filter: false + } + }, { name: 'started', label: 'Started', @@ -352,6 +364,7 @@ function WorkflowInstanceListPage() { const feedback = useFeedback(); const [initialLoad, setInitialLoad] = useState(true); + const [executorLoad, setExecutorLoad] = useState(true); const [definitions, setDefinitions] = useState>([]); const [instances, setInstances] = useState>(); const [executors, setExecutors] = useState>([]); @@ -374,17 +387,17 @@ function WorkflowInstanceListPage() { }, [config, feedback]); const fetchExecutors = useCallback(() => { - listExecutors(config) - .then(data => setExecutors(data)) - .catch(error => { - // TODO error handling - console.error('Error', error); - }) - .finally(() => setInitialLoad(false)); + listAllExecutors(config) + .then(data => setExecutors(data)) + .catch(error => { + // TODO error handling + console.error('Error', error); + }) + .finally(() => setExecutorLoad(false)); }, [config]); - useEffect(() => fetchDefinitions(), [fetchDefinitions]); + useEffect(() => fetchExecutors(), [fetchExecutors]); const searchInstances = useCallback( (data: any) => { @@ -414,17 +427,13 @@ function WorkflowInstanceListPage() { style={{paddingLeft: 10, paddingRight: 10, paddingTop: 10}} xs={12} > - {initialLoad ? ( + {initialLoad && executorLoad ? ( ) : ( !executor.stopped) - .map(executor => executor.executorGroup) - .filter((value, index, self) => self.indexOf(value) === index) - } + executorGroups={executors} onSubmit={search} /> )} diff --git a/nflow-explorer/src/workflow-instance/WorkflowInstanceSearchForm.tsx b/nflow-explorer/src/workflow-instance/WorkflowInstanceSearchForm.tsx index acb67d769..ee93cf81f 100644 --- a/nflow-explorer/src/workflow-instance/WorkflowInstanceSearchForm.tsx +++ b/nflow-explorer/src/workflow-instance/WorkflowInstanceSearchForm.tsx @@ -5,7 +5,7 @@ import Button from '@material-ui/core/Button'; import {makeStyles} from '@material-ui/core/styles'; import {Selection} from '../component'; -import {Executor} from "../types"; +import {Executor} from '../types'; const useStyles = makeStyles(theme => ({ formControl: { @@ -43,12 +43,12 @@ const statusNames: any = { [allMarker]: '-- All statuses --' }; const executorGroupNames: any = { - [allMarker]: '-- Executor Groups --' + [allMarker]: '-- Default --' }; function WorkflowInstanceSearchForm(props: { definitions: Array; - executorGroups: Array; + executorGroups: Array; onSubmit: (data: any) => any; }) { const classes = useStyles(); @@ -66,7 +66,7 @@ function WorkflowInstanceSearchForm(props: { queryParams.get('status') || allMarker ); const [executorGroup, setExecutorGroup] = useState( - queryParams.get('executorGroup') || allMarker + queryParams.get('executorGroups') || allMarker ); const [businessKey, setBusinessKey] = useState( queryParams.get('businessKey') || '' @@ -81,7 +81,16 @@ function WorkflowInstanceSearchForm(props: { const types = [allMarker].concat(props.definitions.map(d => d.type)); - const executorGroups = [allMarker].concat(props.definitions.map(d => d.executorGroup)); + //concat the executor groups + // and filter where expires greater than now and not stopped + const executorGroups = [allMarker].concat( + props.executorGroups + // .filter((executorGroup: Executor) => !executorGroup.stopped) + // .filter((executorGroup: Executor) => !executorGroup.expires || new Date(executorGroup.expires) > new Date()) + .map((executorGroup: Executor) => executorGroup.executorGroup) + //remove duplicates + .filter((value, index, self) => self.indexOf(value) === index) + ); // TODO to lodash or not to lodash? const selectedWorkflow = props.definitions.filter(d => d.type === type)[0]; @@ -99,6 +108,7 @@ function WorkflowInstanceSearchForm(props: { type, state, status, + executorGroup, businessKey, externalId, id, @@ -139,6 +149,7 @@ function WorkflowInstanceSearchForm(props: { parentWorkflowId, props, state, + executorGroup, status, type ] @@ -156,7 +167,7 @@ function WorkflowInstanceSearchForm(props: { const setWorkflowType = (type: string) => { // when type is changed, need to reset all selections that depend on type setType(type); - setExecutorGroup(executorGroup) + setExecutorGroup(executorGroup); setState(allMarker); }; @@ -194,7 +205,9 @@ function WorkflowInstanceSearchForm(props: { items={executorGroups} selected={executorGroup} onChange={setExecutorGroup} - getSelectionLabel={(executorGroup: string) => executorGroupNames[executorGroup] || executorGroup} + getSelectionLabel={(executorGroup: string) => + executorGroupNames[executorGroup] || executorGroup + } /> ok(workflowExecutors.getWorkflowExecutors().stream().map(converter::convert).collect(toList()))); } + @GET + @Operation(summary = "List all workflow executors") + @Path("/all") + @ApiResponse(content = @Content(array = @ArraySchema(schema = @Schema(implementation = ListWorkflowExecutorResponse.class)))) + public Response listAllWorkflowExecutors() { + return handleExceptions( + () -> ok(workflowExecutors.getAllWorkflowExecutors().stream().map(converter::convert).collect(toList()))); + } } diff --git a/nflow-rest-api-spring-web/src/main/java/io/nflow/rest/v1/springweb/WorkflowExecutorResource.java b/nflow-rest-api-spring-web/src/main/java/io/nflow/rest/v1/springweb/WorkflowExecutorResource.java index 46aa4202e..31ad693c3 100644 --- a/nflow-rest-api-spring-web/src/main/java/io/nflow/rest/v1/springweb/WorkflowExecutorResource.java +++ b/nflow-rest-api-spring-web/src/main/java/io/nflow/rest/v1/springweb/WorkflowExecutorResource.java @@ -51,4 +51,12 @@ public Mono> listWorkflowExecutors() { return handleExceptions(() -> wrapBlocking( () -> ok(workflowExecutors.getWorkflowExecutors().stream().map(converter::convert).collect(toList())))); } + @GetMapping + @Operation(summary = "List all workflow executors") + @RequestMapping("/all") + @ApiResponse(content = @Content(array = @ArraySchema(schema = @Schema(implementation = ListWorkflowExecutorResponse.class)))) + public Mono> listAllWorkflowExecutors() { + return handleExceptions(() -> wrapBlocking( + () -> ok(workflowExecutors.getAllWorkflowExecutors().stream().map(converter::convert).collect(toList())))); + } } diff --git a/nflow-rest-api-spring-web/src/main/java/io/nflow/rest/v1/springweb/WorkflowInstanceResource.java b/nflow-rest-api-spring-web/src/main/java/io/nflow/rest/v1/springweb/WorkflowInstanceResource.java index 9d343a810..eaad119f7 100644 --- a/nflow-rest-api-spring-web/src/main/java/io/nflow/rest/v1/springweb/WorkflowInstanceResource.java +++ b/nflow-rest-api-spring-web/src/main/java/io/nflow/rest/v1/springweb/WorkflowInstanceResource.java @@ -179,7 +179,7 @@ public Mono> listWorkflowInstances( description = "Current value of state variable defined by stateVariableKey") String stateVariableValue, @RequestParam(value = "queryArchive", required = false, defaultValue = QUERY_ARCHIVED_DEFAULT_STR) @Parameter( description = "Query also the archive if not enough results found from main tables") boolean queryArchive, - @RequestParam(value = "executorGroups", required = false, defaultValue = "") @Parameter( + @RequestParam(value = "executorGroup", required = false, defaultValue = "") @Parameter( description = "Query executor groups, default is just the current executor's group, system property to change default") Set executorGroups) { return handleExceptions(() -> wrapBlocking(() -> ok(super.listWorkflowInstances(ids, types, parentWorkflowId, parentActionId, states, statuses, businessKey, externalId, stateVariableKey, stateVariableValue, includes, include, maxResults, From 1ea26204e9a0e36a1dbc1110d8a5af4fbbedc935 Mon Sep 17 00:00:00 2001 From: Julian Purse Date: Tue, 20 Aug 2024 19:57:28 +0200 Subject: [PATCH 08/10] reduce min width on selection to keep on same row --- nflow-explorer/src/component/Selection.tsx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nflow-explorer/src/component/Selection.tsx b/nflow-explorer/src/component/Selection.tsx index d55ff56b8..5359ea772 100644 --- a/nflow-explorer/src/component/Selection.tsx +++ b/nflow-explorer/src/component/Selection.tsx @@ -25,7 +25,7 @@ function Selection(props: { let currentIndex = ++index; return ( - + {props.label}